Use direct function call.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2018-03-13 04:41:21 +00:00
parent edb38dfecc
commit f0a500a390
16 changed files with 195 additions and 118 deletions

View File

@@ -19,15 +19,15 @@ package server
import (
"errors"
"github.com/containerd/containerd"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/events/v1"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util"
"github.com/containerd/cri-containerd/pkg/store"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
@@ -48,6 +48,7 @@ type eventMonitor struct {
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
// event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
containerStore: c,
@@ -58,12 +59,12 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit
}
// subscribe starts to subscribe containerd events.
func (em *eventMonitor) subscribe(client *containerd.Client) {
func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`,
}
em.ch, em.errCh = client.Subscribe(em.ctx, filters...)
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
// start starts the event monitor which monitors and handles all container events. It returns
@@ -98,6 +99,7 @@ func (em *eventMonitor) stop() {
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(evt *events.Envelope) {
ctx := ctrdutil.NamespacedContext()
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt)
@@ -113,7 +115,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
handleContainerExit(e, cntr)
handleContainerExit(ctx, e, cntr)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
@@ -122,7 +124,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
handleSandboxExit(e, sb)
handleSandboxExit(ctx, e, sb)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID)
@@ -151,13 +153,13 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) {
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(context.Background(),
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
return cntr.IO, nil
},
@@ -169,7 +171,7 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
@@ -199,13 +201,13 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container)
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(context.Background(), nil)
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID)
@@ -213,7 +215,7 @@ func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
if _, err = task.Delete(ctx); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID)