Handle containerd event reliably

fix #434

Signed-off-by: yanxuean <yan.xuean@zte.com.cn>
This commit is contained in:
yanxuean
2018-03-01 01:01:01 +08:00
parent 7583bce4ab
commit c751847350
2 changed files with 322 additions and 42 deletions

View File

@@ -18,14 +18,18 @@ package server
import (
"errors"
"fmt"
"time"
eventtypes "github.com/containerd/containerd/api/events"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
gogotypes "github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/util/clock"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
@@ -33,6 +37,12 @@ import (
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
const (
backOffInitDuration = 1 * time.Second
backOffMaxDuration = 5 * time.Minute
backOffExpireCheckDuration = 1 * time.Second
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): [P1] Figure out is it possible to drop event during containerd
// is running. If it is, we should do periodically list to sync state with containerd.
@@ -43,6 +53,23 @@ type eventMonitor struct {
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}
type backOff struct {
queuePool map[string]*backOffQueue
ticker *time.Ticker
minDuration time.Duration
maxDuration time.Duration
checkDuration time.Duration
clock clock.Clock
}
type backOffQueue struct {
events []interface{}
expireTime time.Time
duration time.Duration
clock clock.Clock
}
// Create new event monitor. New event monitor will start subscribing containerd event. All events
@@ -55,6 +82,8 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit
sandboxStore: s,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(backOffInitDuration, backOffMaxDuration,
backOffExpireCheckDuration, clock.RealClock{}),
}
}
@@ -67,6 +96,24 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
containerID := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, fmt.Errorf("failed to unmarshalany %+v", err)
}
switch evt.(type) {
case *eventtypes.TaskExit:
containerID = evt.(*eventtypes.TaskExit).ContainerID
case *eventtypes.TaskOOM:
containerID = evt.(*eventtypes.TaskOOM).ContainerID
default:
return "", nil, fmt.Errorf("unsupported event")
}
return containerID, evt, nil
}
// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after
// subscribe.
@@ -76,15 +123,40 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
}
closeCh := make(chan struct{})
go func() {
backOffCheckCh := em.backOff.start()
for {
select {
case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
em.handleEvent(e)
cID, evt, err := convertEvent(e.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(cID) {
em.backOff.enBackOff(cID, evt)
break
}
if err := em.handleEvent(evt); err != nil {
logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID)
em.backOff.enBackOff(cID, evt)
}
case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream")
close(closeCh)
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
for _, cID := range cIDs {
queue := em.backOff.deBackOff(cID)
for i, any := range queue.events {
if err := em.handleEvent(any); err != nil {
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID)
em.backOff.reBackOff(cID, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
@@ -94,17 +166,13 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
// stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *eventMonitor) stop() {
em.backOff.stop()
em.cancel()
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(evt *events.Envelope) {
func (em *eventMonitor) handleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt)
return
}
switch any.(type) {
// If containerd-shim exits unexpectedly, there will be no corresponding event.
// However, containerd could not retrieve container state in that case, so it's
@@ -112,51 +180,59 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
// TODO(random-liu): [P2] Handle containerd-shim exit.
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
handleContainerExit(ctx, e, cntr)
return
if err := handleContainerExit(ctx, e, cntr); err != nil {
return fmt.Errorf("failed to handle container TaskExit event:%+v", err)
}
return nil
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
return
return fmt.Errorf("can't find container for TaskExit event:%+v", err)
}
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
handleSandboxExit(ctx, e, sb)
return
if err := handleSandboxExit(ctx, e, sb); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event:%+v", err)
}
return nil
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID)
return
return fmt.Errorf("can't find sandbox for TaskExit event:%+v", err)
}
return nil
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if _, err := em.sandboxStore.Get(e.ContainerID); err == nil {
return
if err != store.ErrNotExist {
return fmt.Errorf("can't find container for TaskOOM event:%+v", err)
}
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
return
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil {
if err != store.ErrNotExist {
return fmt.Errorf("can't find sandbox for TaskOOM event:%+v", err)
}
return nil
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q oom", e.ContainerID)
return
return fmt.Errorf("failed to update container status for TaskOOM event:%+v", err)
}
}
return nil
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return
return nil
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
@@ -166,16 +242,13 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for container %q", e.ContainerID)
return
return fmt.Errorf("failed to load task for container: %v", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
return
return fmt.Errorf("failed to stop container: %v", err)
}
// Move on to make sure container status is updated.
}
@@ -192,34 +265,30 @@ func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr conta
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
return fmt.Errorf("failed to update container state: %v", err)
}
// Using channel to propagate the information of container stop
cntr.Stop()
return nil
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return
return nil
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID)
return
return fmt.Errorf("failed to load task for sandbox: %v", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID)
return
return fmt.Errorf("failed to stop sandbox: %v", err)
}
// Move on to make sure container status is updated.
}
@@ -238,10 +307,84 @@ func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxst
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update sandbox %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
return fmt.Errorf("failed to update sandbox state: %v", err)
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
return nil
}
func newBackOff(min, max, check time.Duration, c clock.Clock) *backOff {
return &backOff{
queuePool: map[string]*backOffQueue{},
minDuration: min,
maxDuration: max,
checkDuration: check,
clock: c,
}
}
func (b *backOff) getExpiredContainers() []string {
var containers []string
for c, q := range b.queuePool {
if q.isExpire() {
containers = append(containers, c)
}
}
return containers
}
func (b *backOff) isInBackOff(key string) bool {
if _, ok := b.queuePool[key]; ok {
return true
}
return false
}
// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
}
b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock)
}
// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}
// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
}
b.queuePool[key] = newBackOffQueue(events, duration, b.clock)
}
func (b *backOff) start() <-chan time.Time {
b.ticker = time.NewTicker(b.checkDuration)
return b.ticker.C
}
func (b *backOff) stop() {
b.ticker.Stop()
}
func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {
return &backOffQueue{
events: events,
duration: init,
expireTime: c.Now().Add(init),
clock: c,
}
}
func (q *backOffQueue) isExpire() bool {
// return time.Now >= expireTime
return !q.clock.Now().Before(q.expireTime)
}