diff --git a/runtime/v2/bridge.go b/runtime/v2/bridge.go index 4075d6c2d..6262e0efd 100644 --- a/runtime/v2/bridge.go +++ b/runtime/v2/bridge.go @@ -20,14 +20,38 @@ import ( "context" "fmt" + "github.com/containerd/ttrpc" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" v2 "github.com/containerd/containerd/api/runtime/task/v2" v3 "github.com/containerd/containerd/api/runtime/task/v3" - "github.com/containerd/ttrpc" + + api "github.com/containerd/containerd/api/runtime/task/v3" // Current version used by TaskServiceClient ) +// TaskServiceClient exposes a client interface to shims, which aims to hide +// the underlying complexity and backward compatibility (v2 task service vs v3, TTRPC vs GRPC, etc). +type TaskServiceClient interface { + State(context.Context, *api.StateRequest) (*api.StateResponse, error) + Create(context.Context, *api.CreateTaskRequest) (*api.CreateTaskResponse, error) + Start(context.Context, *api.StartRequest) (*api.StartResponse, error) + Delete(context.Context, *api.DeleteRequest) (*api.DeleteResponse, error) + Pids(context.Context, *api.PidsRequest) (*api.PidsResponse, error) + Pause(context.Context, *api.PauseRequest) (*emptypb.Empty, error) + Resume(context.Context, *api.ResumeRequest) (*emptypb.Empty, error) + Checkpoint(context.Context, *api.CheckpointTaskRequest) (*emptypb.Empty, error) + Kill(context.Context, *api.KillRequest) (*emptypb.Empty, error) + Exec(context.Context, *api.ExecProcessRequest) (*emptypb.Empty, error) + ResizePty(context.Context, *api.ResizePtyRequest) (*emptypb.Empty, error) + CloseIO(context.Context, *api.CloseIORequest) (*emptypb.Empty, error) + Update(context.Context, *api.UpdateTaskRequest) (*emptypb.Empty, error) + Wait(context.Context, *api.WaitRequest) (*api.WaitResponse, error) + Stats(context.Context, *api.StatsRequest) (*api.StatsResponse, error) + Connect(context.Context, *api.ConnectRequest) (*api.ConnectResponse, error) + Shutdown(context.Context, *api.ShutdownRequest) (*emptypb.Empty, error) +} + // NewTaskClient returns a new task client interface which handles both GRPC and TTRPC servers depending on the // client object type passed in. // @@ -35,33 +59,47 @@ import ( // - *ttrpc.Client // - grpc.ClientConnInterface // -// In 1.7 we support TaskService v2 (for backward compatibility with existing shims) and GRPC TaskService v3. -// In 2.0 we'll switch to TaskService v3 only for both TTRPC and GRPC, which will remove overhead of mapping v2 structs to v3 structs. -func NewTaskClient(client interface{}) (v2.TaskService, error) { +// Currently supported servers: +// - TTRPC v2 (compatibility with shims before 2.0) +// - TTRPC v3 +// - GRPC v3 +func NewTaskClient(client interface{}, version int) (TaskServiceClient, error) { switch c := client.(type) { case *ttrpc.Client: - return v2.NewTaskClient(c), nil + switch version { + case 2: + return &ttrpcV2Bridge{client: v2.NewTaskClient(c)}, nil + case 3: + return v3.NewTTRPCTaskClient(c), nil + default: + return nil, fmt.Errorf("containerd client supports only v2 and v3 TTRPC task client (got %d)", version) + } + case grpc.ClientConnInterface: - return &grpcBridge{v3.NewTaskClient(c)}, nil + if version != 3 { + return nil, fmt.Errorf("containerd client supports only v3 GRPC task service (got %d)", version) + } + + return &grpcV3Bridge{v3.NewTaskClient(c)}, nil default: return nil, fmt.Errorf("unsupported shim client type %T", c) } } -// grpcBridge implements `v2.TaskService` interface for GRPC shim server. -type grpcBridge struct { - client v3.TaskClient +// ttrpcV2Bridge is a bridge from TTRPC v2 task service. +type ttrpcV2Bridge struct { + client v2.TaskService } -var _ v2.TaskService = (*grpcBridge)(nil) +var _ TaskServiceClient = (*ttrpcV2Bridge)(nil) -func (g *grpcBridge) State(ctx context.Context, request *v2.StateRequest) (*v2.StateResponse, error) { - resp, err := g.client.State(ctx, &v3.StateRequest{ +func (b *ttrpcV2Bridge) State(ctx context.Context, request *api.StateRequest) (*api.StateResponse, error) { + resp, err := b.client.State(ctx, &v2.StateRequest{ ID: request.GetID(), ExecID: request.GetExecID(), }) - return &v2.StateResponse{ + return &v3.StateResponse{ ID: resp.GetID(), Bundle: resp.GetBundle(), Pid: resp.GetPid(), @@ -76,8 +114,8 @@ func (g *grpcBridge) State(ctx context.Context, request *v2.StateRequest) (*v2.S }, err } -func (g *grpcBridge) Create(ctx context.Context, request *v2.CreateTaskRequest) (*v2.CreateTaskResponse, error) { - resp, err := g.client.Create(ctx, &v3.CreateTaskRequest{ +func (b *ttrpcV2Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) { + resp, err := b.client.Create(ctx, &v2.CreateTaskRequest{ ID: request.GetID(), Bundle: request.GetBundle(), Rootfs: request.GetRootfs(), @@ -90,54 +128,54 @@ func (g *grpcBridge) Create(ctx context.Context, request *v2.CreateTaskRequest) Options: request.GetOptions(), }) - return &v2.CreateTaskResponse{Pid: resp.GetPid()}, err + return &api.CreateTaskResponse{Pid: resp.GetPid()}, err } -func (g *grpcBridge) Start(ctx context.Context, request *v2.StartRequest) (*v2.StartResponse, error) { - resp, err := g.client.Start(ctx, &v3.StartRequest{ +func (b *ttrpcV2Bridge) Start(ctx context.Context, request *api.StartRequest) (*api.StartResponse, error) { + resp, err := b.client.Start(ctx, &v2.StartRequest{ ID: request.GetID(), ExecID: request.GetExecID(), }) - return &v2.StartResponse{Pid: resp.GetPid()}, err + return &api.StartResponse{Pid: resp.GetPid()}, err } -func (g *grpcBridge) Delete(ctx context.Context, request *v2.DeleteRequest) (*v2.DeleteResponse, error) { - resp, err := g.client.Delete(ctx, &v3.DeleteRequest{ +func (b *ttrpcV2Bridge) Delete(ctx context.Context, request *api.DeleteRequest) (*api.DeleteResponse, error) { + resp, err := b.client.Delete(ctx, &v2.DeleteRequest{ ID: request.GetID(), ExecID: request.GetExecID(), }) - return &v2.DeleteResponse{ + return &api.DeleteResponse{ Pid: resp.GetPid(), ExitStatus: resp.GetExitStatus(), ExitedAt: resp.GetExitedAt(), }, err } -func (g *grpcBridge) Pids(ctx context.Context, request *v2.PidsRequest) (*v2.PidsResponse, error) { - resp, err := g.client.Pids(ctx, &v3.PidsRequest{ID: request.GetID()}) - return &v2.PidsResponse{Processes: resp.GetProcesses()}, err +func (b *ttrpcV2Bridge) Pids(ctx context.Context, request *api.PidsRequest) (*api.PidsResponse, error) { + resp, err := b.client.Pids(ctx, &v2.PidsRequest{ID: request.GetID()}) + return &api.PidsResponse{Processes: resp.GetProcesses()}, err } -func (g *grpcBridge) Pause(ctx context.Context, request *v2.PauseRequest) (*emptypb.Empty, error) { - return g.client.Pause(ctx, &v3.PauseRequest{ID: request.GetID()}) +func (b *ttrpcV2Bridge) Pause(ctx context.Context, request *api.PauseRequest) (*emptypb.Empty, error) { + return b.client.Pause(ctx, &v2.PauseRequest{ID: request.GetID()}) } -func (g *grpcBridge) Resume(ctx context.Context, request *v2.ResumeRequest) (*emptypb.Empty, error) { - return g.client.Resume(ctx, &v3.ResumeRequest{ID: request.GetID()}) +func (b *ttrpcV2Bridge) Resume(ctx context.Context, request *api.ResumeRequest) (*emptypb.Empty, error) { + return b.client.Resume(ctx, &v2.ResumeRequest{ID: request.GetID()}) } -func (g *grpcBridge) Checkpoint(ctx context.Context, request *v2.CheckpointTaskRequest) (*emptypb.Empty, error) { - return g.client.Checkpoint(ctx, &v3.CheckpointTaskRequest{ +func (b *ttrpcV2Bridge) Checkpoint(ctx context.Context, request *api.CheckpointTaskRequest) (*emptypb.Empty, error) { + return b.client.Checkpoint(ctx, &v2.CheckpointTaskRequest{ ID: request.GetID(), Path: request.GetPath(), Options: request.GetOptions(), }) } -func (g *grpcBridge) Kill(ctx context.Context, request *v2.KillRequest) (*emptypb.Empty, error) { - return g.client.Kill(ctx, &v3.KillRequest{ +func (b *ttrpcV2Bridge) Kill(ctx context.Context, request *api.KillRequest) (*emptypb.Empty, error) { + return b.client.Kill(ctx, &v2.KillRequest{ ID: request.GetID(), ExecID: request.GetExecID(), Signal: request.GetSignal(), @@ -145,8 +183,8 @@ func (g *grpcBridge) Kill(ctx context.Context, request *v2.KillRequest) (*emptyp }) } -func (g *grpcBridge) Exec(ctx context.Context, request *v2.ExecProcessRequest) (*emptypb.Empty, error) { - return g.client.Exec(ctx, &v3.ExecProcessRequest{ +func (b *ttrpcV2Bridge) Exec(ctx context.Context, request *api.ExecProcessRequest) (*emptypb.Empty, error) { + return b.client.Exec(ctx, &v2.ExecProcessRequest{ ID: request.GetID(), ExecID: request.GetExecID(), Terminal: request.GetTerminal(), @@ -157,8 +195,8 @@ func (g *grpcBridge) Exec(ctx context.Context, request *v2.ExecProcessRequest) ( }) } -func (g *grpcBridge) ResizePty(ctx context.Context, request *v2.ResizePtyRequest) (*emptypb.Empty, error) { - return g.client.ResizePty(ctx, &v3.ResizePtyRequest{ +func (b *ttrpcV2Bridge) ResizePty(ctx context.Context, request *api.ResizePtyRequest) (*emptypb.Empty, error) { + return b.client.ResizePty(ctx, &v2.ResizePtyRequest{ ID: request.GetID(), ExecID: request.GetExecID(), Width: request.GetWidth(), @@ -166,52 +204,128 @@ func (g *grpcBridge) ResizePty(ctx context.Context, request *v2.ResizePtyRequest }) } -func (g *grpcBridge) CloseIO(ctx context.Context, request *v2.CloseIORequest) (*emptypb.Empty, error) { - return g.client.CloseIO(ctx, &v3.CloseIORequest{ +func (b *ttrpcV2Bridge) CloseIO(ctx context.Context, request *api.CloseIORequest) (*emptypb.Empty, error) { + return b.client.CloseIO(ctx, &v2.CloseIORequest{ ID: request.GetID(), ExecID: request.GetExecID(), Stdin: request.GetStdin(), }) } -func (g *grpcBridge) Update(ctx context.Context, request *v2.UpdateTaskRequest) (*emptypb.Empty, error) { - return g.client.Update(ctx, &v3.UpdateTaskRequest{ +func (b *ttrpcV2Bridge) Update(ctx context.Context, request *api.UpdateTaskRequest) (*emptypb.Empty, error) { + return b.client.Update(ctx, &v2.UpdateTaskRequest{ ID: request.GetID(), Resources: request.GetResources(), Annotations: request.GetAnnotations(), }) } -func (g *grpcBridge) Wait(ctx context.Context, request *v2.WaitRequest) (*v2.WaitResponse, error) { - resp, err := g.client.Wait(ctx, &v3.WaitRequest{ +func (b *ttrpcV2Bridge) Wait(ctx context.Context, request *api.WaitRequest) (*api.WaitResponse, error) { + resp, err := b.client.Wait(ctx, &v2.WaitRequest{ ID: request.GetID(), ExecID: request.GetExecID(), }) - return &v2.WaitResponse{ + return &api.WaitResponse{ ExitStatus: resp.GetExitStatus(), ExitedAt: resp.GetExitedAt(), }, err } -func (g *grpcBridge) Stats(ctx context.Context, request *v2.StatsRequest) (*v2.StatsResponse, error) { - resp, err := g.client.Stats(ctx, &v3.StatsRequest{ID: request.GetID()}) - return &v2.StatsResponse{Stats: resp.GetStats()}, err +func (b *ttrpcV2Bridge) Stats(ctx context.Context, request *api.StatsRequest) (*api.StatsResponse, error) { + resp, err := b.client.Stats(ctx, &v2.StatsRequest{ID: request.GetID()}) + return &api.StatsResponse{Stats: resp.GetStats()}, err } -func (g *grpcBridge) Connect(ctx context.Context, request *v2.ConnectRequest) (*v2.ConnectResponse, error) { - resp, err := g.client.Connect(ctx, &v3.ConnectRequest{ID: request.GetID()}) +func (b *ttrpcV2Bridge) Connect(ctx context.Context, request *api.ConnectRequest) (*api.ConnectResponse, error) { + resp, err := b.client.Connect(ctx, &v2.ConnectRequest{ID: request.GetID()}) - return &v2.ConnectResponse{ + return &api.ConnectResponse{ ShimPid: resp.GetShimPid(), TaskPid: resp.GetTaskPid(), Version: resp.GetVersion(), }, err } -func (g *grpcBridge) Shutdown(ctx context.Context, request *v2.ShutdownRequest) (*emptypb.Empty, error) { - return g.client.Shutdown(ctx, &v3.ShutdownRequest{ +func (b *ttrpcV2Bridge) Shutdown(ctx context.Context, request *api.ShutdownRequest) (*emptypb.Empty, error) { + return b.client.Shutdown(ctx, &v2.ShutdownRequest{ ID: request.GetID(), Now: request.GetNow(), }) } + +// grpcV3Bridge implements task service client for v3 GRPC server. +// GRPC uses same request/response structures as TTRPC, so it just wraps GRPC calls. +type grpcV3Bridge struct { + client v3.TaskClient +} + +var _ TaskServiceClient = (*grpcV3Bridge)(nil) + +func (g *grpcV3Bridge) State(ctx context.Context, request *api.StateRequest) (*api.StateResponse, error) { + return g.client.State(ctx, request) +} + +func (g *grpcV3Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) { + return g.client.Create(ctx, request) +} + +func (g *grpcV3Bridge) Start(ctx context.Context, request *api.StartRequest) (*api.StartResponse, error) { + return g.client.Start(ctx, request) +} + +func (g *grpcV3Bridge) Delete(ctx context.Context, request *api.DeleteRequest) (*api.DeleteResponse, error) { + return g.client.Delete(ctx, request) +} + +func (g *grpcV3Bridge) Pids(ctx context.Context, request *api.PidsRequest) (*api.PidsResponse, error) { + return g.client.Pids(ctx, request) +} + +func (g *grpcV3Bridge) Pause(ctx context.Context, request *api.PauseRequest) (*emptypb.Empty, error) { + return g.client.Pause(ctx, request) +} + +func (g *grpcV3Bridge) Resume(ctx context.Context, request *api.ResumeRequest) (*emptypb.Empty, error) { + return g.client.Resume(ctx, request) +} + +func (g *grpcV3Bridge) Checkpoint(ctx context.Context, request *api.CheckpointTaskRequest) (*emptypb.Empty, error) { + return g.client.Checkpoint(ctx, request) +} + +func (g *grpcV3Bridge) Kill(ctx context.Context, request *api.KillRequest) (*emptypb.Empty, error) { + return g.client.Kill(ctx, request) +} + +func (g *grpcV3Bridge) Exec(ctx context.Context, request *api.ExecProcessRequest) (*emptypb.Empty, error) { + return g.client.Exec(ctx, request) +} + +func (g *grpcV3Bridge) ResizePty(ctx context.Context, request *api.ResizePtyRequest) (*emptypb.Empty, error) { + return g.client.ResizePty(ctx, request) +} + +func (g *grpcV3Bridge) CloseIO(ctx context.Context, request *api.CloseIORequest) (*emptypb.Empty, error) { + return g.client.CloseIO(ctx, request) +} + +func (g *grpcV3Bridge) Update(ctx context.Context, request *api.UpdateTaskRequest) (*emptypb.Empty, error) { + return g.client.Update(ctx, request) +} + +func (g *grpcV3Bridge) Wait(ctx context.Context, request *api.WaitRequest) (*api.WaitResponse, error) { + return g.client.Wait(ctx, request) +} + +func (g *grpcV3Bridge) Stats(ctx context.Context, request *api.StatsRequest) (*api.StatsResponse, error) { + return g.client.Stats(ctx, request) +} + +func (g *grpcV3Bridge) Connect(ctx context.Context, request *api.ConnectRequest) (*api.ConnectResponse, error) { + return g.client.Connect(ctx, request) +} + +func (g *grpcV3Bridge) Shutdown(ctx context.Context, request *api.ShutdownRequest) (*emptypb.Empty, error) { + return g.client.Shutdown(ctx, request) +} diff --git a/runtime/v2/process.go b/runtime/v2/process.go index e2c9a5c0d..83e18151f 100644 --- a/runtime/v2/process.go +++ b/runtime/v2/process.go @@ -20,7 +20,7 @@ import ( "context" "errors" - "github.com/containerd/containerd/api/runtime/task/v2" + task "github.com/containerd/containerd/api/runtime/task/v3" tasktypes "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/protobuf" diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 4b6172602..fe827df23 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -33,7 +33,7 @@ import ( "google.golang.org/grpc/credentials/insecure" eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/api/runtime/task/v2" + task "github.com/containerd/containerd/api/runtime/task/v3" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" @@ -375,11 +375,12 @@ var _ runtime.Task = &shimTask{} // shimTask wraps shim process and adds task service client for compatibility with existing shim manager. type shimTask struct { ShimInstance - task task.TaskService + task TaskServiceClient } func newShimTask(shim ShimInstance) (*shimTask, error) { - taskClient, err := NewTaskClient(shim.Client()) + // TODO: Fix version + taskClient, err := NewTaskClient(shim.Client(), 0) if err != nil { return nil, err }