From 05a3171bb4e334c6ea1fb500365dcae67fb8ab40 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 2 May 2024 17:11:45 -0700 Subject: [PATCH] Update transfer proxy to support ttrpc Signed-off-by: Derek McGowan --- client/transfer.go | 3 +- core/transfer/proxy/transfer.go | 49 ++++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/client/transfer.go b/client/transfer.go index fb8e5e7d2..5bd02aa6d 100644 --- a/client/transfer.go +++ b/client/transfer.go @@ -19,7 +19,6 @@ package client import ( "context" - transferapi "github.com/containerd/containerd/api/services/transfer/v1" "github.com/containerd/containerd/v2/core/streaming" streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy" "github.com/containerd/containerd/v2/core/transfer" @@ -33,7 +32,7 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{} } defer done(ctx) - return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...) + return proxy.NewTransferrer(c.conn, c.streamCreator()).Transfer(ctx, src, dest, opts...) } func (c *Client) streamCreator() streaming.StreamCreator { diff --git a/core/transfer/proxy/transfer.go b/core/transfer/proxy/transfer.go index b2582e4b6..f314f6fe0 100644 --- a/core/transfer/proxy/transfer.go +++ b/core/transfer/proxy/transfer.go @@ -19,9 +19,12 @@ package proxy import ( "context" "errors" + "fmt" "io" + "google.golang.org/grpc" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/emptypb" transferapi "github.com/containerd/containerd/api/services/transfer/v1" transfertypes "github.com/containerd/containerd/api/types/transfer" @@ -29,25 +32,57 @@ import ( "github.com/containerd/containerd/v2/core/transfer" tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/containerd/ttrpc" "github.com/containerd/typeurl/v2" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) type proxyTransferrer struct { - client transferapi.TransferClient + client transferapi.TTRPCTransferService streamCreator streaming.StreamCreator } -// NewTransferrer returns a new transferrer which communicates over a GRPC -// connection using the containerd transfer API -func NewTransferrer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferrer { - return &proxyTransferrer{ - client: client, - streamCreator: sc, +// NewTransferrer returns a new transferrer which can communicate over a GRPC +// or TTRPC connection using the containerd transfer API +func NewTransferrer(client any, sc streaming.StreamCreator) transfer.Transferrer { + switch c := client.(type) { + case transferapi.TransferClient: + return &proxyTransferrer{ + client: convertClient{c}, + streamCreator: sc, + } + case grpc.ClientConnInterface: + return &proxyTransferrer{ + client: convertClient{transferapi.NewTransferClient(c)}, + streamCreator: sc, + } + case transferapi.TTRPCTransferService: + return &proxyTransferrer{ + client: c, + streamCreator: sc, + } + case *ttrpc.Client: + return &proxyTransferrer{ + client: transferapi.NewTTRPCTransferClient(c), + streamCreator: sc, + } + case transfer.Transferrer: + return c + default: + panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented)) } } +type convertClient struct { + transferapi.TransferClient +} + +func (c convertClient) Transfer(ctx context.Context, r *transferapi.TransferRequest) (*emptypb.Empty, error) { + return c.TransferClient.Transfer(ctx, r) +} + func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { o := &transfer.Config{} for _, opt := range opts {