Merge pull request #8052 from mxpv/grpc_shim

Initial GRPC shims support
This commit is contained in:
Derek McGowan 2023-02-14 18:53:22 -08:00 committed by GitHub
commit 4b80a2be96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 467 additions and 51 deletions

View File

@ -107,7 +107,10 @@ func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ...
return fmt.Errorf("failed to start new sandbox: %w", err)
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
svc, err := sandbox.NewClient(shim.Client())
if err != nil {
return err
}
var options *anypb.Any
if coptions.Options != nil {
@ -136,7 +139,11 @@ func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox.
return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID)
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
svc, err := sandbox.NewClient(shim.Client())
if err != nil {
return sandbox.ControllerInstance{}, err
}
resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID})
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
@ -258,6 +265,5 @@ func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI
return nil, errdefs.ErrNotFound
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
return svc, nil
return sandbox.NewClient(shim.Client())
}

View File

@ -26,7 +26,6 @@ import (
gruntime "runtime"
"strings"
"github.com/containerd/ttrpc"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd/api/runtime/task/v2"
@ -122,10 +121,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
return nil, fmt.Errorf("%s: %w", out, err)
}
address := strings.TrimSpace(string(out))
conn, err := client.Connect(address, client.AnonDialer)
if err != nil {
return nil, err
}
onCloseWithShimLog := func() {
onClose()
cancelShimLog()
@ -135,10 +131,15 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil {
return nil, err
}
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
conn, err := makeConnection(ctx, address, onCloseWithShimLog)
if err != nil {
return nil, err
}
return &shim{
bundle: b.bundle,
client: client,
client: conn,
}, nil
}

217
runtime/v2/bridge.go Normal file
View File

@ -0,0 +1,217 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
v2 "github.com/containerd/containerd/api/runtime/task/v2"
v3 "github.com/containerd/containerd/api/runtime/task/v3"
"github.com/containerd/ttrpc"
)
// NewTaskClient returns a new task client interface which handles both GRPC and TTRPC servers depending on the
// client object type passed in.
//
// Supported client types are:
// - *ttrpc.Client
// - grpc.ClientConnInterface
//
// In 1.7 we support TaskService v2 (for backward compatibility with existing shims) and GRPC TaskService v3.
// In 2.0 we'll switch to TaskService v3 only for both TTRPC and GRPC, which will remove overhead of mapping v2 structs to v3 structs.
func NewTaskClient(client interface{}) (v2.TaskService, error) {
switch c := client.(type) {
case *ttrpc.Client:
return v2.NewTaskClient(c), nil
case grpc.ClientConnInterface:
return &grpcBridge{v3.NewTaskClient(c)}, nil
default:
return nil, fmt.Errorf("unsupported shim client type %T", c)
}
}
// grpcBridge implements `v2.TaskService` interface for GRPC shim server.
type grpcBridge struct {
client v3.TaskClient
}
var _ v2.TaskService = (*grpcBridge)(nil)
func (g *grpcBridge) State(ctx context.Context, request *v2.StateRequest) (*v2.StateResponse, error) {
resp, err := g.client.State(ctx, &v3.StateRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
})
return &v2.StateResponse{
ID: resp.GetID(),
Bundle: resp.GetBundle(),
Pid: resp.GetPid(),
Status: resp.GetStatus(),
Stdin: resp.GetStdin(),
Stdout: resp.GetStdout(),
Stderr: resp.GetStderr(),
Terminal: resp.GetTerminal(),
ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt(),
ExecID: resp.GetExecID(),
}, err
}
func (g *grpcBridge) Create(ctx context.Context, request *v2.CreateTaskRequest) (*v2.CreateTaskResponse, error) {
resp, err := g.client.Create(ctx, &v3.CreateTaskRequest{
ID: request.GetID(),
Bundle: request.GetBundle(),
Rootfs: request.GetRootfs(),
Terminal: request.GetTerminal(),
Stdin: request.GetStdin(),
Stdout: request.GetStdout(),
Stderr: request.GetStderr(),
Checkpoint: request.GetCheckpoint(),
ParentCheckpoint: request.GetParentCheckpoint(),
Options: request.GetOptions(),
})
return &v2.CreateTaskResponse{Pid: resp.GetPid()}, err
}
func (g *grpcBridge) Start(ctx context.Context, request *v2.StartRequest) (*v2.StartResponse, error) {
resp, err := g.client.Start(ctx, &v3.StartRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
})
return &v2.StartResponse{Pid: resp.GetPid()}, err
}
func (g *grpcBridge) Delete(ctx context.Context, request *v2.DeleteRequest) (*v2.DeleteResponse, error) {
resp, err := g.client.Delete(ctx, &v3.DeleteRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
})
return &v2.DeleteResponse{
Pid: resp.GetPid(),
ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt(),
}, err
}
func (g *grpcBridge) Pids(ctx context.Context, request *v2.PidsRequest) (*v2.PidsResponse, error) {
resp, err := g.client.Pids(ctx, &v3.PidsRequest{ID: request.GetID()})
return &v2.PidsResponse{Processes: resp.GetProcesses()}, err
}
func (g *grpcBridge) Pause(ctx context.Context, request *v2.PauseRequest) (*emptypb.Empty, error) {
return g.client.Pause(ctx, &v3.PauseRequest{ID: request.GetID()})
}
func (g *grpcBridge) Resume(ctx context.Context, request *v2.ResumeRequest) (*emptypb.Empty, error) {
return g.client.Resume(ctx, &v3.ResumeRequest{ID: request.GetID()})
}
func (g *grpcBridge) Checkpoint(ctx context.Context, request *v2.CheckpointTaskRequest) (*emptypb.Empty, error) {
return g.client.Checkpoint(ctx, &v3.CheckpointTaskRequest{
ID: request.GetID(),
Path: request.GetPath(),
Options: request.GetOptions(),
})
}
func (g *grpcBridge) Kill(ctx context.Context, request *v2.KillRequest) (*emptypb.Empty, error) {
return g.client.Kill(ctx, &v3.KillRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
Signal: request.GetSignal(),
All: request.GetAll(),
})
}
func (g *grpcBridge) Exec(ctx context.Context, request *v2.ExecProcessRequest) (*emptypb.Empty, error) {
return g.client.Exec(ctx, &v3.ExecProcessRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
Terminal: request.GetTerminal(),
Stdin: request.GetStdin(),
Stdout: request.GetStdout(),
Stderr: request.GetStderr(),
Spec: request.GetSpec(),
})
}
func (g *grpcBridge) ResizePty(ctx context.Context, request *v2.ResizePtyRequest) (*emptypb.Empty, error) {
return g.client.ResizePty(ctx, &v3.ResizePtyRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
Width: request.GetWidth(),
Height: request.GetHeight(),
})
}
func (g *grpcBridge) CloseIO(ctx context.Context, request *v2.CloseIORequest) (*emptypb.Empty, error) {
return g.client.CloseIO(ctx, &v3.CloseIORequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
Stdin: request.GetStdin(),
})
}
func (g *grpcBridge) Update(ctx context.Context, request *v2.UpdateTaskRequest) (*emptypb.Empty, error) {
return g.client.Update(ctx, &v3.UpdateTaskRequest{
ID: request.GetID(),
Resources: request.GetResources(),
Annotations: request.GetAnnotations(),
})
}
func (g *grpcBridge) Wait(ctx context.Context, request *v2.WaitRequest) (*v2.WaitResponse, error) {
resp, err := g.client.Wait(ctx, &v3.WaitRequest{
ID: request.GetID(),
ExecID: request.GetExecID(),
})
return &v2.WaitResponse{
ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt(),
}, err
}
func (g *grpcBridge) Stats(ctx context.Context, request *v2.StatsRequest) (*v2.StatsResponse, error) {
resp, err := g.client.Stats(ctx, &v3.StatsRequest{ID: request.GetID()})
return &v2.StatsResponse{Stats: resp.GetStats()}, err
}
func (g *grpcBridge) Connect(ctx context.Context, request *v2.ConnectRequest) (*v2.ConnectResponse, error) {
resp, err := g.client.Connect(ctx, &v3.ConnectRequest{ID: request.GetID()})
return &v2.ConnectResponse{
ShimPid: resp.GetShimPid(),
TaskPid: resp.GetTaskPid(),
Version: resp.GetVersion(),
}, err
}
func (g *grpcBridge) Shutdown(ctx context.Context, request *v2.ShutdownRequest) (*emptypb.Empty, error) {
return g.client.Shutdown(ctx, &v3.ShutdownRequest{
ID: request.GetID(),
Now: request.GetNow(),
})
}

View File

@ -425,7 +425,11 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask := newShimTask(shim)
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
@ -443,7 +447,7 @@ func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.Cr
}
shimTask.Shutdown(dctx)
shimTask.Client().Close()
shimTask.Close()
}
return nil, fmt.Errorf("failed to create shim task: %w", err)
@ -458,7 +462,7 @@ func (m *TaskManager) Get(ctx context.Context, id string) (runtime.Task, error)
if err != nil {
return nil, err
}
return newShimTask(shim), nil
return newShimTask(shim)
}
// Tasks lists all tasks
@ -469,7 +473,11 @@ func (m *TaskManager) Tasks(ctx context.Context, all bool) ([]runtime.Task, erro
}
out := make([]runtime.Task, len(shims))
for i := range shims {
out[i] = newShimTask(shims[i])
newClient, err := newShimTask(shims[i])
if err != nil {
return nil, err
}
out[i] = newClient
}
return out, nil
}
@ -486,10 +494,12 @@ func (m *TaskManager) Delete(ctx context.Context, taskID string) (*runtime.Exit,
return nil, err
}
var (
sandboxed = container.SandboxID != ""
shimTask = newShimTask(shim)
)
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
sandboxed := container.SandboxID != ""
exit, err := shimTask.delete(ctx, sandboxed, func(ctx context.Context, id string) {
m.manager.shims.Delete(ctx, id)

View File

@ -18,13 +18,20 @@ package v2
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/containerd/ttrpc"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/runtime/task/v2"
"github.com/containerd/containerd/api/types"
@ -32,13 +39,12 @@ import (
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/protobuf"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime"
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/ttrpc"
"github.com/hashicorp/go-multierror"
)
const (
@ -61,23 +67,15 @@ func loadAddress(path string) (string, error) {
return string(data), nil
}
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) {
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, retErr error) {
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil {
return nil, err
}
conn, err := client.Connect(address, client.AnonReconnectDialer)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
conn.Close()
}
}()
shimCtx, cancelShimLog := context.WithCancel(ctx)
defer func() {
if err != nil {
if retErr != nil {
cancelShimLog()
}
}()
@ -86,7 +84,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
return nil, fmt.Errorf("open shim log pipe when reload: %w", err)
}
defer func() {
if err != nil {
if retErr != nil {
f.Close()
}
}()
@ -109,23 +107,36 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
cancelShimLog()
f.Close()
}
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
conn, err := makeConnection(ctx, address, onCloseWithShimLog)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
client.Close()
if retErr != nil {
conn.Close()
}
}()
shim := &shim{
bundle: bundle,
client: client,
client: conn,
}
ctx, cancel := timeout.WithContext(ctx, loadTimeout)
defer cancel()
// Check connectivity, TaskService is the only required service, so create a temp one to check connection.
s := newShimTask(shim)
s, err := newShimTask(shim)
if err != nil {
return nil, err
}
if _, err := s.PID(ctx); err != nil {
return nil, err
}
return shim, nil
}
@ -176,21 +187,86 @@ func cleanupAfterDeadShim(ctx context.Context, id string, rt *runtime.NSMap[Shim
// ShimInstance represents running shim process managed by ShimManager.
type ShimInstance interface {
io.Closer
// ID of the shim.
ID() string
// Namespace of this shim.
Namespace() string
// Bundle is a file system path to shim's bundle.
Bundle() string
// Client returns the underlying TTRPC client for this shim.
Client() *ttrpc.Client
// Client returns the underlying TTRPC or GRPC client object for this shim.
// The underlying object can be either *ttrpc.Client or grpc.ClientConnInterface.
Client() any
// Delete will close the client and remove bundle from disk.
Delete(ctx context.Context) error
}
// makeConnection creates a new TTRPC or GRPC connection object from address.
// address can be either a socket path for TTRPC or JSON serialized BootstrapParams.
func makeConnection(ctx context.Context, address string, onClose func()) (_ io.Closer, retErr error) {
var (
payload = []byte(address)
params client.BootstrapParams
)
if json.Valid(payload) {
if err := json.Unmarshal([]byte(address), &params); err != nil {
return nil, fmt.Errorf("unable to unmarshal bootstrap params: %w", err)
}
} else {
// Use TTRPC for legacy shims
params.Address = address
params.Protocol = "ttrpc"
}
switch strings.ToLower(params.Protocol) {
case "ttrpc":
conn, err := client.Connect(params.Address, client.AnonReconnectDialer)
if err != nil {
return nil, fmt.Errorf("failed to create TTRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
defer func() {
if retErr != nil {
ttrpcClient.Close()
}
}()
return ttrpcClient, nil
case "grpc":
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed.
return conn, nil
default:
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol)
}
}
type shim struct {
bundle *Bundle
client *ttrpc.Client
client any
}
var _ ShimInstance = (*shim)(nil)
@ -208,21 +284,32 @@ func (s *shim) Bundle() string {
return s.bundle.Path
}
func (s *shim) Client() *ttrpc.Client {
func (s *shim) Client() any {
return s.client
}
// Close closes the underlying client connection.
func (s *shim) Close() error {
if ttrpcClient, ok := s.client.(*ttrpc.Client); ok {
return ttrpcClient.Close()
}
return nil
}
func (s *shim) Delete(ctx context.Context) error {
var (
result *multierror.Error
)
if err := s.client.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err))
}
if ttrpcClient, ok := s.client.(*ttrpc.Client); ok {
if err := ttrpcClient.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err))
}
if err := s.client.UserOnCloseWait(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("close wait error: %w", err))
if err := ttrpcClient.UserOnCloseWait(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("close wait error: %w", err))
}
}
if err := s.bundle.Delete(); err != nil {
@ -241,11 +328,16 @@ type shimTask struct {
task task.TaskService
}
func newShimTask(shim ShimInstance) *shimTask {
func newShimTask(shim ShimInstance) (*shimTask, error) {
taskClient, err := NewTaskClient(shim.Client())
if err != nil {
return nil, err
}
return &shimTask{
ShimInstance: shim,
task: task.NewTaskClient(shim.Client()),
}
task: taskClient,
}, nil
}
func (s *shimTask) Shutdown(ctx context.Context) error {

View File

@ -57,6 +57,16 @@ type StartOpts struct {
Debug bool
}
// BootstrapParams is a JSON payload returned in stdout from shim.Start call.
type BootstrapParams struct {
// Address is a address containerd should use to connect to shim.
Address string `json:"address"`
// Protocol is either TTRPC or GRPC.
Protocol string `json:"protocol"`
// Caps is a list of capabilities supported by shim implementation (reserved for future)
//Caps []string `json:"caps"`
}
type StopStatus struct {
Pid int
ExitStatus int

View File

@ -144,7 +144,10 @@ func (m *ShimManager) loadShims(ctx context.Context) error {
cleanupAfterDeadShim(ctx, id, m.shims, m.events, binaryCall)
continue
}
shim := newShimTask(instance)
shim, err := newShimTask(instance)
if err != nil {
return err
}
// There are 3 possibilities for the loaded shim here:
// 1. It could be a shim that is running a task.

77
sandbox/bridge.go Normal file
View File

@ -0,0 +1,77 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sandbox
import (
"context"
"fmt"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
api "github.com/containerd/containerd/api/runtime/sandbox/v1"
)
// NewClient returns a new sandbox client that handles both GRPC and TTRPC clients.
func NewClient(client interface{}) (api.TTRPCSandboxService, error) {
switch c := client.(type) {
case *ttrpc.Client:
return api.NewTTRPCSandboxClient(c), nil
case grpc.ClientConnInterface:
return &grpcBridge{api.NewSandboxClient(c)}, nil
default:
return nil, fmt.Errorf("unsupported client type %T", client)
}
}
type grpcBridge struct {
client api.SandboxClient
}
var _ api.TTRPCSandboxService = (*grpcBridge)(nil)
func (g *grpcBridge) CreateSandbox(ctx context.Context, request *api.CreateSandboxRequest) (*api.CreateSandboxResponse, error) {
return g.client.CreateSandbox(ctx, request)
}
func (g *grpcBridge) StartSandbox(ctx context.Context, request *api.StartSandboxRequest) (*api.StartSandboxResponse, error) {
return g.client.StartSandbox(ctx, request)
}
func (g *grpcBridge) Platform(ctx context.Context, request *api.PlatformRequest) (*api.PlatformResponse, error) {
return g.client.Platform(ctx, request)
}
func (g *grpcBridge) StopSandbox(ctx context.Context, request *api.StopSandboxRequest) (*api.StopSandboxResponse, error) {
return g.client.StopSandbox(ctx, request)
}
func (g *grpcBridge) WaitSandbox(ctx context.Context, request *api.WaitSandboxRequest) (*api.WaitSandboxResponse, error) {
return g.client.WaitSandbox(ctx, request)
}
func (g *grpcBridge) SandboxStatus(ctx context.Context, request *api.SandboxStatusRequest) (*api.SandboxStatusResponse, error) {
return g.client.SandboxStatus(ctx, request)
}
func (g *grpcBridge) PingSandbox(ctx context.Context, request *api.PingRequest) (*api.PingResponse, error) {
return g.client.PingSandbox(ctx, request)
}
func (g *grpcBridge) ShutdownSandbox(ctx context.Context, request *api.ShutdownSandboxRequest) (*api.ShutdownSandboxResponse, error) {
return g.client.ShutdownSandbox(ctx, request)
}