sandbox: make event monitor in CRI independent

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng 2023-10-23 17:16:09 +08:00
parent 9a2b85561a
commit d0da3d1caf
12 changed files with 486 additions and 409 deletions

View File

@ -23,16 +23,16 @@ import (
"io"
"time"
containerd "github.com/containerd/containerd/v2/client"
containerdio "github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/errdefs"
"github.com/containerd/log"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
cio "github.com/containerd/containerd/v2/internal/cri/io"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
containerdio "github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
)
@ -171,7 +171,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
}
// It handles the TaskExit event and update container state after this.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
c.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
c.generateAndSendContainerEvent(ctx, id, sandboxID, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)

View File

@ -89,7 +89,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
}
// Don't return for unknown state, some cleanup needs to be done.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
return nil
}
@ -104,11 +104,11 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to wait for task for %q: %w", id, err)
}
return cleanupUnknownContainer(ctx, id, container, sandboxID, c)
return c.cleanupUnknownContainer(ctx, id, container, sandboxID)
}
exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
stopCh := c.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
@ -207,13 +207,13 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
}
// cleanupUnknownContainer cleanup stopped container in unknown state.
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string, c *criService) error {
func (c *criService) cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container, sandboxID string) error {
// Reuse handleContainerExit to do the cleanup.
return handleContainerExit(ctx, &eventtypes.TaskExit{
return c.handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: 0,
ExitStatus: unknownExitCode,
ExitedAt: protobuf.ToTimestamp(time.Now()),
}, cntr, sandboxID, c)
}, cntr, sandboxID)
}

View File

@ -18,34 +18,24 @@ package server
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/clock"
eventtypes "github.com/containerd/containerd/v2/api/events"
apitasks "github.com/containerd/containerd/v2/api/services/tasks/v1"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/internal/cri/constants"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
containerdio "github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
)
const (
backOffInitDuration = 1 * time.Second
backOffMaxDuration = 5 * time.Minute
backOffExpireCheckDuration = 1 * time.Second
// handleEventTimeout is the timeout for handling 1 event. Event monitor
// handles events in serial, if one event blocks the event monitor, no
// other events can be handled.
@ -54,62 +44,8 @@ const (
handleEventTimeout = 10 * time.Second
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
type eventMonitor struct {
c *criService
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}
type backOff struct {
// queuePoolMu is mutex used to protect the queuePool map
queuePoolMu sync.Mutex
queuePool map[string]*backOffQueue
// tickerMu is mutex used to protect the ticker.
tickerMu sync.Mutex
ticker *time.Ticker
minDuration time.Duration
maxDuration time.Duration
checkDuration time.Duration
clock clock.Clock
}
type backOffQueue struct {
events []interface{}
expireTime time.Time
duration time.Duration
clock clock.Clock
}
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *criService) *eventMonitor {
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
c: c,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
}
}
// subscribe starts to subscribe containerd events.
func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
// note: filters are any match, if you want any match but not in namespace foo
// then you have to manually filter namespace foo
filters := []string{
`topic=="/tasks/oom"`,
`topic~="/images/"`,
}
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
func (c *criService) startSandboxExitMonitor(ctx context.Context, id string, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
@ -135,9 +71,9 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
sb, err := em.c.sandboxStore.Get(e.GetSandboxID())
sb, err := c.sandboxStore.Get(id)
if err == nil {
if err := handleSandboxExit(dctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
if err := c.handleSandboxExit(dctx, sb, exitStatus, exitedAt); err != nil {
return err
}
return nil
@ -148,7 +84,7 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
}()
if err != nil {
log.L.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
c.eventMonitor.Backoff(id, e)
}
return
case <-ctx.Done():
@ -157,8 +93,26 @@ func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string,
return stopCh
}
// handleSandboxExit handles sandbox exit event.
func (c *criService) handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time) error {
if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady
status.Pid = 0
status.ExitStatus = exitStatus
status.ExitedAt = exitTime
return status, nil
}); err != nil {
return fmt.Errorf("failed to update sandbox state: %w", err)
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}
// startContainerExitMonitor starts an exit monitor for a given container.
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
func (c *criService) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
@ -186,9 +140,9 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
cntr, err := em.c.containerStore.Get(e.ID)
cntr, err := c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr, cntr.SandboxID, em.c); err != nil {
if err := c.handleContainerExit(dctx, e, cntr, cntr.SandboxID); err != nil {
return err
}
return nil
@ -199,7 +153,7 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
}()
if err != nil {
log.L.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
c.eventMonitor.Backoff(id, e)
}
return
case <-ctx.Done():
@ -208,177 +162,8 @@ func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string
return stopCh
}
func convertEvent(e typeurl.Any) (string, interface{}, error) {
id := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, fmt.Errorf("failed to unmarshalany: %w", err)
}
switch e := evt.(type) {
case *eventtypes.TaskOOM:
id = e.ContainerID
case *eventtypes.SandboxExit:
id = e.SandboxID
case *eventtypes.ImageCreate:
id = e.Name
case *eventtypes.ImageUpdate:
id = e.Name
case *eventtypes.ImageDelete:
id = e.Name
default:
return "", nil, errors.New("unsupported event")
}
return id, evt, nil
}
// start starts the event monitor which monitors and handles all subscribed events.
// It returns an error channel for the caller to wait for stop errors from the
// event monitor.
//
// NOTE:
// 1. start must be called after subscribe.
// 2. The task exit event has been handled in individual startSandboxExitMonitor
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
// it puts the event into backoff retry queue and event monitor will handle
// it later.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
panic("event channel is nil")
}
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
log.L.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace {
log.L.Debugf("Ignoring events in namespace - %q", e.Namespace)
break
}
id, evt, err := convertEvent(e.Event)
if err != nil {
log.L.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(id) {
log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
em.backOff.enBackOff(id, evt)
break
}
if err := em.handleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
em.backOff.enBackOff(id, evt)
}
case err := <-em.errCh:
// Close errCh in defer directly if there is no error.
if err != nil {
log.L.WithError(err).Error("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
ids := em.backOff.getExpiredIDs()
for _, id := range ids {
queue := em.backOff.deBackOff(id)
for i, evt := range queue.events {
if err := em.handleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
return errCh
}
// stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *eventMonitor) stop() {
em.backOff.stop()
em.cancel()
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()
switch e := any.(type) {
case *eventtypes.TaskExit:
log.L.Infof("TaskExit event %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskExit event: %w", err)
}
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.SandboxExit:
log.L.Infof("SandboxExit event %+v", e)
sb, err := em.c.sandboxStore.Get(e.GetSandboxID())
if err == nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.TaskOOM:
log.L.Infof("TaskOOM event %+v", e)
// For TaskOOM, we only care which container it belongs to.
cntr, err := em.c.containerStore.Get(e.ContainerID)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskOOM event: %w", err)
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
return fmt.Errorf("failed to update container status for TaskOOM event: %w", err)
}
// TODO: ImageService should handle these events directly
case *eventtypes.ImageCreate:
log.L.Infof("ImageCreate event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
log.L.Infof("ImageUpdate event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
log.L.Infof("ImageDelete event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
}
return nil
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error {
func (c *criService) handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
@ -426,7 +211,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
// ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service,
// shim will be leaky.
//
// Based on containerd/containerd#7496 issue, when host is under IO
// Based on containerd/containerd/v2#7496 issue, when host is under IO
// pressure, the umount2 syscall will take more than 10 seconds so that
// the CRI plugin will cancel this task.Delete call. However, the shim
// server isn't aware about this. After return from umount2 syscall, the
@ -476,116 +261,78 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
return nil
}
// handleSandboxExit handles sandbox exit event.
func handleSandboxExit(ctx context.Context, sb sandboxstore.Sandbox, exitStatus uint32, exitTime time.Time, c *criService) error {
if err := sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.State = sandboxstore.StateNotReady
status.Pid = 0
status.ExitStatus = exitStatus
status.ExitedAt = exitTime
return status, nil
}); err != nil {
return fmt.Errorf("failed to update sandbox state: %w", err)
type criEventHandler struct {
c *criService
}
// HandleEvent handles a containerd event.
func (ce *criEventHandler) HandleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()
switch e := any.(type) {
case *eventtypes.TaskExit:
log.L.Infof("TaskExit event %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := ce.c.containerStore.Get(e.ID)
if err == nil {
if err := ce.c.handleContainerExit(ctx, e, cntr, cntr.SandboxID); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskExit event: %w", err)
}
sb, err := ce.c.sandboxStore.Get(e.ID)
if err == nil {
if err := ce.c.handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime()); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.SandboxExit:
log.L.Infof("SandboxExit event %+v", e)
sb, err := ce.c.sandboxStore.Get(e.GetSandboxID())
if err == nil {
if err := ce.c.handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime()); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.TaskOOM:
log.L.Infof("TaskOOM event %+v", e)
// For TaskOOM, we only care which container it belongs to.
cntr, err := ce.c.containerStore.Get(e.ContainerID)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskOOM event: %w", err)
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
return fmt.Errorf("failed to update container status for TaskOOM event: %w", err)
}
case *eventtypes.ImageCreate:
log.L.Infof("ImageCreate event %+v", e)
return ce.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
log.L.Infof("ImageUpdate event %+v", e)
return ce.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
log.L.Infof("ImageDelete event %+v", e)
return ce.c.UpdateImage(ctx, e.Name)
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
c.generateAndSendContainerEvent(ctx, sb.ID, sb.ID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}
func newBackOff() *backOff {
return &backOff{
queuePool: map[string]*backOffQueue{},
minDuration: backOffInitDuration,
maxDuration: backOffMaxDuration,
checkDuration: backOffExpireCheckDuration,
clock: clock.RealClock{},
}
}
func (b *backOff) getExpiredIDs() []string {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
var ids []string
for id, q := range b.queuePool {
if q.isExpire() {
ids = append(ids, id)
}
}
return ids
}
func (b *backOff) isInBackOff(key string) bool {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if _, ok := b.queuePool[key]; ok {
return true
}
return false
}
// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
}
b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock)
}
// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}
// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
}
b.queuePool[key] = newBackOffQueue(events, duration, b.clock)
}
func (b *backOff) start() <-chan time.Time {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
b.ticker = time.NewTicker(b.checkDuration)
return b.ticker.C
}
func (b *backOff) stop() {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
if b.ticker != nil {
b.ticker.Stop()
}
}
func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {
return &backOffQueue{
events: events,
duration: init,
expireTime: c.Now().Add(init),
clock: c,
}
}
func (q *backOffQueue) isExpire() bool {
// return time.Now >= expireTime
return !q.clock.Now().Before(q.expireTime)
}

View File

@ -0,0 +1,289 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
"k8s.io/utils/clock"
eventtypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/internal/cri/constants"
)
const (
backOffInitDuration = 1 * time.Second
backOffMaxDuration = 5 * time.Minute
backOffExpireCheckDuration = 1 * time.Second
)
type EventHandler interface {
HandleEvent(any interface{}) error
}
// EventMonitor monitors containerd event and updates internal state correspondingly.
type EventMonitor struct {
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
eventHandler EventHandler
}
type backOff struct {
// queuePoolMu is mutex used to protect the queuePool map
queuePoolMu sync.Mutex
queuePool map[string]*backOffQueue
// tickerMu is mutex used to protect the ticker.
tickerMu sync.Mutex
ticker *time.Ticker
minDuration time.Duration
maxDuration time.Duration
checkDuration time.Duration
clock clock.Clock
}
type backOffQueue struct {
events []interface{}
expireTime time.Time
duration time.Duration
clock clock.Clock
}
// NewEventMonitor create new event monitor. New event monitor will Start subscribing containerd event. All events
// happen after it should be monitored.
func NewEventMonitor(eventHandler EventHandler) *EventMonitor {
ctx, cancel := context.WithCancel(context.Background())
return &EventMonitor{
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
eventHandler: eventHandler,
}
}
// Subscribe starts to Subscribe containerd events.
func (em *EventMonitor) Subscribe(subscriber events.Subscriber, filters []string) {
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
func convertEvent(e typeurl.Any) (string, interface{}, error) {
id := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, fmt.Errorf("failed to unmarshalany: %w", err)
}
switch e := evt.(type) {
case *eventtypes.TaskOOM:
id = e.ContainerID
case *eventtypes.SandboxExit:
id = e.SandboxID
case *eventtypes.ImageCreate:
id = e.Name
case *eventtypes.ImageUpdate:
id = e.Name
case *eventtypes.ImageDelete:
id = e.Name
case *eventtypes.TaskExit:
id = e.ContainerID
default:
return "", nil, errors.New("unsupported event")
}
return id, evt, nil
}
// Start starts the event monitor which monitors and handles all subscribed events.
// It returns an error channel for the caller to wait for Stop errors from the
// event monitor.
//
// NOTE:
// 1. Start must be called after Subscribe.
// 2. The task exit event has been handled in individual startSandboxExitMonitor
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
// it puts the event into backoff retry queue and event monitor will handle
// it later.
func (em *EventMonitor) Start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
panic("event channel is nil")
}
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
log.L.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace {
log.L.Debugf("Ignoring events in namespace - %q", e.Namespace)
break
}
id, evt, err := convertEvent(e.Event)
if err != nil {
log.L.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(id) {
log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
em.backOff.enBackOff(id, evt)
break
}
if err := em.eventHandler.HandleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
em.backOff.enBackOff(id, evt)
}
case err := <-em.errCh:
// Close errCh in defer directly if there is no error.
if err != nil {
log.L.WithError(err).Error("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
ids := em.backOff.getExpiredIDs()
for _, id := range ids {
queue := em.backOff.deBackOff(id)
for i, evt := range queue.events {
if err := em.eventHandler.HandleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
return errCh
}
func (em *EventMonitor) Backoff(key string, evt interface{}) {
em.backOff.enBackOff(key, evt)
}
// Stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *EventMonitor) Stop() {
em.backOff.stop()
em.cancel()
}
func newBackOff() *backOff {
return &backOff{
queuePool: map[string]*backOffQueue{},
minDuration: backOffInitDuration,
maxDuration: backOffMaxDuration,
checkDuration: backOffExpireCheckDuration,
clock: clock.RealClock{},
}
}
func (b *backOff) getExpiredIDs() []string {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
var ids []string
for id, q := range b.queuePool {
if q.isExpire() {
ids = append(ids, id)
}
}
return ids
}
func (b *backOff) isInBackOff(key string) bool {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if _, ok := b.queuePool[key]; ok {
return true
}
return false
}
// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
}
b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock)
}
// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}
// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
}
b.queuePool[key] = newBackOffQueue(events, duration, b.clock)
}
func (b *backOff) start() <-chan time.Time {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
b.ticker = time.NewTicker(b.checkDuration)
return b.ticker.C
}
func (b *backOff) stop() {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
if b.ticker != nil {
b.ticker.Stop()
}
}
func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {
return &backOffQueue{
events: events,
duration: init,
expireTime: c.Now().Add(init),
clock: c,
}
}
func (q *backOffQueue) isExpire() bool {
// return time.Now >= expireTime
return !q.clock.Now().Before(q.expireTime)
}

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package server
package events
import (
"testing"

View File

@ -31,6 +31,7 @@ import (
"github.com/containerd/containerd/v2/core/sandbox"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/constants"
"github.com/containerd/containerd/v2/internal/cri/server/events"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox/types"
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
@ -85,18 +86,19 @@ func init() {
imageService: criImagePlugin.(ImageService),
store: NewStore(),
}
eventMonitor := events.NewEventMonitor(&podSandboxEventHandler{
controller: &c,
})
eventMonitor.Subscribe(client, []string{`topic="/tasks/exit"`})
eventMonitor.Start()
c.eventMonitor = eventMonitor
return &c, nil
},
})
}
// CRIService interface contains things required by controller, but not yet refactored from criService.
// TODO: this will be removed in subsequent iterations.
type CRIService interface {
// TODO: we should implement Event backoff in Controller.
BackOffEvent(id string, event interface{})
}
// RuntimeService specifies dependencies to CRI runtime service.
type RuntimeService interface {
Config() criconfig.Config
@ -123,18 +125,13 @@ type Controller struct {
imageService ImageService
// os is an interface for all required os operations.
os osinterface.OS
// cri is CRI service that provides missing gaps needed by controller.
cri CRIService
// eventMonitor is the event monitor for podsandbox controller to handle sandbox task exit event
// actually we only use it's backoff mechanism to make sure pause container is cleaned up.
eventMonitor *events.EventMonitor
store *Store
}
func (c *Controller) Init(
cri CRIService,
) {
c.cri = cri
}
var _ sandbox.Controller = (*Controller)(nil)
func (c *Controller) Platform(_ctx context.Context, _sandboxID string) (platforms.Platform, error) {
@ -172,11 +169,7 @@ func (c *Controller) waitSandboxExit(ctx context.Context, p *types.PodSandbox, e
defer dcancel()
event := &eventtypes.TaskExit{ExitStatus: exitStatus, ExitedAt: protobuf.ToTimestamp(exitedAt)}
if err := handleSandboxTaskExit(dctx, p, event); err != nil {
// TODO will backoff the event to the controller's own EventMonitor, but not cri's,
// because we should call handleSandboxTaskExit again the next time
// eventMonitor handle this event. but now it goes into cri's EventMonitor,
// the handleSandboxTaskExit will not be called anymore
c.cri.BackOffEvent(p.ID, e)
c.eventMonitor.Backoff(p.ID, event)
}
return nil
case <-ctx.Done():

View File

@ -0,0 +1,61 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podsandbox
import (
"context"
"fmt"
"time"
"github.com/containerd/log"
eventtypes "github.com/containerd/containerd/v2/api/events"
ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
)
const (
// handleEventTimeout is the timeout for handling 1 event. Event monitor
// handles events in serial, if one event blocks the event monitor, no
// other events can be handled.
// Add a timeout for each event handling, events that timeout will be requeued and
// handled again in the future.
handleEventTimeout = 10 * time.Second
)
type podSandboxEventHandler struct {
controller *Controller
}
func (p *podSandboxEventHandler) HandleEvent(any interface{}) error {
switch e := any.(type) {
case *eventtypes.TaskExit:
log.L.Infof("TaskExit event in podsandbox handler %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
sb := p.controller.store.Get(e.ID)
if sb == nil {
return nil
}
ctx := ctrdutil.NamespacedContext()
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()
if err := handleSandboxTaskExit(ctx, sb, e); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
}
return nil
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"path"
"path/filepath"
"time"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
@ -54,10 +53,6 @@ const (
unknownExitCode = 255
)
const (
handleEventTimeout = 10 * time.Second
)
// getSandboxRootDir returns the root directory for managing sandbox files,
// e.g. hosts files.
func (c *Controller) getSandboxRootDir(id string) string {

View File

@ -95,8 +95,8 @@ func (c *Controller) stopSandboxContainer(ctx context.Context, podSandbox *types
go func() {
defer close(stopCh)
err := c.waitSandboxExit(exitCtx, podSandbox, exitCh)
if err != nil {
log.G(ctx).WithError(err).Errorf("Failed to wait pod sandbox exit %+v", err)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
log.G(ctx).WithError(err).Errorf("Failed to wait sandbox exit %+v", err)
}
}()
defer func() {

View File

@ -157,7 +157,7 @@ func (c *criService) recover(ctx context.Context) error {
log.G(ctx).WithError(err).Error("failed to wait sandbox")
continue
}
c.eventMonitor.startSandboxExitMonitor(context.Background(), sb.ID, exitCh)
c.startSandboxExitMonitor(context.Background(), sb.ID, exitCh)
}
// Recover all containers.
containers, err := c.client.Containers(ctx, filterLabel(crilabels.ContainerKindLabel, crilabels.ContainerKindContainer))
@ -387,7 +387,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
status.Reason = unknownExitReason
} else {
// Start exit monitor.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
c.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
}
case containerd.Stopped:
// Task is stopped. Update status and delete the task.

View File

@ -406,7 +406,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, exitCh)
c.startSandboxExitMonitor(context.Background(), id, exitCh)
// Send CONTAINER_STARTED event with ContainerId equal to SandboxId.
c.generateAndSendContainerEvent(ctx, id, id, runtime.ContainerEventType_CONTAINER_STARTED_EVENT)

View File

@ -42,7 +42,7 @@ import (
"github.com/containerd/containerd/v2/internal/cri/config"
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/nri"
"github.com/containerd/containerd/v2/internal/cri/server/podsandbox"
"github.com/containerd/containerd/v2/internal/cri/server/events"
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
"github.com/containerd/containerd/v2/internal/cri/store/label"
@ -137,7 +137,7 @@ type criService struct {
// streamServer is the streaming server serves container streaming request.
streamServer streaming.Server
// eventMonitor is the monitor monitors containerd events.
eventMonitor *eventMonitor
eventMonitor *events.EventMonitor
// initialized indicates whether the server is initialized. All GRPC services
// should return error before the server is initialized.
initialized atomic.Bool
@ -218,7 +218,7 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
return nil, nil, fmt.Errorf("failed to create stream server: %w", err)
}
c.eventMonitor = newEventMonitor(c)
c.eventMonitor = events.NewEventMonitor(&criEventHandler{c: c})
c.cniNetConfMonitor = make(map[string]*cniNetConfSyncer)
for name, i := range c.netPlugin {
@ -237,10 +237,6 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
}
}
// Initialize pod sandbox controller
podSandboxController := options.SandboxControllers[string(criconfig.ModePodSandbox)].(*podsandbox.Controller)
podSandboxController.Init(c)
c.nri = options.NRI
c.runtimeHandlers, err = c.introspectRuntimeHandlers(ctx)
@ -251,16 +247,12 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
return c, c, nil
}
// BackOffEvent is a temporary workaround to call eventMonitor from controller.Stop.
// TODO: get rid of this.
func (c *criService) BackOffEvent(id string, event interface{}) {
c.eventMonitor.backOff.enBackOff(id, event)
}
// Run starts the CRI service.
func (c *criService) Run(ready func()) error {
log.L.Info("Start subscribing containerd event")
c.eventMonitor.subscribe(c.client)
// note: filters are any match, if you want any match but not in namespace foo
// then you have to manually filter namespace foo
c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`})
log.L.Infof("Start recovering state")
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
@ -269,7 +261,7 @@ func (c *criService) Run(ready func()) error {
// Start event handler.
log.L.Info("Start event monitor")
eventMonitorErrCh := c.eventMonitor.start()
eventMonitorErrCh := c.eventMonitor.Start()
// Start CNI network conf syncers
cniNetConfMonitorErrCh := make(chan error, len(c.cniNetConfMonitor))
@ -355,7 +347,7 @@ func (c *criService) Close() error {
log.L.WithError(err).Errorf("failed to stop cni network conf monitor for %s", name)
}
}
c.eventMonitor.stop()
c.eventMonitor.Stop()
if err := c.streamServer.Stop(); err != nil {
return fmt.Errorf("failed to stop stream server: %w", err)
}