Merge pull request #1776 from stevvooe/decouple-events-grpc

events: decouple events package from grpc
This commit is contained in:
Phil Estes 2017-11-17 11:44:46 -05:00 committed by GitHub
commit 2edc475818
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 21 deletions

View File

@ -2,10 +2,19 @@ package events
import ( import (
"context" "context"
"time"
events "github.com/containerd/containerd/api/services/events/v1" "github.com/gogo/protobuf/types"
) )
// Envelope provides the packaging for an event.
type Envelope struct {
Timestamp time.Time
Namespace string
Topic string
Event *types.Any
}
// Event is a generic interface for any type of event // Event is a generic interface for any type of event
type Event interface{} type Event interface{}
@ -16,16 +25,10 @@ type Publisher interface {
// Forwarder forwards an event to the underlying event bus // Forwarder forwards an event to the underlying event bus
type Forwarder interface { type Forwarder interface {
Forward(ctx context.Context, envelope *events.Envelope) error Forward(ctx context.Context, envelope *Envelope) error
}
type publisherFunc func(ctx context.Context, topic string, event Event) error
func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error {
return fn(ctx, topic, event)
} }
// Subscriber allows callers to subscribe to events // Subscriber allows callers to subscribe to events
type Subscriber interface { type Subscriber interface {
Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
} }

View File

@ -5,7 +5,6 @@ import (
"strings" "strings"
"time" "time"
v1 "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters" "github.com/containerd/containerd/filters"
@ -31,11 +30,15 @@ func NewExchange() *Exchange {
} }
} }
var _ events.Publisher = &Exchange{}
var _ events.Forwarder = &Exchange{}
var _ events.Subscriber = &Exchange{}
// Forward accepts an envelope to be direcly distributed on the exchange. // Forward accepts an envelope to be direcly distributed on the exchange.
// //
// This is useful when an event is forwaded on behalf of another namespace or // This is useful when an event is forwaded on behalf of another namespace or
// when the event is propagated on behalf of another publisher. // when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) { func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil { if err := validateEnvelope(envelope); err != nil {
return err return err
} }
@ -64,7 +67,7 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event
var ( var (
namespace string namespace string
encoded *types.Any encoded *types.Any
envelope v1.Envelope envelope events.Envelope
) )
namespace, err = namespaces.NamespaceRequired(ctx) namespace, err = namespaces.NamespaceRequired(ctx)
@ -109,9 +112,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event
// Zero or more filters may be provided as strings. Only events that match // Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use // *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax. // the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) { func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var ( var (
evch = make(chan *v1.Envelope) evch = make(chan *events.Envelope)
errq = make(chan error, 1) errq = make(chan error, 1)
channel = goevents.NewChannel(0) channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel) queue = goevents.NewQueue(channel)
@ -151,7 +154,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.E
for { for {
select { select {
case ev := <-channel.C: case ev := <-channel.C:
env, ok := ev.(*v1.Envelope) env, ok := ev.(*events.Envelope)
if !ok { if !ok {
// TODO(stevvooe): For the most part, we are well protected // TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect // from this condition. Both Forward and Publish protect
@ -205,7 +208,7 @@ func validateTopic(topic string) error {
return nil return nil
} }
func validateEnvelope(envelope *v1.Envelope) error { func validateEnvelope(envelope *events.Envelope) error {
if err := namespaces.Validate(envelope.Namespace); err != nil { if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace") return errors.Wrapf(err, "event envelope has invalid namespace")
} }

View File

@ -9,7 +9,6 @@ import (
"time" "time"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
v1 "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -57,7 +56,7 @@ func TestExchangeBasic(t *testing.T) {
wg.Wait() wg.Wait()
for _, subscriber := range []struct { for _, subscriber := range []struct {
eventq <-chan *v1.Envelope eventq <-chan *events.Envelope
errq <-chan error errq <-chan error
cancel func() cancel func()
}{ }{
@ -133,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
envelope := v1.Envelope{ envelope := events.Envelope{
Timestamp: time.Now().UTC(), Timestamp: time.Now().UTC(),
Namespace: namespace, Namespace: namespace,
Topic: testcase.input, Topic: testcase.input,

View File

@ -3,6 +3,7 @@ package events
import ( import (
api "github.com/containerd/containerd/api/services/events/v1" api "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
@ -44,7 +45,7 @@ func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.E
} }
func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) {
if err := s.events.Forward(ctx, r.Envelope); err != nil { if err := s.events.Forward(ctx, fromProto(r.Envelope)); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
@ -59,7 +60,7 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS
for { for {
select { select {
case ev := <-eventq: case ev := <-eventq:
if err := srv.Send(ev); err != nil { if err := srv.Send(toProto(ev)); err != nil {
return errors.Wrapf(err, "failed sending event to subscriber") return errors.Wrapf(err, "failed sending event to subscriber")
} }
case err := <-errq: case err := <-errq:
@ -71,3 +72,21 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS
} }
} }
} }
func toProto(env *events.Envelope) *api.Envelope {
return &api.Envelope{
Timestamp: env.Timestamp,
Namespace: env.Namespace,
Topic: env.Topic,
Event: env.Event,
}
}
func fromProto(env *api.Envelope) *events.Envelope {
return &events.Envelope{
Timestamp: env.Timestamp,
Namespace: env.Namespace,
Topic: env.Topic,
Event: env.Event,
}
}