Add ttrpc support to content proxy

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2024-03-01 14:04:19 -08:00
parent 9104e6a24f
commit 347346e3cf
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
5 changed files with 96 additions and 11 deletions

View File

@ -27,7 +27,6 @@ import (
"time"
containersapi "github.com/containerd/containerd/v2/api/services/containers/v1"
contentapi "github.com/containerd/containerd/v2/api/services/content/v1"
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/v2/api/services/introspection/v1"
@ -622,7 +621,7 @@ func (c *Client) ContentStore() content.Store {
}
c.connMu.Lock()
defer c.connMu.Unlock()
return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
return contentproxy.NewContentStore(c.conn)
}
// SnapshotService returns the underlying snapshotter for the provided snapshotter name

View File

@ -46,7 +46,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
csapi "github.com/containerd/containerd/v2/api/services/content/v1"
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
sbapi "github.com/containerd/containerd/v2/api/services/sandbox/v1"
ssapi "github.com/containerd/containerd/v2/api/services/snapshots/v1"
@ -507,7 +506,7 @@ func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Regist
case string(plugins.ContentPlugin), "content":
t = plugins.ContentPlugin
f = func(conn *grpc.ClientConn) interface{} {
return csproxy.NewContentStore(csapi.NewContentClient(conn))
return csproxy.NewContentStore(conn)
}
case string(plugins.SandboxControllerPlugin), "sandbox":
t = plugins.SandboxControllerPlugin

View File

@ -27,7 +27,7 @@ type remoteReaderAt struct {
ctx context.Context
digest digest.Digest
size int64
client contentapi.ContentClient
client contentapi.TTRPCContentClient
}
func (ra *remoteReaderAt) Size() int64 {

View File

@ -18,6 +18,7 @@ package proxy
import (
"context"
"fmt"
"io"
contentapi "github.com/containerd/containerd/v2/api/services/content/v1"
@ -25,19 +26,41 @@ import (
"github.com/containerd/containerd/v2/protobuf"
protobuftypes "github.com/containerd/containerd/v2/protobuf/types"
"github.com/containerd/errdefs"
"github.com/containerd/ttrpc"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
type proxyContentStore struct {
client contentapi.ContentClient
// client is the rpc content client
// NOTE: ttrpc is used because it is the smaller interface shared with grpc
client contentapi.TTRPCContentClient
}
// NewContentStore returns a new content store which communicates over a GRPC
// connection using the containerd content GRPC API.
func NewContentStore(client contentapi.ContentClient) content.Store {
return &proxyContentStore{
client: client,
func NewContentStore(client any) content.Store {
switch c := client.(type) {
case contentapi.ContentClient:
return &proxyContentStore{
client: convertClient{c},
}
case grpc.ClientConnInterface:
return &proxyContentStore{
client: convertClient{contentapi.NewContentClient(c)},
}
case contentapi.TTRPCContentClient:
return &proxyContentStore{
client: c,
}
case *ttrpc.Client:
return &proxyContentStore{
client: contentapi.NewTTRPCContentClient(c),
}
default:
panic(fmt.Errorf("unsupported content client %T: %w", client, errdefs.ErrNotImplemented))
}
}
@ -191,7 +214,7 @@ func (pcs *proxyContentStore) Abort(ctx context.Context, ref string) error {
return nil
}
func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.TTRPCContent_WriteClient, int64, error) {
wrclient, err := pcs.client.Write(ctx)
if err != nil {
return nil, 0, err
@ -214,6 +237,70 @@ func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size in
return wrclient, resp.Offset, nil
}
type convertClient struct {
contentapi.ContentClient
}
func (c convertClient) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) {
return c.ContentClient.Info(ctx, req)
}
func (c convertClient) Update(ctx context.Context, req *contentapi.UpdateRequest) (*contentapi.UpdateResponse, error) {
return c.ContentClient.Update(ctx, req)
}
type convertListClient struct {
contentapi.Content_ListClient
}
func (c convertClient) List(ctx context.Context, req *contentapi.ListContentRequest) (contentapi.TTRPCContent_ListClient, error) {
lc, err := c.ContentClient.List(ctx, req)
if lc == nil {
return nil, err
}
return convertListClient{lc}, err
}
func (c convertClient) Delete(ctx context.Context, req *contentapi.DeleteContentRequest) (*emptypb.Empty, error) {
return c.ContentClient.Delete(ctx, req)
}
type convertReadClient struct {
contentapi.Content_ReadClient
}
func (c convertClient) Read(ctx context.Context, req *contentapi.ReadContentRequest) (contentapi.TTRPCContent_ReadClient, error) {
rc, err := c.ContentClient.Read(ctx, req)
if rc == nil {
return nil, err
}
return convertReadClient{rc}, err
}
func (c convertClient) Status(ctx context.Context, req *contentapi.StatusRequest) (*contentapi.StatusResponse, error) {
return c.ContentClient.Status(ctx, req)
}
func (c convertClient) ListStatuses(ctx context.Context, req *contentapi.ListStatusesRequest) (*contentapi.ListStatusesResponse, error) {
return c.ContentClient.ListStatuses(ctx, req)
}
type convertWriteClient struct {
contentapi.Content_WriteClient
}
func (c convertClient) Write(ctx context.Context) (contentapi.TTRPCContent_WriteClient, error) {
wc, err := c.ContentClient.Write(ctx)
if wc == nil {
return nil, err
}
return convertWriteClient{wc}, err
}
func (c convertClient) Abort(ctx context.Context, req *contentapi.AbortRequest) (*emptypb.Empty, error) {
return c.ContentClient.Abort(ctx, req)
}
func infoToGRPC(info *content.Info) *contentapi.Info {
return &contentapi.Info{
Digest: info.Digest.String(),

View File

@ -30,7 +30,7 @@ import (
type remoteWriter struct {
ref string
client contentapi.Content_WriteClient
client contentapi.TTRPCContent_WriteClient
offset int64
digest digest.Digest
}