Move events from shim into linux runtime

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-11-09 13:15:08 -05:00
parent 1fe5a251c4
commit a7343b0773
4 changed files with 53 additions and 57 deletions

View File

@ -5,6 +5,7 @@ package linux
import ( import (
"context" "context"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
@ -96,12 +97,17 @@ func (p *Process) CloseIO(ctx context.Context) error {
// Start the process // Start the process
func (p *Process) Start(ctx context.Context) error { func (p *Process) Start(ctx context.Context) error {
_, err := p.t.shim.Start(ctx, &shim.StartRequest{ r, err := p.t.shim.Start(ctx, &shim.StartRequest{
ID: p.id, ID: p.id,
}) })
if err != nil { if err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventsapi.TaskExecStarted{
ContainerID: p.t.id,
Pid: r.Pid,
ExecID: p.id,
})
return nil return nil
} }

View File

@ -242,14 +242,14 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
} }
}() }()
runtime := r.config.Runtime rt := r.config.Runtime
if ropts != nil && ropts.Runtime != "" { if ropts != nil && ropts.Runtime != "" {
runtime = ropts.Runtime rt = ropts.Runtime
} }
sopts := &shim.CreateTaskRequest{ sopts := &shim.CreateTaskRequest{
ID: id, ID: id,
Bundle: bundle.path, Bundle: bundle.path,
Runtime: runtime, Runtime: rt,
Stdin: opts.IO.Stdin, Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout, Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr, Stderr: opts.IO.Stderr,
@ -268,7 +268,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
if err != nil { if err != nil {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor) t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,6 +285,20 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, err return nil, err
} }
} }
r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{
ContainerID: sopts.ID,
Bundle: sopts.Bundle,
Rootfs: sopts.Rootfs,
IO: &eventsapi.TaskIO{
Stdin: sopts.Stdin,
Stdout: sopts.Stdout,
Stderr: sopts.Stderr,
Terminal: sopts.Terminal,
},
Checkpoint: sopts.Checkpoint,
Pid: uint32(t.pid),
})
return t, nil return t, nil
} }
@ -322,6 +336,12 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
if err := bundle.Delete(); err != nil { if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle") log.G(ctx).WithError(err).Error("failed to delete bundle")
} }
r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{
ContainerID: lc.id,
ExitStatus: rsp.ExitStatus,
ExitedAt: rsp.ExitedAt,
Pid: rsp.Pid,
})
return &runtime.Exit{ return &runtime.Exit{
Status: rsp.ExitStatus, Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt, Timestamp: rsp.ExitedAt,
@ -391,7 +411,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue continue
} }
t, err := newTask(id, ns, pid, s, r.monitor) t, err := newTask(id, ns, pid, s, r.monitor, r.events)
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("loading task type") log.G(ctx).WithError(err).Error("loading task type")
continue continue

View File

@ -107,19 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
s.bundle = r.Bundle s.bundle = r.Bundle
pid := process.Pid() pid := process.Pid()
s.processes[r.ID] = process s.processes[r.ID] = process
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventsapi.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
return &shimapi.CreateTaskResponse{ return &shimapi.CreateTaskResponse{
Pid: uint32(pid), Pid: uint32(pid),
}, nil }, nil
@ -136,19 +123,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
if err := p.Start(ctx); err != nil { if err := p.Start(ctx); err != nil {
return nil, err return nil, err
} }
if r.ID == s.id {
s.events <- &eventsapi.TaskStart{
ContainerID: s.id,
Pid: uint32(p.Pid()),
}
} else {
pid := p.Pid()
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
Pid: uint32(pid),
}
}
return &shimapi.StartResponse{ return &shimapi.StartResponse{
ID: p.ID(), ID: p.ID(),
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
@ -169,12 +143,6 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
} }
delete(s.processes, s.id) delete(s.processes, s.id)
s.platform.Close() s.platform.Close()
s.events <- &eventsapi.TaskDelete{
ContainerID: s.id,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}
return &shimapi.DeleteResponse{ return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()), ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
@ -223,11 +191,6 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
s.processes[r.ID] = process s.processes[r.ID] = process
s.events <- &eventsapi.TaskExecAdded{
ContainerID: s.id,
ExecID: r.ID,
}
return empty, nil return empty, nil
} }
@ -303,9 +266,6 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
if err := p.(*proc.Init).Pause(ctx); err != nil { if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &eventsapi.TaskPaused{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }
@ -320,9 +280,6 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
if err := p.(*proc.Init).Resume(ctx); err != nil { if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &eventsapi.TaskResumed{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }
@ -409,9 +366,6 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque
if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil { if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
s.events <- &eventsapi.TaskCheckpointed{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }

View File

@ -9,8 +9,10 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/linux/shim/client" "github.com/containerd/containerd/linux/shim/client"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
@ -25,9 +27,10 @@ type Task struct {
namespace string namespace string
cg cgroups.Cgroup cg cgroups.Cgroup
monitor runtime.TaskMonitor monitor runtime.TaskMonitor
events *exchange.Exchange
} }
func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor) (*Task, error) { func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange) (*Task, error) {
var ( var (
err error err error
cg cgroups.Cgroup cg cgroups.Cgroup
@ -45,6 +48,7 @@ func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime
namespace: namespace, namespace: namespace,
cg: cg, cg: cg,
monitor: monitor, monitor: monitor,
events: events,
}, nil }, nil
} }
@ -82,6 +86,10 @@ func (t *Task) Start(ctx context.Context) error {
return err return err
} }
} }
t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventsapi.TaskStart{
ContainerID: t.id,
Pid: uint32(t.pid),
})
return nil return nil
} }
@ -123,11 +131,13 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) {
// Pause the task and all processes // Pause the task and all processes
func (t *Task) Pause(ctx context.Context) error { func (t *Task) Pause(ctx context.Context) error {
_, err := t.shim.Pause(ctx, empty) if _, err := t.shim.Pause(ctx, empty); err != nil {
if err != nil { return errdefs.FromGRPC(err)
err = errdefs.FromGRPC(err)
} }
return err t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{
ContainerID: t.id,
})
return nil
} }
// Resume the task and all processes // Resume the task and all processes
@ -135,6 +145,9 @@ func (t *Task) Resume(ctx context.Context) error {
if _, err := t.shim.Resume(ctx, empty); err != nil { if _, err := t.shim.Resume(ctx, empty); err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{
ContainerID: t.id,
})
return nil return nil
} }
@ -223,6 +236,9 @@ func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any)
if _, err := t.shim.Checkpoint(ctx, r); err != nil { if _, err := t.shim.Checkpoint(ctx, r); err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventsapi.TaskCheckpointed{
ContainerID: t.id,
})
return nil return nil
} }