diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 361067fac..b24bdb9b9 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -28,15 +28,10 @@ import ( "golang.org/x/sys/unix" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - "github.com/containerd/cri-containerd/pkg/store" containerstore "github.com/containerd/cri-containerd/pkg/store/container" ) const ( - // stopCheckPollInterval is the the interval to check whether a container - // is stopped successfully. - stopCheckPollInterval = 100 * time.Millisecond - // killContainerTimeout is the timeout that we wait for the container to // be SIGKILLed. killContainerTimeout = 2 * time.Minute @@ -104,7 +99,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont } } - err = c.waitContainerStop(ctx, id, timeout) + err = c.waitContainerStop(ctx, container, timeout) if err == nil { return nil } @@ -130,41 +125,22 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont } // Wait for a fixed timeout until container stop is observed by event monitor. - if err := c.waitContainerStop(ctx, id, killContainerTimeout); err != nil { + if err := c.waitContainerStop(ctx, container, killContainerTimeout); err != nil { return fmt.Errorf("an error occurs during waiting for container %q to stop: %v", id, err) } return nil } -// waitContainerStop polls container state until timeout exceeds or container is stopped. -func (c *criContainerdService) waitContainerStop(ctx context.Context, id string, timeout time.Duration) error { - ticker := time.NewTicker(stopCheckPollInterval) - defer ticker.Stop() +// waitContainerStop waits for container to be stopped until timeout exceeds or centext is cancelled. +func (c *criContainerdService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error { timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() - for { - // Poll once before waiting for stopCheckPollInterval. - container, err := c.containerStore.Get(id) - if err != nil { - if err != store.ErrNotExist { - return fmt.Errorf("failed to get container %q: %v", id, err) - } - // Do not return error here because container was removed means - // it is already stopped. - logrus.Warnf("Container %q was removed during stopping", id) - return nil - } - // TODO(random-liu): Use channel with event handler instead of polling. - if container.Status.Get().State() == runtime.ContainerState_CONTAINER_EXITED { - return nil - } - select { - case <-ctx.Done(): - return fmt.Errorf("wait container %q is cancelled", id) - case <-timeoutTimer.C: - return fmt.Errorf("wait container %q stop timeout", id) - case <-ticker.C: - continue - } + select { + case <-ctx.Done(): + return fmt.Errorf("wait container %q is cancelled", container.ID) + case <-timeoutTimer.C: + return fmt.Errorf("wait container %q stop timeout", container.ID) + case <-container.Stopped(): + return nil } } diff --git a/pkg/server/container_stop_test.go b/pkg/server/container_stop_test.go index 6daa3d644..97e894797 100644 --- a/pkg/server/container_stop_test.go +++ b/pkg/server/container_stop_test.go @@ -39,7 +39,7 @@ func TestWaitContainerStop(t *testing.T) { CreatedAt: time.Now().UnixNano(), StartedAt: time.Now().UnixNano(), }, - timeout: 2 * stopCheckPollInterval, + timeout: 200 * time.Millisecond, expectErr: true, }, "should return error if context is cancelled": { @@ -51,11 +51,6 @@ func TestWaitContainerStop(t *testing.T) { cancel: true, expectErr: true, }, - "should not return error if container is removed before timeout": { - status: nil, - timeout: time.Hour, - expectErr: false, - }, "should not return error if container is stopped before timeout": { status: &containerstore.Status{ CreatedAt: time.Now().UnixNano(), @@ -67,13 +62,15 @@ func TestWaitContainerStop(t *testing.T) { }, } { c := newTestCRIContainerdService() - if test.status != nil { - container, err := containerstore.NewContainer( - containerstore.Metadata{ID: id}, - containerstore.WithFakeStatus(*test.status), - ) - assert.NoError(t, err) - assert.NoError(t, c.containerStore.Add(container)) + container, err := containerstore.NewContainer( + containerstore.Metadata{ID: id}, + containerstore.WithFakeStatus(*test.status), + ) + assert.NoError(t, err) + assert.NoError(t, c.containerStore.Add(container)) + if test.status.FinishedAt != 0 { + // Fake the TaskExit event + container.Stop() } ctx := context.Background() if test.cancel { @@ -81,7 +78,7 @@ func TestWaitContainerStop(t *testing.T) { cancel() ctx = cancelledCtx } - err := c.waitContainerStop(ctx, id, test.timeout) + err = c.waitContainerStop(ctx, container, test.timeout) assert.Equal(t, test.expectErr, err != nil, desc) } } diff --git a/pkg/server/events.go b/pkg/server/events.go index 4a00853ab..0df4e681e 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -155,6 +155,8 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { // TODO(random-liu): [P0] Enqueue the event and retry. return } + // Using channel to propagate the information of container stop + cntr.Stop() case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 6f636e228..2026ed903 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -21,6 +21,7 @@ import ( "github.com/containerd/containerd" "github.com/docker/docker/pkg/truncindex" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" cio "github.com/containerd/cri-containerd/pkg/server/io" "github.com/containerd/cri-containerd/pkg/store" @@ -37,7 +38,8 @@ type Container struct { Container containerd.Container // Container IO IO *cio.ContainerIO - // TODO(random-liu): Add stop channel to get rid of stop poll waiting. + // StopCh is used to propagate the stop information of the container. + StopCh } // Opts sets specific information to newly created Container. @@ -67,6 +69,9 @@ func WithStatus(status Status, root string) Opts { return err } c.Status = s + if s.Get().State() == runtime.ContainerState_CONTAINER_EXITED { + c.Stop() + } return nil } } @@ -75,6 +80,7 @@ func WithStatus(status Status, root string) Opts { func NewContainer(metadata Metadata, opts ...Opts) (Container, error) { c := Container{ Metadata: metadata, + StopCh: StopCh(make(chan struct{})), } for _, o := range opts { if err := o(&c); err != nil { @@ -89,6 +95,19 @@ func (c *Container) Delete() error { return c.Status.Delete() } +// StopCh is used to propagate the stop information of a container. +type StopCh chan struct{} + +// Stop close stopCh of the container. +func (ch StopCh) Stop() { + close(ch) +} + +// Stopped return the stopCh of the container as a readonly channel. +func (ch StopCh) Stopped() <-chan struct{} { + return ch +} + // Store stores all Containers. type Store struct { lock sync.RWMutex