|
|
|
|
@@ -36,7 +36,6 @@ import (
|
|
|
|
|
ctrdutil "github.com/containerd/containerd/pkg/cri/util"
|
|
|
|
|
"github.com/containerd/containerd/protobuf"
|
|
|
|
|
"github.com/containerd/typeurl/v2"
|
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
|
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
|
|
|
"k8s.io/utils/clock"
|
|
|
|
|
)
|
|
|
|
|
@@ -264,18 +263,18 @@ func (em *eventMonitor) start() <-chan error {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if em.backOff.isInBackOff(id) {
|
|
|
|
|
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
|
|
|
|
|
log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
|
|
|
|
|
em.backOff.enBackOff(id, evt)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if err := em.handleEvent(evt); err != nil {
|
|
|
|
|
logrus.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
|
|
|
|
|
log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
|
|
|
|
|
em.backOff.enBackOff(id, evt)
|
|
|
|
|
}
|
|
|
|
|
case err := <-em.errCh:
|
|
|
|
|
// Close errCh in defer directly if there is no error.
|
|
|
|
|
if err != nil {
|
|
|
|
|
logrus.WithError(err).Error("Failed to handle event stream")
|
|
|
|
|
log.L.WithError(err).Error("Failed to handle event stream")
|
|
|
|
|
errCh <- err
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
@@ -285,7 +284,7 @@ func (em *eventMonitor) start() <-chan error {
|
|
|
|
|
queue := em.backOff.deBackOff(id)
|
|
|
|
|
for i, evt := range queue.events {
|
|
|
|
|
if err := em.handleEvent(evt); err != nil {
|
|
|
|
|
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
|
|
|
|
|
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
|
|
|
|
|
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
@@ -312,7 +311,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
|
|
|
|
|
|
|
|
|
switch e := any.(type) {
|
|
|
|
|
case *eventtypes.TaskExit:
|
|
|
|
|
logrus.Infof("TaskExit event %+v", e)
|
|
|
|
|
log.G(ctx).Infof("TaskExit event %+v", e)
|
|
|
|
|
// Use ID instead of ContainerID to rule out TaskExit event for exec.
|
|
|
|
|
cntr, err := em.c.containerStore.Get(e.ID)
|
|
|
|
|
if err == nil {
|
|
|
|
|
@@ -334,7 +333,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
case *eventtypes.TaskOOM:
|
|
|
|
|
logrus.Infof("TaskOOM event %+v", e)
|
|
|
|
|
log.G(ctx).Infof("TaskOOM event %+v", e)
|
|
|
|
|
// For TaskOOM, we only care which container it belongs to.
|
|
|
|
|
cntr, err := em.c.containerStore.Get(e.ContainerID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
@@ -351,13 +350,13 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
|
|
|
|
return fmt.Errorf("failed to update container status for TaskOOM event: %w", err)
|
|
|
|
|
}
|
|
|
|
|
case *eventtypes.ImageCreate:
|
|
|
|
|
logrus.Infof("ImageCreate event %+v", e)
|
|
|
|
|
log.G(ctx).Infof("ImageCreate event %+v", e)
|
|
|
|
|
return em.c.updateImage(ctx, e.Name)
|
|
|
|
|
case *eventtypes.ImageUpdate:
|
|
|
|
|
logrus.Infof("ImageUpdate event %+v", e)
|
|
|
|
|
log.G(ctx).Infof("ImageUpdate event %+v", e)
|
|
|
|
|
return em.c.updateImage(ctx, e.Name)
|
|
|
|
|
case *eventtypes.ImageDelete:
|
|
|
|
|
logrus.Infof("ImageDelete event %+v", e)
|
|
|
|
|
log.G(ctx).Infof("ImageDelete event %+v", e)
|
|
|
|
|
return em.c.updateImage(ctx, e.Name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -436,7 +435,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
|
|
|
|
return fmt.Errorf("failed to cleanup container %s in task-service: %w", cntr.Container.ID(), err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
logrus.Infof("Ensure that container %s in task-service has been cleanup successfully", cntr.Container.ID())
|
|
|
|
|
log.G(ctx).Infof("Ensure that container %s in task-service has been cleanup successfully", cntr.Container.ID())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
|
|
|
|
|
@@ -449,7 +448,7 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
|
|
|
|
|
// Unknown state can only transit to EXITED state, so we need
|
|
|
|
|
// to handle unknown state here.
|
|
|
|
|
if status.Unknown {
|
|
|
|
|
logrus.Debugf("Container %q transited from UNKNOWN to EXITED", cntr.ID)
|
|
|
|
|
log.G(ctx).Debugf("Container %q transited from UNKNOWN to EXITED", cntr.ID)
|
|
|
|
|
status.Unknown = false
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
@@ -522,7 +521,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
|
|
|
|
|
return fmt.Errorf("failed to cleanup sandbox %s in task-service: %w", sb.Container.ID(), err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
logrus.Infof("Ensure that sandbox %s in task-service has been cleanup successfully", sb.Container.ID())
|
|
|
|
|
log.G(ctx).Infof("Ensure that sandbox %s in task-service has been cleanup successfully", sb.Container.ID())
|
|
|
|
|
}
|
|
|
|
|
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
|
|
|
|
status.State = sandboxstore.StateNotReady
|
|
|
|
|
|