diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 0fc94e80e..86d2e40d6 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -29,8 +29,8 @@ import ( "github.com/containerd/ttrpc" "github.com/hashicorp/go-multierror" - "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" eventstypes "github.com/containerd/containerd/api/events" @@ -221,7 +221,7 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C params.Protocol = "ttrpc" } - log.G(ctx).WithFields(logrus.Fields{ + log.G(ctx).WithFields(log.Fields{ "address": params.Address, "protocol": params.Protocol, }).Debug("shim bootstrap parameters") @@ -238,38 +238,78 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C } }() - ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) - defer func() { - if retErr != nil { - ttrpcClient.Close() - } - }() - - return ttrpcClient, nil + return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil case "grpc": + ctx, cancel := context.WithTimeout(ctx, time.Second*100) + defer cancel() + gopts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), } - - conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...) - if err != nil { - return nil, fmt.Errorf("failed to create GRPC connection: %w", err) - } - - defer func() { - if retErr != nil { - conn.Close() - } - }() - - // TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. - - return conn, nil + return grpcDialContext(ctx, dialer.DialAddress(params.Address), onClose, gopts...) default: return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol) } } +// grpcDialContext and the underlying grpcConn type exist solely +// so we can have something similar to ttrpc.WithOnClose to have +// a callback run when the connection is severed or explicitly closed. +func grpcDialContext( + ctx context.Context, + target string, + onClose func(), + gopts ...grpc.DialOption, +) (*grpcConn, error) { + client, err := grpc.DialContext(ctx, target, gopts...) + if err != nil { + return nil, fmt.Errorf("failed to create GRPC connection: %w", err) + } + + done := make(chan struct{}) + go func() { + gctx := context.Background() + sourceState := connectivity.Ready + for { + if client.WaitForStateChange(gctx, sourceState) { + state := client.GetState() + if state == connectivity.Idle || state == connectivity.Shutdown { + break + } + // Could be transient failure. Lets see if we can get back to a working + // state. + log.G(gctx).WithFields(log.Fields{ + "state": state, + "addr": target, + }).Warn("shim grpc connection unexpected state") + sourceState = state + } + } + onClose() + close(done) + }() + + return &grpcConn{ + ClientConn: client, + onCloseDone: done, + }, nil +} + +type grpcConn struct { + *grpc.ClientConn + onCloseDone chan struct{} +} + +func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error { + select { + case <-gc.onCloseDone: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + type shim struct { bundle *Bundle client any @@ -300,6 +340,10 @@ func (s *shim) Close() error { return ttrpcClient.Close() } + if grpcClient, ok := s.client.(*grpcConn); ok { + return grpcClient.Close() + } + return nil } @@ -318,6 +362,16 @@ func (s *shim) Delete(ctx context.Context) error { } } + if grpcClient, ok := s.client.(*grpcConn); ok { + if err := grpcClient.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close grpc client: %w", err)) + } + + if err := grpcClient.UserOnCloseWait(ctx); err != nil { + result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + } + } + if err := s.bundle.Delete(); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err))