From fc2e761e26d94ef9673f154f911e6a685478572d Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Sun, 5 Feb 2023 18:43:02 -0800 Subject: [PATCH] Initial GRPC client support Signed-off-by: Maksym Pavlenko --- runtime/v2/shim.go | 104 +++++++++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 31 deletions(-) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index a7741bfde..fdf7d491d 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -23,8 +23,13 @@ import ( "io" "os" "path/filepath" + "strings" "time" + "github.com/containerd/ttrpc" + "github.com/hashicorp/go-multierror" + "google.golang.org/grpc" + eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/api/types" @@ -32,13 +37,12 @@ import ( "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/dialer" "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/protobuf" ptypes "github.com/containerd/containerd/protobuf/types" "github.com/containerd/containerd/runtime" client "github.com/containerd/containerd/runtime/v2/shim" - "github.com/containerd/ttrpc" - "github.com/hashicorp/go-multierror" ) const ( @@ -61,23 +65,18 @@ func loadAddress(path string) (string, error) { return string(data), nil } -func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) { +func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, retErr error) { address, err := loadAddress(filepath.Join(bundle.Path, "address")) if err != nil { return nil, err } - conn, err := client.Connect(address, client.AnonReconnectDialer) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - conn.Close() - } - }() + + // TODO: figure out how to negotiate protocol between containerd and shim + isGRPC := strings.Contains(strings.ToLower(address), "grpc") + shimCtx, cancelShimLog := context.WithCancel(ctx) defer func() { - if err != nil { + if retErr != nil { cancelShimLog() } }() @@ -86,7 +85,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan return nil, fmt.Errorf("open shim log pipe when reload: %w", err) } defer func() { - if err != nil { + if retErr != nil { f.Close() } }() @@ -109,16 +108,51 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan cancelShimLog() f.Close() } - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) - defer func() { + + shim := &shim{bundle: bundle} + + if !isGRPC { + conn, err := client.Connect(address, client.AnonReconnectDialer) if err != nil { - client.Close() + return nil, fmt.Errorf("failed to create TTRPC connection: %w", err) } - }() - shim := &shim{ - bundle: bundle, - client: client, + defer func() { + if retErr != nil { + conn.Close() + } + }() + + ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) + defer func() { + if retErr != nil { + ttrpcClient.Close() + } + }() + + shim.client = ttrpcClient + } else { + // GRPC shim + + var ( + gopts []grpc.DialOption + ) + + conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...) + if err != nil { + return nil, fmt.Errorf("failed to create GRPC connection: %w", err) + } + + defer func() { + if retErr != nil { + conn.Close() + } + }() + + // TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. + + shim.client = conn } + ctx, cancel := timeout.WithContext(ctx, loadTimeout) defer cancel() @@ -131,6 +165,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan if _, err := s.PID(ctx); err != nil { return nil, err } + return shim, nil } @@ -189,15 +224,16 @@ type ShimInstance interface { Namespace() string // Bundle is a file system path to shim's bundle. Bundle() string - // Client returns the underlying TTRPC client for this shim. - Client() interface{} + // Client returns the underlying TTRPC or GRPC client object for this shim. + // The underlying object can be either *ttrpc.Client or grpc.ClientConnInterface. + Client() any // Delete will close the client and remove bundle from disk. Delete(ctx context.Context) error } type shim struct { bundle *Bundle - client *ttrpc.Client + client any } var _ ShimInstance = (*shim)(nil) @@ -215,13 +251,17 @@ func (s *shim) Bundle() string { return s.bundle.Path } -func (s *shim) Client() interface{} { +func (s *shim) Client() any { return s.client } // Close closes the underlying client connection. func (s *shim) Close() error { - return s.client.Close() + if ttrpcClient, ok := s.client.(*ttrpc.Client); ok { + return ttrpcClient.Close() + } + + return nil } func (s *shim) Delete(ctx context.Context) error { @@ -229,12 +269,14 @@ func (s *shim) Delete(ctx context.Context) error { result *multierror.Error ) - if err := s.client.Close(); err != nil { - result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) - } + if ttrpcClient, ok := s.client.(*ttrpc.Client); ok { + if err := ttrpcClient.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) + } - if err := s.client.UserOnCloseWait(ctx); err != nil { - result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + if err := ttrpcClient.UserOnCloseWait(ctx); err != nil { + result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + } } if err := s.bundle.Delete(); err != nil {