Merge pull request #4682 from fuweid/cri-handle-exit-event-separate

cri: handle sandbox/container exit event in parallel
This commit is contained in:
Derek McGowan 2021-01-23 23:22:28 -08:00 committed by GitHub
commit f615c58dcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 117 additions and 36 deletions

View File

@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
return nil, errors.Wrapf(err, "failed to update container %q state", id) return nil, errors.Wrapf(err, "failed to update container %q state", id)
} }
// start the monitor after updating container state, this ensures that // It handles the TaskExit event and update container state after this.
// event monitor receives the TaskExit event and update container state c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
// after this.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
return &runtime.StartContainerResponse{}, nil return &runtime.StartContainerResponse{}, nil
} }

View File

@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
} }
exitCtx, exitCancel := context.WithCancel(context.Background()) exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh) stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() { defer func() {
exitCancel() exitCancel()
// This ensures that exit monitor is stopped before // This ensures that exit monitor is stopped before

View File

@ -50,17 +50,12 @@ const (
// Add a timeout for each event handling, events that timeout will be requeued and // Add a timeout for each event handling, events that timeout will be requeued and
// handled again in the future. // handled again in the future.
handleEventTimeout = 10 * time.Second handleEventTimeout = 10 * time.Second
exitChannelSize = 1024
) )
// eventMonitor monitors containerd event and updates internal state correspondingly. // eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): Handle event for each container in a separate goroutine.
type eventMonitor struct { type eventMonitor struct {
c *criService c *criService
ch <-chan *events.Envelope ch <-chan *events.Envelope
// exitCh receives container/sandbox exit events from exit monitors.
exitCh chan *eventtypes.TaskExit
errCh <-chan error errCh <-chan error
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -68,6 +63,9 @@ type eventMonitor struct {
} }
type backOff struct { type backOff struct {
// queuePoolMu is mutex used to protect the queuePool map
queuePoolMu sync.Mutex
queuePool map[string]*backOffQueue queuePool map[string]*backOffQueue
// tickerMu is mutex used to protect the ticker. // tickerMu is mutex used to protect the ticker.
tickerMu sync.Mutex tickerMu sync.Mutex
@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
c: c, c: c,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
backOff: newBackOff(), backOff: newBackOff(),
} }
} }
@ -109,8 +106,8 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
} }
// startExitMonitor starts an exit monitor for a given container/sandbox. // startSandboxExitMonitor starts an exit monitor for a given sandbox.
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} { func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go func() { go func() {
defer close(stopCh) defer close(stopCh)
@ -118,17 +115,93 @@ func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uin
case exitRes := <-exitCh: case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result() exitStatus, exitedAt, err := exitRes.Result()
if err != nil { if err != nil {
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id) logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode exitStatus = unknownExitCode
exitedAt = time.Now() exitedAt = time.Now()
} }
em.exitCh <- &eventtypes.TaskExit{
e := &eventtypes.TaskExit{
ContainerID: id, ContainerID: id,
ID: id, ID: id,
Pid: pid, Pid: pid,
ExitStatus: exitStatus, ExitStatus: exitStatus,
ExitedAt: exitedAt, ExitedAt: exitedAt,
} }
logrus.Debugf("received exit event %+v", e)
err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(dctx, e, sb); err != nil {
return err
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrapf(err, "failed to get sandbox %s", e.ID)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
}
return
case <-ctx.Done():
}
}()
return stopCh
}
// startContainerExitMonitor starts an exit monitor for a given container.
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
select {
case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}
logrus.Debugf("received exit event %+v", e)
err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr); err != nil {
return err
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrapf(err, "failed to get container %s", e.ID)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
}
return
case <-ctx.Done(): case <-ctx.Done():
} }
}() }()
@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
return id, evt, nil return id, evt, nil
} }
// start starts the event monitor which monitors and handles all subscribed events. It returns // start starts the event monitor which monitors and handles all subscribed events.
// an error channel for the caller to wait for stop errors from the event monitor. // It returns an error channel for the caller to wait for stop errors from the
// start must be called after subscribe. // event monitor.
//
// NOTE:
// 1. start must be called after subscribe.
// 2. The task exit event has been handled in individual startSandboxExitMonitor
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
// it puts the event into backoff retry queue and event monitor will handle
// it later.
func (em *eventMonitor) start() <-chan error { func (em *eventMonitor) start() <-chan error {
errCh := make(chan error) errCh := make(chan error)
if em.ch == nil || em.errCh == nil { if em.ch == nil || em.errCh == nil {
@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
defer close(errCh) defer close(errCh)
for { for {
select { select {
case e := <-em.exitCh:
logrus.Debugf("Received exit event %+v", e)
id := e.ID
if em.backOff.isInBackOff(id) {
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
em.backOff.enBackOff(id, e)
break
}
if err := em.handleEvent(e); err != nil {
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
em.backOff.enBackOff(id, e)
}
case e := <-em.ch: case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace { if e.Namespace != constants.K8sContainerdNamespace {
@ -388,6 +456,9 @@ func newBackOff() *backOff {
} }
func (b *backOff) getExpiredIDs() []string { func (b *backOff) getExpiredIDs() []string {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
var ids []string var ids []string
for id, q := range b.queuePool { for id, q := range b.queuePool {
if q.isExpire() { if q.isExpire() {
@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
} }
func (b *backOff) isInBackOff(key string) bool { func (b *backOff) isInBackOff(key string) bool {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if _, ok := b.queuePool[key]; ok { if _, ok := b.queuePool[key]; ok {
return true return true
} }
@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {
// enBackOff start to backOff and put event to the tail of queue // enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) { func (b *backOff) enBackOff(key string, evt interface{}) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
if queue, ok := b.queuePool[key]; ok { if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt) queue.events = append(queue.events, evt)
return return
@ -415,6 +492,9 @@ func (b *backOff) enBackOff(key string, evt interface{}) {
// enBackOff get out the whole queue // enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue { func (b *backOff) deBackOff(key string) *backOffQueue {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
queue := b.queuePool[key] queue := b.queuePool[key]
delete(b.queuePool, key) delete(b.queuePool, key)
return queue return queue
@ -422,6 +502,9 @@ func (b *backOff) deBackOff(key string) *backOffQueue {
// enBackOff start to backOff again and put events to the queue // enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) { func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()
duration := 2 * oldDuration duration := 2 * oldDuration
if duration > b.maxDuration { if duration > b.maxDuration {
duration = b.maxDuration duration = b.maxDuration

View File

@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
status.Reason = unknownExitReason status.Reason = unknownExitReason
} else { } else {
// Start exit monitor. // Start exit monitor.
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh) c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
} }
case containerd.Stopped: case containerd.Stopped:
// Task is stopped. Updata status and delete the task. // Task is stopped. Updata status and delete the task.
@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
// Task is running, set sandbox state as READY. // Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady status.State = sandboxstore.StateReady
status.Pid = t.Pid() status.Pid = t.Pid()
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh) c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
} }
} else { } else {
// Task is not running. Delete the task and set sandbox state as NOTREADY. // Task is not running. Delete the task and set sandbox state as NOTREADY.

View File

@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
// //
// TaskOOM from containerd may come before sandbox is added to store, // TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine. // but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh) c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
} }

View File

@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
} }
exitCtx, exitCancel := context.WithCancel(context.Background()) exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh) stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() { defer func() {
exitCancel() exitCancel()
// This ensures that exit monitor is stopped before // This ensures that exit monitor is stopped before