Add shim bootstrap params

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-02-06 12:12:32 -08:00
parent fc2e761e26
commit a82e37a5a2
3 changed files with 92 additions and 49 deletions

View File

@ -26,7 +26,6 @@ import (
gruntime "runtime" gruntime "runtime"
"strings" "strings"
"github.com/containerd/ttrpc"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/api/runtime/task/v2"
@ -122,10 +121,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
return nil, fmt.Errorf("%s: %w", out, err) return nil, fmt.Errorf("%s: %w", out, err)
} }
address := strings.TrimSpace(string(out)) address := strings.TrimSpace(string(out))
conn, err := client.Connect(address, client.AnonDialer)
if err != nil {
return nil, err
}
onCloseWithShimLog := func() { onCloseWithShimLog := func() {
onClose() onClose()
cancelShimLog() cancelShimLog()
@ -135,10 +131,15 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil { if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil {
return nil, err return nil, err
} }
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
conn, err := makeConnection(ctx, address, onCloseWithShimLog)
if err != nil {
return nil, err
}
return &shim{ return &shim{
bundle: b.bundle, bundle: b.bundle,
client: client, client: conn,
}, nil }, nil
} }

View File

@ -18,6 +18,7 @@ package v2
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -71,9 +72,6 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
return nil, err return nil, err
} }
// TODO: figure out how to negotiate protocol between containerd and shim
isGRPC := strings.Contains(strings.ToLower(address), "grpc")
shimCtx, cancelShimLog := context.WithCancel(ctx) shimCtx, cancelShimLog := context.WithCancel(ctx)
defer func() { defer func() {
if retErr != nil { if retErr != nil {
@ -109,48 +107,20 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
f.Close() f.Close()
} }
shim := &shim{bundle: bundle} conn, err := makeConnection(ctx, address, onCloseWithShimLog)
if err != nil {
return nil, err
}
if !isGRPC { defer func() {
conn, err := client.Connect(address, client.AnonReconnectDialer) if retErr != nil {
if err != nil { conn.Close()
return nil, fmt.Errorf("failed to create TTRPC connection: %w", err)
} }
defer func() { }()
if retErr != nil {
conn.Close()
}
}()
ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) shim := &shim{
defer func() { bundle: bundle,
if retErr != nil { client: conn,
ttrpcClient.Close()
}
}()
shim.client = ttrpcClient
} else {
// GRPC shim
var (
gopts []grpc.DialOption
)
conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed.
shim.client = conn
} }
ctx, cancel := timeout.WithContext(ctx, loadTimeout) ctx, cancel := timeout.WithContext(ctx, loadTimeout)
@ -231,6 +201,68 @@ type ShimInstance interface {
Delete(ctx context.Context) error Delete(ctx context.Context) error
} }
// makeConnection creates a new TTRPC or GRPC connection object from address.
// address can be either a socket path for TTRPC or JSON serialized BootstrapParams.
func makeConnection(ctx context.Context, address string, onClose func()) (_ io.Closer, retErr error) {
var (
payload = []byte(address)
params client.BootstrapParams
)
if json.Valid(payload) {
if err := json.Unmarshal([]byte(address), &params); err != nil {
return nil, fmt.Errorf("unable to unmarshal bootstrap params: %w", err)
}
} else {
// Use TTRPC for legacy shims
params.Address = address
params.Protocol = "ttrpc"
}
switch strings.ToLower(params.Protocol) {
case "ttrpc":
conn, err := client.Connect(params.Address, client.AnonReconnectDialer)
if err != nil {
return nil, fmt.Errorf("failed to create TTRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
defer func() {
if retErr != nil {
ttrpcClient.Close()
}
}()
return ttrpcClient, nil
case "grpc":
var (
gopts []grpc.DialOption
)
conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed.
return conn, nil
default:
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol)
}
}
type shim struct { type shim struct {
bundle *Bundle bundle *Bundle
client any client any

View File

@ -57,6 +57,16 @@ type StartOpts struct {
Debug bool Debug bool
} }
// BootstrapParams is a JSON payload returned in stdout from shim.Start call.
type BootstrapParams struct {
// Address is a address containerd should use to connect to shim.
Address string `json:"address"`
// Protocol is either TTRPC or GRPC.
Protocol string `json:"protocol"`
// Caps is a list of capabilities supported by shim implementation (reserved for future)
//Caps []string `json:"caps"`
}
type StopStatus struct { type StopStatus struct {
Pid int Pid int
ExitStatus int ExitStatus int