diff --git a/pkg/cri/server/container_start.go b/pkg/cri/server/container_start.go index 285b98a06..90c807d7f 100644 --- a/pkg/cri/server/container_start.go +++ b/pkg/cri/server/container_start.go @@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain return nil, errors.Wrapf(err, "failed to update container %q state", id) } - // start the monitor after updating container state, this ensures that - // event monitor receives the TaskExit event and update container state - // after this. - c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh) + // It handles the TaskExit event and update container state after this. + c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) return &runtime.StartContainerResponse{}, nil } diff --git a/pkg/cri/server/container_stop.go b/pkg/cri/server/container_stop.go index 657d2e10d..c584a535b 100644 --- a/pkg/cri/server/container_stop.go +++ b/pkg/cri/server/container_stop.go @@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore } exitCtx, exitCancel := context.WithCancel(context.Background()) - stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh) + stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh) defer func() { exitCancel() // This ensures that exit monitor is stopped before diff --git a/pkg/cri/server/events.go b/pkg/cri/server/events.go index 0a1ecea01..8d66319ff 100644 --- a/pkg/cri/server/events.go +++ b/pkg/cri/server/events.go @@ -50,17 +50,12 @@ const ( // Add a timeout for each event handling, events that timeout will be requeued and // handled again in the future. handleEventTimeout = 10 * time.Second - - exitChannelSize = 1024 ) // eventMonitor monitors containerd event and updates internal state correspondingly. -// TODO(random-liu): Handle event for each container in a separate goroutine. type eventMonitor struct { - c *criService - ch <-chan *events.Envelope - // exitCh receives container/sandbox exit events from exit monitors. - exitCh chan *eventtypes.TaskExit + c *criService + ch <-chan *events.Envelope errCh <-chan error ctx context.Context cancel context.CancelFunc @@ -68,6 +63,9 @@ type eventMonitor struct { } type backOff struct { + // queuePoolMu is mutex used to protect the queuePool map + queuePoolMu sync.Mutex + queuePool map[string]*backOffQueue // tickerMu is mutex used to protect the ticker. tickerMu sync.Mutex @@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor { c: c, ctx: ctx, cancel: cancel, - exitCh: make(chan *eventtypes.TaskExit, exitChannelSize), backOff: newBackOff(), } } @@ -109,8 +106,8 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) { em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) } -// startExitMonitor starts an exit monitor for a given container/sandbox. -func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { +// startSandboxExitMonitor starts an exit monitor for a given sandbox. +func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { stopCh := make(chan struct{}) go func() { defer close(stopCh) @@ -118,17 +115,93 @@ func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uin case exitRes := <-exitCh: exitStatus, exitedAt, err := exitRes.Result() if err != nil { - logrus.WithError(err).Errorf("Failed to get task exit status for %q", id) + logrus.WithError(err).Errorf("failed to get task exit status for %q", id) exitStatus = unknownExitCode exitedAt = time.Now() } - em.exitCh <- &eventtypes.TaskExit{ + + e := &eventtypes.TaskExit{ ContainerID: id, ID: id, Pid: pid, ExitStatus: exitStatus, ExitedAt: exitedAt, } + + logrus.Debugf("received exit event %+v", e) + + err = func() error { + dctx := ctrdutil.NamespacedContext() + dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) + defer dcancel() + + sb, err := em.c.sandboxStore.Get(e.ID) + if err == nil { + if err := handleSandboxExit(dctx, e, sb); err != nil { + return err + } + return nil + } else if err != store.ErrNotExist { + return errors.Wrapf(err, "failed to get sandbox %s", e.ID) + } + return nil + }() + if err != nil { + logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e) + em.backOff.enBackOff(id, e) + } + return + case <-ctx.Done(): + } + }() + return stopCh +} + +// startContainerExitMonitor starts an exit monitor for a given container. +func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { + stopCh := make(chan struct{}) + go func() { + defer close(stopCh) + select { + case exitRes := <-exitCh: + exitStatus, exitedAt, err := exitRes.Result() + if err != nil { + logrus.WithError(err).Errorf("failed to get task exit status for %q", id) + exitStatus = unknownExitCode + exitedAt = time.Now() + } + + e := &eventtypes.TaskExit{ + ContainerID: id, + ID: id, + Pid: pid, + ExitStatus: exitStatus, + ExitedAt: exitedAt, + } + + logrus.Debugf("received exit event %+v", e) + + err = func() error { + dctx := ctrdutil.NamespacedContext() + dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) + defer dcancel() + + cntr, err := em.c.containerStore.Get(e.ID) + if err == nil { + if err := handleContainerExit(dctx, e, cntr); err != nil { + return err + } + return nil + } else if err != store.ErrNotExist { + return errors.Wrapf(err, "failed to get container %s", e.ID) + } + return nil + }() + if err != nil { + logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e) + em.backOff.enBackOff(id, e) + } + return case <-ctx.Done(): } }() @@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) { return id, evt, nil } -// start starts the event monitor which monitors and handles all subscribed events. It returns -// an error channel for the caller to wait for stop errors from the event monitor. -// start must be called after subscribe. +// start starts the event monitor which monitors and handles all subscribed events. +// It returns an error channel for the caller to wait for stop errors from the +// event monitor. +// +// NOTE: +// 1. start must be called after subscribe. +// 2. The task exit event has been handled in individual startSandboxExitMonitor +// or startContainerExitMonitor goroutine at the first. If the goroutine fails, +// it puts the event into backoff retry queue and event monitor will handle +// it later. func (em *eventMonitor) start() <-chan error { errCh := make(chan error) if em.ch == nil || em.errCh == nil { @@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error { defer close(errCh) for { select { - case e := <-em.exitCh: - logrus.Debugf("Received exit event %+v", e) - id := e.ID - if em.backOff.isInBackOff(id) { - logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e) - em.backOff.enBackOff(id, e) - break - } - if err := em.handleEvent(e); err != nil { - logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id) - em.backOff.enBackOff(id, e) - } case e := <-em.ch: logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) if e.Namespace != constants.K8sContainerdNamespace { @@ -388,6 +456,9 @@ func newBackOff() *backOff { } func (b *backOff) getExpiredIDs() []string { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + var ids []string for id, q := range b.queuePool { if q.isExpire() { @@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string { } func (b *backOff) isInBackOff(key string) bool { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + if _, ok := b.queuePool[key]; ok { return true } @@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool { // enBackOff start to backOff and put event to the tail of queue func (b *backOff) enBackOff(key string, evt interface{}) { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + if queue, ok := b.queuePool[key]; ok { queue.events = append(queue.events, evt) return @@ -415,6 +492,9 @@ func (b *backOff) enBackOff(key string, evt interface{}) { // enBackOff get out the whole queue func (b *backOff) deBackOff(key string) *backOffQueue { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + queue := b.queuePool[key] delete(b.queuePool, key) return queue @@ -422,6 +502,9 @@ func (b *backOff) deBackOff(key string) *backOffQueue { // enBackOff start to backOff again and put events to the queue func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + duration := 2 * oldDuration if duration > b.maxDuration { duration = b.maxDuration diff --git a/pkg/cri/server/restart.go b/pkg/cri/server/restart.go index a839675c2..68e7e7ee8 100644 --- a/pkg/cri/server/restart.go +++ b/pkg/cri/server/restart.go @@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe status.Reason = unknownExitReason } else { // Start exit monitor. - c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh) + c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh) } case containerd.Stopped: // Task is stopped. Updata status and delete the task. @@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) // Task is running, set sandbox state as READY. status.State = sandboxstore.StateReady status.Pid = t.Pid() - c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) + c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) } } else { // Task is not running. Delete the task and set sandbox state as NOTREADY. diff --git a/pkg/cri/server/sandbox_run.go b/pkg/cri/server/sandbox_run.go index 909122672..39ded4966 100644 --- a/pkg/cri/server/sandbox_run.go +++ b/pkg/cri/server/sandbox_run.go @@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox // // TaskOOM from containerd may come before sandbox is added to store, // but we don't care about sandbox TaskOOM right now, so it is fine. - c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh) + c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil } diff --git a/pkg/cri/server/sandbox_stop.go b/pkg/cri/server/sandbox_stop.go index cafceff13..273273094 100644 --- a/pkg/cri/server/sandbox_stop.go +++ b/pkg/cri/server/sandbox_stop.go @@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst } exitCtx, exitCancel := context.WithCancel(context.Background()) - stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh) + stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh) defer func() { exitCancel() // This ensures that exit monitor is stopped before