Merge pull request #575 from miaoyq/use-channel-propagate-stop-info-of-sandbox
Use channel to propagate the stop info of sandbox
This commit is contained in:
commit
08644a7fff
@ -129,7 +129,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitContainerStop waits for container to be stopped until timeout exceeds or centext is cancelled.
|
// waitContainerStop waits for container to be stopped until timeout exceeds or context is cancelled.
|
||||||
func (c *criContainerdService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
|
func (c *criContainerdService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
timeoutTimer := time.NewTimer(timeout)
|
||||||
defer timeoutTimer.Stop()
|
defer timeoutTimer.Stop()
|
||||||
|
@ -68,10 +68,6 @@ func TestWaitContainerStop(t *testing.T) {
|
|||||||
)
|
)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NoError(t, c.containerStore.Add(container))
|
assert.NoError(t, c.containerStore.Add(container))
|
||||||
if test.status.FinishedAt != 0 {
|
|
||||||
// Fake the TaskExit event
|
|
||||||
container.Stop()
|
|
||||||
}
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
if test.cancel {
|
if test.cancel {
|
||||||
cancelledCtx, cancel := context.WithCancel(ctx)
|
cancelledCtx, cancel := context.WithCancel(ctx)
|
||||||
|
@ -235,4 +235,6 @@ func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
|
|||||||
// TODO(random-liu): [P0] Enqueue the event and retry.
|
// TODO(random-liu): [P0] Enqueue the event and retry.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Using channel to propagate the information of sandbox stop
|
||||||
|
sb.Stop()
|
||||||
}
|
}
|
||||||
|
@ -31,10 +31,6 @@ import (
|
|||||||
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
|
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
|
||||||
)
|
)
|
||||||
|
|
||||||
// stopCheckPollInterval is the the interval to check whether a sandbox
|
|
||||||
// is stopped successfully.
|
|
||||||
const stopCheckPollInterval = 100 * time.Millisecond
|
|
||||||
|
|
||||||
// StopPodSandbox stops the sandbox. If there are any running containers in the
|
// StopPodSandbox stops the sandbox. If there are any running containers in the
|
||||||
// sandbox, they should be forcibly terminated.
|
// sandbox, they should be forcibly terminated.
|
||||||
func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) {
|
func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) {
|
||||||
@ -125,25 +121,16 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, sandbox
|
|||||||
return c.waitSandboxStop(ctx, sandbox, killContainerTimeout)
|
return c.waitSandboxStop(ctx, sandbox, killContainerTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitSandboxStop polls sandbox state until timeout exceeds or sandbox is stopped.
|
// waitSandboxStop waits for sandbox to be stopped until timeout exceeds or context is cancelled.
|
||||||
func (c *criContainerdService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error {
|
func (c *criContainerdService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error {
|
||||||
ticker := time.NewTicker(stopCheckPollInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
timeoutTimer := time.NewTimer(timeout)
|
timeoutTimer := time.NewTimer(timeout)
|
||||||
defer timeoutTimer.Stop()
|
defer timeoutTimer.Stop()
|
||||||
for {
|
select {
|
||||||
// Poll once before waiting for stopCheckPollInterval.
|
case <-ctx.Done():
|
||||||
// TODO(random-liu): Use channel with event handler instead of polling.
|
return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID)
|
||||||
if sandbox.Status.Get().State == sandboxstore.StateNotReady {
|
case <-timeoutTimer.C:
|
||||||
return nil
|
return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID)
|
||||||
}
|
case <-sandbox.Stopped():
|
||||||
select {
|
return nil
|
||||||
case <-ctx.Done():
|
|
||||||
return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID)
|
|
||||||
case <-timeoutTimer.C:
|
|
||||||
return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID)
|
|
||||||
case <-ticker.C:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ func TestWaitSandboxStop(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
"should return error if timeout exceeds": {
|
"should return error if timeout exceeds": {
|
||||||
state: sandboxstore.StateReady,
|
state: sandboxstore.StateReady,
|
||||||
timeout: 2 * stopCheckPollInterval,
|
timeout: 200 * time.Millisecond,
|
||||||
expectErr: true,
|
expectErr: true,
|
||||||
},
|
},
|
||||||
"should return error if context is cancelled": {
|
"should return error if context is cancelled": {
|
||||||
|
@ -39,7 +39,7 @@ type Container struct {
|
|||||||
// Container IO
|
// Container IO
|
||||||
IO *cio.ContainerIO
|
IO *cio.ContainerIO
|
||||||
// StopCh is used to propagate the stop information of the container.
|
// StopCh is used to propagate the stop information of the container.
|
||||||
StopCh
|
store.StopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// Opts sets specific information to newly created Container.
|
// Opts sets specific information to newly created Container.
|
||||||
@ -80,7 +80,7 @@ func WithStatus(status Status, root string) Opts {
|
|||||||
func NewContainer(metadata Metadata, opts ...Opts) (Container, error) {
|
func NewContainer(metadata Metadata, opts ...Opts) (Container, error) {
|
||||||
c := Container{
|
c := Container{
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
StopCh: StopCh(make(chan struct{})),
|
StopCh: store.StopCh(make(chan struct{})),
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
if err := o(&c); err != nil {
|
if err := o(&c); err != nil {
|
||||||
@ -95,19 +95,6 @@ func (c *Container) Delete() error {
|
|||||||
return c.Status.Delete()
|
return c.Status.Delete()
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopCh is used to propagate the stop information of a container.
|
|
||||||
type StopCh chan struct{}
|
|
||||||
|
|
||||||
// Stop close stopCh of the container.
|
|
||||||
func (ch StopCh) Stop() {
|
|
||||||
close(ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stopped return the stopCh of the container as a readonly channel.
|
|
||||||
func (ch StopCh) Stopped() <-chan struct{} {
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store stores all Containers.
|
// Store stores all Containers.
|
||||||
type Store struct {
|
type Store struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
@ -22,6 +22,10 @@ import "sync"
|
|||||||
func WithFakeStatus(status Status) Opts {
|
func WithFakeStatus(status Status) Opts {
|
||||||
return func(c *Container) error {
|
return func(c *Container) error {
|
||||||
c.Status = &fakeStatusStorage{status: status}
|
c.Status = &fakeStatusStorage{status: status}
|
||||||
|
if status.FinishedAt != 0 {
|
||||||
|
// Fake the TaskExit event
|
||||||
|
c.Stop()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,15 +36,22 @@ type Sandbox struct {
|
|||||||
Container containerd.Container
|
Container containerd.Container
|
||||||
// CNI network namespace client
|
// CNI network namespace client
|
||||||
NetNS *NetNS
|
NetNS *NetNS
|
||||||
|
// StopCh is used to propagate the stop information of the sandbox.
|
||||||
|
store.StopCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSandbox creates an internally used sandbox type. This functions reminds
|
// NewSandbox creates an internally used sandbox type. This functions reminds
|
||||||
// the caller that a sandbox must have a status.
|
// the caller that a sandbox must have a status.
|
||||||
func NewSandbox(metadata Metadata, status Status) Sandbox {
|
func NewSandbox(metadata Metadata, status Status) Sandbox {
|
||||||
return Sandbox{
|
s := Sandbox{
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
Status: StoreStatus(status),
|
Status: StoreStatus(status),
|
||||||
|
StopCh: store.StopCh(make(chan struct{})),
|
||||||
}
|
}
|
||||||
|
if status.State == StateNotReady {
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store stores all sandboxes.
|
// Store stores all sandboxes.
|
||||||
|
30
pkg/store/util.go
Normal file
30
pkg/store/util.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
// StopCh is used to propagate the stop information of a container.
|
||||||
|
type StopCh chan struct{}
|
||||||
|
|
||||||
|
// Stop close stopCh of the container.
|
||||||
|
func (ch StopCh) Stop() {
|
||||||
|
close(ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stopped return the stopCh of the container as a readonly channel.
|
||||||
|
func (ch StopCh) Stopped() <-chan struct{} {
|
||||||
|
return ch
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user