Merge pull request #10163 from dmcgowan/transfer-ttrpc-support
Add support for ttrpc in transfer and streaming service
This commit is contained in:
		@@ -18,17 +18,11 @@ package client
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
					 | 
				
			||||||
	"io"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
 | 
					 | 
				
			||||||
	transferapi "github.com/containerd/containerd/api/services/transfer/v1"
 | 
					 | 
				
			||||||
	"github.com/containerd/containerd/v2/core/streaming"
 | 
						"github.com/containerd/containerd/v2/core/streaming"
 | 
				
			||||||
 | 
						streamproxy "github.com/containerd/containerd/v2/core/streaming/proxy"
 | 
				
			||||||
	"github.com/containerd/containerd/v2/core/transfer"
 | 
						"github.com/containerd/containerd/v2/core/transfer"
 | 
				
			||||||
	"github.com/containerd/containerd/v2/core/transfer/proxy"
 | 
						"github.com/containerd/containerd/v2/core/transfer/proxy"
 | 
				
			||||||
	"github.com/containerd/containerd/v2/pkg/protobuf"
 | 
					 | 
				
			||||||
	"github.com/containerd/errdefs"
 | 
					 | 
				
			||||||
	"github.com/containerd/typeurl/v2"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
 | 
					func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
 | 
				
			||||||
@@ -38,72 +32,9 @@ func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	defer done(ctx)
 | 
						defer done(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
 | 
						return proxy.NewTransferrer(c.conn, c.streamCreator()).Transfer(ctx, src, dest, opts...)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *Client) streamCreator() streaming.StreamCreator {
 | 
					func (c *Client) streamCreator() streaming.StreamCreator {
 | 
				
			||||||
	return &streamCreator{
 | 
						return streamproxy.NewStreamCreator(c.conn)
 | 
				
			||||||
		client: streamingapi.NewStreamingClient(c.conn),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type streamCreator struct {
 | 
					 | 
				
			||||||
	client streamingapi.StreamingClient
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
 | 
					 | 
				
			||||||
	stream, err := sc.client.Stream(ctx)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
 | 
					 | 
				
			||||||
		ID: id,
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	err = stream.Send(protobuf.FromAny(a))
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if !errors.Is(err, io.EOF) {
 | 
					 | 
				
			||||||
			err = errdefs.FromGRPC(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Receive an ack that stream is init and ready
 | 
					 | 
				
			||||||
	if _, err = stream.Recv(); err != nil {
 | 
					 | 
				
			||||||
		if !errors.Is(err, io.EOF) {
 | 
					 | 
				
			||||||
			err = errdefs.FromGRPC(err)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return &clientStream{
 | 
					 | 
				
			||||||
		s: stream,
 | 
					 | 
				
			||||||
	}, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type clientStream struct {
 | 
					 | 
				
			||||||
	s streamingapi.Streaming_StreamClient
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cs *clientStream) Send(a typeurl.Any) (err error) {
 | 
					 | 
				
			||||||
	err = cs.s.Send(protobuf.FromAny(a))
 | 
					 | 
				
			||||||
	if !errors.Is(err, io.EOF) {
 | 
					 | 
				
			||||||
		err = errdefs.FromGRPC(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cs *clientStream) Recv() (a typeurl.Any, err error) {
 | 
					 | 
				
			||||||
	a, err = cs.s.Recv()
 | 
					 | 
				
			||||||
	if !errors.Is(err, io.EOF) {
 | 
					 | 
				
			||||||
		err = errdefs.FromGRPC(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (cs *clientStream) Close() error {
 | 
					 | 
				
			||||||
	return cs.s.CloseSend()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										128
									
								
								core/streaming/proxy/streaming.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										128
									
								
								core/streaming/proxy/streaming.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,128 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					   Copyright The containerd Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					   you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					   You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					       http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					   distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					   See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					   limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package proxy
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
 | 
				
			||||||
 | 
						"github.com/containerd/containerd/v2/core/streaming"
 | 
				
			||||||
 | 
						"github.com/containerd/containerd/v2/pkg/protobuf"
 | 
				
			||||||
 | 
						"github.com/containerd/errdefs"
 | 
				
			||||||
 | 
						"github.com/containerd/ttrpc"
 | 
				
			||||||
 | 
						"github.com/containerd/typeurl/v2"
 | 
				
			||||||
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewStreamCreator returns a new stream creator which can communicate over a GRPC
 | 
				
			||||||
 | 
					// or TTRPC connection using the containerd streaming API.
 | 
				
			||||||
 | 
					func NewStreamCreator(client any) streaming.StreamCreator {
 | 
				
			||||||
 | 
						switch c := client.(type) {
 | 
				
			||||||
 | 
						case streamingapi.StreamingClient:
 | 
				
			||||||
 | 
							return &streamCreator{
 | 
				
			||||||
 | 
								client: convertClient{c},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case grpc.ClientConnInterface:
 | 
				
			||||||
 | 
							return &streamCreator{
 | 
				
			||||||
 | 
								client: convertClient{streamingapi.NewStreamingClient(c)},
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case streamingapi.TTRPCStreamingClient:
 | 
				
			||||||
 | 
							return &streamCreator{
 | 
				
			||||||
 | 
								client: c,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case *ttrpc.Client:
 | 
				
			||||||
 | 
							return &streamCreator{
 | 
				
			||||||
 | 
								client: streamingapi.NewTTRPCStreamingClient(c),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case streaming.StreamCreator:
 | 
				
			||||||
 | 
							return c
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type convertClient struct {
 | 
				
			||||||
 | 
						streamingapi.StreamingClient
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c convertClient) Stream(ctx context.Context) (streamingapi.TTRPCStreaming_StreamClient, error) {
 | 
				
			||||||
 | 
						return c.StreamingClient.Stream(ctx)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type streamCreator struct {
 | 
				
			||||||
 | 
						client streamingapi.TTRPCStreamingClient
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
 | 
				
			||||||
 | 
						stream, err := sc.client.Stream(ctx)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
 | 
				
			||||||
 | 
							ID: id,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = stream.Send(protobuf.FromAny(a))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							if !errors.Is(err, io.EOF) {
 | 
				
			||||||
 | 
								err = errdefs.FromGRPC(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Receive an ack that stream is init and ready
 | 
				
			||||||
 | 
						if _, err = stream.Recv(); err != nil {
 | 
				
			||||||
 | 
							if !errors.Is(err, io.EOF) {
 | 
				
			||||||
 | 
								err = errdefs.FromGRPC(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &clientStream{
 | 
				
			||||||
 | 
							s: stream,
 | 
				
			||||||
 | 
						}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type clientStream struct {
 | 
				
			||||||
 | 
						s streamingapi.TTRPCStreaming_StreamClient
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (cs *clientStream) Send(a typeurl.Any) (err error) {
 | 
				
			||||||
 | 
						err = cs.s.Send(protobuf.FromAny(a))
 | 
				
			||||||
 | 
						if !errors.Is(err, io.EOF) {
 | 
				
			||||||
 | 
							err = errdefs.FromGRPC(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (cs *clientStream) Recv() (a typeurl.Any, err error) {
 | 
				
			||||||
 | 
						a, err = cs.s.Recv()
 | 
				
			||||||
 | 
						if !errors.Is(err, io.EOF) {
 | 
				
			||||||
 | 
							err = errdefs.FromGRPC(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (cs *clientStream) Close() error {
 | 
				
			||||||
 | 
						return cs.s.CloseSend()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -19,9 +19,12 @@ package proxy
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"google.golang.org/grpc"
 | 
				
			||||||
	"google.golang.org/protobuf/types/known/anypb"
 | 
						"google.golang.org/protobuf/types/known/anypb"
 | 
				
			||||||
 | 
						"google.golang.org/protobuf/types/known/emptypb"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	transferapi "github.com/containerd/containerd/api/services/transfer/v1"
 | 
						transferapi "github.com/containerd/containerd/api/services/transfer/v1"
 | 
				
			||||||
	transfertypes "github.com/containerd/containerd/api/types/transfer"
 | 
						transfertypes "github.com/containerd/containerd/api/types/transfer"
 | 
				
			||||||
@@ -29,23 +32,55 @@ import (
 | 
				
			|||||||
	"github.com/containerd/containerd/v2/core/transfer"
 | 
						"github.com/containerd/containerd/v2/core/transfer"
 | 
				
			||||||
	tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
 | 
						tstreaming "github.com/containerd/containerd/v2/core/transfer/streaming"
 | 
				
			||||||
	"github.com/containerd/containerd/v2/pkg/oci"
 | 
						"github.com/containerd/containerd/v2/pkg/oci"
 | 
				
			||||||
 | 
						"github.com/containerd/errdefs"
 | 
				
			||||||
	"github.com/containerd/log"
 | 
						"github.com/containerd/log"
 | 
				
			||||||
 | 
						"github.com/containerd/ttrpc"
 | 
				
			||||||
	"github.com/containerd/typeurl/v2"
 | 
						"github.com/containerd/typeurl/v2"
 | 
				
			||||||
	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | 
						ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type proxyTransferrer struct {
 | 
					type proxyTransferrer struct {
 | 
				
			||||||
	client        transferapi.TransferClient
 | 
						client        transferapi.TTRPCTransferService
 | 
				
			||||||
	streamCreator streaming.StreamCreator
 | 
						streamCreator streaming.StreamCreator
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewTransferrer returns a new transferrer which communicates over a GRPC
 | 
					// NewTransferrer returns a new transferrer which can communicate over a GRPC
 | 
				
			||||||
// connection using the containerd transfer API
 | 
					// or TTRPC connection using the containerd transfer API
 | 
				
			||||||
func NewTransferrer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferrer {
 | 
					func NewTransferrer(client any, sc streaming.StreamCreator) transfer.Transferrer {
 | 
				
			||||||
 | 
						switch c := client.(type) {
 | 
				
			||||||
 | 
						case transferapi.TransferClient:
 | 
				
			||||||
		return &proxyTransferrer{
 | 
							return &proxyTransferrer{
 | 
				
			||||||
		client:        client,
 | 
								client:        convertClient{c},
 | 
				
			||||||
			streamCreator: sc,
 | 
								streamCreator: sc,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
						case grpc.ClientConnInterface:
 | 
				
			||||||
 | 
							return &proxyTransferrer{
 | 
				
			||||||
 | 
								client:        convertClient{transferapi.NewTransferClient(c)},
 | 
				
			||||||
 | 
								streamCreator: sc,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case transferapi.TTRPCTransferService:
 | 
				
			||||||
 | 
							return &proxyTransferrer{
 | 
				
			||||||
 | 
								client:        c,
 | 
				
			||||||
 | 
								streamCreator: sc,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case *ttrpc.Client:
 | 
				
			||||||
 | 
							return &proxyTransferrer{
 | 
				
			||||||
 | 
								client:        transferapi.NewTTRPCTransferClient(c),
 | 
				
			||||||
 | 
								streamCreator: sc,
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case transfer.Transferrer:
 | 
				
			||||||
 | 
							return c
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							panic(fmt.Errorf("unsupported stream client %T: %w", client, errdefs.ErrNotImplemented))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type convertClient struct {
 | 
				
			||||||
 | 
						transferapi.TransferClient
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c convertClient) Transfer(ctx context.Context, r *transferapi.TransferRequest) (*emptypb.Empty, error) {
 | 
				
			||||||
 | 
						return c.TransferClient.Transfer(ctx, r)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
 | 
					func (p *proxyTransferrer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user