@@ -24,7 +24,6 @@ import (
|
||||
gruntime "runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/events/exchange"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
@@ -36,14 +35,12 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
|
||||
func shimBinary(bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string) *binary {
|
||||
return &binary{
|
||||
bundle: bundle,
|
||||
runtime: runtime,
|
||||
containerdAddress: containerdAddress,
|
||||
containerdTTRPCAddress: containerdTTRPCAddress,
|
||||
events: events,
|
||||
rtTasks: rt,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,8 +49,6 @@ type binary struct {
|
||||
containerdAddress string
|
||||
containerdTTRPCAddress string
|
||||
bundle *Bundle
|
||||
events *exchange.Exchange
|
||||
rtTasks *runtime.TaskList
|
||||
}
|
||||
|
||||
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
|
||||
@@ -123,11 +118,9 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
}
|
||||
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
|
||||
return &shim{
|
||||
bundle: b.bundle,
|
||||
client: client,
|
||||
task: task.NewTaskClient(client),
|
||||
events: b.events,
|
||||
rtTasks: b.rtTasks,
|
||||
bundle: b.bundle,
|
||||
client: client,
|
||||
task: task.NewTaskClient(client),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -168,7 +168,7 @@ func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string,
|
||||
topts = opts.RuntimeOptions
|
||||
}
|
||||
|
||||
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||
b := shimBinary(bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress)
|
||||
shim, err := b.Start(ctx, topts, func() {
|
||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||
|
||||
@@ -191,7 +191,7 @@ func (m *TaskManager) deleteShim(shim *shim) {
|
||||
dctx, cancel := timeout.WithContext(context.Background(), cleanupTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, errShim := shim.Delete(dctx)
|
||||
_, errShim := shim.delete(dctx, m.tasks.Delete)
|
||||
if errShim != nil {
|
||||
if errdefs.IsDeadlineExceeded(errShim) {
|
||||
dctx, cancel = timeout.WithContext(context.Background(), cleanupTimeout)
|
||||
@@ -213,8 +213,19 @@ func (m *TaskManager) Add(ctx context.Context, task runtime.Task) error {
|
||||
}
|
||||
|
||||
// Delete a runtime task
|
||||
func (m *TaskManager) Delete(ctx context.Context, id string) {
|
||||
m.tasks.Delete(ctx, id)
|
||||
func (m *TaskManager) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
|
||||
task, err := m.tasks.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shim := task.(*shim)
|
||||
exit, err := shim.delete(ctx, m.tasks.Delete)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return exit, err
|
||||
}
|
||||
|
||||
// Tasks lists all tasks
|
||||
@@ -293,8 +304,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
|
||||
bundle.Delete()
|
||||
continue
|
||||
}
|
||||
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
|
||||
binaryCall := shimBinary(bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress)
|
||||
shim, err := loadShim(ctx, bundle, func() {
|
||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||
|
||||
cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, binaryCall)
|
||||
|
||||
@@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) {
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList, onClose func()) (_ *shim, err error) {
|
||||
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ *shim, err error) {
|
||||
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
|
||||
}
|
||||
}()
|
||||
s := &shim{
|
||||
client: client,
|
||||
task: task.NewTaskClient(client),
|
||||
bundle: bundle,
|
||||
events: events,
|
||||
rtTasks: rt,
|
||||
client: client,
|
||||
task: task.NewTaskClient(client),
|
||||
bundle: bundle,
|
||||
}
|
||||
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
|
||||
defer cancel()
|
||||
if err := s.Connect(ctx); err != nil {
|
||||
|
||||
// Check connectivity
|
||||
if _, err := s.PID(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
@@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
|
||||
var _ runtime.Task = &shim{}
|
||||
|
||||
type shim struct {
|
||||
bundle *Bundle
|
||||
client *ttrpc.Client
|
||||
task task.TaskService
|
||||
taskPid int
|
||||
events *exchange.Exchange
|
||||
rtTasks *runtime.TaskList
|
||||
}
|
||||
|
||||
func (s *shim) Connect(ctx context.Context) error {
|
||||
response, err := s.task.Connect(ctx, &task.ConnectRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.taskPid = int(response.TaskPid)
|
||||
return nil
|
||||
bundle *Bundle
|
||||
client *ttrpc.Client
|
||||
task task.TaskService
|
||||
}
|
||||
|
||||
func (s *shim) Shutdown(ctx context.Context) error {
|
||||
@@ -227,8 +213,15 @@ func (s *shim) ID() string {
|
||||
}
|
||||
|
||||
// PID of the task
|
||||
func (s *shim) PID() uint32 {
|
||||
return uint32(s.taskPid)
|
||||
func (s *shim) PID(ctx context.Context) (uint32, error) {
|
||||
response, err := s.task.Connect(ctx, &task.ConnectRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
if err != nil {
|
||||
return 0, errdefs.FromGRPC(err)
|
||||
}
|
||||
|
||||
return response.TaskPid, nil
|
||||
}
|
||||
|
||||
func (s *shim) Namespace() string {
|
||||
@@ -239,7 +232,7 @@ func (s *shim) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
||||
func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
func (s *shim) delete(ctx context.Context, removeTask func(ctx context.Context, id string)) (*runtime.Exit, error) {
|
||||
response, shimErr := s.task.Delete(ctx, &task.DeleteRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
@@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
// So we should remove the record and prevent duplicate events from
|
||||
// ttrpc-callback-on-close.
|
||||
if shimErr == nil {
|
||||
s.rtTasks.Delete(ctx, s.ID())
|
||||
removeTask(ctx, s.ID())
|
||||
}
|
||||
|
||||
if err := s.waitShutdown(ctx); err != nil {
|
||||
@@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
|
||||
// remove self from the runtime task list
|
||||
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
|
||||
s.rtTasks.Delete(ctx, s.ID())
|
||||
removeTask(ctx, s.ID())
|
||||
if err := s.bundle.Delete(); err != nil {
|
||||
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
|
||||
}
|
||||
@@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
|
||||
Options: m.Options,
|
||||
})
|
||||
}
|
||||
response, err := s.task.Create(ctx, request)
|
||||
|
||||
_, err := s.task.Create(ctx, request)
|
||||
if err != nil {
|
||||
return nil, errdefs.FromGRPC(err)
|
||||
}
|
||||
s.taskPid = int(response.Pid)
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *shim) Start(ctx context.Context) error {
|
||||
response, err := s.task.Start(ctx, &task.StartRequest{
|
||||
_, err := s.task.Start(ctx, &task.StartRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
if err != nil {
|
||||
return errdefs.FromGRPC(err)
|
||||
}
|
||||
s.taskPid = int(response.Pid)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
|
||||
func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
|
||||
if err := identifiers.Validate(id); err != nil {
|
||||
return nil, errors.Wrapf(err, "invalid exec id %s", id)
|
||||
}
|
||||
@@ -422,6 +415,10 @@ func (s *shim) CloseIO(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
|
||||
taskPid, err := s.PID(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response, err := s.task.Wait(ctx, &task.WaitRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
@@ -429,7 +426,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
|
||||
return nil, errdefs.FromGRPC(err)
|
||||
}
|
||||
return &runtime.Exit{
|
||||
Pid: uint32(s.taskPid),
|
||||
Pid: taskPid,
|
||||
Timestamp: response.ExitedAt,
|
||||
Status: response.ExitStatus,
|
||||
}, nil
|
||||
@@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
|
||||
return response.Stats, nil
|
||||
}
|
||||
|
||||
func (s *shim) Process(ctx context.Context, id string) (runtime.Process, error) {
|
||||
func (s *shim) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
|
||||
p := &process{
|
||||
id: id,
|
||||
shim: s,
|
||||
|
||||
Reference in New Issue
Block a user