Initial GRPC client support
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
		| @@ -23,8 +23,13 @@ import ( | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/containerd/ttrpc" | ||||
| 	"github.com/hashicorp/go-multierror" | ||||
| 	"google.golang.org/grpc" | ||||
|  | ||||
| 	eventstypes "github.com/containerd/containerd/api/events" | ||||
| 	"github.com/containerd/containerd/api/runtime/task/v2" | ||||
| 	"github.com/containerd/containerd/api/types" | ||||
| @@ -32,13 +37,12 @@ import ( | ||||
| 	"github.com/containerd/containerd/events/exchange" | ||||
| 	"github.com/containerd/containerd/identifiers" | ||||
| 	"github.com/containerd/containerd/log" | ||||
| 	"github.com/containerd/containerd/pkg/dialer" | ||||
| 	"github.com/containerd/containerd/pkg/timeout" | ||||
| 	"github.com/containerd/containerd/protobuf" | ||||
| 	ptypes "github.com/containerd/containerd/protobuf/types" | ||||
| 	"github.com/containerd/containerd/runtime" | ||||
| 	client "github.com/containerd/containerd/runtime/v2/shim" | ||||
| 	"github.com/containerd/ttrpc" | ||||
| 	"github.com/hashicorp/go-multierror" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| @@ -61,23 +65,18 @@ func loadAddress(path string) (string, error) { | ||||
| 	return string(data), nil | ||||
| } | ||||
|  | ||||
| func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, err error) { | ||||
| func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, retErr error) { | ||||
| 	address, err := loadAddress(filepath.Join(bundle.Path, "address")) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	conn, err := client.Connect(address, client.AnonReconnectDialer) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 			conn.Close() | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	// TODO: figure out how to negotiate protocol between containerd and shim | ||||
| 	isGRPC := strings.Contains(strings.ToLower(address), "grpc") | ||||
|  | ||||
| 	shimCtx, cancelShimLog := context.WithCancel(ctx) | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 		if retErr != nil { | ||||
| 			cancelShimLog() | ||||
| 		} | ||||
| 	}() | ||||
| @@ -86,7 +85,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan | ||||
| 		return nil, fmt.Errorf("open shim log pipe when reload: %w", err) | ||||
| 	} | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 		if retErr != nil { | ||||
| 			f.Close() | ||||
| 		} | ||||
| 	}() | ||||
| @@ -109,16 +108,51 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan | ||||
| 		cancelShimLog() | ||||
| 		f.Close() | ||||
| 	} | ||||
| 	client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) | ||||
| 	defer func() { | ||||
|  | ||||
| 	shim := &shim{bundle: bundle} | ||||
|  | ||||
| 	if !isGRPC { | ||||
| 		conn, err := client.Connect(address, client.AnonReconnectDialer) | ||||
| 		if err != nil { | ||||
| 			client.Close() | ||||
| 			return nil, fmt.Errorf("failed to create TTRPC connection: %w", err) | ||||
| 		} | ||||
| 	}() | ||||
| 	shim := &shim{ | ||||
| 		bundle: bundle, | ||||
| 		client: client, | ||||
| 		defer func() { | ||||
| 			if retErr != nil { | ||||
| 				conn.Close() | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		ttrpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog)) | ||||
| 		defer func() { | ||||
| 			if retErr != nil { | ||||
| 				ttrpcClient.Close() | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		shim.client = ttrpcClient | ||||
| 	} else { | ||||
| 		// GRPC shim | ||||
|  | ||||
| 		var ( | ||||
| 			gopts []grpc.DialOption | ||||
| 		) | ||||
|  | ||||
| 		conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("failed to create GRPC connection: %w", err) | ||||
| 		} | ||||
|  | ||||
| 		defer func() { | ||||
| 			if retErr != nil { | ||||
| 				conn.Close() | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		// TODO: figure out how to invoke onCloseWithShimLog callback when shim connection is closed. | ||||
|  | ||||
| 		shim.client = conn | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := timeout.WithContext(ctx, loadTimeout) | ||||
| 	defer cancel() | ||||
|  | ||||
| @@ -131,6 +165,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan | ||||
| 	if _, err := s.PID(ctx); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return shim, nil | ||||
| } | ||||
|  | ||||
| @@ -189,15 +224,16 @@ type ShimInstance interface { | ||||
| 	Namespace() string | ||||
| 	// Bundle is a file system path to shim's bundle. | ||||
| 	Bundle() string | ||||
| 	// Client returns the underlying TTRPC client for this shim. | ||||
| 	Client() interface{} | ||||
| 	// Client returns the underlying TTRPC or GRPC client object for this shim. | ||||
| 	// The underlying object can be either *ttrpc.Client or grpc.ClientConnInterface. | ||||
| 	Client() any | ||||
| 	// Delete will close the client and remove bundle from disk. | ||||
| 	Delete(ctx context.Context) error | ||||
| } | ||||
|  | ||||
| type shim struct { | ||||
| 	bundle *Bundle | ||||
| 	client *ttrpc.Client | ||||
| 	client any | ||||
| } | ||||
|  | ||||
| var _ ShimInstance = (*shim)(nil) | ||||
| @@ -215,13 +251,17 @@ func (s *shim) Bundle() string { | ||||
| 	return s.bundle.Path | ||||
| } | ||||
|  | ||||
| func (s *shim) Client() interface{} { | ||||
| func (s *shim) Client() any { | ||||
| 	return s.client | ||||
| } | ||||
|  | ||||
| // Close closes the underlying client connection. | ||||
| func (s *shim) Close() error { | ||||
| 	return s.client.Close() | ||||
| 	if ttrpcClient, ok := s.client.(*ttrpc.Client); ok { | ||||
| 		return ttrpcClient.Close() | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *shim) Delete(ctx context.Context) error { | ||||
| @@ -229,12 +269,14 @@ func (s *shim) Delete(ctx context.Context) error { | ||||
| 		result *multierror.Error | ||||
| 	) | ||||
|  | ||||
| 	if err := s.client.Close(); err != nil { | ||||
| 		result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) | ||||
| 	} | ||||
| 	if ttrpcClient, ok := s.client.(*ttrpc.Client); ok { | ||||
| 		if err := ttrpcClient.Close(); err != nil { | ||||
| 			result = multierror.Append(result, fmt.Errorf("failed to close ttrpc client: %w", err)) | ||||
| 		} | ||||
|  | ||||
| 	if err := s.client.UserOnCloseWait(ctx); err != nil { | ||||
| 		result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) | ||||
| 		if err := ttrpcClient.UserOnCloseWait(ctx); err != nil { | ||||
| 			result = multierror.Append(result, fmt.Errorf("close wait error: %w", err)) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := s.bundle.Delete(); err != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Maksym Pavlenko
					Maksym Pavlenko