From d30d897ef93e7838f0c0828e0e85619b93331131 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Wed, 4 Aug 2021 10:38:05 -0700 Subject: [PATCH] Cleanup v2 shim Signed-off-by: Maksym Pavlenko --- runtime/runtime.go | 2 +- runtime/task.go | 14 ++++++-- runtime/v1/linux/runtime.go | 18 ++++++++-- runtime/v1/linux/task.go | 8 ++--- runtime/v2/binary.go | 15 +++----- runtime/v2/manager.go | 23 +++++++++---- runtime/v2/shim.go | 69 ++++++++++++++++++------------------- services/tasks/local.go | 25 ++++++++++++-- 8 files changed, 107 insertions(+), 67 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index 3d758fb97..52974a3a9 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -72,5 +72,5 @@ type PlatformRuntime interface { // Add adds a task into runtime. Add(ctx context.Context, task Task) error // Delete remove a task. - Delete(ctx context.Context, taskID string) + Delete(ctx context.Context, taskID string) (*Exit, error) } diff --git a/runtime/task.go b/runtime/task.go index c9876ed4a..e453fa3a9 100644 --- a/runtime/task.go +++ b/runtime/task.go @@ -47,6 +47,14 @@ type Process interface { Start(ctx context.Context) error // Wait for the process to exit Wait(ctx context.Context) (*Exit, error) +} + +// ExecProcess is a process spawned in container via Task.Exec call. +// The only difference from a regular `Process` is that exec process can delete self, +// while task process requires slightly more complex logic and needs to be deleted through the task manager. +type ExecProcess interface { + Process + // Delete deletes the process Delete(ctx context.Context) (*Exit, error) } @@ -56,7 +64,7 @@ type Task interface { Process // PID of the process - PID() uint32 + PID(ctx context.Context) (uint32, error) // Namespace that the task exists in Namespace() string // Pause pauses the container process @@ -64,7 +72,7 @@ type Task interface { // Resume unpauses the container process Resume(ctx context.Context) error // Exec adds a process into the container - Exec(ctx context.Context, id string, opts ExecOpts) (Process, error) + Exec(ctx context.Context, id string, opts ExecOpts) (ExecProcess, error) // Pids returns all pids Pids(ctx context.Context) ([]ProcessInfo, error) // Checkpoint checkpoints a container to an image with live system data @@ -72,7 +80,7 @@ type Task interface { // Update sets the provided resources to a running task Update(ctx context.Context, resources *types.Any, annotations map[string]string) error // Process returns a process within the task for the provided id - Process(ctx context.Context, id string) (Process, error) + Process(ctx context.Context, id string) (ExecProcess, error) // Stats returns runtime specific metrics for a task Stats(ctx context.Context) (*types.Any, error) } diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index aa6d3f314..8ed7297d5 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -43,8 +43,8 @@ import ( "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" v1 "github.com/containerd/containerd/runtime/v1" - shim "github.com/containerd/containerd/runtime/v1/shim/v1" - runc "github.com/containerd/go-runc" + "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/go-runc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -315,8 +315,20 @@ func (r *Runtime) Add(ctx context.Context, task runtime.Task) error { } // Delete a runtime task -func (r *Runtime) Delete(ctx context.Context, id string) { +func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) { + task, err := r.tasks.Get(ctx, id) + if err != nil { + return nil, err + } + + s := task.(*Task) + exit, err := s.Delete(ctx) + if err != nil { + return nil, err + } + r.tasks.Delete(ctx, id) + return exit, nil } func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { diff --git a/runtime/v1/linux/task.go b/runtime/v1/linux/task.go index 1e9e50ca5..b1ece32f2 100644 --- a/runtime/v1/linux/task.go +++ b/runtime/v1/linux/task.go @@ -85,8 +85,8 @@ func (t *Task) Namespace() string { } // PID of the task -func (t *Task) PID() uint32 { - return uint32(t.pid) +func (t *Task) PID(_ctx context.Context) (uint32, error) { + return uint32(t.pid), nil } // Delete the task and return the exit status @@ -226,7 +226,7 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error { } // Exec creates a new process inside the task -func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { +func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid exec id") } @@ -316,7 +316,7 @@ func (t *Task) Update(ctx context.Context, resources *types.Any, _ map[string]st } // Process returns a specific process inside the task by the process id -func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) { +func (t *Task) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { p := &Process{ id: id, t: t, diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index a214cb3b3..8f01f0f83 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -24,7 +24,6 @@ import ( gruntime "runtime" "strings" - "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/runtime" @@ -36,14 +35,12 @@ import ( "github.com/sirupsen/logrus" ) -func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary { +func shimBinary(bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string) *binary { return &binary{ bundle: bundle, runtime: runtime, containerdAddress: containerdAddress, containerdTTRPCAddress: containerdTTRPCAddress, - events: events, - rtTasks: rt, } } @@ -52,8 +49,6 @@ type binary struct { containerdAddress string containerdTTRPCAddress string bundle *Bundle - events *exchange.Exchange - rtTasks *runtime.TaskList } func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { @@ -123,11 +118,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ } client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) return &shim{ - bundle: b.bundle, - client: client, - task: task.NewTaskClient(client), - events: b.events, - rtTasks: b.rtTasks, + bundle: b.bundle, + client: client, + task: task.NewTaskClient(client), }, nil } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index f8cdd9c6c..c3362993d 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -162,7 +162,7 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, topts = opts.RuntimeOptions } - b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) + b := shimBinary(bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress) shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") @@ -185,7 +185,7 @@ func (m *TaskManager) deleteShim(shim *shim) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.Delete(dctx) + _, errShim := shim.delete(dctx, m.tasks.Delete) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) @@ -207,8 +207,19 @@ func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error { } // Delete a runtime task -func (m *TaskManager) Delete(ctx context.Context, id string) { - m.tasks.Delete(ctx, id) +func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { + task, err := m.tasks.Get(ctx, id) + if err != nil { + return nil, err + } + + shim := task.(*shim) + exit, err := shim.delete(ctx, m.tasks.Delete) + if err != nil { + return nil, err + } + + return exit, err } // Tasks lists all tasks @@ -287,8 +298,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { bundle.Delete() continue } - binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) - shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { + binaryCall := shimBinary(bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress) + shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index c2c290fa3..43a19591b 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt } }() s := &shim{ - client: client, - task: task.NewTaskClient(client), - bundle: bundle, - events: events, - rtTasks: rt, + client: client, + task: task.NewTaskClient(client), + bundle: bundle, } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() - if err := s.Connect(ctx); err != nil { + + // Check connectivity + if _, err := s.PID(ctx); err != nil { return nil, err } return s, nil @@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi var _ runtime.Task = &shim{} type shim struct { - bundle *Bundle - client *ttrpc.Client - task task.TaskService - taskPid int - events *exchange.Exchange - rtTasks *runtime.TaskList -} - -func (s *shim) Connect(ctx context.Context) error { - response, err := s.task.Connect(ctx, &task.ConnectRequest{ - ID: s.ID(), - }) - if err != nil { - return err - } - s.taskPid = int(response.TaskPid) - return nil + bundle *Bundle + client *ttrpc.Client + task task.TaskService } func (s *shim) Shutdown(ctx context.Context) error { @@ -227,8 +213,15 @@ func (s *shim) ID() string { } // PID of the task -func (s *shim) PID() uint32 { - return uint32(s.taskPid) +func (s *shim) PID(ctx context.Context) (uint32, error) { + response, err := s.task.Connect(ctx, &task.ConnectRequest{ + ID: s.ID(), + }) + if err != nil { + return 0, errdefs.FromGRPC(err) + } + + return response.TaskPid, nil } func (s *shim) Namespace() string { @@ -239,7 +232,7 @@ func (s *shim) Close() error { return s.client.Close() } -func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { +func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ ID: s.ID(), }) @@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { // So we should remove the record and prevent duplicate events from // ttrpc-callback-on-close. if shimErr == nil { - s.rtTasks.Delete(ctx, s.ID()) + removeTask(ctx, s.ID()) } if err := s.waitShutdown(ctx); err != nil { @@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { // remove self from the runtime task list // this seems dirty but it cleans up the API across runtimes, tasks, and the service - s.rtTasks.Delete(ctx, s.ID()) + removeTask(ctx, s.ID()) if err := s.bundle.Delete(); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") } @@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas Options: m.Options, }) } - response, err := s.task.Create(ctx, request) + + _, err := s.task.Create(ctx, request) if err != nil { return nil, errdefs.FromGRPC(err) } - s.taskPid = int(response.Pid) + return s, nil } @@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error { } func (s *shim) Start(ctx context.Context) error { - response, err := s.task.Start(ctx, &task.StartRequest{ + _, err := s.task.Start(ctx, &task.StartRequest{ ID: s.ID(), }) if err != nil { return errdefs.FromGRPC(err) } - s.taskPid = int(response.Pid) return nil } @@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { return nil } -func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { +func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid exec id %s", id) } @@ -422,6 +415,10 @@ func (s *shim) CloseIO(ctx context.Context) error { } func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { + taskPid, err := s.PID(ctx) + if err != nil { + return nil, err + } response, err := s.task.Wait(ctx, &task.WaitRequest{ ID: s.ID(), }) @@ -429,7 +426,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { return nil, errdefs.FromGRPC(err) } return &runtime.Exit{ - Pid: uint32(s.taskPid), + Pid: taskPid, Timestamp: response.ExitedAt, Status: response.ExitStatus, }, nil @@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { return response.Stats, nil } -func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) { +func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { p := &process{ id: id, shim: s, diff --git a/services/tasks/local.go b/services/tasks/local.go index cd5280a7f..f24f7cae4 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -215,9 +215,13 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. if err := l.monitor.Monitor(c, labels); err != nil { return nil, errors.Wrap(err, "monitor task") } + pid, err := c.PID(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get task pid") + } return &api.CreateTaskResponse{ ContainerID: r.ContainerID, - Pid: c.PID(), + Pid: pid, }, nil } @@ -245,17 +249,32 @@ func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOp } func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) { - t, err := l.getTask(ctx, r.ContainerID) + container, err := l.getContainer(ctx, r.ContainerID) if err != nil { return nil, err } + + // Find runtime manager + rtime, err := l.getRuntime(container.Runtime.Name) + if err != nil { + return nil, err + } + + // Get task object + t, err := rtime.Get(ctx, container.ID) + if err != nil { + return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID) + } + if err := l.monitor.Stop(t); err != nil { return nil, err } - exit, err := t.Delete(ctx) + + exit, err := rtime.Delete(ctx, r.ContainerID) if err != nil { return nil, errdefs.ToGRPC(err) } + return &api.DeleteResponse{ ExitStatus: exit.Status, ExitedAt: exit.Timestamp,