diff --git a/runtime/v2/binary.go b/runtime/v2/binary.go index 77da655ca..44a0d5424 100644 --- a/runtime/v2/binary.go +++ b/runtime/v2/binary.go @@ -26,7 +26,6 @@ import ( gruntime "runtime" "strings" - "github.com/containerd/ttrpc" "github.com/sirupsen/logrus" "github.com/containerd/containerd/api/runtime/task/v2" @@ -122,10 +121,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ return nil, fmt.Errorf("%s: %w", out, err) } address := strings.TrimSpace(string(out)) - conn, err := client.Connect(address, client.AnonDialer) - if err != nil { - return nil, err - } + onCloseWithShimLog := func() { onClose() cancelShimLog() @@ -135,10 +131,15 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil { return nil, err } - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) + + conn, err := makeConnection(ctx, address, onCloseWithShimLog) + if err != nil { + return nil, err + } + return &shim{ bundle: b.bundle, - client: client, + client: conn, }, nil } diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index fdf7d491d..00b7b1caa 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -18,6 +18,7 @@ package v2 import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -71,9 +72,6 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan return nil, err } - // TODO: figure out how to negotiate protocol between containerd and shim - isGRPC := strings.Contains(strings.ToLower(address), "grpc") - shimCtx, cancelShimLog := context.WithCancel(ctx) defer func() { if retErr != nil { @@ -109,48 +107,20 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan f.Close() } - shim := &shim{bundle: bundle} + conn, err := makeConnection(ctx, address, onCloseWithShimLog) + if err != nil { + return nil, err + } - if !isGRPC { - conn, err := client.Connect(address, client.AnonReconnectDialer) - if err != nil { - return nil, fmt.Errorf("failed to create TTRPC connection: %w", err) + defer func() { + if retErr != nil { + conn.Close() } - defer func() { - if retErr != nil { - conn.Close() - } - }() + }() - ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - defer func() { - if retErr != nil { - ttrpcClient.Close() - } - }() - - shim.client = ttrpcClient - } else { - // GRPC shim - - var ( - gopts []grpc.DialOption - ) - - conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...) - if err != nil { - return nil, fmt.Errorf("failed to create GRPC connection: %w", err) - } - - defer func() { - if retErr != nil { - conn.Close() - } - }() - - // TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. - - shim.client = conn + shim := &shim{ + bundle: bundle, + client: conn, } ctx, cancel := timeout.WithContext(ctx, loadTimeout) @@ -231,6 +201,68 @@ type ShimInstance interface { Delete(ctx context.Context) error } +// makeConnection creates a new TTRPC or GRPC connection object from address. +// address can be either a socket path for TTRPC or JSON serialized BootstrapParams. +func makeConnection(ctx context.Context, address string, onClose func()) (_ io.Closer, retErr error) { + var ( + payload = []byte(address) + params client.BootstrapParams + ) + + if json.Valid(payload) { + if err := json.Unmarshal([]byte(address), ¶ms); err != nil { + return nil, fmt.Errorf("unable to unmarshal bootstrap params: %w", err) + } + } else { + // Use TTRPC for legacy shims + params.Address = address + params.Protocol = "ttrpc" + } + + switch strings.ToLower(params.Protocol) { + case "ttrpc": + conn, err := client.Connect(params.Address, client.AnonReconnectDialer) + if err != nil { + return nil, fmt.Errorf("failed to create TTRPC connection: %w", err) + } + defer func() { + if retErr != nil { + conn.Close() + } + }() + + ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) + defer func() { + if retErr != nil { + ttrpcClient.Close() + } + }() + + return ttrpcClient, nil + case "grpc": + var ( + gopts []grpc.DialOption + ) + + conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...) + if err != nil { + return nil, fmt.Errorf("failed to create GRPC connection: %w", err) + } + + defer func() { + if retErr != nil { + conn.Close() + } + }() + + // TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. + + return conn, nil + default: + return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol) + } +} + type shim struct { bundle *Bundle client any diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index f12228b98..17d916edd 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -57,6 +57,16 @@ type StartOpts struct { Debug bool } +// BootstrapParams is a JSON payload returned in stdout from shim.Start call. +type BootstrapParams struct { + // Address is a address containerd should use to connect to shim. + Address string `json:"address"` + // Protocol is either TTRPC or GRPC. + Protocol string `json:"protocol"` + // Caps is a list of capabilities supported by shim implementation (reserved for future) + //Caps []string `json:"caps"` +} + type StopStatus struct { Pid int ExitStatus int