Merge pull request #7280 from mxpv/runtime
Runtime cleanup (Shim manager and task service)
This commit is contained in:
		
							
								
								
									
										142
									
								
								runtime/nsmap.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										142
									
								
								runtime/nsmap.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,142 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package runtime | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| ) | ||||
|  | ||||
| type object interface { | ||||
| 	ID() string | ||||
| } | ||||
|  | ||||
| // NSMap extends Map type with a notion of namespaces passed via Context. | ||||
| type NSMap[T object] struct { | ||||
| 	mu      sync.Mutex | ||||
| 	objects map[string]map[string]T | ||||
| } | ||||
|  | ||||
| // NewNSMap returns a new NSMap | ||||
| func NewNSMap[T object]() *NSMap[T] { | ||||
| 	return &NSMap[T]{ | ||||
| 		objects: make(map[string]map[string]T), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Get a task | ||||
| func (m *NSMap[T]) Get(ctx context.Context, id string) (T, error) { | ||||
| 	m.mu.Lock() | ||||
| 	defer m.mu.Unlock() | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	var t T | ||||
| 	if err != nil { | ||||
| 		return t, err | ||||
| 	} | ||||
| 	tasks, ok := m.objects[namespace] | ||||
| 	if !ok { | ||||
| 		return t, errdefs.ErrNotFound | ||||
| 	} | ||||
| 	t, ok = tasks[id] | ||||
| 	if !ok { | ||||
| 		return t, errdefs.ErrNotFound | ||||
| 	} | ||||
| 	return t, nil | ||||
| } | ||||
|  | ||||
| // GetAll objects under a namespace | ||||
| func (m *NSMap[T]) GetAll(ctx context.Context, noNS bool) ([]T, error) { | ||||
| 	m.mu.Lock() | ||||
| 	defer m.mu.Unlock() | ||||
| 	var o []T | ||||
| 	if noNS { | ||||
| 		for ns := range m.objects { | ||||
| 			for _, t := range m.objects[ns] { | ||||
| 				o = append(o, t) | ||||
| 			} | ||||
| 		} | ||||
| 		return o, nil | ||||
| 	} | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	tasks, ok := m.objects[namespace] | ||||
| 	if !ok { | ||||
| 		return o, nil | ||||
| 	} | ||||
| 	for _, t := range tasks { | ||||
| 		o = append(o, t) | ||||
| 	} | ||||
| 	return o, nil | ||||
| } | ||||
|  | ||||
| // Add a task | ||||
| func (m *NSMap[T]) Add(ctx context.Context, t T) error { | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return m.AddWithNamespace(namespace, t) | ||||
| } | ||||
|  | ||||
| // AddWithNamespace adds a task with the provided namespace | ||||
| func (m *NSMap[T]) AddWithNamespace(namespace string, t T) error { | ||||
| 	m.mu.Lock() | ||||
| 	defer m.mu.Unlock() | ||||
|  | ||||
| 	id := t.ID() | ||||
| 	if _, ok := m.objects[namespace]; !ok { | ||||
| 		m.objects[namespace] = make(map[string]T) | ||||
| 	} | ||||
| 	if _, ok := m.objects[namespace][id]; ok { | ||||
| 		return fmt.Errorf("%s: %w", id, errdefs.ErrAlreadyExists) | ||||
| 	} | ||||
| 	m.objects[namespace][id] = t | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Delete a task | ||||
| func (m *NSMap[T]) Delete(ctx context.Context, id string) { | ||||
| 	m.mu.Lock() | ||||
| 	defer m.mu.Unlock() | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	tasks, ok := m.objects[namespace] | ||||
| 	if ok { | ||||
| 		delete(tasks, id) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *NSMap[T]) IsEmpty() bool { | ||||
| 	m.mu.Lock() | ||||
| 	defer m.mu.Unlock() | ||||
|  | ||||
| 	for ns := range m.objects { | ||||
| 		if len(m.objects[ns]) > 0 { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return true | ||||
| } | ||||
| @@ -1,144 +0,0 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package runtime | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/containerd/containerd/namespaces" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	// ErrTaskNotExists is returned when a task does not exist | ||||
| 	ErrTaskNotExists = errors.New("task does not exist") | ||||
| 	// ErrTaskAlreadyExists is returned when a task already exists | ||||
| 	ErrTaskAlreadyExists = errors.New("task already exists") | ||||
| ) | ||||
|  | ||||
| // NewTaskList returns a new TaskList | ||||
| func NewTaskList() *TaskList { | ||||
| 	return &TaskList{ | ||||
| 		tasks: make(map[string]map[string]Task), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TaskList holds and provides locking around tasks | ||||
| type TaskList struct { | ||||
| 	mu    sync.Mutex | ||||
| 	tasks map[string]map[string]Task | ||||
| } | ||||
|  | ||||
| // Get a task | ||||
| func (l *TaskList) Get(ctx context.Context, id string) (Task, error) { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	tasks, ok := l.tasks[namespace] | ||||
| 	if !ok { | ||||
| 		return nil, ErrTaskNotExists | ||||
| 	} | ||||
| 	t, ok := tasks[id] | ||||
| 	if !ok { | ||||
| 		return nil, ErrTaskNotExists | ||||
| 	} | ||||
| 	return t, nil | ||||
| } | ||||
|  | ||||
| // GetAll tasks under a namespace | ||||
| func (l *TaskList) GetAll(ctx context.Context, noNS bool) ([]Task, error) { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
| 	var o []Task | ||||
| 	if noNS { | ||||
| 		for ns := range l.tasks { | ||||
| 			for _, t := range l.tasks[ns] { | ||||
| 				o = append(o, t) | ||||
| 			} | ||||
| 		} | ||||
| 		return o, nil | ||||
| 	} | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	tasks, ok := l.tasks[namespace] | ||||
| 	if !ok { | ||||
| 		return o, nil | ||||
| 	} | ||||
| 	for _, t := range tasks { | ||||
| 		o = append(o, t) | ||||
| 	} | ||||
| 	return o, nil | ||||
| } | ||||
|  | ||||
| // Add a task | ||||
| func (l *TaskList) Add(ctx context.Context, t Task) error { | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return l.AddWithNamespace(namespace, t) | ||||
| } | ||||
|  | ||||
| // AddWithNamespace adds a task with the provided namespace | ||||
| func (l *TaskList) AddWithNamespace(namespace string, t Task) error { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
|  | ||||
| 	id := t.ID() | ||||
| 	if _, ok := l.tasks[namespace]; !ok { | ||||
| 		l.tasks[namespace] = make(map[string]Task) | ||||
| 	} | ||||
| 	if _, ok := l.tasks[namespace][id]; ok { | ||||
| 		return fmt.Errorf("%s: %w", id, ErrTaskAlreadyExists) | ||||
| 	} | ||||
| 	l.tasks[namespace][id] = t | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Delete a task | ||||
| func (l *TaskList) Delete(ctx context.Context, id string) { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
| 	namespace, err := namespaces.NamespaceRequired(ctx) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
| 	tasks, ok := l.tasks[namespace] | ||||
| 	if ok { | ||||
| 		delete(tasks, id) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (l *TaskList) IsEmpty() bool { | ||||
| 	l.mu.Lock() | ||||
| 	defer l.mu.Unlock() | ||||
|  | ||||
| 	for ns := range l.tasks { | ||||
| 		if len(l.tasks[ns]) > 0 { | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return true | ||||
| } | ||||
| @@ -124,7 +124,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { | ||||
| 	r := &Runtime{ | ||||
| 		root:       ic.Root, | ||||
| 		state:      ic.State, | ||||
| 		tasks:      runtime.NewTaskList(), | ||||
| 		tasks:      runtime.NewNSMap[runtime.Task](), | ||||
| 		containers: metadata.NewContainerStore(m.(*metadata.DB)), | ||||
| 		address:    ic.Address, | ||||
| 		events:     ep.(*exchange.Exchange), | ||||
| @@ -148,7 +148,7 @@ type Runtime struct { | ||||
| 	state   string | ||||
| 	address string | ||||
|  | ||||
| 	tasks      *runtime.TaskList | ||||
| 	tasks      *runtime.NSMap[runtime.Task] | ||||
| 	containers containers.Store | ||||
| 	events     *exchange.Exchange | ||||
|  | ||||
|   | ||||
| @@ -48,11 +48,11 @@ type Task struct { | ||||
| 	namespace string | ||||
| 	cg        cgroups.Cgroup | ||||
| 	events    *exchange.Exchange | ||||
| 	tasks     *runtime.TaskList | ||||
| 	tasks     *runtime.NSMap[runtime.Task] | ||||
| 	bundle    *bundle | ||||
| } | ||||
|  | ||||
| func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) { | ||||
| func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.NSMap[runtime.Task], bundle *bundle) (*Task, error) { | ||||
| 	var ( | ||||
| 		err error | ||||
| 		cg  cgroups.Cgroup | ||||
|   | ||||
| @@ -25,7 +25,6 @@ import ( | ||||
| 	"strings" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/containerd/containerd/api/runtime/task/v2" | ||||
| 	"github.com/containerd/containerd/containers" | ||||
| 	"github.com/containerd/containerd/errdefs" | ||||
| 	"github.com/containerd/containerd/events/exchange" | ||||
| @@ -143,7 +142,7 @@ func NewShimManager(ctx context.Context, config *ManagerConfig) (*ShimManager, e | ||||
| 		state:                  config.State, | ||||
| 		containerdAddress:      config.Address, | ||||
| 		containerdTTRPCAddress: config.TTRPCAddress, | ||||
| 		shims:                  runtime.NewTaskList(), | ||||
| 		shims:                  runtime.NewNSMap[ShimInstance](), | ||||
| 		events:                 config.Events, | ||||
| 		containers:             config.Store, | ||||
| 		schedCore:              config.SchedCore, | ||||
| @@ -167,7 +166,7 @@ type ShimManager struct { | ||||
| 	containerdAddress      string | ||||
| 	containerdTTRPCAddress string | ||||
| 	schedCore              bool | ||||
| 	shims                  *runtime.TaskList | ||||
| 	shims                  *runtime.NSMap[ShimInstance] | ||||
| 	events                 *exchange.Exchange | ||||
| 	containers             containers.Store | ||||
| 	// runtimePaths is a cache of `runtime names` -> `resolved fs path` | ||||
| @@ -181,7 +180,7 @@ func (m *ShimManager) ID() string { | ||||
| } | ||||
|  | ||||
| // Start launches a new shim instance | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimProcess, retErr error) { | ||||
| func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { | ||||
| 	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -236,18 +235,11 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// NOTE: temporarily keep this wrapper around until containerd's task service depends on it. | ||||
| 	// This will no longer be required once we migrate to client side task management. | ||||
| 	shimTask := &shimTask{ | ||||
| 		shim: shim, | ||||
| 		task: task.NewTaskClient(shim.client), | ||||
| 	} | ||||
|  | ||||
| 	if err := m.shims.Add(ctx, shimTask); err != nil { | ||||
| 	if err := m.shims.Add(ctx, shim); err != nil { | ||||
| 		return nil, fmt.Errorf("failed to add task: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return shimTask, nil | ||||
| 	return shim, nil | ||||
| } | ||||
|  | ||||
| func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) { | ||||
| @@ -372,29 +364,22 @@ func (m *ShimManager) cleanupShim(shim *shim) { | ||||
| 	dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	_ = shim.delete(dctx) | ||||
| 	_ = shim.Delete(dctx) | ||||
| 	m.shims.Delete(dctx, shim.ID()) | ||||
| } | ||||
|  | ||||
| func (m *ShimManager) Get(ctx context.Context, id string) (ShimProcess, error) { | ||||
| 	proc, err := m.shims.Get(ctx, id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	shimTask := proc.(*shimTask) | ||||
| 	return shimTask, nil | ||||
| func (m *ShimManager) Get(ctx context.Context, id string) (ShimInstance, error) { | ||||
| 	return m.shims.Get(ctx, id) | ||||
| } | ||||
|  | ||||
| // Delete a runtime task | ||||
| func (m *ShimManager) Delete(ctx context.Context, id string) error { | ||||
| 	proc, err := m.shims.Get(ctx, id) | ||||
| 	shim, err := m.shims.Get(ctx, id) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	shimTask := proc.(*shimTask) | ||||
| 	err = shimTask.shim.delete(ctx) | ||||
| 	err = shim.Delete(ctx) | ||||
| 	m.shims.Delete(ctx, id) | ||||
|  | ||||
| 	return err | ||||
| @@ -431,15 +416,15 @@ func (m *TaskManager) ID() string { | ||||
|  | ||||
| // Create launches new shim instance and creates new task | ||||
| func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) { | ||||
| 	process, err := m.manager.Start(ctx, taskID, opts) | ||||
| 	shim, err := m.manager.Start(ctx, taskID, opts) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to start shim: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	// Cast to shim task and call task service to create a new container task instance. | ||||
| 	// This will not be required once shim service / client implemented. | ||||
| 	shim := process.(*shimTask) | ||||
| 	t, err := shim.Create(ctx, opts) | ||||
| 	shimTask := newShimTask(shim) | ||||
| 	t, err := shimTask.Create(ctx, opts) | ||||
| 	if err != nil { | ||||
| 		// NOTE: ctx contains required namespace information. | ||||
| 		m.manager.shims.Delete(ctx, taskID) | ||||
| @@ -448,15 +433,15 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr | ||||
| 		defer cancel() | ||||
|  | ||||
| 		sandboxed := opts.SandboxID != "" | ||||
| 		_, errShim := shim.delete(dctx, sandboxed, func(context.Context, string) {}) | ||||
| 		_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {}) | ||||
| 		if errShim != nil { | ||||
| 			if errdefs.IsDeadlineExceeded(errShim) { | ||||
| 				dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) | ||||
| 				defer cancel() | ||||
| 			} | ||||
|  | ||||
| 			shim.Shutdown(dctx) | ||||
| 			shim.Close() | ||||
| 			shimTask.Shutdown(dctx) | ||||
| 			shimTask.Client().Close() | ||||
| 		} | ||||
|  | ||||
| 		return nil, fmt.Errorf("failed to create shim task: %w", err) | ||||
| @@ -467,17 +452,29 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr | ||||
|  | ||||
| // Get a specific task | ||||
| func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) { | ||||
| 	return m.manager.shims.Get(ctx, id) | ||||
| 	shim, err := m.manager.shims.Get(ctx, id) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newShimTask(shim), nil | ||||
| } | ||||
|  | ||||
| // Tasks lists all tasks | ||||
| func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) { | ||||
| 	return m.manager.shims.GetAll(ctx, all) | ||||
| 	shims, err := m.manager.shims.GetAll(ctx, all) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	out := make([]runtime.Task, len(shims)) | ||||
| 	for i := range shims { | ||||
| 		out[i] = newShimTask(shims[i]) | ||||
| 	} | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| // Delete deletes the task and shim instance | ||||
| func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, error) { | ||||
| 	item, err := m.manager.shims.Get(ctx, taskID) | ||||
| 	shim, err := m.manager.shims.Get(ctx, taskID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -487,8 +484,11 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	sandboxed := container.SandboxID != "" | ||||
| 	shimTask := item.(*shimTask) | ||||
| 	var ( | ||||
| 		sandboxed = container.SandboxID != "" | ||||
| 		shimTask  = newShimTask(shim) | ||||
| 	) | ||||
|  | ||||
| 	exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { | ||||
| 		m.manager.shims.Delete(ctx, id) | ||||
| 	}) | ||||
|   | ||||
| @@ -63,7 +63,7 @@ func loadAddress(path string) (string, error) { | ||||
| 	return string(data), nil | ||||
| } | ||||
|  | ||||
| func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, err error) { | ||||
| func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) { | ||||
| 	address, err := loadAddress(filepath.Join(bundle.Path, "address")) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -117,24 +117,21 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimTask, | ||||
| 			client.Close() | ||||
| 		} | ||||
| 	}() | ||||
| 	s := &shimTask{ | ||||
| 		shim: &shim{ | ||||
| 			bundle: bundle, | ||||
| 			client: client, | ||||
| 		}, | ||||
| 		task: task.NewTaskClient(client), | ||||
| 	shim := &shim{ | ||||
| 		bundle: bundle, | ||||
| 		client: client, | ||||
| 	} | ||||
| 	ctx, cancel := timeout.WithContext(ctx, loadTimeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	// Check connectivity | ||||
| 	// Check connectivity, TaskService is the only required service, so create a temp one to check connection. | ||||
| 	s := newShimTask(shim) | ||||
| 	if _, err := s.PID(ctx); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return s, nil | ||||
| 	return shim, nil | ||||
| } | ||||
|  | ||||
| func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskList, events *exchange.Exchange, binaryCall *binary) { | ||||
| func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.NSMap[ShimInstance], events *exchange.Exchange, binaryCall *binary) { | ||||
| 	ctx = namespaces.WithNamespace(ctx, ns) | ||||
| 	ctx, cancel := timeout.WithContext(ctx, cleanupTimeout) | ||||
| 	defer cancel() | ||||
| @@ -186,10 +183,8 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // ShimProcess represents a shim instance managed by the shim service. | ||||
| type ShimProcess interface { | ||||
| 	runtime.Process | ||||
|  | ||||
| // ShimInstance represents running shim process managed by ShimManager. | ||||
| type ShimInstance interface { | ||||
| 	// ID of the shim. | ||||
| 	ID() string | ||||
| 	// Namespace of this shim. | ||||
| @@ -198,6 +193,8 @@ type ShimProcess interface { | ||||
| 	Bundle() string | ||||
| 	// Client returns the underlying TTRPC client for this shim. | ||||
| 	Client() *ttrpc.Client | ||||
| 	// Delete will close the client and remove bundle from disk. | ||||
| 	Delete(ctx context.Context) error | ||||
| } | ||||
|  | ||||
| type shim struct { | ||||
| @@ -205,6 +202,8 @@ type shim struct { | ||||
| 	client *ttrpc.Client | ||||
| } | ||||
|  | ||||
| var _ ShimInstance = (*shim)(nil) | ||||
|  | ||||
| // ID of the shim/task | ||||
| func (s *shim) ID() string { | ||||
| 	return s.bundle.ID | ||||
| @@ -218,16 +217,16 @@ func (s *shim) Bundle() string { | ||||
| 	return s.bundle.Path | ||||
| } | ||||
|  | ||||
| func (s *shim) Close() error { | ||||
| 	return s.client.Close() | ||||
| func (s *shim) Client() *ttrpc.Client { | ||||
| 	return s.client | ||||
| } | ||||
|  | ||||
| func (s *shim) delete(ctx context.Context) error { | ||||
| func (s *shim) Delete(ctx context.Context) error { | ||||
| 	var ( | ||||
| 		result *multierror.Error | ||||
| 	) | ||||
|  | ||||
| 	if err := s.Close(); err != nil { | ||||
| 	if err := s.client.Close(); err != nil { | ||||
| 		result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) | ||||
| 	} | ||||
|  | ||||
| @@ -247,12 +246,15 @@ var _ runtime.Task = &shimTask{} | ||||
|  | ||||
| // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. | ||||
| type shimTask struct { | ||||
| 	*shim | ||||
| 	ShimInstance | ||||
| 	task task.TaskService | ||||
| } | ||||
|  | ||||
| func (s *shimTask) Client() *ttrpc.Client { | ||||
| 	return s.client | ||||
| func newShimTask(shim ShimInstance) *shimTask { | ||||
| 	return &shimTask{ | ||||
| 		ShimInstance: shim, | ||||
| 		task:         task.NewTaskClient(shim.Client()), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *shimTask) Shutdown(ctx context.Context) error { | ||||
| @@ -319,7 +321,7 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := s.shim.delete(ctx); err != nil { | ||||
| 	if err := s.ShimInstance.Delete(ctx); err != nil { | ||||
| 		log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete shim") | ||||
| 	} | ||||
|  | ||||
| @@ -345,7 +347,7 @@ func (s *shimTask) Create(ctx context.Context, opts runtime.CreateOpts) (runtime | ||||
| 	} | ||||
| 	request := &task.CreateTaskRequest{ | ||||
| 		ID:         s.ID(), | ||||
| 		Bundle:     s.bundle.Path, | ||||
| 		Bundle:     s.Bundle(), | ||||
| 		Stdin:      opts.IO.Stdin, | ||||
| 		Stdout:     opts.IO.Stdout, | ||||
| 		Stderr:     opts.IO.Stderr, | ||||
|   | ||||
| @@ -130,7 +130,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| 				ttrpcAddress: m.containerdTTRPCAddress, | ||||
| 				schedCore:    m.schedCore, | ||||
| 			}) | ||||
| 		shim, err := loadShim(ctx, bundle, func() { | ||||
| 		instance, err := loadShim(ctx, bundle, func() { | ||||
| 			log.G(ctx).WithField("id", id).Info("shim disconnected") | ||||
|  | ||||
| 			cleanupAfterDeadShim(context.Background(), id, ns, m.shims, m.events, binaryCall) | ||||
| @@ -141,6 +141,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| 			cleanupAfterDeadShim(ctx, id, ns, m.shims, m.events, binaryCall) | ||||
| 			continue | ||||
| 		} | ||||
| 		shim := newShimTask(instance) | ||||
|  | ||||
| 		// There are 3 possibilities for the loaded shim here: | ||||
| 		// 1. It could be a shim that is running a task. | ||||
| @@ -159,7 +160,7 @@ func (m *ShimManager) loadShims(ctx context.Context) error { | ||||
| 			// No need to do anything for removeTask since we never added this shim. | ||||
| 			shim.delete(ctx, false, func(ctx context.Context, id string) {}) | ||||
| 		} else { | ||||
| 			m.shims.Add(ctx, shim) | ||||
| 			m.shims.Add(ctx, shim.ShimInstance) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
|   | ||||
| @@ -228,7 +228,7 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc. | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	_, err = rtime.Get(ctx, r.ContainerID) | ||||
| 	if err != nil && err != runtime.ErrTaskNotExists { | ||||
| 	if err != nil && !errdefs.IsNotFound(err) { | ||||
| 		return nil, errdefs.ToGRPC(err) | ||||
| 	} | ||||
| 	if err == nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Fu Wei
					Fu Wei