Merge pull request #570 from miaoyq/fixes-569
Use channel to pass the stop info instead of polling for container stop
This commit is contained in:
commit
11042a4141
@ -28,15 +28,10 @@ import (
|
||||
"golang.org/x/sys/unix"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||
|
||||
"github.com/containerd/cri-containerd/pkg/store"
|
||||
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
|
||||
)
|
||||
|
||||
const (
|
||||
// stopCheckPollInterval is the the interval to check whether a container
|
||||
// is stopped successfully.
|
||||
stopCheckPollInterval = 100 * time.Millisecond
|
||||
|
||||
// killContainerTimeout is the timeout that we wait for the container to
|
||||
// be SIGKILLed.
|
||||
killContainerTimeout = 2 * time.Minute
|
||||
@ -104,7 +99,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
|
||||
}
|
||||
}
|
||||
|
||||
err = c.waitContainerStop(ctx, id, timeout)
|
||||
err = c.waitContainerStop(ctx, container, timeout)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
@ -130,41 +125,22 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
|
||||
}
|
||||
|
||||
// Wait for a fixed timeout until container stop is observed by event monitor.
|
||||
if err := c.waitContainerStop(ctx, id, killContainerTimeout); err != nil {
|
||||
if err := c.waitContainerStop(ctx, container, killContainerTimeout); err != nil {
|
||||
return fmt.Errorf("an error occurs during waiting for container %q to stop: %v", id, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitContainerStop polls container state until timeout exceeds or container is stopped.
|
||||
func (c *criContainerdService) waitContainerStop(ctx context.Context, id string, timeout time.Duration) error {
|
||||
ticker := time.NewTicker(stopCheckPollInterval)
|
||||
defer ticker.Stop()
|
||||
// waitContainerStop waits for container to be stopped until timeout exceeds or centext is cancelled.
|
||||
func (c *criContainerdService) waitContainerStop(ctx context.Context, container containerstore.Container, timeout time.Duration) error {
|
||||
timeoutTimer := time.NewTimer(timeout)
|
||||
defer timeoutTimer.Stop()
|
||||
for {
|
||||
// Poll once before waiting for stopCheckPollInterval.
|
||||
container, err := c.containerStore.Get(id)
|
||||
if err != nil {
|
||||
if err != store.ErrNotExist {
|
||||
return fmt.Errorf("failed to get container %q: %v", id, err)
|
||||
}
|
||||
// Do not return error here because container was removed means
|
||||
// it is already stopped.
|
||||
logrus.Warnf("Container %q was removed during stopping", id)
|
||||
return nil
|
||||
}
|
||||
// TODO(random-liu): Use channel with event handler instead of polling.
|
||||
if container.Status.Get().State() == runtime.ContainerState_CONTAINER_EXITED {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("wait container %q is cancelled", id)
|
||||
case <-timeoutTimer.C:
|
||||
return fmt.Errorf("wait container %q stop timeout", id)
|
||||
case <-ticker.C:
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("wait container %q is cancelled", container.ID)
|
||||
case <-timeoutTimer.C:
|
||||
return fmt.Errorf("wait container %q stop timeout", container.ID)
|
||||
case <-container.Stopped():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ func TestWaitContainerStop(t *testing.T) {
|
||||
CreatedAt: time.Now().UnixNano(),
|
||||
StartedAt: time.Now().UnixNano(),
|
||||
},
|
||||
timeout: 2 * stopCheckPollInterval,
|
||||
timeout: 200 * time.Millisecond,
|
||||
expectErr: true,
|
||||
},
|
||||
"should return error if context is cancelled": {
|
||||
@ -51,11 +51,6 @@ func TestWaitContainerStop(t *testing.T) {
|
||||
cancel: true,
|
||||
expectErr: true,
|
||||
},
|
||||
"should not return error if container is removed before timeout": {
|
||||
status: nil,
|
||||
timeout: time.Hour,
|
||||
expectErr: false,
|
||||
},
|
||||
"should not return error if container is stopped before timeout": {
|
||||
status: &containerstore.Status{
|
||||
CreatedAt: time.Now().UnixNano(),
|
||||
@ -67,13 +62,15 @@ func TestWaitContainerStop(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
c := newTestCRIContainerdService()
|
||||
if test.status != nil {
|
||||
container, err := containerstore.NewContainer(
|
||||
containerstore.Metadata{ID: id},
|
||||
containerstore.WithFakeStatus(*test.status),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, c.containerStore.Add(container))
|
||||
container, err := containerstore.NewContainer(
|
||||
containerstore.Metadata{ID: id},
|
||||
containerstore.WithFakeStatus(*test.status),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, c.containerStore.Add(container))
|
||||
if test.status.FinishedAt != 0 {
|
||||
// Fake the TaskExit event
|
||||
container.Stop()
|
||||
}
|
||||
ctx := context.Background()
|
||||
if test.cancel {
|
||||
@ -81,7 +78,7 @@ func TestWaitContainerStop(t *testing.T) {
|
||||
cancel()
|
||||
ctx = cancelledCtx
|
||||
}
|
||||
err := c.waitContainerStop(ctx, id, test.timeout)
|
||||
err = c.waitContainerStop(ctx, container, test.timeout)
|
||||
assert.Equal(t, test.expectErr, err != nil, desc)
|
||||
}
|
||||
}
|
||||
|
@ -155,6 +155,8 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
|
||||
// TODO(random-liu): [P0] Enqueue the event and retry.
|
||||
return
|
||||
}
|
||||
// Using channel to propagate the information of container stop
|
||||
cntr.Stop()
|
||||
case *eventtypes.TaskOOM:
|
||||
e := any.(*eventtypes.TaskOOM)
|
||||
logrus.Infof("TaskOOM event %+v", e)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/docker/docker/pkg/truncindex"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||
|
||||
cio "github.com/containerd/cri-containerd/pkg/server/io"
|
||||
"github.com/containerd/cri-containerd/pkg/store"
|
||||
@ -37,7 +38,8 @@ type Container struct {
|
||||
Container containerd.Container
|
||||
// Container IO
|
||||
IO *cio.ContainerIO
|
||||
// TODO(random-liu): Add stop channel to get rid of stop poll waiting.
|
||||
// StopCh is used to propagate the stop information of the container.
|
||||
StopCh
|
||||
}
|
||||
|
||||
// Opts sets specific information to newly created Container.
|
||||
@ -67,6 +69,9 @@ func WithStatus(status Status, root string) Opts {
|
||||
return err
|
||||
}
|
||||
c.Status = s
|
||||
if s.Get().State() == runtime.ContainerState_CONTAINER_EXITED {
|
||||
c.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -75,6 +80,7 @@ func WithStatus(status Status, root string) Opts {
|
||||
func NewContainer(metadata Metadata, opts ...Opts) (Container, error) {
|
||||
c := Container{
|
||||
Metadata: metadata,
|
||||
StopCh: StopCh(make(chan struct{})),
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(&c); err != nil {
|
||||
@ -89,6 +95,19 @@ func (c *Container) Delete() error {
|
||||
return c.Status.Delete()
|
||||
}
|
||||
|
||||
// StopCh is used to propagate the stop information of a container.
|
||||
type StopCh chan struct{}
|
||||
|
||||
// Stop close stopCh of the container.
|
||||
func (ch StopCh) Stop() {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// Stopped return the stopCh of the container as a readonly channel.
|
||||
func (ch StopCh) Stopped() <-chan struct{} {
|
||||
return ch
|
||||
}
|
||||
|
||||
// Store stores all Containers.
|
||||
type Store struct {
|
||||
lock sync.RWMutex
|
||||
|
Loading…
Reference in New Issue
Block a user