From ec04e4f638cae8cfe61ab9f193d4e4d9f46ea790 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 2 May 2024 16:57:17 -0700 Subject: [PATCH 1/2] Add streaming proxy Signed-off-by: Derek McGowan Signed-off-by: Derek McGowan --- client/transfer.go | 72 +---------------- core/streaming/proxy/streaming.go | 128 ++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 70 deletions(-) create mode 100644 core/streaming/proxy/streaming.go diff --git a/client/transfer.go b/client/transfer.go index 6b62d1459..fb8e5e7d2 100644 --- a/client/transfer.go +++ b/client/transfer.go @@ -18,17 +18,12 @@ package client import ( "context" - "errors" - "io" - streamingapi "github.com/containerd/containerd/api/services/streaming/v1" transferapi "github.com/containerd/containerd/api/services/transfer/v1" "github.com/containerd/containerd/v2/core/streaming" + streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy" "github.com/containerd/containerd/v2/core/transfer" "github.com/containerd/containerd/v2/core/transfer/proxy" - "github.com/containerd/containerd/v2/pkg/protobuf" - "github.com/containerd/errdefs" - "github.com/containerd/typeurl/v2" ) func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { @@ -42,68 +37,5 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{} } func (c *Client) streamCreator() streaming.StreamCreator { - return &streamCreator{ - client: streamingapi.NewStreamingClient(c.conn), - } -} - -type streamCreator struct { - client streamingapi.StreamingClient -} - -func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) { - stream, err := sc.client.Stream(ctx) - if err != nil { - return nil, err - } - - a, err := typeurl.MarshalAny(&streamingapi.StreamInit{ - ID: id, - }) - if err != nil { - return nil, err - } - err = stream.Send(protobuf.FromAny(a)) - if err != nil { - if !errors.Is(err, io.EOF) { - err = errdefs.FromGRPC(err) - } - return nil, err - } - - // Receive an ack that stream is init and ready - if _, err = stream.Recv(); err != nil { - if !errors.Is(err, io.EOF) { - err = errdefs.FromGRPC(err) - } - return nil, err - } - - return &clientStream{ - s: stream, - }, nil -} - -type clientStream struct { - s streamingapi.Streaming_StreamClient -} - -func (cs *clientStream) Send(a typeurl.Any) (err error) { - err = cs.s.Send(protobuf.FromAny(a)) - if !errors.Is(err, io.EOF) { - err = errdefs.FromGRPC(err) - } - return -} - -func (cs *clientStream) Recv() (a typeurl.Any, err error) { - a, err = cs.s.Recv() - if !errors.Is(err, io.EOF) { - err = errdefs.FromGRPC(err) - } - return -} - -func (cs *clientStream) Close() error { - return cs.s.CloseSend() + return streamproxy.NewStreamCreator(c.conn) } diff --git a/core/streaming/proxy/streaming.go b/core/streaming/proxy/streaming.go new file mode 100644 index 000000000..11c66e70e --- /dev/null +++ b/core/streaming/proxy/streaming.go @@ -0,0 +1,128 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package proxy + +import ( + "context" + "errors" + "fmt" + "io" + + streamingapi "github.com/containerd/containerd/api/services/streaming/v1" + "github.com/containerd/containerd/v2/core/streaming" + "github.com/containerd/containerd/v2/pkg/protobuf" + "github.com/containerd/errdefs" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl/v2" + "google.golang.org/grpc" +) + +// NewStreamCreator returns a new stream creator which can communicate over a GRPC +// or TTRPC connection using the containerd streaming API. +func NewStreamCreator(client any) streaming.StreamCreator { + switch c := client.(type) { + case streamingapi.StreamingClient: + return &streamCreator{ + client: convertClient{c}, + } + case grpc.ClientConnInterface: + return &streamCreator{ + client: convertClient{streamingapi.NewStreamingClient(c)}, + } + case streamingapi.TTRPCStreamingClient: + return &streamCreator{ + client: c, + } + case *ttrpc.Client: + return &streamCreator{ + client: streamingapi.NewTTRPCStreamingClient(c), + } + case streaming.StreamCreator: + return c + default: + panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented)) + } +} + +type convertClient struct { + streamingapi.StreamingClient +} + +func (c convertClient) Stream(ctx context.Context) (streamingapi.TTRPCStreaming_StreamClient, error) { + return c.StreamingClient.Stream(ctx) +} + +type streamCreator struct { + client streamingapi.TTRPCStreamingClient +} + +func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) { + stream, err := sc.client.Stream(ctx) + if err != nil { + return nil, err + } + + a, err := typeurl.MarshalAny(&streamingapi.StreamInit{ + ID: id, + }) + if err != nil { + return nil, err + } + err = stream.Send(protobuf.FromAny(a)) + if err != nil { + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return nil, err + } + + // Receive an ack that stream is init and ready + if _, err = stream.Recv(); err != nil { + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return nil, err + } + + return &clientStream{ + s: stream, + }, nil +} + +type clientStream struct { + s streamingapi.TTRPCStreaming_StreamClient +} + +func (cs *clientStream) Send(a typeurl.Any) (err error) { + err = cs.s.Send(protobuf.FromAny(a)) + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return +} + +func (cs *clientStream) Recv() (a typeurl.Any, err error) { + a, err = cs.s.Recv() + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return +} + +func (cs *clientStream) Close() error { + return cs.s.CloseSend() +} From 05a3171bb4e334c6ea1fb500365dcae67fb8ab40 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 2 May 2024 17:11:45 -0700 Subject: [PATCH 2/2] Update transfer proxy to support ttrpc Signed-off-by: Derek McGowan --- client/transfer.go | 3 +- core/transfer/proxy/transfer.go | 49 ++++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/client/transfer.go b/client/transfer.go index fb8e5e7d2..5bd02aa6d 100644 --- a/client/transfer.go +++ b/client/transfer.go @@ -19,7 +19,6 @@ package client import ( "context" - transferapi "github.com/containerd/containerd/api/services/transfer/v1" "github.com/containerd/containerd/v2/core/streaming" streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy" "github.com/containerd/containerd/v2/core/transfer" @@ -33,7 +32,7 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{} } defer done(ctx) - return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...) + return proxy.NewTransferrer(c.conn, c.streamCreator()).Transfer(ctx, src, dest, opts...) } func (c *Client) streamCreator() streaming.StreamCreator { diff --git a/core/transfer/proxy/transfer.go b/core/transfer/proxy/transfer.go index b2582e4b6..f314f6fe0 100644 --- a/core/transfer/proxy/transfer.go +++ b/core/transfer/proxy/transfer.go @@ -19,9 +19,12 @@ package proxy import ( "context" "errors" + "fmt" "io" + "google.golang.org/grpc" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/emptypb" transferapi "github.com/containerd/containerd/api/services/transfer/v1" transfertypes "github.com/containerd/containerd/api/types/transfer" @@ -29,25 +32,57 @@ import ( "github.com/containerd/containerd/v2/core/transfer" tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming" "github.com/containerd/containerd/v2/pkg/oci" + "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/containerd/ttrpc" "github.com/containerd/typeurl/v2" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) type proxyTransferrer struct { - client transferapi.TransferClient + client transferapi.TTRPCTransferService streamCreator streaming.StreamCreator } -// NewTransferrer returns a new transferrer which communicates over a GRPC -// connection using the containerd transfer API -func NewTransferrer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferrer { - return &proxyTransferrer{ - client: client, - streamCreator: sc, +// NewTransferrer returns a new transferrer which can communicate over a GRPC +// or TTRPC connection using the containerd transfer API +func NewTransferrer(client any, sc streaming.StreamCreator) transfer.Transferrer { + switch c := client.(type) { + case transferapi.TransferClient: + return &proxyTransferrer{ + client: convertClient{c}, + streamCreator: sc, + } + case grpc.ClientConnInterface: + return &proxyTransferrer{ + client: convertClient{transferapi.NewTransferClient(c)}, + streamCreator: sc, + } + case transferapi.TTRPCTransferService: + return &proxyTransferrer{ + client: c, + streamCreator: sc, + } + case *ttrpc.Client: + return &proxyTransferrer{ + client: transferapi.NewTTRPCTransferClient(c), + streamCreator: sc, + } + case transfer.Transferrer: + return c + default: + panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented)) } } +type convertClient struct { + transferapi.TransferClient +} + +func (c convertClient) Transfer(ctx context.Context, r *transferapi.TransferRequest) (*emptypb.Empty, error) { + return c.TransferClient.Transfer(ctx, r) +} + func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { o := &transfer.Config{} for _, opt := range opts {