Cleanup v2 shim

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-08-04 10:38:05 -07:00
parent 4282ec12cd
commit d30d897ef9
8 changed files with 107 additions and 67 deletions

View File

@ -72,5 +72,5 @@ type PlatformRuntime interface {
// Add adds a task into runtime. // Add adds a task into runtime.
Add(ctx context.Context, task Task) error Add(ctx context.Context, task Task) error
// Delete remove a task. // Delete remove a task.
Delete(ctx context.Context, taskID string) Delete(ctx context.Context, taskID string) (*Exit, error)
} }

View File

@ -47,6 +47,14 @@ type Process interface {
Start(ctx context.Context) error Start(ctx context.Context) error
// Wait for the process to exit // Wait for the process to exit
Wait(ctx context.Context) (*Exit, error) Wait(ctx context.Context) (*Exit, error)
}
// ExecProcess is a process spawned in container via Task.Exec call.
// The only difference from a regular `Process` is that exec process can delete self,
// while task process requires slightly more complex logic and needs to be deleted through the task manager.
type ExecProcess interface {
Process
// Delete deletes the process // Delete deletes the process
Delete(ctx context.Context) (*Exit, error) Delete(ctx context.Context) (*Exit, error)
} }
@ -56,7 +64,7 @@ type Task interface {
Process Process
// PID of the process // PID of the process
PID() uint32 PID(ctx context.Context) (uint32, error)
// Namespace that the task exists in // Namespace that the task exists in
Namespace() string Namespace() string
// Pause pauses the container process // Pause pauses the container process
@ -64,7 +72,7 @@ type Task interface {
// Resume unpauses the container process // Resume unpauses the container process
Resume(ctx context.Context) error Resume(ctx context.Context) error
// Exec adds a process into the container // Exec adds a process into the container
Exec(ctx context.Context, id string, opts ExecOpts) (Process, error) Exec(ctx context.Context, id string, opts ExecOpts) (ExecProcess, error)
// Pids returns all pids // Pids returns all pids
Pids(ctx context.Context) ([]ProcessInfo, error) Pids(ctx context.Context) ([]ProcessInfo, error)
// Checkpoint checkpoints a container to an image with live system data // Checkpoint checkpoints a container to an image with live system data
@ -72,7 +80,7 @@ type Task interface {
// Update sets the provided resources to a running task // Update sets the provided resources to a running task
Update(ctx context.Context, resources *types.Any, annotations map[string]string) error Update(ctx context.Context, resources *types.Any, annotations map[string]string) error
// Process returns a process within the task for the provided id // Process returns a process within the task for the provided id
Process(ctx context.Context, id string) (Process, error) Process(ctx context.Context, id string) (ExecProcess, error)
// Stats returns runtime specific metrics for a task // Stats returns runtime specific metrics for a task
Stats(ctx context.Context) (*types.Any, error) Stats(ctx context.Context) (*types.Any, error)
} }

View File

@ -43,8 +43,8 @@ import (
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
v1 "github.com/containerd/containerd/runtime/v1" v1 "github.com/containerd/containerd/runtime/v1"
shim "github.com/containerd/containerd/runtime/v1/shim/v1" "github.com/containerd/containerd/runtime/v1/shim/v1"
runc "github.com/containerd/go-runc" "github.com/containerd/go-runc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -315,8 +315,20 @@ func (r *Runtime) Add(ctx context.Context, task runtime.Task) error {
} }
// Delete a runtime task // Delete a runtime task
func (r *Runtime) Delete(ctx context.Context, id string) { func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := r.tasks.Get(ctx, id)
if err != nil {
return nil, err
}
s := task.(*Task)
exit, err := s.Delete(ctx)
if err != nil {
return nil, err
}
r.tasks.Delete(ctx, id) r.tasks.Delete(ctx, id)
return exit, nil
} }
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {

View File

@ -85,8 +85,8 @@ func (t *Task) Namespace() string {
} }
// PID of the task // PID of the task
func (t *Task) PID() uint32 { func (t *Task) PID(_ctx context.Context) (uint32, error) {
return uint32(t.pid) return uint32(t.pid), nil
} }
// Delete the task and return the exit status // Delete the task and return the exit status
@ -226,7 +226,7 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
} }
// Exec creates a new process inside the task // Exec creates a new process inside the task
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil { if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id") return nil, errors.Wrapf(err, "invalid exec id")
} }
@ -316,7 +316,7 @@ func (t *Task) Update(ctx context.Context, resources *types.Any, _ map[string]st
} }
// Process returns a specific process inside the task by the process id // Process returns a specific process inside the task by the process id
func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) { func (t *Task) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &Process{ p := &Process{
id: id, id: id,
t: t, t: t,

View File

@ -24,7 +24,6 @@ import (
gruntime "runtime" gruntime "runtime"
"strings" "strings"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
@ -36,14 +35,12 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary { func shimBinary(bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string) *binary {
return &binary{ return &binary{
bundle: bundle, bundle: bundle,
runtime: runtime, runtime: runtime,
containerdAddress: containerdAddress, containerdAddress: containerdAddress,
containerdTTRPCAddress: containerdTTRPCAddress, containerdTTRPCAddress: containerdTTRPCAddress,
events: events,
rtTasks: rt,
} }
} }
@ -52,8 +49,6 @@ type binary struct {
containerdAddress string containerdAddress string
containerdTTRPCAddress string containerdTTRPCAddress string
bundle *Bundle bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
} }
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
@ -123,11 +118,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
} }
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
return &shim{ return &shim{
bundle: b.bundle, bundle: b.bundle,
client: client, client: client,
task: task.NewTaskClient(client), task: task.NewTaskClient(client),
events: b.events,
rtTasks: b.rtTasks,
}, nil }, nil
} }

View File

@ -162,7 +162,7 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
topts = opts.RuntimeOptions topts = opts.RuntimeOptions
} }
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) b := shimBinary(bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress)
shim, err := b.Start(ctx, topts, func() { shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
@ -185,7 +185,7 @@ func (m *TaskManager) deleteShim(shim *shim) {
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout) dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
defer cancel() defer cancel()
_, errShim := shim.Delete(dctx) _, errShim := shim.delete(dctx, m.tasks.Delete)
if errShim != nil { if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) { if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout) dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
@ -207,8 +207,19 @@ func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
} }
// Delete a runtime task // Delete a runtime task
func (m *TaskManager) Delete(ctx context.Context, id string) { func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
m.tasks.Delete(ctx, id) task, err := m.tasks.Get(ctx, id)
if err != nil {
return nil, err
}
shim := task.(*shim)
exit, err := shim.delete(ctx, m.tasks.Delete)
if err != nil {
return nil, err
}
return exit, err
} }
// Tasks lists all tasks // Tasks lists all tasks
@ -287,8 +298,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
bundle.Delete() bundle.Delete()
continue continue
} }
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) binaryCall := shimBinary(bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress)
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { shim, err := loadShim(ctx, bundle, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall) cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)

View File

@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) {
return string(data), nil return string(data), nil
} }
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) { func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) {
address, err := loadAddress(filepath.Join(bundle.Path, "address")) address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil { if err != nil {
return nil, err return nil, err
@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
} }
}() }()
s := &shim{ s := &shim{
client: client, client: client,
task: task.NewTaskClient(client), task: task.NewTaskClient(client),
bundle: bundle, bundle: bundle,
events: events,
rtTasks: rt,
} }
ctx, cancel := timeout.WithContext(ctx, loadTimeout) ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel() defer cancel()
if err := s.Connect(ctx); err != nil {
// Check connectivity
if _, err := s.PID(ctx); err != nil {
return nil, err return nil, err
} }
return s, nil return s, nil
@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
var _ runtime.Task = &shim{} var _ runtime.Task = &shim{}
type shim struct { type shim struct {
bundle *Bundle bundle *Bundle
client *ttrpc.Client client *ttrpc.Client
task task.TaskService task task.TaskService
taskPid int
events *exchange.Exchange
rtTasks *runtime.TaskList
}
func (s *shim) Connect(ctx context.Context) error {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return err
}
s.taskPid = int(response.TaskPid)
return nil
} }
func (s *shim) Shutdown(ctx context.Context) error { func (s *shim) Shutdown(ctx context.Context) error {
@ -227,8 +213,15 @@ func (s *shim) ID() string {
} }
// PID of the task // PID of the task
func (s *shim) PID() uint32 { func (s *shim) PID(ctx context.Context) (uint32, error) {
return uint32(s.taskPid) response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
} }
func (s *shim) Namespace() string { func (s *shim) Namespace() string {
@ -239,7 +232,7 @@ func (s *shim) Close() error {
return s.client.Close() return s.client.Close()
} }
func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) { func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{ response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
// So we should remove the record and prevent duplicate events from // So we should remove the record and prevent duplicate events from
// ttrpc-callback-on-close. // ttrpc-callback-on-close.
if shimErr == nil { if shimErr == nil {
s.rtTasks.Delete(ctx, s.ID()) removeTask(ctx, s.ID())
} }
if err := s.waitShutdown(ctx); err != nil { if err := s.waitShutdown(ctx); err != nil {
@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
// remove self from the runtime task list // remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service // this seems dirty but it cleans up the API across runtimes, tasks, and the service
s.rtTasks.Delete(ctx, s.ID()) removeTask(ctx, s.ID())
if err := s.bundle.Delete(); err != nil { if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
} }
@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
Options: m.Options, Options: m.Options,
}) })
} }
response, err := s.task.Create(ctx, request)
_, err := s.task.Create(ctx, request)
if err != nil { if err != nil {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
s.taskPid = int(response.Pid)
return s, nil return s, nil
} }
@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error {
} }
func (s *shim) Start(ctx context.Context) error { func (s *shim) Start(ctx context.Context) error {
response, err := s.task.Start(ctx, &task.StartRequest{ _, err := s.task.Start(ctx, &task.StartRequest{
ID: s.ID(), ID: s.ID(),
}) })
if err != nil { if err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
s.taskPid = int(response.Pid)
return nil return nil
} }
@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
return nil return nil
} }
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil { if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id %s", id) return nil, errors.Wrapf(err, "invalid exec id %s", id)
} }
@ -422,6 +415,10 @@ func (s *shim) CloseIO(ctx context.Context) error {
} }
func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
taskPid, err := s.PID(ctx)
if err != nil {
return nil, err
}
response, err := s.task.Wait(ctx, &task.WaitRequest{ response, err := s.task.Wait(ctx, &task.WaitRequest{
ID: s.ID(), ID: s.ID(),
}) })
@ -429,7 +426,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
return &runtime.Exit{ return &runtime.Exit{
Pid: uint32(s.taskPid), Pid: taskPid,
Timestamp: response.ExitedAt, Timestamp: response.ExitedAt,
Status: response.ExitStatus, Status: response.ExitStatus,
}, nil }, nil
@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
return response.Stats, nil return response.Stats, nil
} }
func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) { func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &process{ p := &process{
id: id, id: id,
shim: s, shim: s,

View File

@ -215,9 +215,13 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
if err := l.monitor.Monitor(c, labels); err != nil { if err := l.monitor.Monitor(c, labels); err != nil {
return nil, errors.Wrap(err, "monitor task") return nil, errors.Wrap(err, "monitor task")
} }
pid, err := c.PID(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get task pid")
}
return &api.CreateTaskResponse{ return &api.CreateTaskResponse{
ContainerID: r.ContainerID, ContainerID: r.ContainerID,
Pid: c.PID(), Pid: pid,
}, nil }, nil
} }
@ -245,17 +249,32 @@ func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOp
} }
func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) { func (l *local) Delete(ctx context.Context, r *api.DeleteTaskRequest, _ ...grpc.CallOption) (*api.DeleteResponse, error) {
t, err := l.getTask(ctx, r.ContainerID) container, err := l.getContainer(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Find runtime manager
rtime, err := l.getRuntime(container.Runtime.Name)
if err != nil {
return nil, err
}
// Get task object
t, err := rtime.Get(ctx, container.ID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "task %v not found", container.ID)
}
if err := l.monitor.Stop(t); err != nil { if err := l.monitor.Stop(t); err != nil {
return nil, err return nil, err
} }
exit, err := t.Delete(ctx)
exit, err := rtime.Delete(ctx, r.ContainerID)
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
return &api.DeleteResponse{ return &api.DeleteResponse{
ExitStatus: exit.Status, ExitStatus: exit.Status,
ExitedAt: exit.Timestamp, ExitedAt: exit.Timestamp,