From 4278fbbc7ebe0517371b75594dc2150a5124b3cd Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Wed, 15 Feb 2023 16:08:14 -0800 Subject: [PATCH 1/2] runtime/v2: Call onCloseWithShimLog for grpc shims We pass in a callback using the ttrpc.WithOnClose functionality for shims that use ttrpc, but with the newly added ability to use GRPC for shims this was left as a follow-up. It doesn't seem like grpc-go has anything similar so some options (that I could see) are: This change introduces a new grpcConn wrapper type for the connection that exposes a method to get notified when the users callback has run, the same in functionality as TTRPC's `UserOnCloseWait`. The callback gets passed in in a new `grpcDialContext` function that will: 1. Dial the connection as normal 2. Spin off a goroutine that will monitor the connections state until it transitions to idle or shutdown and will then run the callback. Signed-off-by: Danny Canter --- runtime/v2/shim.go | 101 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 0fc94e80e..1a7532b3b 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -31,6 +31,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" eventstypes "github.com/containerd/containerd/api/events" @@ -238,38 +239,78 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C } }() - ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) - defer func() { - if retErr != nil { - ttrpcClient.Close() - } - }() - - return ttrpcClient, nil + return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil case "grpc": + ctx, cancel := context.WithTimeout(ctx, time.Second*100) + defer cancel() + gopts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), } - - conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...) - if err != nil { - return nil, fmt.Errorf("failed to create GRPC connection: %w", err) - } - - defer func() { - if retErr != nil { - conn.Close() - } - }() - - // TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. - - return conn, nil + return grpcDialContext(ctx, dialer.DialAddress(params.Address), onClose, gopts...) default: return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol) } } +// grpcDialContext and the underlying grpcConn type exist solely +// so we can have something similar to ttrpc.WithOnClose to have +// a callback run when the connection is severed or explicitly closed. +func grpcDialContext( + ctx context.Context, + target string, + onClose func(), + gopts ...grpc.DialOption, +) (*grpcConn, error) { + client, err := grpc.DialContext(ctx, target, gopts...) + if err != nil { + return nil, fmt.Errorf("failed to create GRPC connection: %w", err) + } + + done := make(chan struct{}) + go func() { + gctx := context.Background() + sourceState := connectivity.Ready + for { + if client.WaitForStateChange(gctx, sourceState) { + state := client.GetState() + if state == connectivity.Idle || state == connectivity.Shutdown { + break + } + // Could be transient failure. Lets see if we can get back to a working + // state. + log.G(gctx).WithFields(log.Fields{ + "state": state, + "addr": target, + }).Warn("shim grpc connection unexpected state") + sourceState = state + } + } + onClose() + close(done) + }() + + return &grpcConn{ + ClientConn: client, + onCloseDone: done, + }, nil +} + +type grpcConn struct { + *grpc.ClientConn + onCloseDone chan struct{} +} + +func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error { + select { + case <-gc.onCloseDone: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + type shim struct { bundle *Bundle client any @@ -300,6 +341,10 @@ func (s *shim) Close() error { return ttrpcClient.Close() } + if grpcClient, ok := s.client.(*grpcConn); ok { + return grpcClient.Close() + } + return nil } @@ -318,6 +363,16 @@ func (s *shim) Delete(ctx context.Context) error { } } + if grpcClient, ok := s.client.(*grpcConn); ok { + if err := grpcClient.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close grpc client: %w", err)) + } + + if err := grpcClient.UserOnCloseWait(ctx); err != nil { + result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) + } + } + if err := s.bundle.Delete(); err != nil { log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err)) From 4728800abc5d540540350c8096adbdfd47787984 Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Mon, 20 Feb 2023 18:29:56 -0800 Subject: [PATCH 2/2] runtime/v2: Get rid of last logrus.Fields usage https://github.com/containerd/containerd/pull/8143 added an alias for logrus.Fields and moved over most usages to this alias, but there was one straggler. Signed-off-by: Danny Canter --- runtime/v2/shim.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 1a7532b3b..86d2e40d6 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -29,7 +29,6 @@ import ( "github.com/containerd/ttrpc" "github.com/hashicorp/go-multierror" - "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" @@ -222,7 +221,7 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C params.Protocol = "ttrpc" } - log.G(ctx).WithFields(logrus.Fields{ + log.G(ctx).WithFields(log.Fields{ "address": params.Address, "protocol": params.Protocol, }).Debug("shim bootstrap parameters")