From a7343b0773c93d0265184e133180c43eb5d5cae9 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 13:15:08 -0500 Subject: [PATCH] Move events from shim into linux runtime Signed-off-by: Michael Crosby --- linux/process.go | 8 +++++++- linux/runtime.go | 30 +++++++++++++++++++++++----- linux/shim/service.go | 46 ------------------------------------------- linux/task.go | 26 +++++++++++++++++++----- 4 files changed, 53 insertions(+), 57 deletions(-) diff --git a/linux/process.go b/linux/process.go index 0febff9e7..7660ee898 100644 --- a/linux/process.go +++ b/linux/process.go @@ -5,6 +5,7 @@ package linux import ( "context" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" shim "github.com/containerd/containerd/linux/shim/v1" @@ -96,12 +97,17 @@ func (p *Process) CloseIO(ctx context.Context) error { // Start the process func (p *Process) Start(ctx context.Context) error { - _, err := p.t.shim.Start(ctx, &shim.StartRequest{ + r, err := p.t.shim.Start(ctx, &shim.StartRequest{ ID: p.id, }) if err != nil { return errdefs.FromGRPC(err) } + p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventsapi.TaskExecStarted{ + ContainerID: p.t.id, + Pid: r.Pid, + ExecID: p.id, + }) return nil } diff --git a/linux/runtime.go b/linux/runtime.go index 605a8dae2..e5b474827 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -242,14 +242,14 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } }() - runtime := r.config.Runtime + rt := r.config.Runtime if ropts != nil && ropts.Runtime != "" { - runtime = ropts.Runtime + rt = ropts.Runtime } sopts := &shim.CreateTaskRequest{ ID: id, Bundle: bundle.path, - Runtime: runtime, + Runtime: rt, Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, @@ -268,7 +268,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts if err != nil { return nil, errdefs.FromGRPC(err) } - t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor) + t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events) if err != nil { return nil, err } @@ -285,6 +285,20 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, err } } + r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{ + ContainerID: sopts.ID, + Bundle: sopts.Bundle, + Rootfs: sopts.Rootfs, + IO: &eventsapi.TaskIO{ + Stdin: sopts.Stdin, + Stdout: sopts.Stdout, + Stderr: sopts.Stderr, + Terminal: sopts.Terminal, + }, + Checkpoint: sopts.Checkpoint, + Pid: uint32(t.pid), + }) + return t, nil } @@ -322,6 +336,12 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err := bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("failed to delete bundle") } + r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ + ContainerID: lc.id, + ExitStatus: rsp.ExitStatus, + ExitedAt: rsp.ExitedAt, + Pid: rsp.Pid, + }) return &runtime.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, @@ -391,7 +411,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } - t, err := newTask(id, ns, pid, s, r.monitor) + t, err := newTask(id, ns, pid, s, r.monitor, r.events) if err != nil { log.G(ctx).WithError(err).Error("loading task type") continue diff --git a/linux/shim/service.go b/linux/shim/service.go index dd1ca62c8..25e9ac93a 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -107,19 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh s.bundle = r.Bundle pid := process.Pid() s.processes[r.ID] = process - s.events <- &eventsapi.TaskCreate{ - ContainerID: r.ID, - Bundle: r.Bundle, - Rootfs: r.Rootfs, - IO: &eventsapi.TaskIO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: r.Checkpoint, - Pid: uint32(pid), - } return &shimapi.CreateTaskResponse{ Pid: uint32(pid), }, nil @@ -136,19 +123,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. if err := p.Start(ctx); err != nil { return nil, err } - if r.ID == s.id { - s.events <- &eventsapi.TaskStart{ - ContainerID: s.id, - Pid: uint32(p.Pid()), - } - } else { - pid := p.Pid() - s.events <- &eventsapi.TaskExecStarted{ - ContainerID: s.id, - ExecID: r.ID, - Pid: uint32(pid), - } - } return &shimapi.StartResponse{ ID: p.ID(), Pid: uint32(p.Pid()), @@ -169,12 +143,6 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap } delete(s.processes, s.id) s.platform.Close() - s.events <- &eventsapi.TaskDelete{ - ContainerID: s.id, - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - Pid: uint32(p.Pid()), - } return &shimapi.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), @@ -223,11 +191,6 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo return nil, errdefs.ToGRPC(err) } s.processes[r.ID] = process - - s.events <- &eventsapi.TaskExecAdded{ - ContainerID: s.id, - ExecID: r.ID, - } return empty, nil } @@ -303,9 +266,6 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ if err := p.(*proc.Init).Pause(ctx); err != nil { return nil, err } - s.events <- &eventsapi.TaskPaused{ - ContainerID: s.id, - } return empty, nil } @@ -320,9 +280,6 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google if err := p.(*proc.Init).Resume(ctx); err != nil { return nil, err } - s.events <- &eventsapi.TaskResumed{ - ContainerID: s.id, - } return empty, nil } @@ -409,9 +366,6 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } - s.events <- &eventsapi.TaskCheckpointed{ - ContainerID: s.id, - } return empty, nil } diff --git a/linux/task.go b/linux/task.go index 91a5279a8..235e67166 100644 --- a/linux/task.go +++ b/linux/task.go @@ -9,8 +9,10 @@ import ( "google.golang.org/grpc" "github.com/containerd/cgroups" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/linux/shim/client" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/runtime" @@ -25,9 +27,10 @@ type Task struct { namespace string cg cgroups.Cgroup monitor runtime.TaskMonitor + events *exchange.Exchange } -func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor) (*Task, error) { +func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange) (*Task, error) { var ( err error cg cgroups.Cgroup @@ -45,6 +48,7 @@ func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime namespace: namespace, cg: cg, monitor: monitor, + events: events, }, nil } @@ -82,6 +86,10 @@ func (t *Task) Start(ctx context.Context) error { return err } } + t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventsapi.TaskStart{ + ContainerID: t.id, + Pid: uint32(t.pid), + }) return nil } @@ -123,11 +131,13 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) { // Pause the task and all processes func (t *Task) Pause(ctx context.Context) error { - _, err := t.shim.Pause(ctx, empty) - if err != nil { - err = errdefs.FromGRPC(err) + if _, err := t.shim.Pause(ctx, empty); err != nil { + return errdefs.FromGRPC(err) } - return err + t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{ + ContainerID: t.id, + }) + return nil } // Resume the task and all processes @@ -135,6 +145,9 @@ func (t *Task) Resume(ctx context.Context) error { if _, err := t.shim.Resume(ctx, empty); err != nil { return errdefs.FromGRPC(err) } + t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{ + ContainerID: t.id, + }) return nil } @@ -223,6 +236,9 @@ func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) if _, err := t.shim.Checkpoint(ctx, r); err != nil { return errdefs.FromGRPC(err) } + t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventsapi.TaskCheckpointed{ + ContainerID: t.id, + }) return nil }