From 6468619d736840481ec17ac433e7175eeb8c5ee8 Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Mon, 14 Jan 2019 14:33:51 -0800 Subject: [PATCH] Implement the Runtime v2 Shim async task model for runhcs Changes the requirement of a Runtime v2 shim in order to avoid race conditions between shim and shim client sending async events. Places a requirement of what events and what order a shim must comply to. Signed-off-by: Justin Terry (VM) --- runtime/v2/README.md | 23 +++++++++++-- runtime/v2/binary.go | 8 ----- runtime/v2/process.go | 8 +---- runtime/v2/runhcs/process.go | 16 +++++++-- runtime/v2/runhcs/service.go | 65 ++++++++++++++++++++++++++++++++++-- runtime/v2/shim.go | 20 ----------- 6 files changed, 98 insertions(+), 42 deletions(-) diff --git a/runtime/v2/README.md b/runtime/v2/README.md index 619ed7380..1aeb20b4d 100644 --- a/runtime/v2/README.md +++ b/runtime/v2/README.md @@ -149,8 +149,27 @@ Filesystems are provided by the containerd snapshotters. ### Events -The shim MUST publish a `runtime.TaskExitEventTopic` when the container exits. -If the shim collects Out of Memory events, it SHOULD also publish a `runtime.TaskOOMEventTopic`. +The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement the following events where `Compliance=MUST`. This avoids race conditions between the shim and shim client where for example a call to `Start` can signal a `TaskExitEventTopic` before even returning the results from the `Start` call. With these guarantees of a Runtime v2 shim a call to `Start` is required to have published the async event `TaskStartEventTopic` before the shim can publish the `TaskExitEventTopic`. + +#### Tasks + +| Topic | Compliance | Description | +| ----- | ---------- | ----------- | +| `runtime.TaskCreateEventTopic` | MUST | When a task is successfully created | +| `runtime.TaskStartEventTopic` | MUST (follow `TaskCreateEventTopic`) | When a task is successfully started | +| `runtime.TaskExitEventTopic` | MUST (follow `TaskStartEventTopic`) | When a task exits expected or unexpected | +| `runtime.TaskDeleteEventTopic` | MUST (follow `TaskExitEventTopic` or `TaskCreateEventTopic` if never started) | When a task is removed from a shim | +| `runtime.TaskPausedEventTopic` | SHOULD | When a task is successfully paused | +| `runtime.TaskResumedEventTopic` | SHOULD (follow `TaskPausedEventTopic`) | When a task is successfully resumed | +| `runtime.TaskCheckpointedEventTopic` | SHOULD | When a task is checkpointed | +| `runtime.TaskOOMEventTopic` | SHOULD | If the shim collects Out of Memory events | + +#### Execs + +| Topic | Compliance | Description | +| ----- | ---------- | ----------- | +| `runtime.TaskExecAddedEventTopic` | MUST (follow `TaskCreateEventTopic` ) | When an exec is successfully added | +| `runtime.TaskExecStartedEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec is successfully started | ### Other diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index 41de0d3e0..8cad501e6 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -24,7 +24,6 @@ import ( gruntime "runtime" "strings" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/runtime" @@ -152,13 +151,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) { // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service b.rtTasks.Delete(ctx, b.bundle.ID) - // shim will send the exit event - b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ - ContainerID: b.bundle.ID, - ExitStatus: response.ExitStatus, - ExitedAt: response.ExitedAt, - Pid: response.Pid, - }) return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt, diff --git a/runtime/v2/process.go b/runtime/v2/process.go index dbff8fd2c..b41935f6a 100644 --- a/runtime/v2/process.go +++ b/runtime/v2/process.go @@ -19,7 +19,6 @@ package v2 import ( "context" - eventstypes "github.com/containerd/containerd/api/events" tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime" @@ -114,18 +113,13 @@ func (p *process) CloseIO(ctx context.Context) error { // Start the process func (p *process) Start(ctx context.Context) error { - response, err := p.shim.task.Start(ctx, &task.StartRequest{ + _, err := p.shim.task.Start(ctx, &task.StartRequest{ ID: p.shim.ID(), ExecID: p.id, }) if err != nil { return errdefs.FromGRPC(err) } - p.shim.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{ - ContainerID: p.shim.ID(), - Pid: response.Pid, - ExecID: p.id, - }) return nil } diff --git a/runtime/v2/runhcs/process.go b/runtime/v2/runhcs/process.go index 0b3a395fe..b57d80dae 100644 --- a/runtime/v2/runhcs/process.go +++ b/runtime/v2/runhcs/process.go @@ -59,6 +59,8 @@ func newProcess(ctx context.Context, s *service, id string, pid uint32, pr *pipe func waitForProcess(ctx context.Context, process *process, p *os.Process, s *service) { pid := uint32(p.Pid) + process.startedWg.Add(1) + // Store the default non-exited value for calls to stat process.exit.Store(&processExit{ pid: pid, @@ -87,9 +89,11 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser // Wait for the relay process.relay.wait() - // close the client io, and free upstream waiters - process.close() + // Wait for the started event to fire if it hasn't already + process.startedWg.Wait() + // We publish the exit before freeing upstream so that the exit event always + // happens before any delete event. s.publisher.Publish( ctx, runtime.TaskExitEventTopic, @@ -100,6 +104,9 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser ExitStatus: uint32(status), ExitedAt: now, }) + + // close the client io, and free upstream waiters + process.close() } func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) { @@ -114,6 +121,8 @@ func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRel relay: pr, waitBlock: make(chan struct{}), } + process.startedWg.Add(1) + // Store the default non-exited value for calls to stat process.exit.Store(&processExit{ pid: 0, // This is updated when the call to Start happens and the state is overwritten in waitForProcess. @@ -139,7 +148,8 @@ type process struct { // started track if the process has ever been started and will not be reset // for the lifetime of the process object. - started bool + started bool + startedWg sync.WaitGroup waitBlock chan struct{} // exit holds the exit value for all calls to `stat`. By default a diff --git a/runtime/v2/runhcs/service.go b/runtime/v2/runhcs/service.go index 6851ec646..f45414093 100644 --- a/runtime/v2/runhcs/service.go +++ b/runtime/v2/runhcs/service.go @@ -37,6 +37,7 @@ import ( winio "github.com/Microsoft/go-winio" "github.com/Microsoft/hcsshim/pkg/go-runhcs" + eventstypes "github.com/containerd/containerd/api/events" containerd_types "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" @@ -44,6 +45,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/v2/runhcs/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" @@ -593,6 +595,22 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta } s.processes[r.ID] = process + s.publisher.Publish(ctx, + runtime.TaskCreateEventTopic, + &eventstypes.TaskCreate{ + ContainerID: process.id, + Bundle: process.bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: "", + Pid: uint32(pid), + }) + return &taskAPI.CreateTaskResponse{ Pid: uint32(pid), }, nil @@ -711,9 +729,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. time.Sleep(1 * time.Second) } } - stat := p.stat() + + pid := p.stat().pid + if r.ExecID != "" { + s.publisher.Publish(ctx, + runtime.TaskExecStartedEventTopic, + &eventstypes.TaskExecStarted{ + ContainerID: p.cid, + ExecID: p.id, + Pid: pid, + }) + } else { + s.publisher.Publish(ctx, + runtime.TaskStartEventTopic, + &eventstypes.TaskStart{ + ContainerID: p.id, + Pid: pid, + }) + } + + p.startedWg.Done() + return &taskAPI.StartResponse{ - Pid: stat.pid, + Pid: pid, }, nil } @@ -750,6 +788,14 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP s.mu.Lock() delete(s.processes, p.id) s.mu.Unlock() + + s.publisher.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ + ContainerID: p.id, + Pid: exit.pid, + ExitStatus: exit.exitStatus, + ExitedAt: exit.exitedAt, + }) + return &taskAPI.DeleteResponse{ ExitedAt: exit.exitedAt, ExitStatus: exit.exitStatus, @@ -780,6 +826,10 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E return nil, err } + s.publisher.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{ + r.ID, + }) + return empty, nil } @@ -799,6 +849,10 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes return nil, err } + s.publisher.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{ + r.ID, + }) + return empty, nil } @@ -910,6 +964,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty } s.processes[r.ExecID] = process + s.publisher.Publish(ctx, + runtime.TaskExecAddedEventTopic, + &eventstypes.TaskExecAdded{ + ContainerID: process.cid, + ExecID: process.id, + }) + return empty, nil } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index c0d3833c5..94814e754 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -24,7 +24,6 @@ import ( "path/filepath" "time" - eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types" tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" @@ -163,12 +162,6 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service s.rtTasks.Delete(ctx, s.ID()) - s.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{ - ContainerID: s.ID(), - ExitStatus: response.ExitStatus, - ExitedAt: response.ExitedAt, - Pid: response.Pid, - }) return &runtime.Exit{ Status: response.ExitStatus, Timestamp: response.ExitedAt, @@ -212,9 +205,6 @@ func (s *shim) Pause(ctx context.Context) error { }); err != nil { return errdefs.FromGRPC(err) } - s.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{ - ContainerID: s.ID(), - }) return nil } @@ -224,9 +214,6 @@ func (s *shim) Resume(ctx context.Context) error { }); err != nil { return errdefs.FromGRPC(err) } - s.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{ - ContainerID: s.ID(), - }) return nil } @@ -238,10 +225,6 @@ func (s *shim) Start(ctx context.Context) error { return errdefs.FromGRPC(err) } s.taskPid = int(response.Pid) - s.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{ - ContainerID: s.ID(), - Pid: uint32(s.taskPid), - }) return nil } @@ -340,9 +323,6 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) if _, err := s.task.Checkpoint(ctx, request); err != nil { return errdefs.FromGRPC(err) } - s.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{ - ContainerID: s.ID(), - }) return nil }