diff --git a/pkg/os/os.go b/pkg/os/os.go index b7c1384db..128ffb8f9 100644 --- a/pkg/os/os.go +++ b/pkg/os/os.go @@ -47,7 +47,7 @@ type OS interface { // RealOS is used to dispatch the real system level operations. type RealOS struct{} -// MkdirAll will will call os.MkdirAll to create a directory. +// MkdirAll will call os.MkdirAll to create a directory. func (RealOS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) } @@ -79,7 +79,7 @@ func (RealOS) ResolveSymbolicLink(path string) (string, error) { return filepath.EvalSymlinks(path) } -// CopyFile copys src file to dest file +// CopyFile will copy src file to dest file func (RealOS) CopyFile(src, dest string, perm os.FileMode) error { in, err := os.Open(src) if err != nil { diff --git a/pkg/server/events.go b/pkg/server/events.go index 58f0bfab5..80ba45c91 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -18,14 +18,18 @@ package server import ( "errors" + "fmt" + "time" eventtypes "github.com/containerd/containerd/api/events" containerdio "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/typeurl" + gogotypes "github.com/gogo/protobuf/types" "github.com/sirupsen/logrus" "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/util/clock" ctrdutil "github.com/containerd/cri/pkg/containerd/util" "github.com/containerd/cri/pkg/store" @@ -33,6 +37,12 @@ import ( sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) +const ( + backOffInitDuration = 1 * time.Second + backOffMaxDuration = 5 * time.Minute + backOffExpireCheckDuration = 1 * time.Second +) + // eventMonitor monitors containerd event and updates internal state correspondingly. // TODO(random-liu): [P1] Figure out is it possible to drop event during containerd // is running. If it is, we should do periodically list to sync state with containerd. @@ -43,6 +53,23 @@ type eventMonitor struct { errCh <-chan error ctx context.Context cancel context.CancelFunc + backOff *backOff +} + +type backOff struct { + queuePool map[string]*backOffQueue + ticker *time.Ticker + minDuration time.Duration + maxDuration time.Duration + checkDuration time.Duration + clock clock.Clock +} + +type backOffQueue struct { + events []interface{} + expireTime time.Time + duration time.Duration + clock clock.Clock } // Create new event monitor. New event monitor will start subscribing containerd event. All events @@ -55,6 +82,8 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit sandboxStore: s, ctx: ctx, cancel: cancel, + backOff: newBackOff(backOffInitDuration, backOffMaxDuration, + backOffExpireCheckDuration, clock.RealClock{}), } } @@ -67,6 +96,24 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) { em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) } +func convertEvent(e *gogotypes.Any) (string, interface{}, error) { + containerID := "" + evt, err := typeurl.UnmarshalAny(e) + if err != nil { + return "", nil, fmt.Errorf("failed to unmarshalany %+v", err) + } + + switch evt.(type) { + case *eventtypes.TaskExit: + containerID = evt.(*eventtypes.TaskExit).ContainerID + case *eventtypes.TaskOOM: + containerID = evt.(*eventtypes.TaskOOM).ContainerID + default: + return "", nil, fmt.Errorf("unsupported event") + } + return containerID, evt, nil +} + // start starts the event monitor which monitors and handles all container events. It returns // a channel for the caller to wait for the event monitor to stop. start must be called after // subscribe. @@ -76,15 +123,40 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { } closeCh := make(chan struct{}) go func() { + backOffCheckCh := em.backOff.start() for { select { case e := <-em.ch: logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) - em.handleEvent(e) + cID, evt, err := convertEvent(e.Event) + if err != nil { + logrus.WithError(err).Errorf("Failed to convert event %+v", e) + break + } + if em.backOff.isInBackOff(cID) { + em.backOff.enBackOff(cID, evt) + break + } + if err := em.handleEvent(evt); err != nil { + logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID) + em.backOff.enBackOff(cID, evt) + } case err := <-em.errCh: logrus.WithError(err).Error("Failed to handle event stream") close(closeCh) return + case <-backOffCheckCh: + cIDs := em.backOff.getExpiredContainers() + for _, cID := range cIDs { + queue := em.backOff.deBackOff(cID) + for i, any := range queue.events { + if err := em.handleEvent(any); err != nil { + logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID) + em.backOff.reBackOff(cID, queue.events[i:], queue.duration) + break + } + } + } } } }() @@ -94,17 +166,13 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { // stop stops the event monitor. It will close the event channel. // Once event monitor is stopped, it can't be started. func (em *eventMonitor) stop() { + em.backOff.stop() em.cancel() } // handleEvent handles a containerd event. -func (em *eventMonitor) handleEvent(evt *events.Envelope) { +func (em *eventMonitor) handleEvent(any interface{}) error { ctx := ctrdutil.NamespacedContext() - any, err := typeurl.UnmarshalAny(evt.Event) - if err != nil { - logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt) - return - } switch any.(type) { // If containerd-shim exits unexpectedly, there will be no corresponding event. // However, containerd could not retrieve container state in that case, so it's @@ -112,51 +180,59 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { // TODO(random-liu): [P2] Handle containerd-shim exit. case *eventtypes.TaskExit: e := any.(*eventtypes.TaskExit) - logrus.Infof("TaskExit event %+v", e) cntr, err := em.containerStore.Get(e.ContainerID) if err == nil { - handleContainerExit(ctx, e, cntr) - return + if err := handleContainerExit(ctx, e, cntr); err != nil { + return fmt.Errorf("failed to handle container TaskExit event:%+v", err) + } + return nil } else if err != store.ErrNotExist { - logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) - return + return fmt.Errorf("can't find container for TaskExit event:%+v", err) } // Use GetAll to include sandbox in unknown state. sb, err := em.sandboxStore.GetAll(e.ContainerID) if err == nil { - handleSandboxExit(ctx, e, sb) - return + if err := handleSandboxExit(ctx, e, sb); err != nil { + return fmt.Errorf("failed to handle sandbox TaskExit event:%+v", err) + } + return nil } else if err != store.ErrNotExist { - logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID) - return + return fmt.Errorf("can't find sandbox for TaskExit event:%+v", err) } + return nil case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) cntr, err := em.containerStore.Get(e.ContainerID) if err != nil { - if _, err := em.sandboxStore.Get(e.ContainerID); err == nil { - return + if err != store.ErrNotExist { + return fmt.Errorf("can't find container for TaskOOM event:%+v", err) } - logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) - return + if _, err = em.sandboxStore.Get(e.ContainerID); err != nil { + if err != store.ErrNotExist { + return fmt.Errorf("can't find sandbox for TaskOOM event:%+v", err) + } + return nil + } + return nil } err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { status.Reason = oomExitReason return status, nil }) if err != nil { - logrus.WithError(err).Errorf("Failed to update container %q oom", e.ContainerID) - return + return fmt.Errorf("failed to update container status for TaskOOM event:%+v", err) } } + + return nil } // handleContainerExit handles TaskExit event for container. -func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) { +func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error { if e.Pid != cntr.Status.Get().Pid { // Non-init process died, ignore the event. - return + return nil } // Attach container IO so that `Delete` could cleanup the stream properly. task, err := cntr.Container.Task(ctx, @@ -166,16 +242,13 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta ) if err != nil { if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to load task for container %q", e.ContainerID) - return + return fmt.Errorf("failed to load task for container: %v", err) } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker if _, err = task.Delete(ctx); err != nil { - // TODO(random-liu): [P0] Enqueue the event and retry. if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID) - return + return fmt.Errorf("failed to stop container: %v", err) } // Move on to make sure container status is updated. } @@ -192,34 +265,30 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta return status, nil }) if err != nil { - logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID) - // TODO(random-liu): [P0] Enqueue the event and retry. - return + return fmt.Errorf("failed to update container state: %v", err) } // Using channel to propagate the information of container stop cntr.Stop() + return nil } // handleSandboxExit handles TaskExit event for sandbox. -func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { +func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error { if e.Pid != sb.Status.Get().Pid { // Non-init process died, ignore the event. - return + return nil } // No stream attached to sandbox container. task, err := sb.Container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID) - return + return fmt.Errorf("failed to load task for sandbox: %v", err) } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker if _, err = task.Delete(ctx); err != nil { - // TODO(random-liu): [P0] Enqueue the event and retry. if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID) - return + return fmt.Errorf("failed to stop sandbox: %v", err) } // Move on to make sure container status is updated. } @@ -238,10 +307,84 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst return status, nil }) if err != nil { - logrus.WithError(err).Errorf("Failed to update sandbox %q state", e.ContainerID) - // TODO(random-liu): [P0] Enqueue the event and retry. - return + return fmt.Errorf("failed to update sandbox state: %v", err) } // Using channel to propagate the information of sandbox stop sb.Stop() + return nil +} + +func newBackOff(min, max, check time.Duration, c clock.Clock) *backOff { + return &backOff{ + queuePool: map[string]*backOffQueue{}, + minDuration: min, + maxDuration: max, + checkDuration: check, + clock: c, + } +} + +func (b *backOff) getExpiredContainers() []string { + var containers []string + for c, q := range b.queuePool { + if q.isExpire() { + containers = append(containers, c) + } + } + return containers +} + +func (b *backOff) isInBackOff(key string) bool { + if _, ok := b.queuePool[key]; ok { + return true + } + return false +} + +// enBackOff start to backOff and put event to the tail of queue +func (b *backOff) enBackOff(key string, evt interface{}) { + if queue, ok := b.queuePool[key]; ok { + queue.events = append(queue.events, evt) + return + } + b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock) +} + +// enBackOff get out the whole queue +func (b *backOff) deBackOff(key string) *backOffQueue { + queue := b.queuePool[key] + delete(b.queuePool, key) + return queue +} + +// enBackOff start to backOff again and put events to the queue +func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) { + duration := 2 * oldDuration + if duration > b.maxDuration { + duration = b.maxDuration + } + b.queuePool[key] = newBackOffQueue(events, duration, b.clock) +} + +func (b *backOff) start() <-chan time.Time { + b.ticker = time.NewTicker(b.checkDuration) + return b.ticker.C +} + +func (b *backOff) stop() { + b.ticker.Stop() +} + +func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue { + return &backOffQueue{ + events: events, + duration: init, + expireTime: c.Now().Add(init), + clock: c, + } +} + +func (q *backOffQueue) isExpire() bool { + // return time.Now >= expireTime + return !q.clock.Now().Before(q.expireTime) } diff --git a/pkg/server/events_test.go b/pkg/server/events_test.go new file mode 100644 index 000000000..c326f462c --- /dev/null +++ b/pkg/server/events_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2018 The containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + eventtypes "github.com/containerd/containerd/api/events" + //"github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/typeurl" + //gogotypes "github.com/gogo/protobuf/types" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/clock" +) + +// TestBackOff tests the logic of backOff struct. +func TestBackOff(t *testing.T) { + testStartTime := time.Now() + testClock := clock.NewFakeClock(testStartTime) + inputQueues := map[string]*backOffQueue{ + "container1": { + events: []interface{}{ + &eventtypes.TaskExit{ContainerID: "container1", ID: "1"}, + &eventtypes.TaskExit{ContainerID: "container1", ID: "2"}, + }, + }, + "container2": { + events: []interface{}{ + &eventtypes.TaskExit{ContainerID: "container2", ID: "1"}, + &eventtypes.TaskExit{ContainerID: "container2", ID: "2"}, + }, + }, + } + expectedQueues := map[string]*backOffQueue{ + "container2": { + events: []interface{}{ + &eventtypes.TaskExit{ContainerID: "container2", ID: "1"}, + &eventtypes.TaskExit{ContainerID: "container2", ID: "2"}, + }, + expireTime: testClock.Now().Add(backOffInitDuration), + duration: backOffInitDuration, + clock: testClock, + }, + "container1": { + events: []interface{}{ + &eventtypes.TaskExit{ContainerID: "container1", ID: "1"}, + &eventtypes.TaskExit{ContainerID: "container1", ID: "2"}, + }, + expireTime: testClock.Now().Add(backOffInitDuration), + duration: backOffInitDuration, + clock: testClock, + }, + } + + t.Logf("Should be able to backOff a event") + actual := newBackOff(backOffInitDuration, backOffMaxDuration, backOffExpireCheckDuration, testClock) + for k, queue := range inputQueues { + for _, event := range queue.events { + actual.enBackOff(k, event) + } + } + assert.Equal(t, actual.queuePool, expectedQueues) + + t.Logf("Should be able to check if the container is in backOff state") + for k, queue := range inputQueues { + for _, e := range queue.events { + any, err := typeurl.MarshalAny(e) + assert.NoError(t, err) + key, _, err := convertEvent(any) + assert.NoError(t, err) + assert.Equal(t, k, key) + assert.Equal(t, actual.isInBackOff(key), true) + } + } + + t.Logf("Should be able to check that a container isn't in backOff state") + notExistKey := "containerNotExist" + assert.Equal(t, actual.isInBackOff(notExistKey), false) + + t.Logf("Should be able to get all keys which are expired for backOff") + testClock.Sleep(backOffInitDuration) + expKeyMap := map[string]struct{}{} + for k := range inputQueues { + expKeyMap[k] = struct{}{} + } + actKeyList := actual.getExpiredContainers() + actKeyMap := map[string]struct{}{} //assert.Equal can't compare slice without order + for _, k := range actKeyList { + actKeyMap[k] = struct{}{} + } + assert.Equal(t, actKeyMap, expKeyMap) + + t.Logf("Should be able to get out all backOff events") + doneQueues := map[string]*backOffQueue{} + for k := range inputQueues { + actQueue := actual.deBackOff(k) + doneQueues[k] = actQueue + assert.Equal(t, actQueue, expectedQueues[k]) + } + + t.Logf("Should not get out the event again after having got out the backOff event") + for k := range inputQueues { + var expect *backOffQueue + actQueue := actual.deBackOff(k) + assert.Equal(t, actQueue, expect) + } + + t.Logf("Should be able to reBackOff") + for k, queue := range doneQueues { + failEventIndex := 1 + events := queue.events[failEventIndex:] + actual.reBackOff(k, events, queue.duration) + actQueue := actual.deBackOff(k) + expQueue := &backOffQueue{ + events: events, + expireTime: testClock.Now().Add(2 * queue.duration), + duration: 2 * queue.duration, + clock: testClock, + } + assert.Equal(t, actQueue, expQueue) + } +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 5a30c8d9d..d59cc71bb 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -126,9 +126,9 @@ func makeSandboxName(s *runtime.PodSandboxMetadata) string { func makeContainerName(c *runtime.ContainerMetadata, s *runtime.PodSandboxMetadata) string { return strings.Join([]string{ c.Name, // 0 - s.Name, // 1: sandbox name - s.Namespace, // 2: sandbox namespace - s.Uid, // 3: sandbox uid + s.Name, // 1: pod name + s.Namespace, // 2: pod namespace + s.Uid, // 3: pod uid fmt.Sprintf("%d", c.Attempt), // 4 }, nameDelimiter) }