Merge pull request #8120 from dcantah/grpc-shim-callbackfn

runtime/v2: Call onCloseWithShimLog for grpc shims
This commit is contained in:
Maksym Pavlenko 2023-02-22 10:29:11 -08:00 committed by GitHub
commit 921f49b5f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,8 +29,8 @@ import (
"github.com/containerd/ttrpc"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
eventstypes "github.com/containerd/containerd/api/events"
@ -221,7 +221,7 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C
params.Protocol = "ttrpc"
}
log.G(ctx).WithFields(logrus.Fields{
log.G(ctx).WithFields(log.Fields{
"address": params.Address,
"protocol": params.Protocol,
}).Debug("shim bootstrap parameters")
@ -238,38 +238,78 @@ func makeConnection(ctx context.Context, address string, onClose func()) (_ io.C
}
}()
ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
defer func() {
if retErr != nil {
ttrpcClient.Close()
}
}()
return ttrpcClient, nil
return ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose)), nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, dialer.DialAddress(params.Address), gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}
defer func() {
if retErr != nil {
conn.Close()
}
}()
// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed.
return conn, nil
return grpcDialContext(ctx, dialer.DialAddress(params.Address), onClose, gopts...)
default:
return nil, fmt.Errorf("unexpected protocol: %q", params.Protocol)
}
}
// grpcDialContext and the underlying grpcConn type exist solely
// so we can have something similar to ttrpc.WithOnClose to have
// a callback run when the connection is severed or explicitly closed.
func grpcDialContext(
ctx context.Context,
target string,
onClose func(),
gopts ...grpc.DialOption,
) (*grpcConn, error) {
client, err := grpc.DialContext(ctx, target, gopts...)
if err != nil {
return nil, fmt.Errorf("failed to create GRPC connection: %w", err)
}
done := make(chan struct{})
go func() {
gctx := context.Background()
sourceState := connectivity.Ready
for {
if client.WaitForStateChange(gctx, sourceState) {
state := client.GetState()
if state == connectivity.Idle || state == connectivity.Shutdown {
break
}
// Could be transient failure. Lets see if we can get back to a working
// state.
log.G(gctx).WithFields(log.Fields{
"state": state,
"addr": target,
}).Warn("shim grpc connection unexpected state")
sourceState = state
}
}
onClose()
close(done)
}()
return &grpcConn{
ClientConn: client,
onCloseDone: done,
}, nil
}
type grpcConn struct {
*grpc.ClientConn
onCloseDone chan struct{}
}
func (gc *grpcConn) UserOnCloseWait(ctx context.Context) error {
select {
case <-gc.onCloseDone:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
type shim struct {
bundle *Bundle
client any
@ -300,6 +340,10 @@ func (s *shim) Close() error {
return ttrpcClient.Close()
}
if grpcClient, ok := s.client.(*grpcConn); ok {
return grpcClient.Close()
}
return nil
}
@ -318,6 +362,16 @@ func (s *shim) Delete(ctx context.Context) error {
}
}
if grpcClient, ok := s.client.(*grpcConn); ok {
if err := grpcClient.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close grpc client: %w", err))
}
if err := grpcClient.UserOnCloseWait(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("close wait error: %w", err))
}
}
if err := s.bundle.Delete(); err != nil {
log.G(ctx).WithField("id", s.ID()).WithError(err).Error("failed to delete bundle")
result = multierror.Append(result, fmt.Errorf("failed to delete bundle: %w", err))