From 9e5c207e4cc78841248e6a1e8fcca0420514d8a3 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Sun, 5 Feb 2023 12:28:44 -0800 Subject: [PATCH] Wire up client bridges Signed-off-by: Maksym Pavlenko --- plugins/sandbox/controller.go | 14 ++++++++++---- runtime/v2/bridge.go | 2 +- runtime/v2/manager.go | 26 ++++++++++++++++++-------- runtime/v2/shim.go | 29 +++++++++++++++++++++++------ runtime/v2/shim_load.go | 5 ++++- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/plugins/sandbox/controller.go b/plugins/sandbox/controller.go index d2483fcaa..f1bb69ca6 100644 --- a/plugins/sandbox/controller.go +++ b/plugins/sandbox/controller.go @@ -107,7 +107,10 @@ func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ... return fmt.Errorf("failed to start new sandbox: %w", err) } - svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client()) + svc, err := sandbox.NewClient(shim.Client()) + if err != nil { + return err + } var options *anypb.Any if coptions.Options != nil { @@ -136,7 +139,11 @@ func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox. return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID) } - svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client()) + svc, err := sandbox.NewClient(shim.Client()) + if err != nil { + return sandbox.ControllerInstance{}, err + } + resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID}) if err != nil { return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err)) @@ -258,6 +265,5 @@ func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI return nil, errdefs.ErrNotFound } - svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client()) - return svc, nil + return sandbox.NewClient(shim.Client()) } diff --git a/runtime/v2/bridge.go b/runtime/v2/bridge.go index 29cf8d043..9d93a83a0 100644 --- a/runtime/v2/bridge.go +++ b/runtime/v2/bridge.go @@ -47,7 +47,7 @@ func NewTaskClient(client interface{}) (v2.TaskService, error) { return &grpcBridge{client}, nil } - return nil, fmt.Errorf("unsupported client type %T", client) + return nil, fmt.Errorf("unsupported shim client type %T", client) } // grpcBridge implements `v2.TaskService` interface for GRPC shim server. diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 909023606..73e1af771 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -425,7 +425,11 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr // Cast to shim task and call task service to create a new container task instance. // This will not be required once shim service / client implemented. - shimTask := newShimTask(shim) + shimTask, err := newShimTask(shim) + if err != nil { + return nil, err + } + t, err := shimTask.Create(ctx, opts) if err != nil { // NOTE: ctx contains required namespace information. @@ -443,7 +447,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr } shimTask.Shutdown(dctx) - shimTask.Client().Close() + shimTask.Close() } return nil, fmt.Errorf("failed to create shim task: %w", err) @@ -458,7 +462,7 @@ func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error) if err != nil { return nil, err } - return newShimTask(shim), nil + return newShimTask(shim) } // Tasks lists all tasks @@ -469,7 +473,11 @@ func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, erro } out := make([]runtime.Task, len(shims)) for i := range shims { - out[i] = newShimTask(shims[i]) + newClient, err := newShimTask(shims[i]) + if err != nil { + return nil, err + } + out[i] = newClient } return out, nil } @@ -486,10 +494,12 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit, return nil, err } - var ( - sandboxed = container.SandboxID != "" - shimTask = newShimTask(shim) - ) + shimTask, err := newShimTask(shim) + if err != nil { + return nil, err + } + + sandboxed := container.SandboxID != "" exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) { m.manager.shims.Delete(ctx, id) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index b3db3c799..a7741bfde 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -121,8 +121,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan } ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() + // Check connectivity, TaskService is the only required service, so create a temp one to check connection. - s := newShimTask(shim) + s, err := newShimTask(shim) + if err != nil { + return nil, err + } + if _, err := s.PID(ctx); err != nil { return nil, err } @@ -176,6 +181,8 @@ func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[Shim // ShimInstance represents running shim process managed by ShimManager. type ShimInstance interface { + io.Closer + // ID of the shim. ID() string // Namespace of this shim. @@ -183,7 +190,7 @@ type ShimInstance interface { // Bundle is a file system path to shim's bundle. Bundle() string // Client returns the underlying TTRPC client for this shim. - Client() *ttrpc.Client + Client() interface{} // Delete will close the client and remove bundle from disk. Delete(ctx context.Context) error } @@ -208,10 +215,15 @@ func (s *shim) Bundle() string { return s.bundle.Path } -func (s *shim) Client() *ttrpc.Client { +func (s *shim) Client() interface{} { return s.client } +// Close closes the underlying client connection. +func (s *shim) Close() error { + return s.client.Close() +} + func (s *shim) Delete(ctx context.Context) error { var ( result *multierror.Error @@ -241,11 +253,16 @@ type shimTask struct { task task.TaskService } -func newShimTask(shim ShimInstance) *shimTask { +func newShimTask(shim ShimInstance) (*shimTask, error) { + taskClient, err := NewTaskClient(shim.Client()) + if err != nil { + return nil, err + } + return &shimTask{ ShimInstance: shim, - task: task.NewTaskClient(shim.Client()), - } + task: taskClient, + }, nil } func (s *shimTask) Shutdown(ctx context.Context) error { diff --git a/runtime/v2/shim_load.go b/runtime/v2/shim_load.go index 3360897ef..7214bfc6a 100644 --- a/runtime/v2/shim_load.go +++ b/runtime/v2/shim_load.go @@ -144,7 +144,10 @@ func (m *ShimManager) loadShims(ctx context.Context) error { cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall) continue } - shim := newShimTask(instance) + shim, err := newShimTask(instance) + if err != nil { + return err + } // There are 3 possibilities for the loaded shim here: // 1. It could be a shim that is running a task.