runtime/v2: Call onCloseWithShimLog for grpc shims

We pass in a callback using the ttrpc.WithOnClose functionality
for shims that use ttrpc, but with the newly added ability to use
GRPC for shims this was left as a follow-up. It doesn't seem like
grpc-go has anything similar so some options (that I could see) are:

This change introduces a new grpcConn wrapper type for the connection
that exposes a method to get notified when the users callback has run,
the same in functionality as TTRPC's `UserOnCloseWait`. The callback
gets passed in in a new `grpcDialContext` function that will:

1. Dial the connection as normal
2. Spin off a goroutine that will monitor the connections state
until it transitions to idle or shutdown and will then run the
callback.

Signed-off-by: Danny Canter <danny@dcantah.dev>
This commit is contained in:
Danny Canter 2023-02-15 16:08:14 -08:00
parent 8cb00f45c9
commit 4278fbbc7e

View File

@ -31,6 +31,7 @@ import (
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
@ -238,35 +239,75 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C
} }
}() }()
ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)) return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil
defer func() {
if retErr != nil {
ttrpcClient.Close()
}
}()
return ttrpcClient, nil
case "grpc": case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{ gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
return grpcDialContext(ctx, dialer.DialAddress(params.Address), onClose, gopts...)
default:
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol)
}
} }
conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...) // grpcDialContext and the underlying grpcConn type exist solely
// so we can have something similar to ttrpc.WithOnClose to have
// a callback run when the connection is severed or explicitly closed.
func grpcDialContext(
ctx context.Context,
target string,
onClose func(),
gopts ...grpc.DialOption,
) (*grpcConn, error) {
client, err := grpc.DialContext(ctx, target, gopts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err) return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
} }
defer func() { done := make(chan struct{})
if retErr != nil { go func() {
conn.Close() gctx := context.Background()
sourceState := connectivity.Ready
for {
if client.WaitForStateChange(gctx, sourceState) {
state := client.GetState()
if state == connectivity.Idle || state == connectivity.Shutdown {
break
} }
// Could be transient failure. Lets see if we can get back to a working
// state.
log.G(gctx).WithFields(log.Fields{
"state": state,
"addr": target,
}).Warn("shim grpc connection unexpected state")
sourceState = state
}
}
onClose()
close(done)
}() }()
// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. return &grpcConn{
ClientConn: client,
onCloseDone: done,
}, nil
}
return conn, nil type grpcConn struct {
default: *grpc.ClientConn
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol) onCloseDone chan struct{}
}
func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error {
select {
case <-gc.onCloseDone:
return nil
case <-ctx.Done():
return ctx.Err()
} }
} }
@ -300,6 +341,10 @@ func (s *shim) Close() error {
return ttrpcClient.Close() return ttrpcClient.Close()
} }
if grpcClient, ok := s.client.(*grpcConn); ok {
return grpcClient.Close()
}
return nil return nil
} }
@ -318,6 +363,16 @@ func (s *shim) Delete(ctx context.Context) error {
} }
} }
if grpcClient, ok := s.client.(*grpcConn); ok {
if err := grpcClient.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close grpc client: %w", err))
}
if err := grpcClient.UserOnCloseWait(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("close wait error: %w", err))
}
}
if err := s.bundle.Delete(); err != nil { if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle") log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err))