From 2d05c4aa1a50cc5475e6c51b7eb336f90931a803 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Thu, 16 Nov 2017 17:52:17 -0800 Subject: [PATCH] events: decouple events package from grpc By defining a concrete, non-protobuf type for the events interface, we can completely decouple it from the grpc packages that are expensive at runtime. This does requires some allocation cost for converting between types, but the saving for the size of the shim are worth it. Signed-off-by: Stephen J Day --- events/events.go | 21 ++++++++++++--------- events/exchange/exchange.go | 17 ++++++++++------- events/exchange/exchange_test.go | 5 ++--- services/events/service.go | 23 +++++++++++++++++++++-- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/events/events.go b/events/events.go index efe2f598b..87b94c310 100644 --- a/events/events.go +++ b/events/events.go @@ -2,10 +2,19 @@ package events import ( "context" + "time" - events "github.com/containerd/containerd/api/services/events/v1" + "github.com/gogo/protobuf/types" ) +// Envelope provides the packaging for an event. +type Envelope struct { + Timestamp time.Time + Namespace string + Topic string + Event *types.Any +} + // Event is a generic interface for any type of event type Event interface{} @@ -16,16 +25,10 @@ type Publisher interface { // Forwarder forwards an event to the underlying event bus type Forwarder interface { - Forward(ctx context.Context, envelope *events.Envelope) error -} - -type publisherFunc func(ctx context.Context, topic string, event Event) error - -func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error { - return fn(ctx, topic, event) + Forward(ctx context.Context, envelope *Envelope) error } // Subscriber allows callers to subscribe to events type Subscriber interface { - Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) + Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error) } diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go index 3fefb9c25..3178fc407 100644 --- a/events/exchange/exchange.go +++ b/events/exchange/exchange.go @@ -5,7 +5,6 @@ import ( "strings" "time" - v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/filters" @@ -31,11 +30,15 @@ func NewExchange() *Exchange { } } +var _ events.Publisher = &Exchange{} +var _ events.Forwarder = &Exchange{} +var _ events.Subscriber = &Exchange{} + // Forward accepts an envelope to be direcly distributed on the exchange. // // This is useful when an event is forwaded on behalf of another namespace or // when the event is propagated on behalf of another publisher. -func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) { +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { if err := validateEnvelope(envelope); err != nil { return err } @@ -64,7 +67,7 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event var ( namespace string encoded *types.Any - envelope v1.Envelope + envelope events.Envelope ) namespace, err = namespaces.NamespaceRequired(ctx) @@ -109,9 +112,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event // Zero or more filters may be provided as strings. Only events that match // *any* of the provided filters will be sent on the channel. The filters use // the standard containerd filters package syntax. -func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) { +func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { var ( - evch = make(chan *v1.Envelope) + evch = make(chan *events.Envelope) errq = make(chan error, 1) channel = goevents.NewChannel(0) queue = goevents.NewQueue(channel) @@ -151,7 +154,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.E for { select { case ev := <-channel.C: - env, ok := ev.(*v1.Envelope) + env, ok := ev.(*events.Envelope) if !ok { // TODO(stevvooe): For the most part, we are well protected // from this condition. Both Forward and Publish protect @@ -205,7 +208,7 @@ func validateTopic(topic string) error { return nil } -func validateEnvelope(envelope *v1.Envelope) error { +func validateEnvelope(envelope *events.Envelope) error { if err := namespaces.Validate(envelope.Namespace); err != nil { return errors.Wrapf(err, "event envelope has invalid namespace") } diff --git a/events/exchange/exchange_test.go b/events/exchange/exchange_test.go index 485843d5c..36f9f28d6 100644 --- a/events/exchange/exchange_test.go +++ b/events/exchange/exchange_test.go @@ -9,7 +9,6 @@ import ( "time" eventstypes "github.com/containerd/containerd/api/events" - v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" @@ -57,7 +56,7 @@ func TestExchangeBasic(t *testing.T) { wg.Wait() for _, subscriber := range []struct { - eventq <-chan *v1.Envelope + eventq <-chan *events.Envelope errq <-chan error cancel func() }{ @@ -133,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) { t.Fatal(err) } - envelope := v1.Envelope{ + envelope := events.Envelope{ Timestamp: time.Now().UTC(), Namespace: namespace, Topic: testcase.input, diff --git a/services/events/service.go b/services/events/service.go index 540f9af22..5f75ffacc 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -3,6 +3,7 @@ package events import ( api "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/plugin" ptypes "github.com/gogo/protobuf/types" @@ -44,7 +45,7 @@ func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.E } func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { - if err := s.events.Forward(ctx, r.Envelope); err != nil { + if err := s.events.Forward(ctx, fromProto(r.Envelope)); err != nil { return nil, errdefs.ToGRPC(err) } @@ -59,7 +60,7 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS for { select { case ev := <-eventq: - if err := srv.Send(ev); err != nil { + if err := srv.Send(toProto(ev)); err != nil { return errors.Wrapf(err, "failed sending event to subscriber") } case err := <-errq: @@ -71,3 +72,21 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS } } } + +func toProto(env *events.Envelope) *api.Envelope { + return &api.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +} + +func fromProto(env *api.Envelope) *events.Envelope { + return &events.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +}