Use wait instead of TaskExit.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
@@ -50,13 +50,17 @@ const (
|
||||
// Add a timeout for each event handling, events that timeout will be requeued and
|
||||
// handled again in the future.
|
||||
handleEventTimeout = 10 * time.Second
|
||||
|
||||
exitChannelSize = 1024
|
||||
)
|
||||
|
||||
// eventMonitor monitors containerd event and updates internal state correspondingly.
|
||||
// TODO(random-liu): Handle event for each container in a separate goroutine.
|
||||
type eventMonitor struct {
|
||||
c *criService
|
||||
ch <-chan *events.Envelope
|
||||
c *criService
|
||||
ch <-chan *events.Envelope
|
||||
// exitCh receives container/sandbox exit events from exit monitors.
|
||||
exitCh chan *eventtypes.TaskExit
|
||||
errCh <-chan error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -89,6 +93,7 @@ func newEventMonitor(c *criService) *eventMonitor {
|
||||
c: c,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
|
||||
backOff: newBackOff(),
|
||||
}
|
||||
}
|
||||
@@ -98,13 +103,38 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
|
||||
// note: filters are any match, if you want any match but not in namespace foo
|
||||
// then you have to manually filter namespace foo
|
||||
filters := []string{
|
||||
`topic=="/tasks/exit"`,
|
||||
`topic=="/tasks/oom"`,
|
||||
`topic~="/images/"`,
|
||||
}
|
||||
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
|
||||
}
|
||||
|
||||
// startExitMonitor starts an exit monitor for a given container/sandbox.
|
||||
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
|
||||
stopCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(stopCh)
|
||||
select {
|
||||
case exitRes := <-exitCh:
|
||||
exitStatus, exitedAt, err := exitRes.Result()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
|
||||
exitStatus = unknownExitCode
|
||||
exitedAt = time.Now()
|
||||
}
|
||||
em.exitCh <- &eventtypes.TaskExit{
|
||||
ContainerID: id,
|
||||
ID: id,
|
||||
Pid: pid,
|
||||
ExitStatus: exitStatus,
|
||||
ExitedAt: exitedAt,
|
||||
}
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
return stopCh
|
||||
}
|
||||
|
||||
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
|
||||
id := ""
|
||||
evt, err := typeurl.UnmarshalAny(e)
|
||||
@@ -113,8 +143,6 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
|
||||
}
|
||||
|
||||
switch e := evt.(type) {
|
||||
case *eventtypes.TaskExit:
|
||||
id = e.ContainerID
|
||||
case *eventtypes.TaskOOM:
|
||||
id = e.ContainerID
|
||||
case *eventtypes.ImageCreate:
|
||||
@@ -142,6 +170,18 @@ func (em *eventMonitor) start() <-chan error {
|
||||
defer close(errCh)
|
||||
for {
|
||||
select {
|
||||
case e := <-em.exitCh:
|
||||
logrus.Debugf("Received exit event %+v", e)
|
||||
id := e.ID
|
||||
if em.backOff.isInBackOff(id) {
|
||||
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
|
||||
em.backOff.enBackOff(id, e)
|
||||
break
|
||||
}
|
||||
if err := em.handleEvent(e); err != nil {
|
||||
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
|
||||
em.backOff.enBackOff(id, e)
|
||||
}
|
||||
case e := <-em.ch:
|
||||
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
|
||||
if e.Namespace != constants.K8sContainerdNamespace {
|
||||
@@ -213,8 +253,7 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
|
||||
} else if err != store.ErrNotExist {
|
||||
return errors.Wrap(err, "can't find container for TaskExit event")
|
||||
}
|
||||
// Use GetAll to include sandbox in init state.
|
||||
sb, err := em.c.sandboxStore.GetAll(e.ID)
|
||||
sb, err := em.c.sandboxStore.Get(e.ID)
|
||||
if err == nil {
|
||||
if err := handleSandboxExit(ctx, e, sb); err != nil {
|
||||
return errors.Wrap(err, "failed to handle sandbox TaskExit event")
|
||||
@@ -322,15 +361,7 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
|
||||
}
|
||||
}
|
||||
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
|
||||
// NOTE(random-liu): We SHOULD NOT change INIT state here.
|
||||
// If sandbox state is INIT when event monitor receives an TaskExit event,
|
||||
// it means that sandbox start has failed. In that case, `RunPodSandbox` will
|
||||
// cleanup everything immediately.
|
||||
// Once sandbox state goes out of INIT, it becomes visable to the user, which
|
||||
// is not what we want.
|
||||
if status.State != sandboxstore.StateInit {
|
||||
status.State = sandboxstore.StateNotReady
|
||||
}
|
||||
status.State = sandboxstore.StateNotReady
|
||||
status.Pid = 0
|
||||
return status, nil
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user