diff --git a/internal/cri/server/container_start.go b/internal/cri/server/container_start.go index 8207335a5..549b6dffc 100644 --- a/internal/cri/server/container_start.go +++ b/internal/cri/server/container_start.go @@ -23,16 +23,16 @@ import ( "io" "time" - containerd "github.com/containerd/containerd/v2/client" - containerdio "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/errdefs" "github.com/containerd/log" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" + containerd "github.com/containerd/containerd/v2/client" cio "github.com/containerd/containerd/v2/internal/cri/io" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" + containerdio "github.com/containerd/containerd/v2/pkg/cio" cioutil "github.com/containerd/containerd/v2/pkg/ioutil" ) @@ -171,7 +171,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain } // It handles the TaskExit event and update container state after this. - c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) + c.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh) c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) diff --git a/internal/cri/server/container_stop.go b/internal/cri/server/container_stop.go index ea667e061..4da35c9dc 100644 --- a/internal/cri/server/container_stop.go +++ b/internal/cri/server/container_stop.go @@ -89,7 +89,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore } // Don't return for unknown state, some cleanup needs to be done. if state == runtime.ContainerState_CONTAINER_UNKNOWN { - return cleanupUnknownContainer(ctx, id, container, sandboxID, c) + return c.cleanupUnknownContainer(ctx, id, container, sandboxID) } return nil } @@ -104,11 +104,11 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to wait for task for %q: %w", id, err) } - return cleanupUnknownContainer(ctx, id, container, sandboxID, c) + return c.cleanupUnknownContainer(ctx, id, container, sandboxID) } exitCtx, exitCancel := context.WithCancel(context.Background()) - stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh) + stopCh := c.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh) defer func() { exitCancel() // This ensures that exit monitor is stopped before @@ -207,13 +207,13 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers } // cleanupUnknownContainer cleanup stopped container in unknown state. -func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string, c *criService) error { +func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string) error { // Reuse handleContainerExit to do the cleanup. - return handleContainerExit(ctx, &eventtypes.TaskExit{ + return c.handleContainerExit(ctx, &eventtypes.TaskExit{ ContainerID: id, ID: id, Pid: 0, ExitStatus: unknownExitCode, ExitedAt: protobuf.ToTimestamp(time.Now()), - }, cntr, sandboxID, c) + }, cntr, sandboxID) } diff --git a/internal/cri/server/events.go b/internal/cri/server/events.go index 49fa1d9a7..0cf860c38 100644 --- a/internal/cri/server/events.go +++ b/internal/cri/server/events.go @@ -18,34 +18,24 @@ package server import ( "context" - "errors" "fmt" - "sync" "time" + "github.com/containerd/errdefs" "github.com/containerd/log" - "github.com/containerd/typeurl/v2" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/utils/clock" eventtypes "github.com/containerd/containerd/v2/api/events" apitasks "github.com/containerd/containerd/v2/api/services/tasks/v1" containerd "github.com/containerd/containerd/v2/client" - "github.com/containerd/containerd/v2/core/events" - "github.com/containerd/containerd/v2/internal/cri/constants" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" containerdio "github.com/containerd/containerd/v2/pkg/cio" "github.com/containerd/containerd/v2/protobuf" - "github.com/containerd/errdefs" ) const ( - backOffInitDuration = 1 * time.Second - backOffMaxDuration = 5 * time.Minute - backOffExpireCheckDuration = 1 * time.Second - // handleEventTimeout is the timeout for handling 1 event. Event monitor // handles events in serial, if one event blocks the event monitor, no // other events can be handled. @@ -54,62 +44,8 @@ const ( handleEventTimeout = 10 * time.Second ) -// eventMonitor monitors containerd event and updates internal state correspondingly. -type eventMonitor struct { - c *criService - ch <-chan *events.Envelope - errCh <-chan error - ctx context.Context - cancel context.CancelFunc - backOff *backOff -} - -type backOff struct { - // queuePoolMu is mutex used to protect the queuePool map - queuePoolMu sync.Mutex - - queuePool map[string]*backOffQueue - // tickerMu is mutex used to protect the ticker. - tickerMu sync.Mutex - ticker *time.Ticker - minDuration time.Duration - maxDuration time.Duration - checkDuration time.Duration - clock clock.Clock -} - -type backOffQueue struct { - events []interface{} - expireTime time.Time - duration time.Duration - clock clock.Clock -} - -// Create new event monitor. New event monitor will start subscribing containerd event. All events -// happen after it should be monitored. -func newEventMonitor(c *criService) *eventMonitor { - ctx, cancel := context.WithCancel(context.Background()) - return &eventMonitor{ - c: c, - ctx: ctx, - cancel: cancel, - backOff: newBackOff(), - } -} - -// subscribe starts to subscribe containerd events. -func (em *eventMonitor) subscribe(subscriber events.Subscriber) { - // note: filters are any match, if you want any match but not in namespace foo - // then you have to manually filter namespace foo - filters := []string{ - `topic=="/tasks/oom"`, - `topic~="/images/"`, - } - em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) -} - // startSandboxExitMonitor starts an exit monitor for a given sandbox. -func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} { +func (c *criService) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} { stopCh := make(chan struct{}) go func() { defer close(stopCh) @@ -135,9 +71,9 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) defer dcancel() - sb, err := em.c.sandboxStore.Get(e.GetSandboxID()) + sb, err := c.sandboxStore.Get(id) if err == nil { - if err := handleSandboxExit(dctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { + if err := c.handleSandboxExit(dctx, sb, exitStatus, exitedAt); err != nil { return err } return nil @@ -148,7 +84,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, }() if err != nil { log.L.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e) - em.backOff.enBackOff(id, e) + c.eventMonitor.Backoff(id, e) } return case <-ctx.Done(): @@ -157,8 +93,26 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, return stopCh } +// handleSandboxExit handles sandbox exit event. +func (c *criService) handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time) error { + if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.State = sandboxstore.StateNotReady + status.Pid = 0 + status.ExitStatus = exitStatus + status.ExitedAt = exitTime + return status, nil + }); err != nil { + return fmt.Errorf("failed to update sandbox state: %w", err) + } + + // Using channel to propagate the information of sandbox stop + sb.Stop() + c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) + return nil +} + // startContainerExitMonitor starts an exit monitor for a given container. -func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { +func (c *criService) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { stopCh := make(chan struct{}) go func() { defer close(stopCh) @@ -186,9 +140,9 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout) defer dcancel() - cntr, err := em.c.containerStore.Get(e.ID) + cntr, err := c.containerStore.Get(e.ID) if err == nil { - if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil { + if err := c.handleContainerExit(dctx, e, cntr, cntr.SandboxID); err != nil { return err } return nil @@ -199,7 +153,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string }() if err != nil { log.L.WithError(err).Errorf("failed to handle container TaskExit event %+v", e) - em.backOff.enBackOff(id, e) + c.eventMonitor.Backoff(id, e) } return case <-ctx.Done(): @@ -208,177 +162,8 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string return stopCh } -func convertEvent(e typeurl.Any) (string, interface{}, error) { - id := "" - evt, err := typeurl.UnmarshalAny(e) - if err != nil { - return "", nil, fmt.Errorf("failed to unmarshalany: %w", err) - } - - switch e := evt.(type) { - case *eventtypes.TaskOOM: - id = e.ContainerID - case *eventtypes.SandboxExit: - id = e.SandboxID - case *eventtypes.ImageCreate: - id = e.Name - case *eventtypes.ImageUpdate: - id = e.Name - case *eventtypes.ImageDelete: - id = e.Name - default: - return "", nil, errors.New("unsupported event") - } - return id, evt, nil -} - -// start starts the event monitor which monitors and handles all subscribed events. -// It returns an error channel for the caller to wait for stop errors from the -// event monitor. -// -// NOTE: -// 1. start must be called after subscribe. -// 2. The task exit event has been handled in individual startSandboxExitMonitor -// or startContainerExitMonitor goroutine at the first. If the goroutine fails, -// it puts the event into backoff retry queue and event monitor will handle -// it later. -func (em *eventMonitor) start() <-chan error { - errCh := make(chan error) - if em.ch == nil || em.errCh == nil { - panic("event channel is nil") - } - backOffCheckCh := em.backOff.start() - go func() { - defer close(errCh) - for { - select { - case e := <-em.ch: - log.L.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) - if e.Namespace != constants.K8sContainerdNamespace { - log.L.Debugf("Ignoring events in namespace - %q", e.Namespace) - break - } - id, evt, err := convertEvent(e.Event) - if err != nil { - log.L.WithError(err).Errorf("Failed to convert event %+v", e) - break - } - if em.backOff.isInBackOff(id) { - log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt) - em.backOff.enBackOff(id, evt) - break - } - if err := em.handleEvent(evt); err != nil { - log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id) - em.backOff.enBackOff(id, evt) - } - case err := <-em.errCh: - // Close errCh in defer directly if there is no error. - if err != nil { - log.L.WithError(err).Error("Failed to handle event stream") - errCh <- err - } - return - case <-backOffCheckCh: - ids := em.backOff.getExpiredIDs() - for _, id := range ids { - queue := em.backOff.deBackOff(id) - for i, evt := range queue.events { - if err := em.handleEvent(evt); err != nil { - log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id) - em.backOff.reBackOff(id, queue.events[i:], queue.duration) - break - } - } - } - } - } - }() - return errCh -} - -// stop stops the event monitor. It will close the event channel. -// Once event monitor is stopped, it can't be started. -func (em *eventMonitor) stop() { - em.backOff.stop() - em.cancel() -} - -// handleEvent handles a containerd event. -func (em *eventMonitor) handleEvent(any interface{}) error { - ctx := ctrdutil.NamespacedContext() - ctx, cancel := context.WithTimeout(ctx, handleEventTimeout) - defer cancel() - - switch e := any.(type) { - case *eventtypes.TaskExit: - log.L.Infof("TaskExit event %+v", e) - // Use ID instead of ContainerID to rule out TaskExit event for exec. - cntr, err := em.c.containerStore.Get(e.ID) - if err == nil { - if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil { - return fmt.Errorf("failed to handle container TaskExit event: %w", err) - } - return nil - } else if !errdefs.IsNotFound(err) { - return fmt.Errorf("can't find container for TaskExit event: %w", err) - } - sb, err := em.c.sandboxStore.Get(e.ID) - if err == nil { - if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { - return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) - } - return nil - } else if !errdefs.IsNotFound(err) { - return fmt.Errorf("can't find sandbox for TaskExit event: %w", err) - } - return nil - case *eventtypes.SandboxExit: - log.L.Infof("SandboxExit event %+v", e) - sb, err := em.c.sandboxStore.Get(e.GetSandboxID()) - if err == nil { - if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil { - return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) - } - return nil - } else if !errdefs.IsNotFound(err) { - return fmt.Errorf("can't find sandbox for TaskExit event: %w", err) - } - return nil - case *eventtypes.TaskOOM: - log.L.Infof("TaskOOM event %+v", e) - // For TaskOOM, we only care which container it belongs to. - cntr, err := em.c.containerStore.Get(e.ContainerID) - if err != nil { - if !errdefs.IsNotFound(err) { - return fmt.Errorf("can't find container for TaskOOM event: %w", err) - } - return nil - } - err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { - status.Reason = oomExitReason - return status, nil - }) - if err != nil { - return fmt.Errorf("failed to update container status for TaskOOM event: %w", err) - } - // TODO: ImageService should handle these events directly - case *eventtypes.ImageCreate: - log.L.Infof("ImageCreate event %+v", e) - return em.c.UpdateImage(ctx, e.Name) - case *eventtypes.ImageUpdate: - log.L.Infof("ImageUpdate event %+v", e) - return em.c.UpdateImage(ctx, e.Name) - case *eventtypes.ImageDelete: - log.L.Infof("ImageDelete event %+v", e) - return em.c.UpdateImage(ctx, e.Name) - } - - return nil -} - // handleContainerExit handles TaskExit event for container. -func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error { +func (c *criService) handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string) error { // Attach container IO so that `Delete` could cleanup the stream properly. task, err := cntr.Container.Task(ctx, func(*containerdio.FIFOSet) (containerdio.IO, error) { @@ -426,7 +211,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta // ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service, // shim will be leaky. // - // Based on containerd/containerd#7496 issue, when host is under IO + // Based on containerd/containerd/v2#7496 issue, when host is under IO // pressure, the umount2 syscall will take more than 10 seconds so that // the CRI plugin will cancel this task.Delete call. However, the shim // server isn't aware about this. After return from umount2 syscall, the @@ -476,116 +261,78 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta return nil } -// handleSandboxExit handles sandbox exit event. -func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time, c *criService) error { - if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { - status.State = sandboxstore.StateNotReady - status.Pid = 0 - status.ExitStatus = exitStatus - status.ExitedAt = exitTime - return status, nil - }); err != nil { - return fmt.Errorf("failed to update sandbox state: %w", err) +type criEventHandler struct { + c *criService +} + +// HandleEvent handles a containerd event. +func (ce *criEventHandler) HandleEvent(any interface{}) error { + ctx := ctrdutil.NamespacedContext() + ctx, cancel := context.WithTimeout(ctx, handleEventTimeout) + defer cancel() + + switch e := any.(type) { + case *eventtypes.TaskExit: + log.L.Infof("TaskExit event %+v", e) + // Use ID instead of ContainerID to rule out TaskExit event for exec. + cntr, err := ce.c.containerStore.Get(e.ID) + if err == nil { + if err := ce.c.handleContainerExit(ctx, e, cntr, cntr.SandboxID); err != nil { + return fmt.Errorf("failed to handle container TaskExit event: %w", err) + } + return nil + } else if !errdefs.IsNotFound(err) { + return fmt.Errorf("can't find container for TaskExit event: %w", err) + } + sb, err := ce.c.sandboxStore.Get(e.ID) + if err == nil { + if err := ce.c.handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime()); err != nil { + return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) + } + return nil + } else if !errdefs.IsNotFound(err) { + return fmt.Errorf("can't find sandbox for TaskExit event: %w", err) + } + return nil + case *eventtypes.SandboxExit: + log.L.Infof("SandboxExit event %+v", e) + sb, err := ce.c.sandboxStore.Get(e.GetSandboxID()) + if err == nil { + if err := ce.c.handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime()); err != nil { + return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err) + } + return nil + } else if !errdefs.IsNotFound(err) { + return fmt.Errorf("can't find sandbox for TaskExit event: %w", err) + } + return nil + case *eventtypes.TaskOOM: + log.L.Infof("TaskOOM event %+v", e) + // For TaskOOM, we only care which container it belongs to. + cntr, err := ce.c.containerStore.Get(e.ContainerID) + if err != nil { + if !errdefs.IsNotFound(err) { + return fmt.Errorf("can't find container for TaskOOM event: %w", err) + } + return nil + } + err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { + status.Reason = oomExitReason + return status, nil + }) + if err != nil { + return fmt.Errorf("failed to update container status for TaskOOM event: %w", err) + } + case *eventtypes.ImageCreate: + log.L.Infof("ImageCreate event %+v", e) + return ce.c.UpdateImage(ctx, e.Name) + case *eventtypes.ImageUpdate: + log.L.Infof("ImageUpdate event %+v", e) + return ce.c.UpdateImage(ctx, e.Name) + case *eventtypes.ImageDelete: + log.L.Infof("ImageDelete event %+v", e) + return ce.c.UpdateImage(ctx, e.Name) } - // Using channel to propagate the information of sandbox stop - sb.Stop() - c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT) return nil } - -func newBackOff() *backOff { - return &backOff{ - queuePool: map[string]*backOffQueue{}, - minDuration: backOffInitDuration, - maxDuration: backOffMaxDuration, - checkDuration: backOffExpireCheckDuration, - clock: clock.RealClock{}, - } -} - -func (b *backOff) getExpiredIDs() []string { - b.queuePoolMu.Lock() - defer b.queuePoolMu.Unlock() - - var ids []string - for id, q := range b.queuePool { - if q.isExpire() { - ids = append(ids, id) - } - } - return ids -} - -func (b *backOff) isInBackOff(key string) bool { - b.queuePoolMu.Lock() - defer b.queuePoolMu.Unlock() - - if _, ok := b.queuePool[key]; ok { - return true - } - return false -} - -// enBackOff start to backOff and put event to the tail of queue -func (b *backOff) enBackOff(key string, evt interface{}) { - b.queuePoolMu.Lock() - defer b.queuePoolMu.Unlock() - - if queue, ok := b.queuePool[key]; ok { - queue.events = append(queue.events, evt) - return - } - b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock) -} - -// enBackOff get out the whole queue -func (b *backOff) deBackOff(key string) *backOffQueue { - b.queuePoolMu.Lock() - defer b.queuePoolMu.Unlock() - - queue := b.queuePool[key] - delete(b.queuePool, key) - return queue -} - -// enBackOff start to backOff again and put events to the queue -func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) { - b.queuePoolMu.Lock() - defer b.queuePoolMu.Unlock() - - duration := 2 * oldDuration - if duration > b.maxDuration { - duration = b.maxDuration - } - b.queuePool[key] = newBackOffQueue(events, duration, b.clock) -} - -func (b *backOff) start() <-chan time.Time { - b.tickerMu.Lock() - defer b.tickerMu.Unlock() - b.ticker = time.NewTicker(b.checkDuration) - return b.ticker.C -} - -func (b *backOff) stop() { - b.tickerMu.Lock() - defer b.tickerMu.Unlock() - if b.ticker != nil { - b.ticker.Stop() - } -} - -func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue { - return &backOffQueue{ - events: events, - duration: init, - expireTime: c.Now().Add(init), - clock: c, - } -} - -func (q *backOffQueue) isExpire() bool { - // return time.Now >= expireTime - return !q.clock.Now().Before(q.expireTime) -} diff --git a/internal/cri/server/events/events.go b/internal/cri/server/events/events.go new file mode 100644 index 000000000..01a01c4e3 --- /dev/null +++ b/internal/cri/server/events/events.go @@ -0,0 +1,289 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/containerd/log" + "github.com/containerd/typeurl/v2" + "k8s.io/utils/clock" + + eventtypes "github.com/containerd/containerd/v2/api/events" + "github.com/containerd/containerd/v2/core/events" + "github.com/containerd/containerd/v2/internal/cri/constants" +) + +const ( + backOffInitDuration = 1 * time.Second + backOffMaxDuration = 5 * time.Minute + backOffExpireCheckDuration = 1 * time.Second +) + +type EventHandler interface { + HandleEvent(any interface{}) error +} + +// EventMonitor monitors containerd event and updates internal state correspondingly. +type EventMonitor struct { + ch <-chan *events.Envelope + errCh <-chan error + ctx context.Context + cancel context.CancelFunc + backOff *backOff + eventHandler EventHandler +} + +type backOff struct { + // queuePoolMu is mutex used to protect the queuePool map + queuePoolMu sync.Mutex + + queuePool map[string]*backOffQueue + // tickerMu is mutex used to protect the ticker. + tickerMu sync.Mutex + ticker *time.Ticker + minDuration time.Duration + maxDuration time.Duration + checkDuration time.Duration + clock clock.Clock +} + +type backOffQueue struct { + events []interface{} + expireTime time.Time + duration time.Duration + clock clock.Clock +} + +// NewEventMonitor create new event monitor. New event monitor will Start subscribing containerd event. All events +// happen after it should be monitored. +func NewEventMonitor(eventHandler EventHandler) *EventMonitor { + ctx, cancel := context.WithCancel(context.Background()) + return &EventMonitor{ + ctx: ctx, + cancel: cancel, + backOff: newBackOff(), + eventHandler: eventHandler, + } +} + +// Subscribe starts to Subscribe containerd events. +func (em *EventMonitor) Subscribe(subscriber events.Subscriber, filters []string) { + em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) +} + +func convertEvent(e typeurl.Any) (string, interface{}, error) { + id := "" + evt, err := typeurl.UnmarshalAny(e) + if err != nil { + return "", nil, fmt.Errorf("failed to unmarshalany: %w", err) + } + + switch e := evt.(type) { + case *eventtypes.TaskOOM: + id = e.ContainerID + case *eventtypes.SandboxExit: + id = e.SandboxID + case *eventtypes.ImageCreate: + id = e.Name + case *eventtypes.ImageUpdate: + id = e.Name + case *eventtypes.ImageDelete: + id = e.Name + case *eventtypes.TaskExit: + id = e.ContainerID + default: + return "", nil, errors.New("unsupported event") + } + return id, evt, nil +} + +// Start starts the event monitor which monitors and handles all subscribed events. +// It returns an error channel for the caller to wait for Stop errors from the +// event monitor. +// +// NOTE: +// 1. Start must be called after Subscribe. +// 2. The task exit event has been handled in individual startSandboxExitMonitor +// or startContainerExitMonitor goroutine at the first. If the goroutine fails, +// it puts the event into backoff retry queue and event monitor will handle +// it later. +func (em *EventMonitor) Start() <-chan error { + errCh := make(chan error) + if em.ch == nil || em.errCh == nil { + panic("event channel is nil") + } + backOffCheckCh := em.backOff.start() + go func() { + defer close(errCh) + for { + select { + case e := <-em.ch: + log.L.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) + if e.Namespace != constants.K8sContainerdNamespace { + log.L.Debugf("Ignoring events in namespace - %q", e.Namespace) + break + } + id, evt, err := convertEvent(e.Event) + if err != nil { + log.L.WithError(err).Errorf("Failed to convert event %+v", e) + break + } + if em.backOff.isInBackOff(id) { + log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt) + em.backOff.enBackOff(id, evt) + break + } + if err := em.eventHandler.HandleEvent(evt); err != nil { + log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id) + em.backOff.enBackOff(id, evt) + } + case err := <-em.errCh: + // Close errCh in defer directly if there is no error. + if err != nil { + log.L.WithError(err).Error("Failed to handle event stream") + errCh <- err + } + return + case <-backOffCheckCh: + ids := em.backOff.getExpiredIDs() + for _, id := range ids { + queue := em.backOff.deBackOff(id) + for i, evt := range queue.events { + if err := em.eventHandler.HandleEvent(evt); err != nil { + log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id) + em.backOff.reBackOff(id, queue.events[i:], queue.duration) + break + } + } + } + } + } + }() + return errCh +} + +func (em *EventMonitor) Backoff(key string, evt interface{}) { + em.backOff.enBackOff(key, evt) +} + +// Stop stops the event monitor. It will close the event channel. +// Once event monitor is stopped, it can't be started. +func (em *EventMonitor) Stop() { + em.backOff.stop() + em.cancel() +} + +func newBackOff() *backOff { + return &backOff{ + queuePool: map[string]*backOffQueue{}, + minDuration: backOffInitDuration, + maxDuration: backOffMaxDuration, + checkDuration: backOffExpireCheckDuration, + clock: clock.RealClock{}, + } +} + +func (b *backOff) getExpiredIDs() []string { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + + var ids []string + for id, q := range b.queuePool { + if q.isExpire() { + ids = append(ids, id) + } + } + return ids +} + +func (b *backOff) isInBackOff(key string) bool { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + + if _, ok := b.queuePool[key]; ok { + return true + } + return false +} + +// enBackOff start to backOff and put event to the tail of queue +func (b *backOff) enBackOff(key string, evt interface{}) { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + + if queue, ok := b.queuePool[key]; ok { + queue.events = append(queue.events, evt) + return + } + b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock) +} + +// enBackOff get out the whole queue +func (b *backOff) deBackOff(key string) *backOffQueue { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + + queue := b.queuePool[key] + delete(b.queuePool, key) + return queue +} + +// enBackOff start to backOff again and put events to the queue +func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) { + b.queuePoolMu.Lock() + defer b.queuePoolMu.Unlock() + + duration := 2 * oldDuration + if duration > b.maxDuration { + duration = b.maxDuration + } + b.queuePool[key] = newBackOffQueue(events, duration, b.clock) +} + +func (b *backOff) start() <-chan time.Time { + b.tickerMu.Lock() + defer b.tickerMu.Unlock() + b.ticker = time.NewTicker(b.checkDuration) + return b.ticker.C +} + +func (b *backOff) stop() { + b.tickerMu.Lock() + defer b.tickerMu.Unlock() + if b.ticker != nil { + b.ticker.Stop() + } +} + +func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue { + return &backOffQueue{ + events: events, + duration: init, + expireTime: c.Now().Add(init), + clock: c, + } +} + +func (q *backOffQueue) isExpire() bool { + // return time.Now >= expireTime + return !q.clock.Now().Before(q.expireTime) +} diff --git a/internal/cri/server/events_test.go b/internal/cri/server/events/events_test.go similarity index 99% rename from internal/cri/server/events_test.go rename to internal/cri/server/events/events_test.go index 43fd2f576..f74f0c07e 100644 --- a/internal/cri/server/events_test.go +++ b/internal/cri/server/events/events_test.go @@ -14,7 +14,7 @@ limitations under the License. */ -package server +package events import ( "testing" diff --git a/internal/cri/server/podsandbox/controller.go b/internal/cri/server/podsandbox/controller.go index b68b06433..35ce184f6 100644 --- a/internal/cri/server/podsandbox/controller.go +++ b/internal/cri/server/podsandbox/controller.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/v2/core/sandbox" criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/constants" + "github.com/containerd/containerd/v2/internal/cri/server/events" "github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types" imagestore "github.com/containerd/containerd/v2/internal/cri/store/image" ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" @@ -85,18 +86,19 @@ func init() { imageService: criImagePlugin.(ImageService), store: NewStore(), } + + eventMonitor := events.NewEventMonitor(&podSandboxEventHandler{ + controller: &c, + }) + eventMonitor.Subscribe(client, []string{`topic="/tasks/exit"`}) + eventMonitor.Start() + c.eventMonitor = eventMonitor + return &c, nil }, }) } -// CRIService interface contains things required by controller, but not yet refactored from criService. -// TODO: this will be removed in subsequent iterations. -type CRIService interface { - // TODO: we should implement Event backoff in Controller. - BackOffEvent(id string, event interface{}) -} - // RuntimeService specifies dependencies to CRI runtime service. type RuntimeService interface { Config() criconfig.Config @@ -123,18 +125,13 @@ type Controller struct { imageService ImageService // os is an interface for all required os operations. os osinterface.OS - // cri is CRI service that provides missing gaps needed by controller. - cri CRIService + // eventMonitor is the event monitor for podsandbox controller to handle sandbox task exit event + // actually we only use it's backoff mechanism to make sure pause container is cleaned up. + eventMonitor *events.EventMonitor store *Store } -func (c *Controller) Init( - cri CRIService, -) { - c.cri = cri -} - var _ sandbox.Controller = (*Controller)(nil) func (c *Controller) Platform(_ctx context.Context, _sandboxID string) (platforms.Platform, error) { @@ -172,11 +169,7 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e defer dcancel() event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)} if err := handleSandboxTaskExit(dctx, p, event); err != nil { - // TODO will backoff the event to the controller's own EventMonitor, but not cri's, - // because we should call handleSandboxTaskExit again the next time - // eventMonitor handle this event. but now it goes into cri's EventMonitor, - // the handleSandboxTaskExit will not be called anymore - c.cri.BackOffEvent(p.ID, e) + c.eventMonitor.Backoff(p.ID, event) } return nil case <-ctx.Done(): diff --git a/internal/cri/server/podsandbox/events.go b/internal/cri/server/podsandbox/events.go new file mode 100644 index 000000000..9cff891d5 --- /dev/null +++ b/internal/cri/server/podsandbox/events.go @@ -0,0 +1,61 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package podsandbox + +import ( + "context" + "fmt" + "time" + + "github.com/containerd/log" + + eventtypes "github.com/containerd/containerd/v2/api/events" + ctrdutil "github.com/containerd/containerd/v2/internal/cri/util" +) + +const ( + // handleEventTimeout is the timeout for handling 1 event. Event monitor + // handles events in serial, if one event blocks the event monitor, no + // other events can be handled. + // Add a timeout for each event handling, events that timeout will be requeued and + // handled again in the future. + handleEventTimeout = 10 * time.Second +) + +type podSandboxEventHandler struct { + controller *Controller +} + +func (p *podSandboxEventHandler) HandleEvent(any interface{}) error { + switch e := any.(type) { + case *eventtypes.TaskExit: + log.L.Infof("TaskExit event in podsandbox handler %+v", e) + // Use ID instead of ContainerID to rule out TaskExit event for exec. + sb := p.controller.store.Get(e.ID) + if sb == nil { + return nil + } + ctx := ctrdutil.NamespacedContext() + ctx, cancel := context.WithTimeout(ctx, handleEventTimeout) + defer cancel() + if err := handleSandboxTaskExit(ctx, sb, e); err != nil { + return fmt.Errorf("failed to handle container TaskExit event: %w", err) + } + return nil + } + return nil +} diff --git a/internal/cri/server/podsandbox/helpers.go b/internal/cri/server/podsandbox/helpers.go index 3a0c376b6..8354f6c8c 100644 --- a/internal/cri/server/podsandbox/helpers.go +++ b/internal/cri/server/podsandbox/helpers.go @@ -21,7 +21,6 @@ import ( "fmt" "path" "path/filepath" - "time" "github.com/containerd/log" "github.com/containerd/typeurl/v2" @@ -54,10 +53,6 @@ const ( unknownExitCode = 255 ) -const ( - handleEventTimeout = 10 * time.Second -) - // getSandboxRootDir returns the root directory for managing sandbox files, // e.g. hosts files. func (c *Controller) getSandboxRootDir(id string) string { diff --git a/internal/cri/server/podsandbox/sandbox_stop.go b/internal/cri/server/podsandbox/sandbox_stop.go index be5e892c0..fef1022ec 100644 --- a/internal/cri/server/podsandbox/sandbox_stop.go +++ b/internal/cri/server/podsandbox/sandbox_stop.go @@ -95,8 +95,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types go func() { defer close(stopCh) err := c.waitSandboxExit(exitCtx, podSandbox, exitCh) - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err) + if err != nil && err != context.Canceled && err != context.DeadlineExceeded { + log.G(ctx).WithError(err).Errorf("Failed to wait sandbox exit %+v", err) } }() defer func() { diff --git a/internal/cri/server/restart.go b/internal/cri/server/restart.go index 49b0c5832..3ac3d8032 100644 --- a/internal/cri/server/restart.go +++ b/internal/cri/server/restart.go @@ -157,7 +157,7 @@ func (c *criService) recover(ctx context.Context) error { log.G(ctx).WithError(err).Error("failed to wait sandbox") continue } - c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh) + c.startSandboxExitMonitor(context.Background(), sb.ID, exitCh) } // Recover all containers. containers, err := c.client.Containers(ctx, filterLabel(crilabels.ContainerKindLabel, crilabels.ContainerKindContainer)) @@ -387,7 +387,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe status.Reason = unknownExitReason } else { // Start exit monitor. - c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh) + c.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh) } case containerd.Stopped: // Task is stopped. Update status and delete the task. diff --git a/internal/cri/server/sandbox_run.go b/internal/cri/server/sandbox_run.go index f01cc4ef9..7da4a30b9 100644 --- a/internal/cri/server/sandbox_run.go +++ b/internal/cri/server/sandbox_run.go @@ -406,7 +406,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox // // TaskOOM from containerd may come before sandbox is added to store, // but we don't care about sandbox TaskOOM right now, so it is fine. - c.eventMonitor.startSandboxExitMonitor(context.Background(), id, exitCh) + c.startSandboxExitMonitor(context.Background(), id, exitCh) // Send CONTAINER_STARTED event with ContainerId equal to SandboxId. c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT) diff --git a/internal/cri/server/service.go b/internal/cri/server/service.go index 9e4ba10b3..a379b4a03 100644 --- a/internal/cri/server/service.go +++ b/internal/cri/server/service.go @@ -42,7 +42,7 @@ import ( "github.com/containerd/containerd/v2/internal/cri/config" criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/nri" - "github.com/containerd/containerd/v2/internal/cri/server/podsandbox" + "github.com/containerd/containerd/v2/internal/cri/server/events" containerstore "github.com/containerd/containerd/v2/internal/cri/store/container" imagestore "github.com/containerd/containerd/v2/internal/cri/store/image" "github.com/containerd/containerd/v2/internal/cri/store/label" @@ -137,7 +137,7 @@ type criService struct { // streamServer is the streaming server serves container streaming request. streamServer streaming.Server // eventMonitor is the monitor monitors containerd events. - eventMonitor *eventMonitor + eventMonitor *events.EventMonitor // initialized indicates whether the server is initialized. All GRPC services // should return error before the server is initialized. initialized atomic.Bool @@ -218,7 +218,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi return nil, nil, fmt.Errorf("failed to create stream server: %w", err) } - c.eventMonitor = newEventMonitor(c) + c.eventMonitor = events.NewEventMonitor(&criEventHandler{c: c}) c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer) for name, i := range c.netPlugin { @@ -237,10 +237,6 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi } } - // Initialize pod sandbox controller - podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller) - podSandboxController.Init(c) - c.nri = options.NRI c.runtimeHandlers, err = c.introspectRuntimeHandlers(ctx) @@ -251,16 +247,12 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi return c, c, nil } -// BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop. -// TODO: get rid of this. -func (c *criService) BackOffEvent(id string, event interface{}) { - c.eventMonitor.backOff.enBackOff(id, event) -} - // Run starts the CRI service. func (c *criService) Run(ready func()) error { log.L.Info("Start subscribing containerd event") - c.eventMonitor.subscribe(c.client) + // note: filters are any match, if you want any match but not in namespace foo + // then you have to manually filter namespace foo + c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`}) log.L.Infof("Start recovering state") if err := c.recover(ctrdutil.NamespacedContext()); err != nil { @@ -269,7 +261,7 @@ func (c *criService) Run(ready func()) error { // Start event handler. log.L.Info("Start event monitor") - eventMonitorErrCh := c.eventMonitor.start() + eventMonitorErrCh := c.eventMonitor.Start() // Start CNI network conf syncers cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor)) @@ -355,7 +347,7 @@ func (c *criService) Close() error { log.L.WithError(err).Errorf("failed to stop cni network conf monitor for %s", name) } } - c.eventMonitor.stop() + c.eventMonitor.Stop() if err := c.streamServer.Stop(); err != nil { return fmt.Errorf("failed to stop stream server: %w", err) }