From 1d8b1bc75b42e19a602915bd764e5bafb4019679 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Wed, 10 Aug 2022 14:13:10 -0700 Subject: [PATCH] Cleanup shim manager Signed-off-by: Maksym Pavlenko --- runtime/v2/manager.go | 72 ++++++++++++++++++++--------------------- runtime/v2/shim.go | 50 ++++++++++++++-------------- runtime/v2/shim_load.go | 5 +-- 3 files changed, 65 insertions(+), 62 deletions(-) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index e9218b891..fd2717c0d 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -25,7 +25,6 @@ import ( "strings" "sync" - "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" @@ -143,7 +142,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - shims: runtime.NewNSMap[runtime.Task](), + shims: runtime.NewNSMap[ShimInstance](), events: config.Events, containers: config.Store, schedCore: config.SchedCore, @@ -167,7 +166,7 @@ type ShimManager struct { containerdAddress string containerdTTRPCAddress string schedCore bool - shims *runtime.NSMap[runtime.Task] + shims *runtime.NSMap[ShimInstance] events *exchange.Exchange containers containers.Store // runtimePaths is a cache of `runtime names` -> `resolved fs path` @@ -181,7 +180,7 @@ func (m *ShimManager) ID() string { } // Start launches a new shim instance -func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) { +func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) if err != nil { return nil, err @@ -236,18 +235,11 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO } }() - // NOTE: temporarily keep this wrapper around until containerd's task service depends on it. - // This will no longer be required once we migrate to client side task management. - shimTask := &shimTask{ - shim: shim, - task: task.NewTaskClient(shim.client), - } - - if err := m.shims.Add(ctx, shimTask); err != nil { + if err := m.shims.Add(ctx, shim); err != nil { return nil, fmt.Errorf("failed to add task: %w", err) } - return shimTask, nil + return shim, nil } func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { @@ -372,29 +364,22 @@ func (m *ShimManager) cleanupShim(shim *shim) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _ = shim.delete(dctx) + _ = shim.Delete(dctx) m.shims.Delete(dctx, shim.ID()) } -func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { - proc, err := m.shims.Get(ctx, id) - if err != nil { - return nil, err - } - - shimTask := proc.(*shimTask) - return shimTask, nil +func (m *ShimManager) Get(ctx context.Context, id string) (ShimInstance, error) { + return m.shims.Get(ctx, id) } // Delete a runtime task func (m *ShimManager) Delete(ctx context.Context, id string) error { - proc, err := m.shims.Get(ctx, id) + shim, err := m.shims.Get(ctx, id) if err != nil { return err } - shimTask := proc.(*shimTask) - err = shimTask.shim.delete(ctx) + err = shim.Delete(ctx) m.shims.Delete(ctx, id) return err @@ -431,15 +416,15 @@ func (m *TaskManager) ID() string { // Create launches new shim instance and creates new task func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { - process, err := m.manager.Start(ctx, taskID, opts) + shim, err := m.manager.Start(ctx, taskID, opts) if err != nil { return nil, fmt.Errorf("failed to start shim: %w", err) } // Cast to shim task and call task service to create a new container task instance. // This will not be required once shim service / client implemented. - shim := process.(*shimTask) - t, err := shim.Create(ctx, opts) + shimTask := newShimTask(shim) + t, err := shimTask.Create(ctx, opts) if err != nil { // NOTE: ctx contains required namespace information. m.manager.shims.Delete(ctx, taskID) @@ -448,15 +433,15 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr defer cancel() sandboxed := opts.SandboxID != "" - _, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {}) + _, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() } - shim.Shutdown(dctx) - shim.Close() + shimTask.Shutdown(dctx) + shimTask.Client().Close() } return nil, fmt.Errorf("failed to create shim task: %w", err) @@ -467,17 +452,29 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr // Get a specific task func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.manager.shims.Get(ctx, id) + shim, err := m.manager.shims.Get(ctx, id) + if err != nil { + return nil, err + } + return newShimTask(shim), nil } // Tasks lists all tasks func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.manager.shims.GetAll(ctx, all) + shims, err := m.manager.shims.GetAll(ctx, all) + if err != nil { + return nil, err + } + out := make([]runtime.Task, len(shims)) + for i := range shims { + out[i] = newShimTask(shims[i]) + } + return out, nil } // Delete deletes the task and shim instance func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { - item, err := m.manager.shims.Get(ctx, taskID) + shim, err := m.manager.shims.Get(ctx, taskID) if err != nil { return nil, err } @@ -487,8 +484,11 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, return nil, err } - sandboxed := container.SandboxID != "" - shimTask := item.(*shimTask) + var ( + sandboxed = container.SandboxID != "" + shimTask = newShimTask(shim) + ) + exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { m.manager.shims.Delete(ctx, id) }) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index fb7d68f27..88c6aa1bf 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -63,7 +63,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -117,24 +117,21 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, client.Close() } }() - s := &shimTask{ - shim: &shim{ - bundle: bundle, - client: client, - }, - task: task.NewTaskClient(client), + shim := &shim{ + bundle: bundle, + client: client, } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() - - // Check connectivity + // Check connectivity, TaskService is the only required service, so create a temp one to check connection. + s := newShimTask(shim) if _, err := s.PID(ctx); err != nil { return nil, err } - return s, nil + return shim, nil } -func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[runtime.Task], events *exchange.Exchange, binaryCall *binary) { +func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) { ctx = namespaces.WithNamespace(ctx, ns) ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) defer cancel() @@ -186,10 +183,8 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ }) } -// ShimProcess represents a shim instance managed by the shim service. -type ShimProcess interface { - runtime.Process - +// ShimInstance represents running shim process managed by ShimManager. +type ShimInstance interface { // ID of the shim. ID() string // Namespace of this shim. @@ -198,6 +193,8 @@ type ShimProcess interface { Bundle() string // Client returns the underlying TTRPC client for this shim. Client() *ttrpc.Client + // Delete will close the client and remove bundle from disk. + Delete(ctx context.Context) error } type shim struct { @@ -205,6 +202,8 @@ type shim struct { client *ttrpc.Client } +var _ ShimInstance = (*shim)(nil) + // ID of the shim/task func (s *shim) ID() string { return s.bundle.ID @@ -218,16 +217,16 @@ func (s *shim) Bundle() string { return s.bundle.Path } -func (s *shim) Close() error { - return s.client.Close() +func (s *shim) Client() *ttrpc.Client { + return s.client } -func (s *shim) delete(ctx context.Context) error { +func (s *shim) Delete(ctx context.Context) error { var ( result *multierror.Error ) - if err := s.Close(); err != nil { + if err := s.client.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) } @@ -247,12 +246,15 @@ var _ runtime.Task = &shimTask{} // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. type shimTask struct { - *shim + ShimInstance task task.TaskService } -func (s *shimTask) Client() *ttrpc.Client { - return s.client +func newShimTask(shim ShimInstance) *shimTask { + return &shimTask{ + ShimInstance: shim, + task: task.NewTaskClient(shim.Client()), + } } func (s *shimTask) Shutdown(ctx context.Context) error { @@ -319,7 +321,7 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c } } - if err := s.shim.delete(ctx); err != nil { + if err := s.ShimInstance.Delete(ctx); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete shim") } @@ -345,7 +347,7 @@ func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime } request := &task.CreateTaskRequest{ ID: s.ID(), - Bundle: s.bundle.Path, + Bundle: s.Bundle(), Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 760a6a7f2..9047c3eca 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -130,7 +130,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { ttrpcAddress: m.containerdTTRPCAddress, schedCore: m.schedCore, }) - shim, err := loadShim(ctx, bundle, func() { + instance, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall) @@ -141,6 +141,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall) continue } + shim := newShimTask(instance) // There are 3 possibilities for the loaded shim here: // 1. It could be a shim that is running a task. @@ -159,7 +160,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { // No need to do anything for removeTask since we never added this shim. shim.delete(ctx, false, func(ctx context.Context, id string) {}) } else { - m.shims.Add(ctx, shim) + m.shims.Add(ctx, shim.ShimInstance) } } return nil