cri: handle sandbox/container exit event separately
The event monitor handles exit events one by one. If there is something wrong about deleting task, it will slow down the terminating Pods. In order to reduce the impact, the exit event watcher should handle exit event separately. If it failed, the watcher should put it into backoff queue and retry it. Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
parent
643bb9b66d
commit
e56de63099
@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
|
|||||||
return nil, errors.Wrapf(err, "failed to update container %q state", id)
|
return nil, errors.Wrapf(err, "failed to update container %q state", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the monitor after updating container state, this ensures that
|
// It handles the TaskExit event and update container state after this.
|
||||||
// event monitor receives the TaskExit event and update container state
|
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
||||||
// after this.
|
|
||||||
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
|
||||||
|
|
||||||
return &runtime.StartContainerResponse{}, nil
|
return &runtime.StartContainerResponse{}, nil
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
|
|||||||
}
|
}
|
||||||
|
|
||||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||||
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
|
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
|
||||||
defer func() {
|
defer func() {
|
||||||
exitCancel()
|
exitCancel()
|
||||||
// This ensures that exit monitor is stopped before
|
// This ensures that exit monitor is stopped before
|
||||||
|
@ -50,17 +50,12 @@ const (
|
|||||||
// Add a timeout for each event handling, events that timeout will be requeued and
|
// Add a timeout for each event handling, events that timeout will be requeued and
|
||||||
// handled again in the future.
|
// handled again in the future.
|
||||||
handleEventTimeout = 10 * time.Second
|
handleEventTimeout = 10 * time.Second
|
||||||
|
|
||||||
exitChannelSize = 1024
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// eventMonitor monitors containerd event and updates internal state correspondingly.
|
// eventMonitor monitors containerd event and updates internal state correspondingly.
|
||||||
// TODO(random-liu): Handle event for each container in a separate goroutine.
|
|
||||||
type eventMonitor struct {
|
type eventMonitor struct {
|
||||||
c *criService
|
c *criService
|
||||||
ch <-chan *events.Envelope
|
ch <-chan *events.Envelope
|
||||||
// exitCh receives container/sandbox exit events from exit monitors.
|
|
||||||
exitCh chan *eventtypes.TaskExit
|
|
||||||
errCh <-chan error
|
errCh <-chan error
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -68,6 +63,9 @@ type eventMonitor struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type backOff struct {
|
type backOff struct {
|
||||||
|
// queuePoolMu is mutex used to protect the queuePool map
|
||||||
|
queuePoolMu sync.Mutex
|
||||||
|
|
||||||
queuePool map[string]*backOffQueue
|
queuePool map[string]*backOffQueue
|
||||||
// tickerMu is mutex used to protect the ticker.
|
// tickerMu is mutex used to protect the ticker.
|
||||||
tickerMu sync.Mutex
|
tickerMu sync.Mutex
|
||||||
@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
|
|||||||
c: c,
|
c: c,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
|
|
||||||
backOff: newBackOff(),
|
backOff: newBackOff(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -109,8 +106,8 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
|
|||||||
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
|
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// startExitMonitor starts an exit monitor for a given container/sandbox.
|
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
|
||||||
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
@ -118,17 +115,93 @@ func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uin
|
|||||||
case exitRes := <-exitCh:
|
case exitRes := <-exitCh:
|
||||||
exitStatus, exitedAt, err := exitRes.Result()
|
exitStatus, exitedAt, err := exitRes.Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
|
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
|
||||||
exitStatus = unknownExitCode
|
exitStatus = unknownExitCode
|
||||||
exitedAt = time.Now()
|
exitedAt = time.Now()
|
||||||
}
|
}
|
||||||
em.exitCh <- &eventtypes.TaskExit{
|
|
||||||
|
e := &eventtypes.TaskExit{
|
||||||
ContainerID: id,
|
ContainerID: id,
|
||||||
ID: id,
|
ID: id,
|
||||||
Pid: pid,
|
Pid: pid,
|
||||||
ExitStatus: exitStatus,
|
ExitStatus: exitStatus,
|
||||||
ExitedAt: exitedAt,
|
ExitedAt: exitedAt,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logrus.Debugf("received exit event %+v", e)
|
||||||
|
|
||||||
|
err = func() error {
|
||||||
|
dctx := ctrdutil.NamespacedContext()
|
||||||
|
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
|
||||||
|
defer dcancel()
|
||||||
|
|
||||||
|
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||||
|
if err == nil {
|
||||||
|
if err := handleSandboxExit(dctx, e, sb); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else if err != store.ErrNotExist {
|
||||||
|
return errors.Wrapf(err, "failed to get sandbox %s", e.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
|
||||||
|
em.backOff.enBackOff(id, e)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return stopCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// startContainerExitMonitor starts an exit monitor for a given container.
|
||||||
|
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(stopCh)
|
||||||
|
select {
|
||||||
|
case exitRes := <-exitCh:
|
||||||
|
exitStatus, exitedAt, err := exitRes.Result()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
|
||||||
|
exitStatus = unknownExitCode
|
||||||
|
exitedAt = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &eventtypes.TaskExit{
|
||||||
|
ContainerID: id,
|
||||||
|
ID: id,
|
||||||
|
Pid: pid,
|
||||||
|
ExitStatus: exitStatus,
|
||||||
|
ExitedAt: exitedAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Debugf("received exit event %+v", e)
|
||||||
|
|
||||||
|
err = func() error {
|
||||||
|
dctx := ctrdutil.NamespacedContext()
|
||||||
|
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
|
||||||
|
defer dcancel()
|
||||||
|
|
||||||
|
cntr, err := em.c.containerStore.Get(e.ID)
|
||||||
|
if err == nil {
|
||||||
|
if err := handleContainerExit(dctx, e, cntr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else if err != store.ErrNotExist {
|
||||||
|
return errors.Wrapf(err, "failed to get container %s", e.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
|
||||||
|
em.backOff.enBackOff(id, e)
|
||||||
|
}
|
||||||
|
return
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
|
|||||||
return id, evt, nil
|
return id, evt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// start starts the event monitor which monitors and handles all subscribed events. It returns
|
// start starts the event monitor which monitors and handles all subscribed events.
|
||||||
// an error channel for the caller to wait for stop errors from the event monitor.
|
// It returns an error channel for the caller to wait for stop errors from the
|
||||||
// start must be called after subscribe.
|
// event monitor.
|
||||||
|
//
|
||||||
|
// NOTE:
|
||||||
|
// 1. start must be called after subscribe.
|
||||||
|
// 2. The task exit event has been handled in individual startSandboxExitMonitor
|
||||||
|
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
|
||||||
|
// it puts the event into backoff retry queue and event monitor will handle
|
||||||
|
// it later.
|
||||||
func (em *eventMonitor) start() <-chan error {
|
func (em *eventMonitor) start() <-chan error {
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
if em.ch == nil || em.errCh == nil {
|
if em.ch == nil || em.errCh == nil {
|
||||||
@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
|
|||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case e := <-em.exitCh:
|
|
||||||
logrus.Debugf("Received exit event %+v", e)
|
|
||||||
id := e.ID
|
|
||||||
if em.backOff.isInBackOff(id) {
|
|
||||||
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
|
|
||||||
em.backOff.enBackOff(id, e)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err := em.handleEvent(e); err != nil {
|
|
||||||
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
|
|
||||||
em.backOff.enBackOff(id, e)
|
|
||||||
}
|
|
||||||
case e := <-em.ch:
|
case e := <-em.ch:
|
||||||
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
|
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
|
||||||
if e.Namespace != constants.K8sContainerdNamespace {
|
if e.Namespace != constants.K8sContainerdNamespace {
|
||||||
@ -388,6 +456,9 @@ func newBackOff() *backOff {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *backOff) getExpiredIDs() []string {
|
func (b *backOff) getExpiredIDs() []string {
|
||||||
|
b.queuePoolMu.Lock()
|
||||||
|
defer b.queuePoolMu.Unlock()
|
||||||
|
|
||||||
var ids []string
|
var ids []string
|
||||||
for id, q := range b.queuePool {
|
for id, q := range b.queuePool {
|
||||||
if q.isExpire() {
|
if q.isExpire() {
|
||||||
@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *backOff) isInBackOff(key string) bool {
|
func (b *backOff) isInBackOff(key string) bool {
|
||||||
|
b.queuePoolMu.Lock()
|
||||||
|
defer b.queuePoolMu.Unlock()
|
||||||
|
|
||||||
if _, ok := b.queuePool[key]; ok {
|
if _, ok := b.queuePool[key]; ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {
|
|||||||
|
|
||||||
// enBackOff start to backOff and put event to the tail of queue
|
// enBackOff start to backOff and put event to the tail of queue
|
||||||
func (b *backOff) enBackOff(key string, evt interface{}) {
|
func (b *backOff) enBackOff(key string, evt interface{}) {
|
||||||
|
b.queuePoolMu.Lock()
|
||||||
|
defer b.queuePoolMu.Unlock()
|
||||||
|
|
||||||
if queue, ok := b.queuePool[key]; ok {
|
if queue, ok := b.queuePool[key]; ok {
|
||||||
queue.events = append(queue.events, evt)
|
queue.events = append(queue.events, evt)
|
||||||
return
|
return
|
||||||
@ -415,6 +492,9 @@ func (b *backOff) enBackOff(key string, evt interface{}) {
|
|||||||
|
|
||||||
// enBackOff get out the whole queue
|
// enBackOff get out the whole queue
|
||||||
func (b *backOff) deBackOff(key string) *backOffQueue {
|
func (b *backOff) deBackOff(key string) *backOffQueue {
|
||||||
|
b.queuePoolMu.Lock()
|
||||||
|
defer b.queuePoolMu.Unlock()
|
||||||
|
|
||||||
queue := b.queuePool[key]
|
queue := b.queuePool[key]
|
||||||
delete(b.queuePool, key)
|
delete(b.queuePool, key)
|
||||||
return queue
|
return queue
|
||||||
@ -422,6 +502,9 @@ func (b *backOff) deBackOff(key string) *backOffQueue {
|
|||||||
|
|
||||||
// enBackOff start to backOff again and put events to the queue
|
// enBackOff start to backOff again and put events to the queue
|
||||||
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
|
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
|
||||||
|
b.queuePoolMu.Lock()
|
||||||
|
defer b.queuePoolMu.Unlock()
|
||||||
|
|
||||||
duration := 2 * oldDuration
|
duration := 2 * oldDuration
|
||||||
if duration > b.maxDuration {
|
if duration > b.maxDuration {
|
||||||
duration = b.maxDuration
|
duration = b.maxDuration
|
||||||
|
@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
|
|||||||
status.Reason = unknownExitReason
|
status.Reason = unknownExitReason
|
||||||
} else {
|
} else {
|
||||||
// Start exit monitor.
|
// Start exit monitor.
|
||||||
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh)
|
c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
|
||||||
}
|
}
|
||||||
case containerd.Stopped:
|
case containerd.Stopped:
|
||||||
// Task is stopped. Updata status and delete the task.
|
// Task is stopped. Updata status and delete the task.
|
||||||
@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
|
|||||||
// Task is running, set sandbox state as READY.
|
// Task is running, set sandbox state as READY.
|
||||||
status.State = sandboxstore.StateReady
|
status.State = sandboxstore.StateReady
|
||||||
status.Pid = t.Pid()
|
status.Pid = t.Pid()
|
||||||
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
|
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Task is not running. Delete the task and set sandbox state as NOTREADY.
|
// Task is not running. Delete the task and set sandbox state as NOTREADY.
|
||||||
|
@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
|||||||
//
|
//
|
||||||
// TaskOOM from containerd may come before sandbox is added to store,
|
// TaskOOM from containerd may come before sandbox is added to store,
|
||||||
// but we don't care about sandbox TaskOOM right now, so it is fine.
|
// but we don't care about sandbox TaskOOM right now, so it is fine.
|
||||||
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)
|
||||||
|
|
||||||
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
|
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
|
|||||||
}
|
}
|
||||||
|
|
||||||
exitCtx, exitCancel := context.WithCancel(context.Background())
|
exitCtx, exitCancel := context.WithCancel(context.Background())
|
||||||
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
|
stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
|
||||||
defer func() {
|
defer func() {
|
||||||
exitCancel()
|
exitCancel()
|
||||||
// This ensures that exit monitor is stopped before
|
// This ensures that exit monitor is stopped before
|
||||||
|
Loading…
Reference in New Issue
Block a user