Merge pull request #9910 from dmcgowan/ttrpc-proxy-interfaces

Add ttrpc to proxy interfaces
This commit is contained in:
Akihiro Suda 2024-03-05 01:01:22 +00:00 committed by GitHub
commit 580ae05986
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 323 additions and 15 deletions

View File

@ -27,9 +27,7 @@ import (
"time"
containersapi "github.com/containerd/containerd/v2/api/services/containers/v1"
contentapi "github.com/containerd/containerd/v2/api/services/content/v1"
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
eventsapi "github.com/containerd/containerd/v2/api/services/events/v1"
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
introspectionapi "github.com/containerd/containerd/v2/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/v2/api/services/leases/v1"
@ -43,6 +41,7 @@ import (
"github.com/containerd/containerd/v2/core/content"
contentproxy "github.com/containerd/containerd/v2/core/content/proxy"
"github.com/containerd/containerd/v2/core/events"
eventsproxy "github.com/containerd/containerd/v2/core/events/proxy"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/leases"
leasesproxy "github.com/containerd/containerd/v2/core/leases/proxy"
@ -622,7 +621,7 @@ func (c *Client) ContentStore() content.Store {
}
c.connMu.Lock()
defer c.connMu.Unlock()
return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
return contentproxy.NewContentStore(c.conn)
}
// SnapshotService returns the underlying snapshotter for the provided snapshotter name
@ -708,7 +707,7 @@ func (c *Client) EventService() EventService {
}
c.connMu.Lock()
defer c.connMu.Unlock()
return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
return eventsproxy.NewRemoteEvents(c.conn)
}
// SandboxStore returns the underlying sandbox store client
@ -738,8 +737,9 @@ func (c *Client) VersionService() versionservice.VersionClient {
return versionservice.NewVersionClient(c.conn)
}
// Conn returns the underlying GRPC connection object
func (c *Client) Conn() *grpc.ClientConn {
// Conn returns the underlying RPC connection object
// Either *grpc.ClientConn or *ttrpc.Conn
func (c *Client) Conn() any {
c.connMu.Lock()
defer c.connMu.Unlock()
return c.conn

View File

@ -46,7 +46,6 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
csapi "github.com/containerd/containerd/v2/api/services/content/v1"
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
sbapi "github.com/containerd/containerd/v2/api/services/sandbox/v1"
ssapi "github.com/containerd/containerd/v2/api/services/snapshots/v1"
@ -507,7 +506,7 @@ func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]plugin.Regist
case string(plugins.ContentPlugin), "content":
t = plugins.ContentPlugin
f = func(conn *grpc.ClientConn) interface{} {
return csproxy.NewContentStore(csapi.NewContentClient(conn))
return csproxy.NewContentStore(conn)
}
case string(plugins.SandboxControllerPlugin), "sandbox":
t = plugins.SandboxControllerPlugin

View File

@ -27,7 +27,7 @@ type remoteReaderAt struct {
ctx context.Context
digest digest.Digest
size int64
client contentapi.ContentClient
client contentapi.TTRPCContentClient
}
func (ra *remoteReaderAt) Size() int64 {

View File

@ -18,6 +18,7 @@ package proxy
import (
"context"
"fmt"
"io"
contentapi "github.com/containerd/containerd/v2/api/services/content/v1"
@ -25,19 +26,41 @@ import (
"github.com/containerd/containerd/v2/protobuf"
protobuftypes "github.com/containerd/containerd/v2/protobuf/types"
"github.com/containerd/errdefs"
"github.com/containerd/ttrpc"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
type proxyContentStore struct {
client contentapi.ContentClient
// client is the rpc content client
// NOTE: ttrpc is used because it is the smaller interface shared with grpc
client contentapi.TTRPCContentClient
}
// NewContentStore returns a new content store which communicates over a GRPC
// connection using the containerd content GRPC API.
func NewContentStore(client contentapi.ContentClient) content.Store {
return &proxyContentStore{
client: client,
func NewContentStore(client any) content.Store {
switch c := client.(type) {
case contentapi.ContentClient:
return &proxyContentStore{
client: convertClient{c},
}
case grpc.ClientConnInterface:
return &proxyContentStore{
client: convertClient{contentapi.NewContentClient(c)},
}
case contentapi.TTRPCContentClient:
return &proxyContentStore{
client: c,
}
case *ttrpc.Client:
return &proxyContentStore{
client: contentapi.NewTTRPCContentClient(c),
}
default:
panic(fmt.Errorf("unsupported content client %T: %w", client, errdefs.ErrNotImplemented))
}
}
@ -191,7 +214,7 @@ func (pcs *proxyContentStore) Abort(ctx context.Context, ref string) error {
return nil
}
func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.TTRPCContent_WriteClient, int64, error) {
wrclient, err := pcs.client.Write(ctx)
if err != nil {
return nil, 0, err
@ -214,6 +237,70 @@ func (pcs *proxyContentStore) negotiate(ctx context.Context, ref string, size in
return wrclient, resp.Offset, nil
}
type convertClient struct {
contentapi.ContentClient
}
func (c convertClient) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) {
return c.ContentClient.Info(ctx, req)
}
func (c convertClient) Update(ctx context.Context, req *contentapi.UpdateRequest) (*contentapi.UpdateResponse, error) {
return c.ContentClient.Update(ctx, req)
}
type convertListClient struct {
contentapi.Content_ListClient
}
func (c convertClient) List(ctx context.Context, req *contentapi.ListContentRequest) (contentapi.TTRPCContent_ListClient, error) {
lc, err := c.ContentClient.List(ctx, req)
if lc == nil {
return nil, err
}
return convertListClient{lc}, err
}
func (c convertClient) Delete(ctx context.Context, req *contentapi.DeleteContentRequest) (*emptypb.Empty, error) {
return c.ContentClient.Delete(ctx, req)
}
type convertReadClient struct {
contentapi.Content_ReadClient
}
func (c convertClient) Read(ctx context.Context, req *contentapi.ReadContentRequest) (contentapi.TTRPCContent_ReadClient, error) {
rc, err := c.ContentClient.Read(ctx, req)
if rc == nil {
return nil, err
}
return convertReadClient{rc}, err
}
func (c convertClient) Status(ctx context.Context, req *contentapi.StatusRequest) (*contentapi.StatusResponse, error) {
return c.ContentClient.Status(ctx, req)
}
func (c convertClient) ListStatuses(ctx context.Context, req *contentapi.ListStatusesRequest) (*contentapi.ListStatusesResponse, error) {
return c.ContentClient.ListStatuses(ctx, req)
}
type convertWriteClient struct {
contentapi.Content_WriteClient
}
func (c convertClient) Write(ctx context.Context) (contentapi.TTRPCContent_WriteClient, error) {
wc, err := c.ContentClient.Write(ctx)
if wc == nil {
return nil, err
}
return convertWriteClient{wc}, err
}
func (c convertClient) Abort(ctx context.Context, req *contentapi.AbortRequest) (*emptypb.Empty, error) {
return c.ContentClient.Abort(ctx, req)
}
func infoToGRPC(info *content.Info) *contentapi.Info {
return &contentapi.Info{
Digest: info.Digest.String(),

View File

@ -30,7 +30,7 @@ import (
type remoteWriter struct {
ref string
client contentapi.Content_WriteClient
client contentapi.TTRPCContent_WriteClient
offset int64
digest digest.Digest
}

View File

@ -0,0 +1,222 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"context"
"fmt"
api "github.com/containerd/containerd/v2/api/services/events/v1"
"github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/core/events"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/errdefs"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
"google.golang.org/grpc"
)
type EventService interface {
events.Publisher
events.Forwarder
events.Subscriber
}
func NewRemoteEvents(client any) EventService {
switch c := client.(type) {
case api.EventsClient:
return &grpcEventsProxy{
client: c,
}
case api.TTRPCEventsClient:
return &ttrpcEventsProxy{
client: c,
}
case grpc.ClientConnInterface:
return &grpcEventsProxy{
client: api.NewEventsClient(c),
}
case *ttrpc.Client:
return &ttrpcEventsProxy{
client: api.NewTTRPCEventsClient(c),
}
default:
panic(fmt.Errorf("unsupported events client %T: %w", client, errdefs.ErrNotImplemented))
}
}
type grpcEventsProxy struct {
client api.EventsClient
}
func (p *grpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error {
evt, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
req := &api.PublishRequest{
Topic: topic,
Event: protobuf.FromAny(evt),
}
if _, err := p.client.Publish(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (p *grpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error {
req := &api.ForwardRequest{
Envelope: &types.Envelope{
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
Namespace: envelope.Namespace,
Topic: envelope.Topic,
Event: protobuf.FromAny(envelope.Event),
},
}
if _, err := p.client.Forward(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (p *grpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evq = make(chan *events.Envelope)
errq = make(chan error, 1)
)
errs = errq
ch = evq
session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{
Filters: filters,
})
if err != nil {
errq <- err
close(errq)
return
}
go func() {
defer close(errq)
for {
ev, err := session.Recv()
if err != nil {
errq <- err
return
}
select {
case evq <- &events.Envelope{
Timestamp: protobuf.FromTimestamp(ev.Timestamp),
Namespace: ev.Namespace,
Topic: ev.Topic,
Event: ev.Event,
}:
case <-ctx.Done():
if cerr := ctx.Err(); cerr != context.Canceled {
errq <- cerr
}
return
}
}
}()
return ch, errs
}
type ttrpcEventsProxy struct {
client api.TTRPCEventsClient
}
func (p *ttrpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error {
evt, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
req := &api.PublishRequest{
Topic: topic,
Event: protobuf.FromAny(evt),
}
if _, err := p.client.Publish(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (p *ttrpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error {
req := &api.ForwardRequest{
Envelope: &types.Envelope{
Timestamp: protobuf.ToTimestamp(envelope.Timestamp),
Namespace: envelope.Namespace,
Topic: envelope.Topic,
Event: protobuf.FromAny(envelope.Event),
},
}
if _, err := p.client.Forward(ctx, req); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (p *ttrpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evq = make(chan *events.Envelope)
errq = make(chan error, 1)
)
errs = errq
ch = evq
session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{
Filters: filters,
})
if err != nil {
errq <- err
close(errq)
return
}
go func() {
defer close(errq)
for {
ev, err := session.Recv()
if err != nil {
errq <- err
return
}
select {
case evq <- &events.Envelope{
Timestamp: protobuf.FromTimestamp(ev.Timestamp),
Namespace: ev.Namespace,
Topic: ev.Topic,
Event: ev.Event,
}:
case <-ctx.Done():
if cerr := ctx.Err(); cerr != context.Canceled {
errq <- cerr
}
return
}
}
}()
return ch, errs
}