Merge pull request #2939 from jterry75/bug_publishstart
Implement the Runtime v2 Shim async task model for runhcs
This commit is contained in:
commit
f63d28984c
@ -149,8 +149,27 @@ Filesystems are provided by the containerd snapshotters.
|
||||
|
||||
### Events
|
||||
|
||||
The shim MUST publish a `runtime.TaskExitEventTopic` when the container exits.
|
||||
If the shim collects Out of Memory events, it SHOULD also publish a `runtime.TaskOOMEventTopic`.
|
||||
The Runtime v2 supports an async event model. In order for the an upstream caller (such as Docker) to get these events in the correct order a Runtime v2 shim MUST implement the following events where `Compliance=MUST`. This avoids race conditions between the shim and shim client where for example a call to `Start` can signal a `TaskExitEventTopic` before even returning the results from the `Start` call. With these guarantees of a Runtime v2 shim a call to `Start` is required to have published the async event `TaskStartEventTopic` before the shim can publish the `TaskExitEventTopic`.
|
||||
|
||||
#### Tasks
|
||||
|
||||
| Topic | Compliance | Description |
|
||||
| ----- | ---------- | ----------- |
|
||||
| `runtime.TaskCreateEventTopic` | MUST | When a task is successfully created |
|
||||
| `runtime.TaskStartEventTopic` | MUST (follow `TaskCreateEventTopic`) | When a task is successfully started |
|
||||
| `runtime.TaskExitEventTopic` | MUST (follow `TaskStartEventTopic`) | When a task exits expected or unexpected |
|
||||
| `runtime.TaskDeleteEventTopic` | MUST (follow `TaskExitEventTopic` or `TaskCreateEventTopic` if never started) | When a task is removed from a shim |
|
||||
| `runtime.TaskPausedEventTopic` | SHOULD | When a task is successfully paused |
|
||||
| `runtime.TaskResumedEventTopic` | SHOULD (follow `TaskPausedEventTopic`) | When a task is successfully resumed |
|
||||
| `runtime.TaskCheckpointedEventTopic` | SHOULD | When a task is checkpointed |
|
||||
| `runtime.TaskOOMEventTopic` | SHOULD | If the shim collects Out of Memory events |
|
||||
|
||||
#### Execs
|
||||
|
||||
| Topic | Compliance | Description |
|
||||
| ----- | ---------- | ----------- |
|
||||
| `runtime.TaskExecAddedEventTopic` | MUST (follow `TaskCreateEventTopic` ) | When an exec is successfully added |
|
||||
| `runtime.TaskExecStartedEventTopic` | MUST (follow `TaskExecStartedEventTopic`) | When an exec is successfully started |
|
||||
|
||||
### Other
|
||||
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
gruntime "runtime"
|
||||
"strings"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/events/exchange"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
@ -152,13 +151,6 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
// remove self from the runtime task list
|
||||
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
||||
b.rtTasks.Delete(ctx, b.bundle.ID)
|
||||
// shim will send the exit event
|
||||
b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
|
||||
ContainerID: b.bundle.ID,
|
||||
ExitStatus: response.ExitStatus,
|
||||
ExitedAt: response.ExitedAt,
|
||||
Pid: response.Pid,
|
||||
})
|
||||
return &runtime.Exit{
|
||||
Status: response.ExitStatus,
|
||||
Timestamp: response.ExitedAt,
|
||||
|
@ -19,7 +19,6 @@ package v2
|
||||
import (
|
||||
"context"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
tasktypes "github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
@ -114,18 +113,13 @@ func (p *process) CloseIO(ctx context.Context) error {
|
||||
|
||||
// Start the process
|
||||
func (p *process) Start(ctx context.Context) error {
|
||||
response, err := p.shim.task.Start(ctx, &task.StartRequest{
|
||||
_, err := p.shim.task.Start(ctx, &task.StartRequest{
|
||||
ID: p.shim.ID(),
|
||||
ExecID: p.id,
|
||||
})
|
||||
if err != nil {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
p.shim.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{
|
||||
ContainerID: p.shim.ID(),
|
||||
Pid: response.Pid,
|
||||
ExecID: p.id,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,8 @@ func newProcess(ctx context.Context, s *service, id string, pid uint32, pr *pipe
|
||||
|
||||
func waitForProcess(ctx context.Context, process *process, p *os.Process, s *service) {
|
||||
pid := uint32(p.Pid)
|
||||
process.startedWg.Add(1)
|
||||
|
||||
// Store the default non-exited value for calls to stat
|
||||
process.exit.Store(&processExit{
|
||||
pid: pid,
|
||||
@ -87,9 +89,11 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
|
||||
// Wait for the relay
|
||||
process.relay.wait()
|
||||
|
||||
// close the client io, and free upstream waiters
|
||||
process.close()
|
||||
// Wait for the started event to fire if it hasn't already
|
||||
process.startedWg.Wait()
|
||||
|
||||
// We publish the exit before freeing upstream so that the exit event always
|
||||
// happens before any delete event.
|
||||
s.publisher.Publish(
|
||||
ctx,
|
||||
runtime.TaskExitEventTopic,
|
||||
@ -100,6 +104,9 @@ func waitForProcess(ctx context.Context, process *process, p *os.Process, s *ser
|
||||
ExitStatus: uint32(status),
|
||||
ExitedAt: now,
|
||||
})
|
||||
|
||||
// close the client io, and free upstream waiters
|
||||
process.close()
|
||||
}
|
||||
|
||||
func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) {
|
||||
@ -114,6 +121,8 @@ func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRel
|
||||
relay: pr,
|
||||
waitBlock: make(chan struct{}),
|
||||
}
|
||||
process.startedWg.Add(1)
|
||||
|
||||
// Store the default non-exited value for calls to stat
|
||||
process.exit.Store(&processExit{
|
||||
pid: 0, // This is updated when the call to Start happens and the state is overwritten in waitForProcess.
|
||||
@ -139,7 +148,8 @@ type process struct {
|
||||
|
||||
// started track if the process has ever been started and will not be reset
|
||||
// for the lifetime of the process object.
|
||||
started bool
|
||||
started bool
|
||||
startedWg sync.WaitGroup
|
||||
|
||||
waitBlock chan struct{}
|
||||
// exit holds the exit value for all calls to `stat`. By default a
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
|
||||
winio "github.com/Microsoft/go-winio"
|
||||
"github.com/Microsoft/hcsshim/pkg/go-runhcs"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
containerd_types "github.com/containerd/containerd/api/types"
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -44,6 +45,7 @@ import (
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
"github.com/containerd/containerd/runtime/v2/runhcs/options"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
taskAPI "github.com/containerd/containerd/runtime/v2/task"
|
||||
@ -593,6 +595,22 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
|
||||
}
|
||||
s.processes[r.ID] = process
|
||||
|
||||
s.publisher.Publish(ctx,
|
||||
runtime.TaskCreateEventTopic,
|
||||
&eventstypes.TaskCreate{
|
||||
ContainerID: process.id,
|
||||
Bundle: process.bundle,
|
||||
Rootfs: r.Rootfs,
|
||||
IO: &eventstypes.TaskIO{
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
},
|
||||
Checkpoint: "",
|
||||
Pid: uint32(pid),
|
||||
})
|
||||
|
||||
return &taskAPI.CreateTaskResponse{
|
||||
Pid: uint32(pid),
|
||||
}, nil
|
||||
@ -711,9 +729,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
stat := p.stat()
|
||||
|
||||
pid := p.stat().pid
|
||||
if r.ExecID != "" {
|
||||
s.publisher.Publish(ctx,
|
||||
runtime.TaskExecStartedEventTopic,
|
||||
&eventstypes.TaskExecStarted{
|
||||
ContainerID: p.cid,
|
||||
ExecID: p.id,
|
||||
Pid: pid,
|
||||
})
|
||||
} else {
|
||||
s.publisher.Publish(ctx,
|
||||
runtime.TaskStartEventTopic,
|
||||
&eventstypes.TaskStart{
|
||||
ContainerID: p.id,
|
||||
Pid: pid,
|
||||
})
|
||||
}
|
||||
|
||||
p.startedWg.Done()
|
||||
|
||||
return &taskAPI.StartResponse{
|
||||
Pid: stat.pid,
|
||||
Pid: pid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -750,6 +788,14 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
s.mu.Lock()
|
||||
delete(s.processes, p.id)
|
||||
s.mu.Unlock()
|
||||
|
||||
s.publisher.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
|
||||
ContainerID: p.id,
|
||||
Pid: exit.pid,
|
||||
ExitStatus: exit.exitStatus,
|
||||
ExitedAt: exit.exitedAt,
|
||||
})
|
||||
|
||||
return &taskAPI.DeleteResponse{
|
||||
ExitedAt: exit.exitedAt,
|
||||
ExitStatus: exit.exitStatus,
|
||||
@ -780,6 +826,10 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.publisher.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
|
||||
r.ID,
|
||||
})
|
||||
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
@ -799,6 +849,10 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.publisher.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
|
||||
r.ID,
|
||||
})
|
||||
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
@ -910,6 +964,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
}
|
||||
s.processes[r.ExecID] = process
|
||||
|
||||
s.publisher.Publish(ctx,
|
||||
runtime.TaskExecAddedEventTopic,
|
||||
&eventstypes.TaskExecAdded{
|
||||
ContainerID: process.cid,
|
||||
ExecID: process.id,
|
||||
})
|
||||
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/api/types"
|
||||
tasktypes "github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -163,12 +162,6 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
// remove self from the runtime task list
|
||||
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
||||
s.rtTasks.Delete(ctx, s.ID())
|
||||
s.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
|
||||
ContainerID: s.ID(),
|
||||
ExitStatus: response.ExitStatus,
|
||||
ExitedAt: response.ExitedAt,
|
||||
Pid: response.Pid,
|
||||
})
|
||||
return &runtime.Exit{
|
||||
Status: response.ExitStatus,
|
||||
Timestamp: response.ExitedAt,
|
||||
@ -212,9 +205,6 @@ func (s *shim) Pause(ctx context.Context) error {
|
||||
}); err != nil {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
s.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
|
||||
ContainerID: s.ID(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -224,9 +214,6 @@ func (s *shim) Resume(ctx context.Context) error {
|
||||
}); err != nil {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
s.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
|
||||
ContainerID: s.ID(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -238,10 +225,6 @@ func (s *shim) Start(ctx context.Context) error {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
s.taskPid = int(response.Pid)
|
||||
s.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
|
||||
ContainerID: s.ID(),
|
||||
Pid: uint32(s.taskPid),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -340,9 +323,6 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
|
||||
if _, err := s.task.Checkpoint(ctx, request); err != nil {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
s.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
|
||||
ContainerID: s.ID(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user