diff --git a/events/events.go b/events/events.go index efe2f598b..87b94c310 100644 --- a/events/events.go +++ b/events/events.go @@ -2,10 +2,19 @@ package events import ( "context" + "time" - events "github.com/containerd/containerd/api/services/events/v1" + "github.com/gogo/protobuf/types" ) +// Envelope provides the packaging for an event. +type Envelope struct { + Timestamp time.Time + Namespace string + Topic string + Event *types.Any +} + // Event is a generic interface for any type of event type Event interface{} @@ -16,16 +25,10 @@ type Publisher interface { // Forwarder forwards an event to the underlying event bus type Forwarder interface { - Forward(ctx context.Context, envelope *events.Envelope) error -} - -type publisherFunc func(ctx context.Context, topic string, event Event) error - -func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error { - return fn(ctx, topic, event) + Forward(ctx context.Context, envelope *Envelope) error } // Subscriber allows callers to subscribe to events type Subscriber interface { - Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) + Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error) } diff --git a/events/exchange/exchange.go b/events/exchange/exchange.go index 3fefb9c25..3178fc407 100644 --- a/events/exchange/exchange.go +++ b/events/exchange/exchange.go @@ -5,7 +5,6 @@ import ( "strings" "time" - v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/filters" @@ -31,11 +30,15 @@ func NewExchange() *Exchange { } } +var _ events.Publisher = &Exchange{} +var _ events.Forwarder = &Exchange{} +var _ events.Subscriber = &Exchange{} + // Forward accepts an envelope to be direcly distributed on the exchange. // // This is useful when an event is forwaded on behalf of another namespace or // when the event is propagated on behalf of another publisher. -func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) { +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { if err := validateEnvelope(envelope); err != nil { return err } @@ -64,7 +67,7 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event var ( namespace string encoded *types.Any - envelope v1.Envelope + envelope events.Envelope ) namespace, err = namespaces.NamespaceRequired(ctx) @@ -109,9 +112,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event // Zero or more filters may be provided as strings. Only events that match // *any* of the provided filters will be sent on the channel. The filters use // the standard containerd filters package syntax. -func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) { +func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { var ( - evch = make(chan *v1.Envelope) + evch = make(chan *events.Envelope) errq = make(chan error, 1) channel = goevents.NewChannel(0) queue = goevents.NewQueue(channel) @@ -151,7 +154,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.E for { select { case ev := <-channel.C: - env, ok := ev.(*v1.Envelope) + env, ok := ev.(*events.Envelope) if !ok { // TODO(stevvooe): For the most part, we are well protected // from this condition. Both Forward and Publish protect @@ -205,7 +208,7 @@ func validateTopic(topic string) error { return nil } -func validateEnvelope(envelope *v1.Envelope) error { +func validateEnvelope(envelope *events.Envelope) error { if err := namespaces.Validate(envelope.Namespace); err != nil { return errors.Wrapf(err, "event envelope has invalid namespace") } diff --git a/events/exchange/exchange_test.go b/events/exchange/exchange_test.go index 485843d5c..36f9f28d6 100644 --- a/events/exchange/exchange_test.go +++ b/events/exchange/exchange_test.go @@ -9,7 +9,6 @@ import ( "time" eventstypes "github.com/containerd/containerd/api/events" - v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" @@ -57,7 +56,7 @@ func TestExchangeBasic(t *testing.T) { wg.Wait() for _, subscriber := range []struct { - eventq <-chan *v1.Envelope + eventq <-chan *events.Envelope errq <-chan error cancel func() }{ @@ -133,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) { t.Fatal(err) } - envelope := v1.Envelope{ + envelope := events.Envelope{ Timestamp: time.Now().UTC(), Namespace: namespace, Topic: testcase.input, diff --git a/services/events/service.go b/services/events/service.go index 540f9af22..5f75ffacc 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -3,6 +3,7 @@ package events import ( api "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/plugin" ptypes "github.com/gogo/protobuf/types" @@ -44,7 +45,7 @@ func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.E } func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { - if err := s.events.Forward(ctx, r.Envelope); err != nil { + if err := s.events.Forward(ctx, fromProto(r.Envelope)); err != nil { return nil, errdefs.ToGRPC(err) } @@ -59,7 +60,7 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS for { select { case ev := <-eventq: - if err := srv.Send(ev); err != nil { + if err := srv.Send(toProto(ev)); err != nil { return errors.Wrapf(err, "failed sending event to subscriber") } case err := <-errq: @@ -71,3 +72,21 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS } } } + +func toProto(env *events.Envelope) *api.Envelope { + return &api.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +} + +func fromProto(env *api.Envelope) *events.Envelope { + return &events.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +}