Update transfer proxy to support ttrpc

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2024-05-02 17:11:45 -07:00
parent ec04e4f638
commit 05a3171bb4
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 43 additions and 9 deletions

View File

@ -19,7 +19,6 @@ package client
import ( import (
"context" "context"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
"github.com/containerd/containerd/v2/core/streaming" "github.com/containerd/containerd/v2/core/streaming"
streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy" streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy"
"github.com/containerd/containerd/v2/core/transfer" "github.com/containerd/containerd/v2/core/transfer"
@ -33,7 +32,7 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}
} }
defer done(ctx) defer done(ctx)
return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...) return proxy.NewTransferrer(c.conn, c.streamCreator()).Transfer(ctx, src, dest, opts...)
} }
func (c *Client) streamCreator() streaming.StreamCreator { func (c *Client) streamCreator() streaming.StreamCreator {

View File

@ -19,9 +19,12 @@ package proxy
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb"
transferapi "github.com/containerd/containerd/api/services/transfer/v1" transferapi "github.com/containerd/containerd/api/services/transfer/v1"
transfertypes "github.com/containerd/containerd/api/types/transfer" transfertypes "github.com/containerd/containerd/api/types/transfer"
@ -29,23 +32,55 @@ import (
"github.com/containerd/containerd/v2/core/transfer" "github.com/containerd/containerd/v2/core/transfer"
tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming" tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/oci" "github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/containerd/log" "github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2" "github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
type proxyTransferrer struct { type proxyTransferrer struct {
client transferapi.TransferClient client transferapi.TTRPCTransferService
streamCreator streaming.StreamCreator streamCreator streaming.StreamCreator
} }
// NewTransferrer returns a new transferrer which communicates over a GRPC // NewTransferrer returns a new transferrer which can communicate over a GRPC
// connection using the containerd transfer API // or TTRPC connection using the containerd transfer API
func NewTransferrer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferrer { func NewTransferrer(client any, sc streaming.StreamCreator) transfer.Transferrer {
switch c := client.(type) {
case transferapi.TransferClient:
return &proxyTransferrer{ return &proxyTransferrer{
client: client, client: convertClient{c},
streamCreator: sc, streamCreator: sc,
} }
case grpc.ClientConnInterface:
return &proxyTransferrer{
client: convertClient{transferapi.NewTransferClient(c)},
streamCreator: sc,
}
case transferapi.TTRPCTransferService:
return &proxyTransferrer{
client: c,
streamCreator: sc,
}
case *ttrpc.Client:
return &proxyTransferrer{
client: transferapi.NewTTRPCTransferClient(c),
streamCreator: sc,
}
case transfer.Transferrer:
return c
default:
panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
}
}
type convertClient struct {
transferapi.TransferClient
}
func (c convertClient) Transfer(ctx context.Context, r *transferapi.TransferRequest) (*emptypb.Empty, error) {
return c.TransferClient.Transfer(ctx, r)
} }
func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {