From 347346e3cf26d2edd52dae967ee2653410cce68d Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 1 Mar 2024 14:04:19 -0800 Subject: [PATCH] Add ttrpc support to content proxy Signed-off-by: Derek McGowan --- client/client.go | 3 +- cmd/containerd/server/server.go | 3 +- core/content/proxy/content_reader.go | 2 +- core/content/proxy/content_store.go | 97 ++++++++++++++++++++++++++-- core/content/proxy/content_writer.go | 2 +- 5 files changed, 96 insertions(+), 11 deletions(-) diff --git a/client/client.go b/client/client.go index a84376120..b8c0566ca 100644 --- a/client/client.go +++ b/client/client.go @@ -27,7 +27,6 @@ import ( "time" containersapi "github.com/containerd/containerd/v2/api/services/containers/v1" - contentapi "github.com/containerd/containerd/v2/api/services/content/v1" diffapi "github.com/containerd/containerd/v2/api/services/diff/v1" imagesapi "github.com/containerd/containerd/v2/api/services/images/v1" introspectionapi "github.com/containerd/containerd/v2/api/services/introspection/v1" @@ -622,7 +621,7 @@ func (c *Client) ContentStore() content.Store { } c.connMu.Lock() defer c.connMu.Unlock() - return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn)) + return contentproxy.NewContentStore(c.conn) } // SnapshotService returns the underlying snapshotter for the provided snapshotter name diff --git a/cmd/containerd/server/server.go b/cmd/containerd/server/server.go index bf40c6c41..d53866b2f 100644 --- a/cmd/containerd/server/server.go +++ b/cmd/containerd/server/server.go @@ -46,7 +46,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - csapi "github.com/containerd/containerd/v2/api/services/content/v1" diffapi "github.com/containerd/containerd/v2/api/services/diff/v1" sbapi "github.com/containerd/containerd/v2/api/services/sandbox/v1" ssapi "github.com/containerd/containerd/v2/api/services/snapshots/v1" @@ -507,7 +506,7 @@ func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Regist case string(plugins.ContentPlugin), "content": t = plugins.ContentPlugin f = func(conn *grpc.ClientConn) interface{} { - return csproxy.NewContentStore(csapi.NewContentClient(conn)) + return csproxy.NewContentStore(conn) } case string(plugins.SandboxControllerPlugin), "sandbox": t = plugins.SandboxControllerPlugin diff --git a/core/content/proxy/content_reader.go b/core/content/proxy/content_reader.go index f983e4288..529502ef9 100644 --- a/core/content/proxy/content_reader.go +++ b/core/content/proxy/content_reader.go @@ -27,7 +27,7 @@ type remoteReaderAt struct { ctx context.Context digest digest.Digest size int64 - client contentapi.ContentClient + client contentapi.TTRPCContentClient } func (ra *remoteReaderAt) Size() int64 { diff --git a/core/content/proxy/content_store.go b/core/content/proxy/content_store.go index 5d7edb770..2372bf286 100644 --- a/core/content/proxy/content_store.go +++ b/core/content/proxy/content_store.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "fmt" "io" contentapi "github.com/containerd/containerd/v2/api/services/content/v1" @@ -25,19 +26,41 @@ import ( "github.com/containerd/containerd/v2/protobuf" protobuftypes "github.com/containerd/containerd/v2/protobuf/types" "github.com/containerd/errdefs" + "github.com/containerd/ttrpc" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" ) type proxyContentStore struct { - client contentapi.ContentClient + // client is the rpc content client + // NOTE: ttrpc is used because it is the smaller interface shared with grpc + client contentapi.TTRPCContentClient } // NewContentStore returns a new content store which communicates over a GRPC // connection using the containerd content GRPC API. -func NewContentStore(client contentapi.ContentClient) content.Store { - return &proxyContentStore{ - client: client, +func NewContentStore(client any) content.Store { + switch c := client.(type) { + case contentapi.ContentClient: + return &proxyContentStore{ + client: convertClient{c}, + } + case grpc.ClientConnInterface: + return &proxyContentStore{ + client: convertClient{contentapi.NewContentClient(c)}, + } + case contentapi.TTRPCContentClient: + return &proxyContentStore{ + client: c, + } + case *ttrpc.Client: + return &proxyContentStore{ + client: contentapi.NewTTRPCContentClient(c), + } + default: + panic(fmt.Errorf("unsupported content client %T: %w", client, errdefs.ErrNotImplemented)) } } @@ -191,7 +214,7 @@ func (pcs *proxyContentStore) Abort(ctx context.Context, ref string) error { return nil } -func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) { +func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.TTRPCContent_WriteClient, int64, error) { wrclient, err := pcs.client.Write(ctx) if err != nil { return nil, 0, err @@ -214,6 +237,70 @@ func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size in return wrclient, resp.Offset, nil } +type convertClient struct { + contentapi.ContentClient +} + +func (c convertClient) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) { + return c.ContentClient.Info(ctx, req) +} + +func (c convertClient) Update(ctx context.Context, req *contentapi.UpdateRequest) (*contentapi.UpdateResponse, error) { + return c.ContentClient.Update(ctx, req) +} + +type convertListClient struct { + contentapi.Content_ListClient +} + +func (c convertClient) List(ctx context.Context, req *contentapi.ListContentRequest) (contentapi.TTRPCContent_ListClient, error) { + lc, err := c.ContentClient.List(ctx, req) + if lc == nil { + return nil, err + } + return convertListClient{lc}, err +} + +func (c convertClient) Delete(ctx context.Context, req *contentapi.DeleteContentRequest) (*emptypb.Empty, error) { + return c.ContentClient.Delete(ctx, req) +} + +type convertReadClient struct { + contentapi.Content_ReadClient +} + +func (c convertClient) Read(ctx context.Context, req *contentapi.ReadContentRequest) (contentapi.TTRPCContent_ReadClient, error) { + rc, err := c.ContentClient.Read(ctx, req) + if rc == nil { + return nil, err + } + return convertReadClient{rc}, err +} + +func (c convertClient) Status(ctx context.Context, req *contentapi.StatusRequest) (*contentapi.StatusResponse, error) { + return c.ContentClient.Status(ctx, req) +} + +func (c convertClient) ListStatuses(ctx context.Context, req *contentapi.ListStatusesRequest) (*contentapi.ListStatusesResponse, error) { + return c.ContentClient.ListStatuses(ctx, req) +} + +type convertWriteClient struct { + contentapi.Content_WriteClient +} + +func (c convertClient) Write(ctx context.Context) (contentapi.TTRPCContent_WriteClient, error) { + wc, err := c.ContentClient.Write(ctx) + if wc == nil { + return nil, err + } + return convertWriteClient{wc}, err +} + +func (c convertClient) Abort(ctx context.Context, req *contentapi.AbortRequest) (*emptypb.Empty, error) { + return c.ContentClient.Abort(ctx, req) +} + func infoToGRPC(info *content.Info) *contentapi.Info { return &contentapi.Info{ Digest: info.Digest.String(), diff --git a/core/content/proxy/content_writer.go b/core/content/proxy/content_writer.go index 1b32e0aae..7738f9a8e 100644 --- a/core/content/proxy/content_writer.go +++ b/core/content/proxy/content_writer.go @@ -30,7 +30,7 @@ import ( type remoteWriter struct { ref string - client contentapi.Content_WriteClient + client contentapi.TTRPCContent_WriteClient offset int64 digest digest.Digest }