Wire up client bridges
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
parent
4b1ebef3c5
commit
9e5c207e4c
@ -107,7 +107,10 @@ func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ...
|
||||
return fmt.Errorf("failed to start new sandbox: %w", err)
|
||||
}
|
||||
|
||||
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
|
||||
svc, err := sandbox.NewClient(shim.Client())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var options *anypb.Any
|
||||
if coptions.Options != nil {
|
||||
@ -136,7 +139,11 @@ func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox.
|
||||
return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID)
|
||||
}
|
||||
|
||||
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
|
||||
svc, err := sandbox.NewClient(shim.Client())
|
||||
if err != nil {
|
||||
return sandbox.ControllerInstance{}, err
|
||||
}
|
||||
|
||||
resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID})
|
||||
if err != nil {
|
||||
return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
|
||||
@ -258,6 +265,5 @@ func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI
|
||||
return nil, errdefs.ErrNotFound
|
||||
}
|
||||
|
||||
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
|
||||
return svc, nil
|
||||
return sandbox.NewClient(shim.Client())
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func NewTaskClient(client interface{}) (v2.TaskService, error) {
|
||||
return &grpcBridge{client}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unsupported client type %T", client)
|
||||
return nil, fmt.Errorf("unsupported shim client type %T", client)
|
||||
}
|
||||
|
||||
// grpcBridge implements `v2.TaskService` interface for GRPC shim server.
|
||||
|
@ -425,7 +425,11 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
|
||||
|
||||
// Cast to shim task and call task service to create a new container task instance.
|
||||
// This will not be required once shim service / client implemented.
|
||||
shimTask := newShimTask(shim)
|
||||
shimTask, err := newShimTask(shim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t, err := shimTask.Create(ctx, opts)
|
||||
if err != nil {
|
||||
// NOTE: ctx contains required namespace information.
|
||||
@ -443,7 +447,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
|
||||
}
|
||||
|
||||
shimTask.Shutdown(dctx)
|
||||
shimTask.Client().Close()
|
||||
shimTask.Close()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to create shim task: %w", err)
|
||||
@ -458,7 +462,7 @@ func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newShimTask(shim), nil
|
||||
return newShimTask(shim)
|
||||
}
|
||||
|
||||
// Tasks lists all tasks
|
||||
@ -469,7 +473,11 @@ func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, erro
|
||||
}
|
||||
out := make([]runtime.Task, len(shims))
|
||||
for i := range shims {
|
||||
out[i] = newShimTask(shims[i])
|
||||
newClient, err := newShimTask(shims[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[i] = newClient
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
@ -486,10 +494,12 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
sandboxed = container.SandboxID != ""
|
||||
shimTask = newShimTask(shim)
|
||||
)
|
||||
shimTask, err := newShimTask(shim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sandboxed := container.SandboxID != ""
|
||||
|
||||
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
|
||||
m.manager.shims.Delete(ctx, id)
|
||||
|
@ -121,8 +121,13 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
|
||||
}
|
||||
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Check connectivity, TaskService is the only required service, so create a temp one to check connection.
|
||||
s := newShimTask(shim)
|
||||
s, err := newShimTask(shim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err := s.PID(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -176,6 +181,8 @@ func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[Shim
|
||||
|
||||
// ShimInstance represents running shim process managed by ShimManager.
|
||||
type ShimInstance interface {
|
||||
io.Closer
|
||||
|
||||
// ID of the shim.
|
||||
ID() string
|
||||
// Namespace of this shim.
|
||||
@ -183,7 +190,7 @@ type ShimInstance interface {
|
||||
// Bundle is a file system path to shim's bundle.
|
||||
Bundle() string
|
||||
// Client returns the underlying TTRPC client for this shim.
|
||||
Client() *ttrpc.Client
|
||||
Client() interface{}
|
||||
// Delete will close the client and remove bundle from disk.
|
||||
Delete(ctx context.Context) error
|
||||
}
|
||||
@ -208,10 +215,15 @@ func (s *shim) Bundle() string {
|
||||
return s.bundle.Path
|
||||
}
|
||||
|
||||
func (s *shim) Client() *ttrpc.Client {
|
||||
func (s *shim) Client() interface{} {
|
||||
return s.client
|
||||
}
|
||||
|
||||
// Close closes the underlying client connection.
|
||||
func (s *shim) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
||||
func (s *shim) Delete(ctx context.Context) error {
|
||||
var (
|
||||
result *multierror.Error
|
||||
@ -241,11 +253,16 @@ type shimTask struct {
|
||||
task task.TaskService
|
||||
}
|
||||
|
||||
func newShimTask(shim ShimInstance) *shimTask {
|
||||
func newShimTask(shim ShimInstance) (*shimTask, error) {
|
||||
taskClient, err := NewTaskClient(shim.Client())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &shimTask{
|
||||
ShimInstance: shim,
|
||||
task: task.NewTaskClient(shim.Client()),
|
||||
}
|
||||
task: taskClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *shimTask) Shutdown(ctx context.Context) error {
|
||||
|
@ -144,7 +144,10 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
|
||||
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall)
|
||||
continue
|
||||
}
|
||||
shim := newShimTask(instance)
|
||||
shim, err := newShimTask(instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// There are 3 possibilities for the loaded shim here:
|
||||
// 1. It could be a shim that is running a task.
|
||||
|
Loading…
Reference in New Issue
Block a user