Rename task manager to shim manager

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-08-25 17:00:27 -07:00
parent c8e88447ad
commit 2d5d3541e6
5 changed files with 118 additions and 105 deletions

View File

@ -60,7 +60,7 @@ type binary struct {
bundle *Bundle bundle *Bundle
} }
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shimTask, err error) {
args := []string{"-id", b.bundle.ID} args := []string{"-id", b.bundle.ID}
switch logrus.GetLevel() { switch logrus.GetLevel() {
case logrus.DebugLevel, logrus.TraceLevel: case logrus.DebugLevel, logrus.TraceLevel:
@ -128,9 +128,11 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
f.Close() f.Close()
} }
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
return &shim{ return &shimTask{
shim: &shim{
bundle: b.bundle, bundle: b.bundle,
client: client, client: client,
},
task: task.NewTaskClient(client), task: task.NewTaskClient(client),
}, nil }, nil
} }

View File

@ -48,7 +48,7 @@ type Config struct {
func init() { func init() {
plugin.Register(&plugin.Registration{ plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2, Type: plugin.RuntimePluginV2,
ID: "task", ID: "shim",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin, plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
@ -81,7 +81,7 @@ func init() {
cs := metadata.NewContainerStore(m.(*metadata.DB)) cs := metadata.NewContainerStore(m.(*metadata.DB))
events := ep.(*exchange.Exchange) events := ep.(*exchange.Exchange)
return New(ic.Context, &ManagerConfig{ return NewShimManager(ic.Context, &ManagerConfig{
Root: ic.Root, Root: ic.Root,
State: ic.State, State: ic.State,
Address: ic.Address, Address: ic.Address,
@ -104,49 +104,53 @@ type ManagerConfig struct {
SchedCore bool SchedCore bool
} }
// New task manager for v2 shims // NewShimManager creates a manager for v2 shims
func New(ctx context.Context, config *ManagerConfig) (*TaskManager, error) { func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, error) {
for _, d := range []string{config.Root, config.State} { for _, d := range []string{config.Root, config.State} {
if err := os.MkdirAll(d, 0711); err != nil { if err := os.MkdirAll(d, 0711); err != nil {
return nil, err return nil, err
} }
} }
m := &TaskManager{
m := &ShimManager{
root: config.Root, root: config.Root,
state: config.State, state: config.State,
containerdAddress: config.Address, containerdAddress: config.Address,
containerdTTRPCAddress: config.TTRPCAddress, containerdTTRPCAddress: config.TTRPCAddress,
schedCore: config.SchedCore, list: runtime.NewTaskList(),
tasks: runtime.NewTaskList(),
events: config.Events, events: config.Events,
containers: config.Store, containers: config.Store,
} }
if err := m.loadExistingTasks(ctx); err != nil { if err := m.loadExistingTasks(ctx); err != nil {
return nil, err return nil, err
} }
return m, nil return m, nil
} }
// TaskManager manages v2 shim's and their tasks // ShimManager manages currently running shim processes.
type TaskManager struct { // It is mainly responsible for launching new shims and for proper shutdown and cleanup of existing instances.
// The manager is unaware of the underlying services shim provides and lets higher level services consume them,
// but don't care about lifecycle management.
type ShimManager struct {
root string root string
state string state string
containerdAddress string containerdAddress string
containerdTTRPCAddress string containerdTTRPCAddress string
schedCore bool schedCore bool
list *runtime.TaskList
tasks *runtime.TaskList
events *exchange.Exchange events *exchange.Exchange
containers containers.Store containers containers.Store
} }
// ID of the task manager // ID of the shim manager
func (m *TaskManager) ID() string { func (m *ShimManager) ID() string {
return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "task") return fmt.Sprintf("%s.%s", plugin.RuntimePluginV2, "shim")
} }
// Create a new task // Create a new task
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) { func (m *ShimManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
if err != nil { if err != nil {
return nil, err return nil, err
@ -163,7 +167,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
} }
defer func() { defer func() {
if retErr != nil { if retErr != nil {
m.deleteShim(shim) m.cleanupShim(shim)
} }
}() }()
@ -172,14 +176,14 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
return nil, errors.Wrap(err, "failed to create shim") return nil, errors.Wrap(err, "failed to create shim")
} }
if err := m.tasks.Add(ctx, t); err != nil { if err := m.list.Add(ctx, t); err != nil {
return nil, errors.Wrap(err, "failed to add task") return nil, errors.Wrap(err, "failed to add task")
} }
return t, nil return t, nil
} }
func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shimTask, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -199,12 +203,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
shim, err := b.Start(ctx, topts, func() { shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b) cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim() // Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc // would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists. // disconnect and there is no chance to remove this dead task from runtime task lists.
// Thus it's better to delete it here. // Thus it's better to delete it here.
m.tasks.Delete(ctx, id) m.list.Delete(ctx, id)
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "start failed") return nil, errors.Wrap(err, "start failed")
@ -213,12 +217,12 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
return shim, nil return shim, nil
} }
// deleteShim attempts to properly delete and cleanup shim after error // cleanupShim attempts to properly delete and cleanup shim after error
func (m *TaskManager) deleteShim(shim *shim) { func (m *ShimManager) cleanupShim(shim *shimTask) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel() defer cancel()
_, errShim := shim.delete(dctx, m.tasks.Delete) _, errShim := shim.delete(dctx, m.list.Delete)
if errShim != nil { if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) { if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
@ -230,24 +234,24 @@ func (m *TaskManager) deleteShim(shim *shim) {
} }
// Get a specific task // Get a specific task
func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { func (m *ShimManager) Get(ctx context.Context, id string) (runtime.Task, error) {
return m.tasks.Get(ctx, id) return m.list.Get(ctx, id)
} }
// Add a runtime task // Add a runtime task
func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error { func (m *ShimManager) Add(ctx context.Context, task runtime.Task) error {
return m.tasks.Add(ctx, task) return m.list.Add(ctx, task)
} }
// Delete a runtime task // Delete a runtime task
func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) { func (m *ShimManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := m.tasks.Get(ctx, id) task, err := m.list.Get(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
shim := task.(*shim) shim := task.(*shimTask)
exit, err := shim.delete(ctx, m.tasks.Delete) exit, err := shim.delete(ctx, m.list.Delete)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -256,11 +260,11 @@ func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, err
} }
// Tasks lists all tasks // Tasks lists all tasks
func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { func (m *ShimManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return m.tasks.GetAll(ctx, all) return m.list.GetAll(ctx, all)
} }
func (m *TaskManager) loadExistingTasks(ctx context.Context) error { func (m *ShimManager) loadExistingTasks(ctx context.Context) error {
nsDirs, err := os.ReadDir(m.state) nsDirs, err := os.ReadDir(m.state)
if err != nil { if err != nil {
return err return err
@ -275,7 +279,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error {
continue continue
} }
log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).Debug("loading tasks in namespace")
if err := m.loadTasks(namespaces.WithNamespace(ctx, ns)); err != nil { if err := m.loadShims(namespaces.WithNamespace(ctx, ns)); err != nil {
log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace") log.G(ctx).WithField("namespace", ns).WithError(err).Error("loading tasks in namespace")
continue continue
} }
@ -287,7 +291,7 @@ func (m *TaskManager) loadExistingTasks(ctx context.Context) error {
return nil return nil
} }
func (m *TaskManager) loadTasks(ctx context.Context) error { func (m *ShimManager) loadShims(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
@ -341,20 +345,20 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
shim, err := loadShim(ctx, bundle, func() { shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) cleanupAfterDeadShim(context.Background(), id, ns, m.list, m.events, binaryCall)
// Remove self from the runtime task list. // Remove self from the runtime task list.
m.tasks.Delete(ctx, id) m.list.Delete(ctx, id)
}) })
if err != nil { if err != nil {
cleanupAfterDeadShim(ctx, id, ns, m.tasks, m.events, binaryCall) cleanupAfterDeadShim(ctx, id, ns, m.list, m.events, binaryCall)
continue continue
} }
m.tasks.Add(ctx, shim) m.list.Add(ctx, shim)
} }
return nil return nil
} }
func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) { func (m *ShimManager) container(ctx context.Context, id string) (*containers.Container, error) {
container, err := m.containers.Get(ctx, id) container, err := m.containers.Get(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -362,7 +366,7 @@ func (m *TaskManager) container(ctx context.Context, id string) (*containers.Con
return &container, nil return &container, nil
} }
func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error { func (m *ShimManager) cleanupWorkDirs(ctx context.Context) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
@ -375,7 +379,7 @@ func (m *TaskManager) cleanupWorkDirs(ctx context.Context) error {
// if the task was not loaded, cleanup and empty working directory // if the task was not loaded, cleanup and empty working directory
// this can happen on a reboot where /run for the bundle state is cleaned up // this can happen on a reboot where /run for the bundle state is cleaned up
// but that persistent working dir is left // but that persistent working dir is left
if _, err := m.tasks.Get(ctx, d.Name()); err != nil { if _, err := m.list.Get(ctx, d.Name()); err != nil {
path := filepath.Join(m.root, ns, d.Name()) path := filepath.Join(m.root, ns, d.Name())
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(path); err != nil {
log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path) log.G(ctx).WithError(err).Errorf("cleanup working dir %s", path)

View File

@ -29,7 +29,7 @@ import (
type process struct { type process struct {
id string id string
shim *shim shim *shimTask
} }
func (p *process) ID() string { func (p *process) ID() string {

View File

@ -61,7 +61,7 @@ func loadAddress(path string) (string, error) {
return string(data), nil return string(data), nil
} }
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) { func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) {
address, err := loadAddress(filepath.Join(bundle.Path, "address")) address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil { if err != nil {
return nil, err return nil, err
@ -115,10 +115,12 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err
client.Close() client.Close()
} }
}() }()
s := &shim{ s := &shimTask{
client: client, shim: &shim{
task: task.NewTaskClient(client),
bundle: bundle, bundle: bundle,
client: client,
},
task: task.NewTaskClient(client),
} }
ctx, cancel := timeout.WithContext(ctx, loadTimeout) ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel() defer cancel()
@ -182,28 +184,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
}) })
} }
var _ runtime.Task = &shim{}
type shim struct { type shim struct {
bundle *Bundle bundle *Bundle
client *ttrpc.Client client *ttrpc.Client
task task.TaskService
}
func (s *shim) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
})
if err != nil && !errors.Is(err, ttrpc.ErrClosed) {
return errdefs.FromGRPC(err)
}
return nil
}
func (s *shim) waitShutdown(ctx context.Context) error {
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
defer cancel()
return s.Shutdown(ctx)
} }
// ID of the shim/task // ID of the shim/task
@ -211,18 +194,6 @@ func (s *shim) ID() string {
return s.bundle.ID return s.bundle.ID
} }
// PID of the task
func (s *shim) PID(ctx context.Context) (uint32, error) {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
}
func (s *shim) Namespace() string { func (s *shim) Namespace() string {
return s.bundle.Namespace return s.bundle.Namespace
} }
@ -231,7 +202,43 @@ func (s *shim) Close() error {
return s.client.Close() return s.client.Close()
} }
func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) { var _ runtime.Task = &shimTask{}
// shimTask wraps shim process and adds task service client for compatibility with existing shim manager.
type shimTask struct {
*shim
task task.TaskService
}
func (s *shimTask) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
})
if err != nil && !errors.Is(err, ttrpc.ErrClosed) {
return errdefs.FromGRPC(err)
}
return nil
}
func (s *shimTask) waitShutdown(ctx context.Context) error {
ctx, cancel := timeout.WithContext(ctx, shutdownTimeout)
defer cancel()
return s.Shutdown(ctx)
}
// PID of the task
func (s *shimTask) PID(ctx context.Context) (uint32, error) {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
}
func (s *shimTask) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -281,7 +288,7 @@ func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context,
}, nil }, nil
} }
func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) { func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Task, error) {
topts := opts.TaskOptions topts := opts.TaskOptions
if topts == nil { if topts == nil {
topts = opts.RuntimeOptions topts = opts.RuntimeOptions
@ -312,7 +319,7 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
return s, nil return s, nil
} }
func (s *shim) Pause(ctx context.Context) error { func (s *shimTask) Pause(ctx context.Context) error {
if _, err := s.task.Pause(ctx, &task.PauseRequest{ if _, err := s.task.Pause(ctx, &task.PauseRequest{
ID: s.ID(), ID: s.ID(),
}); err != nil { }); err != nil {
@ -321,7 +328,7 @@ func (s *shim) Pause(ctx context.Context) error {
return nil return nil
} }
func (s *shim) Resume(ctx context.Context) error { func (s *shimTask) Resume(ctx context.Context) error {
if _, err := s.task.Resume(ctx, &task.ResumeRequest{ if _, err := s.task.Resume(ctx, &task.ResumeRequest{
ID: s.ID(), ID: s.ID(),
}); err != nil { }); err != nil {
@ -330,7 +337,7 @@ func (s *shim) Resume(ctx context.Context) error {
return nil return nil
} }
func (s *shim) Start(ctx context.Context) error { func (s *shimTask) Start(ctx context.Context) error {
_, err := s.task.Start(ctx, &task.StartRequest{ _, err := s.task.Start(ctx, &task.StartRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -340,7 +347,7 @@ func (s *shim) Start(ctx context.Context) error {
return nil return nil
} }
func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error { func (s *shimTask) Kill(ctx context.Context, signal uint32, all bool) error {
if _, err := s.task.Kill(ctx, &task.KillRequest{ if _, err := s.task.Kill(ctx, &task.KillRequest{
ID: s.ID(), ID: s.ID(),
Signal: signal, Signal: signal,
@ -351,7 +358,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
return nil return nil
} }
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) { func (s *shimTask) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil { if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id %s", id) return nil, errors.Wrapf(err, "invalid exec id %s", id)
} }
@ -373,7 +380,7 @@ func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt
}, nil }, nil
} }
func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) { func (s *shimTask) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
resp, err := s.task.Pids(ctx, &task.PidsRequest{ resp, err := s.task.Pids(ctx, &task.PidsRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -390,7 +397,7 @@ func (s *shim) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
return processList, nil return processList, nil
} }
func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { func (s *shimTask) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{ _, err := s.task.ResizePty(ctx, &task.ResizePtyRequest{
ID: s.ID(), ID: s.ID(),
Width: size.Width, Width: size.Width,
@ -402,7 +409,7 @@ func (s *shim) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
return nil return nil
} }
func (s *shim) CloseIO(ctx context.Context) error { func (s *shimTask) CloseIO(ctx context.Context) error {
_, err := s.task.CloseIO(ctx, &task.CloseIORequest{ _, err := s.task.CloseIO(ctx, &task.CloseIORequest{
ID: s.ID(), ID: s.ID(),
Stdin: true, Stdin: true,
@ -413,7 +420,7 @@ func (s *shim) CloseIO(ctx context.Context) error {
return nil return nil
} }
func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { func (s *shimTask) Wait(ctx context.Context) (*runtime.Exit, error) {
taskPid, err := s.PID(ctx) taskPid, err := s.PID(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -431,7 +438,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
}, nil }, nil
} }
func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { func (s *shimTask) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
request := &task.CheckpointTaskRequest{ request := &task.CheckpointTaskRequest{
ID: s.ID(), ID: s.ID(),
Path: path, Path: path,
@ -443,7 +450,7 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
return nil return nil
} }
func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error { func (s *shimTask) Update(ctx context.Context, resources *ptypes.Any, annotations map[string]string) error {
if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{ if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{
ID: s.ID(), ID: s.ID(),
Resources: resources, Resources: resources,
@ -454,7 +461,7 @@ func (s *shim) Update(ctx context.Context, resources *ptypes.Any, annotations ma
return nil return nil
} }
func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) { func (s *shimTask) Stats(ctx context.Context) (*ptypes.Any, error) {
response, err := s.task.Stats(ctx, &task.StatsRequest{ response, err := s.task.Stats(ctx, &task.StatsRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -464,7 +471,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
return response.Stats, nil return response.Stats, nil
} }
func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) { func (s *shimTask) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &process{ p := &process{
id: id, id: id,
shim: s, shim: s,
@ -475,7 +482,7 @@ func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, err
return p, nil return p, nil
} }
func (s *shim) State(ctx context.Context) (runtime.State, error) { func (s *shimTask) State(ctx context.Context) (runtime.State, error) {
response, err := s.task.State(ctx, &task.StateRequest{ response, err := s.task.State(ctx, &task.StateRequest{
ID: s.ID(), ID: s.ID(),
}) })

View File

@ -111,7 +111,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
store: db.ContentStore(), store: db.ContentStore(),
publisher: ep.(events.Publisher), publisher: ep.(events.Publisher),
monitor: monitor.(runtime.TaskMonitor), monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(*v2.TaskManager), v2Runtime: v2r.(*v2.ShimManager),
} }
for _, r := range runtimes { for _, r := range runtimes {
tasks, err := r.Tasks(ic.Context, true) tasks, err := r.Tasks(ic.Context, true)
@ -139,7 +139,7 @@ type local struct {
publisher events.Publisher publisher events.Publisher
monitor runtime.TaskMonitor monitor runtime.TaskMonitor
v2Runtime *v2.TaskManager v2Runtime *v2.ShimManager
} }
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) { func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {