Implement the Runtime v2 Shim async task model for runhcs

Changes the requirement of a Runtime v2 shim in order to avoid race conditions
between shim and shim client sending async events. Places a requirement of what
events and what order a shim must comply to.

Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
Justin Terry (VM) 2019-01-14 14:33:51 -08:00
parent 3acf6f1835
commit 6468619d73
6 changed files with 98 additions and 42 deletions

View File

@ -149,8 +149,27 @@ Filesystems are provided by the containerd snapshotters.
### Events
The shim MUST publish a `runtime.TaskExitEventTopic` when the container exits.
If the shim collects Out of Memory events, it SHOULD also publish a `runtime.TaskOOMEventTopic`.
The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement the following events where `Compliance=MUST`. This avoids race conditions between the shim and shim client where for example a call to `Start` can signal a `TaskExitEventTopic` before even returning the results from the `Start` call. With these guarantees of a Runtime v2 shim a call to `Start` is required to have published the async event `TaskStartEventTopic` before the shim can publish the `TaskExitEventTopic`.
#### Tasks
| Topic | Compliance | Description |
| ----- | ---------- | ----------- |
| `runtime.TaskCreateEventTopic` | MUST | When a task is successfully created |
| `runtime.TaskStartEventTopic` | MUST (follow `TaskCreateEventTopic`) | When a task is successfully started |
| `runtime.TaskExitEventTopic` | MUST (follow `TaskStartEventTopic`) | When a task exits expected or unexpected |
| `runtime.TaskDeleteEventTopic` | MUST (follow `TaskExitEventTopic` or `TaskCreateEventTopic` if never started) | When a task is removed from a shim |
| `runtime.TaskPausedEventTopic` | SHOULD | When a task is successfully paused |
| `runtime.TaskResumedEventTopic` | SHOULD (follow `TaskPausedEventTopic`) | When a task is successfully resumed |
| `runtime.TaskCheckpointedEventTopic` | SHOULD | When a task is checkpointed |
| `runtime.TaskOOMEventTopic` | SHOULD | If the shim collects Out of Memory events |
#### Execs
| Topic | Compliance | Description |
| ----- | ---------- | ----------- |
| `runtime.TaskExecAddedEventTopic` | MUST (follow `TaskCreateEventTopic` ) | When an exec is successfully added |
| `runtime.TaskExecStartedEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec is successfully started |
### Other

View File

@ -24,7 +24,6 @@ import (
gruntime "runtime"
"strings"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/runtime"
@ -152,13 +151,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
b.rtTasks.Delete(ctx, b.bundle.ID)
// shim will send the exit event
b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: b.bundle.ID,
ExitStatus: response.ExitStatus,
ExitedAt: response.ExitedAt,
Pid: response.Pid,
})
return &runtime.Exit{
Status: response.ExitStatus,
Timestamp: response.ExitedAt,

View File

@ -19,7 +19,6 @@ package v2
import (
"context"
eventstypes "github.com/containerd/containerd/api/events"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
@ -114,18 +113,13 @@ func (p *process) CloseIO(ctx context.Context) error {
// Start the process
func (p *process) Start(ctx context.Context) error {
response, err := p.shim.task.Start(ctx, &task.StartRequest{
_, err := p.shim.task.Start(ctx, &task.StartRequest{
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
p.shim.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{
ContainerID: p.shim.ID(),
Pid: response.Pid,
ExecID: p.id,
})
return nil
}

View File

@ -59,6 +59,8 @@ func newProcess(ctx context.Context, s *service, id string, pid uint32, pr *pipe
func waitForProcess(ctx context.Context, process *process, p *os.Process, s *service) {
pid := uint32(p.Pid)
process.startedWg.Add(1)
// Store the default non-exited value for calls to stat
process.exit.Store(&processExit{
pid: pid,
@ -87,9 +89,11 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
// Wait for the relay
process.relay.wait()
// close the client io, and free upstream waiters
process.close()
// Wait for the started event to fire if it hasn't already
process.startedWg.Wait()
// We publish the exit before freeing upstream so that the exit event always
// happens before any delete event.
s.publisher.Publish(
ctx,
runtime.TaskExitEventTopic,
@ -100,6 +104,9 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
ExitStatus: uint32(status),
ExitedAt: now,
})
// close the client io, and free upstream waiters
process.close()
}
func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) {
@ -114,6 +121,8 @@ func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRel
relay: pr,
waitBlock: make(chan struct{}),
}
process.startedWg.Add(1)
// Store the default non-exited value for calls to stat
process.exit.Store(&processExit{
pid: 0, // This is updated when the call to Start happens and the state is overwritten in waitForProcess.
@ -140,6 +149,7 @@ type process struct {
// started track if the process has ever been started and will not be reset
// for the lifetime of the process object.
started bool
startedWg sync.WaitGroup
waitBlock chan struct{}
// exit holds the exit value for all calls to `stat`. By default a

View File

@ -37,6 +37,7 @@ import (
winio "github.com/Microsoft/go-winio"
"github.com/Microsoft/hcsshim/pkg/go-runhcs"
eventstypes "github.com/containerd/containerd/api/events"
containerd_types "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
@ -44,6 +45,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/runhcs/options"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
@ -593,6 +595,22 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
}
s.processes[r.ID] = process
s.publisher.Publish(ctx,
runtime.TaskCreateEventTopic,
&eventstypes.TaskCreate{
ContainerID: process.id,
Bundle: process.bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: "",
Pid: uint32(pid),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(pid),
}, nil
@ -711,9 +729,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
time.Sleep(1 * time.Second)
}
}
stat := p.stat()
pid := p.stat().pid
if r.ExecID != "" {
s.publisher.Publish(ctx,
runtime.TaskExecStartedEventTopic,
&eventstypes.TaskExecStarted{
ContainerID: p.cid,
ExecID: p.id,
Pid: pid,
})
} else {
s.publisher.Publish(ctx,
runtime.TaskStartEventTopic,
&eventstypes.TaskStart{
ContainerID: p.id,
Pid: pid,
})
}
p.startedWg.Done()
return &taskAPI.StartResponse{
Pid: stat.pid,
Pid: pid,
}, nil
}
@ -750,6 +788,14 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
s.mu.Lock()
delete(s.processes, p.id)
s.mu.Unlock()
s.publisher.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: p.id,
Pid: exit.pid,
ExitStatus: exit.exitStatus,
ExitedAt: exit.exitedAt,
})
return &taskAPI.DeleteResponse{
ExitedAt: exit.exitedAt,
ExitStatus: exit.exitStatus,
@ -780,6 +826,10 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
return nil, err
}
s.publisher.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
r.ID,
})
return empty, nil
}
@ -799,6 +849,10 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
return nil, err
}
s.publisher.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
r.ID,
})
return empty, nil
}
@ -910,6 +964,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
}
s.processes[r.ExecID] = process
s.publisher.Publish(ctx,
runtime.TaskExecAddedEventTopic,
&eventstypes.TaskExecAdded{
ContainerID: process.cid,
ExecID: process.id,
})
return empty, nil
}

View File

@ -24,7 +24,6 @@ import (
"path/filepath"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types"
tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
@ -163,12 +162,6 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
s.rtTasks.Delete(ctx, s.ID())
s.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: s.ID(),
ExitStatus: response.ExitStatus,
ExitedAt: response.ExitedAt,
Pid: response.Pid,
})
return &runtime.Exit{
Status: response.ExitStatus,
Timestamp: response.ExitedAt,
@ -212,9 +205,6 @@ func (s *shim) Pause(ctx context.Context) error {
}); err != nil {
return errdefs.FromGRPC(err)
}
s.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
ContainerID: s.ID(),
})
return nil
}
@ -224,9 +214,6 @@ func (s *shim) Resume(ctx context.Context) error {
}); err != nil {
return errdefs.FromGRPC(err)
}
s.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
ContainerID: s.ID(),
})
return nil
}
@ -238,10 +225,6 @@ func (s *shim) Start(ctx context.Context) error {
return errdefs.FromGRPC(err)
}
s.taskPid = int(response.Pid)
s.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
ContainerID: s.ID(),
Pid: uint32(s.taskPid),
})
return nil
}
@ -340,9 +323,6 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
if _, err := s.task.Checkpoint(ctx, request); err != nil {
return errdefs.FromGRPC(err)
}
s.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
ContainerID: s.ID(),
})
return nil
}