Bridge task service v2

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-10-13 10:49:06 -07:00
parent daaf67662f
commit f66c46806a
No known key found for this signature in database
3 changed files with 171 additions and 56 deletions

View File

@ -20,14 +20,38 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/containerd/ttrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
v2 "github.com/containerd/containerd/api/runtime/task/v2" v2 "github.com/containerd/containerd/api/runtime/task/v2"
v3 "github.com/containerd/containerd/api/runtime/task/v3" v3 "github.com/containerd/containerd/api/runtime/task/v3"
"github.com/containerd/ttrpc"
api "github.com/containerd/containerd/api/runtime/task/v3" // Current version used by TaskServiceClient
) )
// TaskServiceClient exposes a client interface to shims, which aims to hide
// the underlying complexity and backward compatibility (v2 task service vs v3, TTRPC vs GRPC, etc).
type TaskServiceClient interface {
State(context.Context, *api.StateRequest) (*api.StateResponse, error)
Create(context.Context, *api.CreateTaskRequest) (*api.CreateTaskResponse, error)
Start(context.Context, *api.StartRequest) (*api.StartResponse, error)
Delete(context.Context, *api.DeleteRequest) (*api.DeleteResponse, error)
Pids(context.Context, *api.PidsRequest) (*api.PidsResponse, error)
Pause(context.Context, *api.PauseRequest) (*emptypb.Empty, error)
Resume(context.Context, *api.ResumeRequest) (*emptypb.Empty, error)
Checkpoint(context.Context, *api.CheckpointTaskRequest) (*emptypb.Empty, error)
Kill(context.Context, *api.KillRequest) (*emptypb.Empty, error)
Exec(context.Context, *api.ExecProcessRequest) (*emptypb.Empty, error)
ResizePty(context.Context, *api.ResizePtyRequest) (*emptypb.Empty, error)
CloseIO(context.Context, *api.CloseIORequest) (*emptypb.Empty, error)
Update(context.Context, *api.UpdateTaskRequest) (*emptypb.Empty, error)
Wait(context.Context, *api.WaitRequest) (*api.WaitResponse, error)
Stats(context.Context, *api.StatsRequest) (*api.StatsResponse, error)
Connect(context.Context, *api.ConnectRequest) (*api.ConnectResponse, error)
Shutdown(context.Context, *api.ShutdownRequest) (*emptypb.Empty, error)
}
// NewTaskClient returns a new task client interface which handles both GRPC and TTRPC servers depending on the // NewTaskClient returns a new task client interface which handles both GRPC and TTRPC servers depending on the
// client object type passed in. // client object type passed in.
// //
@ -35,33 +59,47 @@ import (
// - *ttrpc.Client // - *ttrpc.Client
// - grpc.ClientConnInterface // - grpc.ClientConnInterface
// //
// In 1.7 we support TaskService v2 (for backward compatibility with existing shims) and GRPC TaskService v3. // Currently supported servers:
// In 2.0 we'll switch to TaskService v3 only for both TTRPC and GRPC, which will remove overhead of mapping v2 structs to v3 structs. // - TTRPC v2 (compatibility with shims before 2.0)
func NewTaskClient(client interface{}) (v2.TaskService, error) { // - TTRPC v3
// - GRPC v3
func NewTaskClient(client interface{}, version int) (TaskServiceClient, error) {
switch c := client.(type) { switch c := client.(type) {
case *ttrpc.Client: case *ttrpc.Client:
return v2.NewTaskClient(c), nil switch version {
case 2:
return &ttrpcV2Bridge{client: v2.NewTaskClient(c)}, nil
case 3:
return v3.NewTTRPCTaskClient(c), nil
default:
return nil, fmt.Errorf("containerd client supports only v2 and v3 TTRPC task client (got %d)", version)
}
case grpc.ClientConnInterface: case grpc.ClientConnInterface:
return &grpcBridge{v3.NewTaskClient(c)}, nil if version != 3 {
return nil, fmt.Errorf("containerd client supports only v3 GRPC task service (got %d)", version)
}
return &grpcV3Bridge{v3.NewTaskClient(c)}, nil
default: default:
return nil, fmt.Errorf("unsupported shim client type %T", c) return nil, fmt.Errorf("unsupported shim client type %T", c)
} }
} }
// grpcBridge implements `v2.TaskService` interface for GRPC shim server. // ttrpcV2Bridge is a bridge from TTRPC v2 task service.
type grpcBridge struct { type ttrpcV2Bridge struct {
client v3.TaskClient client v2.TaskService
} }
var _ v2.TaskService = (*grpcBridge)(nil) var _ TaskServiceClient = (*ttrpcV2Bridge)(nil)
func (g *grpcBridge) State(ctx context.Context, request *v2.StateRequest) (*v2.StateResponse, error) { func (b *ttrpcV2Bridge) State(ctx context.Context, request *api.StateRequest) (*api.StateResponse, error) {
resp, err := g.client.State(ctx, &v3.StateRequest{ resp, err := b.client.State(ctx, &v2.StateRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
}) })
return &v2.StateResponse{ return &v3.StateResponse{
ID: resp.GetID(), ID: resp.GetID(),
Bundle: resp.GetBundle(), Bundle: resp.GetBundle(),
Pid: resp.GetPid(), Pid: resp.GetPid(),
@ -76,8 +114,8 @@ func (g *grpcBridge) State(ctx context.Context, request *v2.StateRequest) (*v2.S
}, err }, err
} }
func (g *grpcBridge) Create(ctx context.Context, request *v2.CreateTaskRequest) (*v2.CreateTaskResponse, error) { func (b *ttrpcV2Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
resp, err := g.client.Create(ctx, &v3.CreateTaskRequest{ resp, err := b.client.Create(ctx, &v2.CreateTaskRequest{
ID: request.GetID(), ID: request.GetID(),
Bundle: request.GetBundle(), Bundle: request.GetBundle(),
Rootfs: request.GetRootfs(), Rootfs: request.GetRootfs(),
@ -90,54 +128,54 @@ func (g *grpcBridge) Create(ctx context.Context, request *v2.CreateTaskRequest)
Options: request.GetOptions(), Options: request.GetOptions(),
}) })
return &v2.CreateTaskResponse{Pid: resp.GetPid()}, err return &api.CreateTaskResponse{Pid: resp.GetPid()}, err
} }
func (g *grpcBridge) Start(ctx context.Context, request *v2.StartRequest) (*v2.StartResponse, error) { func (b *ttrpcV2Bridge) Start(ctx context.Context, request *api.StartRequest) (*api.StartResponse, error) {
resp, err := g.client.Start(ctx, &v3.StartRequest{ resp, err := b.client.Start(ctx, &v2.StartRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
}) })
return &v2.StartResponse{Pid: resp.GetPid()}, err return &api.StartResponse{Pid: resp.GetPid()}, err
} }
func (g *grpcBridge) Delete(ctx context.Context, request *v2.DeleteRequest) (*v2.DeleteResponse, error) { func (b *ttrpcV2Bridge) Delete(ctx context.Context, request *api.DeleteRequest) (*api.DeleteResponse, error) {
resp, err := g.client.Delete(ctx, &v3.DeleteRequest{ resp, err := b.client.Delete(ctx, &v2.DeleteRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
}) })
return &v2.DeleteResponse{ return &api.DeleteResponse{
Pid: resp.GetPid(), Pid: resp.GetPid(),
ExitStatus: resp.GetExitStatus(), ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt(), ExitedAt: resp.GetExitedAt(),
}, err }, err
} }
func (g *grpcBridge) Pids(ctx context.Context, request *v2.PidsRequest) (*v2.PidsResponse, error) { func (b *ttrpcV2Bridge) Pids(ctx context.Context, request *api.PidsRequest) (*api.PidsResponse, error) {
resp, err := g.client.Pids(ctx, &v3.PidsRequest{ID: request.GetID()}) resp, err := b.client.Pids(ctx, &v2.PidsRequest{ID: request.GetID()})
return &v2.PidsResponse{Processes: resp.GetProcesses()}, err return &api.PidsResponse{Processes: resp.GetProcesses()}, err
} }
func (g *grpcBridge) Pause(ctx context.Context, request *v2.PauseRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Pause(ctx context.Context, request *api.PauseRequest) (*emptypb.Empty, error) {
return g.client.Pause(ctx, &v3.PauseRequest{ID: request.GetID()}) return b.client.Pause(ctx, &v2.PauseRequest{ID: request.GetID()})
} }
func (g *grpcBridge) Resume(ctx context.Context, request *v2.ResumeRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Resume(ctx context.Context, request *api.ResumeRequest) (*emptypb.Empty, error) {
return g.client.Resume(ctx, &v3.ResumeRequest{ID: request.GetID()}) return b.client.Resume(ctx, &v2.ResumeRequest{ID: request.GetID()})
} }
func (g *grpcBridge) Checkpoint(ctx context.Context, request *v2.CheckpointTaskRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Checkpoint(ctx context.Context, request *api.CheckpointTaskRequest) (*emptypb.Empty, error) {
return g.client.Checkpoint(ctx, &v3.CheckpointTaskRequest{ return b.client.Checkpoint(ctx, &v2.CheckpointTaskRequest{
ID: request.GetID(), ID: request.GetID(),
Path: request.GetPath(), Path: request.GetPath(),
Options: request.GetOptions(), Options: request.GetOptions(),
}) })
} }
func (g *grpcBridge) Kill(ctx context.Context, request *v2.KillRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Kill(ctx context.Context, request *api.KillRequest) (*emptypb.Empty, error) {
return g.client.Kill(ctx, &v3.KillRequest{ return b.client.Kill(ctx, &v2.KillRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
Signal: request.GetSignal(), Signal: request.GetSignal(),
@ -145,8 +183,8 @@ func (g *grpcBridge) Kill(ctx context.Context, request *v2.KillRequest) (*emptyp
}) })
} }
func (g *grpcBridge) Exec(ctx context.Context, request *v2.ExecProcessRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Exec(ctx context.Context, request *api.ExecProcessRequest) (*emptypb.Empty, error) {
return g.client.Exec(ctx, &v3.ExecProcessRequest{ return b.client.Exec(ctx, &v2.ExecProcessRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
Terminal: request.GetTerminal(), Terminal: request.GetTerminal(),
@ -157,8 +195,8 @@ func (g *grpcBridge) Exec(ctx context.Context, request *v2.ExecProcessRequest) (
}) })
} }
func (g *grpcBridge) ResizePty(ctx context.Context, request *v2.ResizePtyRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) ResizePty(ctx context.Context, request *api.ResizePtyRequest) (*emptypb.Empty, error) {
return g.client.ResizePty(ctx, &v3.ResizePtyRequest{ return b.client.ResizePty(ctx, &v2.ResizePtyRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
Width: request.GetWidth(), Width: request.GetWidth(),
@ -166,52 +204,128 @@ func (g *grpcBridge) ResizePty(ctx context.Context, request *v2.ResizePtyRequest
}) })
} }
func (g *grpcBridge) CloseIO(ctx context.Context, request *v2.CloseIORequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) CloseIO(ctx context.Context, request *api.CloseIORequest) (*emptypb.Empty, error) {
return g.client.CloseIO(ctx, &v3.CloseIORequest{ return b.client.CloseIO(ctx, &v2.CloseIORequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
Stdin: request.GetStdin(), Stdin: request.GetStdin(),
}) })
} }
func (g *grpcBridge) Update(ctx context.Context, request *v2.UpdateTaskRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Update(ctx context.Context, request *api.UpdateTaskRequest) (*emptypb.Empty, error) {
return g.client.Update(ctx, &v3.UpdateTaskRequest{ return b.client.Update(ctx, &v2.UpdateTaskRequest{
ID: request.GetID(), ID: request.GetID(),
Resources: request.GetResources(), Resources: request.GetResources(),
Annotations: request.GetAnnotations(), Annotations: request.GetAnnotations(),
}) })
} }
func (g *grpcBridge) Wait(ctx context.Context, request *v2.WaitRequest) (*v2.WaitResponse, error) { func (b *ttrpcV2Bridge) Wait(ctx context.Context, request *api.WaitRequest) (*api.WaitResponse, error) {
resp, err := g.client.Wait(ctx, &v3.WaitRequest{ resp, err := b.client.Wait(ctx, &v2.WaitRequest{
ID: request.GetID(), ID: request.GetID(),
ExecID: request.GetExecID(), ExecID: request.GetExecID(),
}) })
return &v2.WaitResponse{ return &api.WaitResponse{
ExitStatus: resp.GetExitStatus(), ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt(), ExitedAt: resp.GetExitedAt(),
}, err }, err
} }
func (g *grpcBridge) Stats(ctx context.Context, request *v2.StatsRequest) (*v2.StatsResponse, error) { func (b *ttrpcV2Bridge) Stats(ctx context.Context, request *api.StatsRequest) (*api.StatsResponse, error) {
resp, err := g.client.Stats(ctx, &v3.StatsRequest{ID: request.GetID()}) resp, err := b.client.Stats(ctx, &v2.StatsRequest{ID: request.GetID()})
return &v2.StatsResponse{Stats: resp.GetStats()}, err return &api.StatsResponse{Stats: resp.GetStats()}, err
} }
func (g *grpcBridge) Connect(ctx context.Context, request *v2.ConnectRequest) (*v2.ConnectResponse, error) { func (b *ttrpcV2Bridge) Connect(ctx context.Context, request *api.ConnectRequest) (*api.ConnectResponse, error) {
resp, err := g.client.Connect(ctx, &v3.ConnectRequest{ID: request.GetID()}) resp, err := b.client.Connect(ctx, &v2.ConnectRequest{ID: request.GetID()})
return &v2.ConnectResponse{ return &api.ConnectResponse{
ShimPid: resp.GetShimPid(), ShimPid: resp.GetShimPid(),
TaskPid: resp.GetTaskPid(), TaskPid: resp.GetTaskPid(),
Version: resp.GetVersion(), Version: resp.GetVersion(),
}, err }, err
} }
func (g *grpcBridge) Shutdown(ctx context.Context, request *v2.ShutdownRequest) (*emptypb.Empty, error) { func (b *ttrpcV2Bridge) Shutdown(ctx context.Context, request *api.ShutdownRequest) (*emptypb.Empty, error) {
return g.client.Shutdown(ctx, &v3.ShutdownRequest{ return b.client.Shutdown(ctx, &v2.ShutdownRequest{
ID: request.GetID(), ID: request.GetID(),
Now: request.GetNow(), Now: request.GetNow(),
}) })
} }
// grpcV3Bridge implements task service client for v3 GRPC server.
// GRPC uses same request/response structures as TTRPC, so it just wraps GRPC calls.
type grpcV3Bridge struct {
client v3.TaskClient
}
var _ TaskServiceClient = (*grpcV3Bridge)(nil)
func (g *grpcV3Bridge) State(ctx context.Context, request *api.StateRequest) (*api.StateResponse, error) {
return g.client.State(ctx, request)
}
func (g *grpcV3Bridge) Create(ctx context.Context, request *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
return g.client.Create(ctx, request)
}
func (g *grpcV3Bridge) Start(ctx context.Context, request *api.StartRequest) (*api.StartResponse, error) {
return g.client.Start(ctx, request)
}
func (g *grpcV3Bridge) Delete(ctx context.Context, request *api.DeleteRequest) (*api.DeleteResponse, error) {
return g.client.Delete(ctx, request)
}
func (g *grpcV3Bridge) Pids(ctx context.Context, request *api.PidsRequest) (*api.PidsResponse, error) {
return g.client.Pids(ctx, request)
}
func (g *grpcV3Bridge) Pause(ctx context.Context, request *api.PauseRequest) (*emptypb.Empty, error) {
return g.client.Pause(ctx, request)
}
func (g *grpcV3Bridge) Resume(ctx context.Context, request *api.ResumeRequest) (*emptypb.Empty, error) {
return g.client.Resume(ctx, request)
}
func (g *grpcV3Bridge) Checkpoint(ctx context.Context, request *api.CheckpointTaskRequest) (*emptypb.Empty, error) {
return g.client.Checkpoint(ctx, request)
}
func (g *grpcV3Bridge) Kill(ctx context.Context, request *api.KillRequest) (*emptypb.Empty, error) {
return g.client.Kill(ctx, request)
}
func (g *grpcV3Bridge) Exec(ctx context.Context, request *api.ExecProcessRequest) (*emptypb.Empty, error) {
return g.client.Exec(ctx, request)
}
func (g *grpcV3Bridge) ResizePty(ctx context.Context, request *api.ResizePtyRequest) (*emptypb.Empty, error) {
return g.client.ResizePty(ctx, request)
}
func (g *grpcV3Bridge) CloseIO(ctx context.Context, request *api.CloseIORequest) (*emptypb.Empty, error) {
return g.client.CloseIO(ctx, request)
}
func (g *grpcV3Bridge) Update(ctx context.Context, request *api.UpdateTaskRequest) (*emptypb.Empty, error) {
return g.client.Update(ctx, request)
}
func (g *grpcV3Bridge) Wait(ctx context.Context, request *api.WaitRequest) (*api.WaitResponse, error) {
return g.client.Wait(ctx, request)
}
func (g *grpcV3Bridge) Stats(ctx context.Context, request *api.StatsRequest) (*api.StatsResponse, error) {
return g.client.Stats(ctx, request)
}
func (g *grpcV3Bridge) Connect(ctx context.Context, request *api.ConnectRequest) (*api.ConnectResponse, error) {
return g.client.Connect(ctx, request)
}
func (g *grpcV3Bridge) Shutdown(ctx context.Context, request *api.ShutdownRequest) (*emptypb.Empty, error) {
return g.client.Shutdown(ctx, request)
}

View File

@ -20,7 +20,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/containerd/containerd/api/runtime/task/v2" task "github.com/containerd/containerd/api/runtime/task/v3"
tasktypes "github.com/containerd/containerd/api/types/task" tasktypes "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/protobuf" "github.com/containerd/containerd/protobuf"

View File

@ -33,7 +33,7 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/runtime/task/v2" task "github.com/containerd/containerd/api/runtime/task/v3"
"github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
@ -375,11 +375,12 @@ var _ runtime.Task = &shimTask{}
// shimTask wraps shim process and adds task service client for compatibility with existing shim manager. // shimTask wraps shim process and adds task service client for compatibility with existing shim manager.
type shimTask struct { type shimTask struct {
ShimInstance ShimInstance
task task.TaskService task TaskServiceClient
} }
func newShimTask(shim ShimInstance) (*shimTask, error) { func newShimTask(shim ShimInstance) (*shimTask, error) {
taskClient, err := NewTaskClient(shim.Client()) // TODO: Fix version
taskClient, err := NewTaskClient(shim.Client(), 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }