diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index e16f8ac92..b847a6a8b 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -60,7 +60,7 @@ type binary struct { bundle *Bundle } -func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { +func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shimTask, err error) { args := []string{"-id", b.bundle.ID} switch logrus.GetLevel() { case logrus.DebugLevel, logrus.TraceLevel: @@ -128,10 +128,12 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ f.Close() } client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - return &shim{ - bundle: b.bundle, - client: client, - task: task.NewTaskClient(client), + return &shimTask{ + shim: &shim{ + bundle: b.bundle, + client: client, + }, + task: task.NewTaskClient(client), }, nil } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 1a6e9b293..d7b23f050 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -48,7 +48,7 @@ type Config struct { func init() { plugin.Register(&plugin.Registration{ Type: plugin.RuntimePluginV2, - ID: "task", + ID: "shim", Requires: []plugin.Type{ plugin.EventPlugin, plugin.MetadataPlugin, @@ -81,7 +81,7 @@ func init() { cs := metadata.NewContainerStore(m.(*metadata.DB)) events := ep.(*exchange.Exchange) - return New(ic.Context, &ManagerConfig{ + return NewShimManager(ic.Context, &ManagerConfig{ Root: ic.Root, State: ic.State, Address: ic.Address, @@ -104,49 +104,53 @@ type ManagerConfig struct { SchedCore bool } -// New task manager for v2 shims -func New(ctx context.Context, config *ManagerConfig) (*TaskManager, error) { +// NewShimManager creates a manager for v2 shims +func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) { for _, d := range []string{config.Root, config.State} { if err := os.MkdirAll(d, 0711); err != nil { return nil, err } } - m := &TaskManager{ + + m := &ShimManager{ root: config.Root, state: config.State, containerdAddress: config.Address, containerdTTRPCAddress: config.TTRPCAddress, - schedCore: config.SchedCore, - tasks: runtime.NewTaskList(), + list: runtime.NewTaskList(), events: config.Events, containers: config.Store, } + if err := m.loadExistingTasks(ctx); err != nil { return nil, err } + return m, nil } -// TaskManager manages v2 shim's and their tasks -type TaskManager struct { +// ShimManager manages currently running shim processes. +// It is mainly responsible for launching new shims and for proper shutdown and cleanup of existing instances. +// The manager is unaware of the underlying services shim provides and lets higher level services consume them, +// but don't care about lifecycle management. +type ShimManager struct { root string state string containerdAddress string containerdTTRPCAddress string schedCore bool - - tasks *runtime.TaskList - events *exchange.Exchange - containers containers.Store + list *runtime.TaskList + events *exchange.Exchange + containers containers.Store } -// ID of the task manager -func (m *TaskManager) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") +// ID of the shim manager +func (m *ShimManager) ID() string { + return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim") } // Create a new task -func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { +func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) if err != nil { return nil, err @@ -163,7 +167,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create } defer func() { if retErr != nil { - m.deleteShim(shim) + m.cleanupShim(shim) } }() @@ -172,14 +176,14 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create return nil, errors.Wrap(err, "failed to create shim") } - if err := m.tasks.Add(ctx, t); err != nil { + if err := m.list.Add(ctx, t); err != nil { return nil, errors.Wrap(err, "failed to add task") } return t, nil } -func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { +func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -199,12 +203,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, shim, err := b.Start(ctx, topts, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, b) // Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // would publish taskExit event, but the shim.Delete() would always failed with ttrpc // disconnect and there is no chance to remove this dead task from runtime task lists. // Thus it's better to delete it here. - m.tasks.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { return nil, errors.Wrap(err, "start failed") @@ -213,12 +217,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, return shim, nil } -// deleteShim attempts to properly delete and cleanup shim after error -func (m *TaskManager) deleteShim(shim *shim) { +// cleanupShim attempts to properly delete and cleanup shim after error +func (m *ShimManager) cleanupShim(shim *shimTask) { dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) defer cancel() - _, errShim := shim.delete(dctx, m.tasks.Delete) + _, errShim := shim.delete(dctx, m.list.Delete) if errShim != nil { if errdefs.IsDeadlineExceeded(errShim) { dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) @@ -230,24 +234,24 @@ func (m *TaskManager) deleteShim(shim *shim) { } // Get a specific task -func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { - return m.tasks.Get(ctx, id) +func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) { + return m.list.Get(ctx, id) } // Add a runtime task -func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error { - return m.tasks.Add(ctx, task) +func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error { + return m.list.Add(ctx, task) } // Delete a runtime task -func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { - task, err := m.tasks.Get(ctx, id) +func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { + task, err := m.list.Get(ctx, id) if err != nil { return nil, err } - shim := task.(*shim) - exit, err := shim.delete(ctx, m.tasks.Delete) + shim := task.(*shimTask) + exit, err := shim.delete(ctx, m.list.Delete) if err != nil { return nil, err } @@ -256,11 +260,11 @@ func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, err } // Tasks lists all tasks -func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { - return m.tasks.GetAll(ctx, all) +func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { + return m.list.GetAll(ctx, all) } -func (m *TaskManager) loadExistingTasks(ctx context.Context) error { +func (m *ShimManager) loadExistingTasks(ctx context.Context) error { nsDirs, err := os.ReadDir(m.state) if err != nil { return err @@ -275,7 +279,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error { continue } log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") - if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil { + if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil { log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") continue } @@ -287,7 +291,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error { return nil } -func (m *TaskManager) loadTasks(ctx context.Context) error { +func (m *ShimManager) loadShims(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -341,20 +345,20 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { shim, err := loadShim(ctx, bundle, func() { log.G(ctx).WithField("id", id).Info("shim disconnected") - cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) + cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall) // Remove self from the runtime task list. - m.tasks.Delete(ctx, id) + m.list.Delete(ctx, id) }) if err != nil { - cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall) + cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall) continue } - m.tasks.Add(ctx, shim) + m.list.Add(ctx, shim) } return nil } -func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) { +func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) { container, err := m.containers.Get(ctx, id) if err != nil { return nil, err @@ -362,7 +366,7 @@ func (m *TaskManager) container(ctx context.Context, id string) (*containers.Con return &container, nil } -func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error { +func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -375,7 +379,7 @@ func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error { // if the task was not loaded, cleanup and empty working directory // this can happen on a reboot where /run for the bundle state is cleaned up // but that persistent working dir is left - if _, err := m.tasks.Get(ctx, d.Name()); err != nil { + if _, err := m.list.Get(ctx, d.Name()); err != nil { path := filepath.Join(m.root, ns, d.Name()) if err := os.RemoveAll(path); err != nil { log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) diff --git a/runtime/v2/process.go b/runtime/v2/process.go index 903a4282b..497219ae5 100644 --- a/runtime/v2/process.go +++ b/runtime/v2/process.go @@ -29,7 +29,7 @@ import ( type process struct { id string - shim *shim + shim *shimTask } func (p *process) ID() string { diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 56e347563..2d9cb27c2 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -61,7 +61,7 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err @@ -115,10 +115,12 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err client.Close() } }() - s := &shim{ - client: client, - task: task.NewTaskClient(client), - bundle: bundle, + s := &shimTask{ + shim: &shim{ + bundle: bundle, + client: client, + }, + task: task.NewTaskClient(client), } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() @@ -182,28 +184,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi }) } -var _ runtime.Task = &shim{} - type shim struct { bundle *Bundle client *ttrpc.Client - task task.TaskService -} - -func (s *shim) Shutdown(ctx context.Context) error { - _, err := s.task.Shutdown(ctx, &task.ShutdownRequest{ - ID: s.ID(), - }) - if err != nil && !errors.Is(err, ttrpc.ErrClosed) { - return errdefs.FromGRPC(err) - } - return nil -} - -func (s *shim) waitShutdown(ctx context.Context) error { - ctx, cancel := timeout.WithContext(ctx, shutdownTimeout) - defer cancel() - return s.Shutdown(ctx) } // ID of the shim/task @@ -211,18 +194,6 @@ func (s *shim) ID() string { return s.bundle.ID } -// PID of the task -func (s *shim) PID(ctx context.Context) (uint32, error) { - response, err := s.task.Connect(ctx, &task.ConnectRequest{ - ID: s.ID(), - }) - if err != nil { - return 0, errdefs.FromGRPC(err) - } - - return response.TaskPid, nil -} - func (s *shim) Namespace() string { return s.bundle.Namespace } @@ -231,7 +202,43 @@ func (s *shim) Close() error { return s.client.Close() } -func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { +var _ runtime.Task = &shimTask{} + +// shimTask wraps shim process and adds task service client for compatibility with existing shim manager. +type shimTask struct { + *shim + task task.TaskService +} + +func (s *shimTask) Shutdown(ctx context.Context) error { + _, err := s.task.Shutdown(ctx, &task.ShutdownRequest{ + ID: s.ID(), + }) + if err != nil && !errors.Is(err, ttrpc.ErrClosed) { + return errdefs.FromGRPC(err) + } + return nil +} + +func (s *shimTask) waitShutdown(ctx context.Context) error { + ctx, cancel := timeout.WithContext(ctx, shutdownTimeout) + defer cancel() + return s.Shutdown(ctx) +} + +// PID of the task +func (s *shimTask) PID(ctx context.Context) (uint32, error) { + response, err := s.task.Connect(ctx, &task.ConnectRequest{ + ID: s.ID(), + }) + if err != nil { + return 0, errdefs.FromGRPC(err) + } + + return response.TaskPid, nil +} + +func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ ID: s.ID(), }) @@ -281,7 +288,7 @@ func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, }, nil } -func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { +func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { topts := opts.TaskOptions if topts == nil { topts = opts.RuntimeOptions @@ -312,7 +319,7 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas return s, nil } -func (s *shim) Pause(ctx context.Context) error { +func (s *shimTask) Pause(ctx context.Context) error { if _, err := s.task.Pause(ctx, &task.PauseRequest{ ID: s.ID(), }); err != nil { @@ -321,7 +328,7 @@ func (s *shim) Pause(ctx context.Context) error { return nil } -func (s *shim) Resume(ctx context.Context) error { +func (s *shimTask) Resume(ctx context.Context) error { if _, err := s.task.Resume(ctx, &task.ResumeRequest{ ID: s.ID(), }); err != nil { @@ -330,7 +337,7 @@ func (s *shim) Resume(ctx context.Context) error { return nil } -func (s *shim) Start(ctx context.Context) error { +func (s *shimTask) Start(ctx context.Context) error { _, err := s.task.Start(ctx, &task.StartRequest{ ID: s.ID(), }) @@ -340,7 +347,7 @@ func (s *shim) Start(ctx context.Context) error { return nil } -func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { +func (s *shimTask) Kill(ctx context.Context, signal uint32, all bool) error { if _, err := s.task.Kill(ctx, &task.KillRequest{ ID: s.ID(), Signal: signal, @@ -351,7 +358,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { return nil } -func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { +func (s *shimTask) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid exec id %s", id) } @@ -373,7 +380,7 @@ func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt }, nil } -func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { +func (s *shimTask) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { resp, err := s.task.Pids(ctx, &task.PidsRequest{ ID: s.ID(), }) @@ -390,7 +397,7 @@ func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { return processList, nil } -func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { +func (s *shimTask) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { _, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{ ID: s.ID(), Width: size.Width, @@ -402,7 +409,7 @@ func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { return nil } -func (s *shim) CloseIO(ctx context.Context) error { +func (s *shimTask) CloseIO(ctx context.Context) error { _, err := s.task.CloseIO(ctx, &task.CloseIORequest{ ID: s.ID(), Stdin: true, @@ -413,7 +420,7 @@ func (s *shim) CloseIO(ctx context.Context) error { return nil } -func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { +func (s *shimTask) Wait(ctx context.Context) (*runtime.Exit, error) { taskPid, err := s.PID(ctx) if err != nil { return nil, err @@ -431,7 +438,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { }, nil } -func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { +func (s *shimTask) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { request := &task.CheckpointTaskRequest{ ID: s.ID(), Path: path, @@ -443,7 +450,7 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) return nil } -func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error { +func (s *shimTask) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error { if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{ ID: s.ID(), Resources: resources, @@ -454,7 +461,7 @@ func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations ma return nil } -func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { +func (s *shimTask) Stats(ctx context.Context) (*ptypes.Any, error) { response, err := s.task.Stats(ctx, &task.StatsRequest{ ID: s.ID(), }) @@ -464,7 +471,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { return response.Stats, nil } -func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { +func (s *shimTask) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { p := &process{ id: id, shim: s, @@ -475,7 +482,7 @@ func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, err return p, nil } -func (s *shim) State(ctx context.Context) (runtime.State, error) { +func (s *shimTask) State(ctx context.Context) (runtime.State, error) { response, err := s.task.State(ctx, &task.StateRequest{ ID: s.ID(), }) diff --git a/services/tasks/local.go b/services/tasks/local.go index 067a5b05c..edee5616c 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -111,7 +111,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { store: db.ContentStore(), publisher: ep.(events.Publisher), monitor: monitor.(runtime.TaskMonitor), - v2Runtime: v2r.(*v2.TaskManager), + v2Runtime: v2r.(*v2.ShimManager), } for _, r := range runtimes { tasks, err := r.Tasks(ic.Context, true) @@ -139,7 +139,7 @@ type local struct { publisher events.Publisher monitor runtime.TaskMonitor - v2Runtime *v2.TaskManager + v2Runtime *v2.ShimManager } func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {