From 61c1fdb098ee4eb36c386b0380622254578716a3 Mon Sep 17 00:00:00 2001 From: Yanqiang Miao Date: Fri, 26 Jan 2018 10:50:54 +0800 Subject: [PATCH] Use channel to propagate the stop info of sandbox Signed-off-by: Yanqiang Miao --- pkg/server/container_stop.go | 2 +- pkg/server/container_stop_test.go | 4 ---- pkg/server/events.go | 2 ++ pkg/server/sandbox_stop.go | 29 ++++++++--------------------- pkg/server/sandbox_stop_test.go | 2 +- pkg/store/container/container.go | 17 ++--------------- pkg/store/container/fake_status.go | 4 ++++ pkg/store/sandbox/sandbox.go | 9 ++++++++- pkg/store/util.go | 30 ++++++++++++++++++++++++++++++ 9 files changed, 56 insertions(+), 43 deletions(-) create mode 100644 pkg/store/util.go diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index a57e31514..1ee2dead7 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -129,7 +129,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont return nil } -// waitContainerStop waits for container to be stopped until timeout exceeds or centext is cancelled. +// waitContainerStop waits for container to be stopped until timeout exceeds or context is cancelled. func (c *criContainerdService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error { timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() diff --git a/pkg/server/container_stop_test.go b/pkg/server/container_stop_test.go index 97e894797..569e9c99e 100644 --- a/pkg/server/container_stop_test.go +++ b/pkg/server/container_stop_test.go @@ -68,10 +68,6 @@ func TestWaitContainerStop(t *testing.T) { ) assert.NoError(t, err) assert.NoError(t, c.containerStore.Add(container)) - if test.status.FinishedAt != 0 { - // Fake the TaskExit event - container.Stop() - } ctx := context.Background() if test.cancel { cancelledCtx, cancel := context.WithCancel(ctx) diff --git a/pkg/server/events.go b/pkg/server/events.go index 23572f52f..74c552558 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -235,4 +235,6 @@ func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { // TODO(random-liu): [P0] Enqueue the event and retry. return } + // Using channel to propagate the information of sandbox stop + sb.Stop() } diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 7cdb8573a..dd4860ad6 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -31,10 +31,6 @@ import ( sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) -// stopCheckPollInterval is the the interval to check whether a sandbox -// is stopped successfully. -const stopCheckPollInterval = 100 * time.Millisecond - // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be forcibly terminated. func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) { @@ -125,25 +121,16 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, sandbox return c.waitSandboxStop(ctx, sandbox, killContainerTimeout) } -// waitSandboxStop polls sandbox state until timeout exceeds or sandbox is stopped. +// waitSandboxStop waits for sandbox to be stopped until timeout exceeds or context is cancelled. func (c *criContainerdService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error { - ticker := time.NewTicker(stopCheckPollInterval) - defer ticker.Stop() timeoutTimer := time.NewTimer(timeout) defer timeoutTimer.Stop() - for { - // Poll once before waiting for stopCheckPollInterval. - // TODO(random-liu): Use channel with event handler instead of polling. - if sandbox.Status.Get().State == sandboxstore.StateNotReady { - return nil - } - select { - case <-ctx.Done(): - return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID) - case <-timeoutTimer.C: - return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID) - case <-ticker.C: - continue - } + select { + case <-ctx.Done(): + return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID) + case <-timeoutTimer.C: + return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID) + case <-sandbox.Stopped(): + return nil } } diff --git a/pkg/server/sandbox_stop_test.go b/pkg/server/sandbox_stop_test.go index 09ff82fb8..10d520146 100644 --- a/pkg/server/sandbox_stop_test.go +++ b/pkg/server/sandbox_stop_test.go @@ -36,7 +36,7 @@ func TestWaitSandboxStop(t *testing.T) { }{ "should return error if timeout exceeds": { state: sandboxstore.StateReady, - timeout: 2 * stopCheckPollInterval, + timeout: 200 * time.Millisecond, expectErr: true, }, "should return error if context is cancelled": { diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 2026ed903..0c274fc52 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -39,7 +39,7 @@ type Container struct { // Container IO IO *cio.ContainerIO // StopCh is used to propagate the stop information of the container. - StopCh + store.StopCh } // Opts sets specific information to newly created Container. @@ -80,7 +80,7 @@ func WithStatus(status Status, root string) Opts { func NewContainer(metadata Metadata, opts ...Opts) (Container, error) { c := Container{ Metadata: metadata, - StopCh: StopCh(make(chan struct{})), + StopCh: store.StopCh(make(chan struct{})), } for _, o := range opts { if err := o(&c); err != nil { @@ -95,19 +95,6 @@ func (c *Container) Delete() error { return c.Status.Delete() } -// StopCh is used to propagate the stop information of a container. -type StopCh chan struct{} - -// Stop close stopCh of the container. -func (ch StopCh) Stop() { - close(ch) -} - -// Stopped return the stopCh of the container as a readonly channel. -func (ch StopCh) Stopped() <-chan struct{} { - return ch -} - // Store stores all Containers. type Store struct { lock sync.RWMutex diff --git a/pkg/store/container/fake_status.go b/pkg/store/container/fake_status.go index 7cc7c83e8..3503bdc8b 100644 --- a/pkg/store/container/fake_status.go +++ b/pkg/store/container/fake_status.go @@ -22,6 +22,10 @@ import "sync" func WithFakeStatus(status Status) Opts { return func(c *Container) error { c.Status = &fakeStatusStorage{status: status} + if status.FinishedAt != 0 { + // Fake the TaskExit event + c.Stop() + } return nil } } diff --git a/pkg/store/sandbox/sandbox.go b/pkg/store/sandbox/sandbox.go index 0b69c2b23..dfcc7dde0 100644 --- a/pkg/store/sandbox/sandbox.go +++ b/pkg/store/sandbox/sandbox.go @@ -36,15 +36,22 @@ type Sandbox struct { Container containerd.Container // CNI network namespace client NetNS *NetNS + // StopCh is used to propagate the stop information of the sandbox. + store.StopCh } // NewSandbox creates an internally used sandbox type. This functions reminds // the caller that a sandbox must have a status. func NewSandbox(metadata Metadata, status Status) Sandbox { - return Sandbox{ + s := Sandbox{ Metadata: metadata, Status: StoreStatus(status), + StopCh: store.StopCh(make(chan struct{})), } + if status.State == StateNotReady { + s.Stop() + } + return s } // Store stores all sandboxes. diff --git a/pkg/store/util.go b/pkg/store/util.go new file mode 100644 index 000000000..8a7b6fcf9 --- /dev/null +++ b/pkg/store/util.go @@ -0,0 +1,30 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +// StopCh is used to propagate the stop information of a container. +type StopCh chan struct{} + +// Stop close stopCh of the container. +func (ch StopCh) Stop() { + close(ch) +} + +// Stopped return the stopCh of the container as a readonly channel. +func (ch StopCh) Stopped() <-chan struct{} { + return ch +}