diff --git a/client/client.go b/client/client.go index b78030b71..a84376120 100644 --- a/client/client.go +++ b/client/client.go @@ -29,7 +29,6 @@ import ( containersapi "github.com/containerd/containerd/v2/api/services/containers/v1" contentapi "github.com/containerd/containerd/v2/api/services/content/v1" diffapi "github.com/containerd/containerd/v2/api/services/diff/v1" - eventsapi "github.com/containerd/containerd/v2/api/services/events/v1" imagesapi "github.com/containerd/containerd/v2/api/services/images/v1" introspectionapi "github.com/containerd/containerd/v2/api/services/introspection/v1" leasesapi "github.com/containerd/containerd/v2/api/services/leases/v1" @@ -43,6 +42,7 @@ import ( "github.com/containerd/containerd/v2/core/content" contentproxy "github.com/containerd/containerd/v2/core/content/proxy" "github.com/containerd/containerd/v2/core/events" + eventsproxy "github.com/containerd/containerd/v2/core/events/proxy" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/leases" leasesproxy "github.com/containerd/containerd/v2/core/leases/proxy" @@ -708,7 +708,7 @@ func (c *Client) EventService() EventService { } c.connMu.Lock() defer c.connMu.Unlock() - return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn)) + return eventsproxy.NewRemoteEvents(c.conn) } // SandboxStore returns the underlying sandbox store client diff --git a/core/events/proxy/remote_events.go b/core/events/proxy/remote_events.go new file mode 100644 index 000000000..f7c7f8721 --- /dev/null +++ b/core/events/proxy/remote_events.go @@ -0,0 +1,222 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package proxy + +import ( + "context" + "fmt" + + api "github.com/containerd/containerd/v2/api/services/events/v1" + "github.com/containerd/containerd/v2/api/types" + "github.com/containerd/containerd/v2/core/events" + "github.com/containerd/containerd/v2/protobuf" + "github.com/containerd/errdefs" + "github.com/containerd/ttrpc" + "github.com/containerd/typeurl/v2" + "google.golang.org/grpc" +) + +type EventService interface { + events.Publisher + events.Forwarder + events.Subscriber +} + +func NewRemoteEvents(client any) EventService { + switch c := client.(type) { + case api.EventsClient: + return &grpcEventsProxy{ + client: c, + } + case api.TTRPCEventsClient: + return &ttrpcEventsProxy{ + client: c, + } + case grpc.ClientConnInterface: + return &grpcEventsProxy{ + client: api.NewEventsClient(c), + } + case *ttrpc.Client: + return &ttrpcEventsProxy{ + client: api.NewTTRPCEventsClient(c), + } + default: + panic(fmt.Errorf("unsupported events client %T: %w", client, errdefs.ErrNotImplemented)) + } +} + +type grpcEventsProxy struct { + client api.EventsClient +} + +func (p *grpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error { + evt, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + req := &api.PublishRequest{ + Topic: topic, + Event: protobuf.FromAny(evt), + } + if _, err := p.client.Publish(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (p *grpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error { + req := &api.ForwardRequest{ + Envelope: &types.Envelope{ + Timestamp: protobuf.ToTimestamp(envelope.Timestamp), + Namespace: envelope.Namespace, + Topic: envelope.Topic, + Event: protobuf.FromAny(envelope.Event), + }, + } + if _, err := p.client.Forward(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (p *grpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { + var ( + evq = make(chan *events.Envelope) + errq = make(chan error, 1) + ) + + errs = errq + ch = evq + + session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{ + Filters: filters, + }) + if err != nil { + errq <- err + close(errq) + return + } + + go func() { + defer close(errq) + + for { + ev, err := session.Recv() + if err != nil { + errq <- err + return + } + + select { + case evq <- &events.Envelope{ + Timestamp: protobuf.FromTimestamp(ev.Timestamp), + Namespace: ev.Namespace, + Topic: ev.Topic, + Event: ev.Event, + }: + case <-ctx.Done(): + if cerr := ctx.Err(); cerr != context.Canceled { + errq <- cerr + } + return + } + } + }() + + return ch, errs +} + +type ttrpcEventsProxy struct { + client api.TTRPCEventsClient +} + +func (p *ttrpcEventsProxy) Publish(ctx context.Context, topic string, event events.Event) error { + evt, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + req := &api.PublishRequest{ + Topic: topic, + Event: protobuf.FromAny(evt), + } + if _, err := p.client.Publish(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (p *ttrpcEventsProxy) Forward(ctx context.Context, envelope *events.Envelope) error { + req := &api.ForwardRequest{ + Envelope: &types.Envelope{ + Timestamp: protobuf.ToTimestamp(envelope.Timestamp), + Namespace: envelope.Namespace, + Topic: envelope.Topic, + Event: protobuf.FromAny(envelope.Event), + }, + } + if _, err := p.client.Forward(ctx, req); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} + +func (p *ttrpcEventsProxy) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) { + var ( + evq = make(chan *events.Envelope) + errq = make(chan error, 1) + ) + + errs = errq + ch = evq + + session, err := p.client.Subscribe(ctx, &api.SubscribeRequest{ + Filters: filters, + }) + if err != nil { + errq <- err + close(errq) + return + } + + go func() { + defer close(errq) + + for { + ev, err := session.Recv() + if err != nil { + errq <- err + return + } + + select { + case evq <- &events.Envelope{ + Timestamp: protobuf.FromTimestamp(ev.Timestamp), + Namespace: ev.Namespace, + Topic: ev.Topic, + Event: ev.Event, + }: + case <-ctx.Done(): + if cerr := ctx.Err(); cerr != context.Canceled { + errq <- cerr + } + return + } + } + }() + + return ch, errs +}