Move events exchange into subpackage
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
232
events/exchange/exchange.go
Normal file
232
events/exchange/exchange.go
Normal file
@@ -0,0 +1,232 @@
|
||||
package exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/identifiers"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/typeurl"
|
||||
goevents "github.com/docker/go-events"
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Exchange broadcasts events
|
||||
type Exchange struct {
|
||||
broadcaster *goevents.Broadcaster
|
||||
}
|
||||
|
||||
// NewExchange returns a new event Exchange
|
||||
func NewExchange() *Exchange {
|
||||
return &Exchange{
|
||||
broadcaster: goevents.NewBroadcaster(),
|
||||
}
|
||||
}
|
||||
|
||||
// Forward accepts an envelope to be direcly distributed on the exchange.
|
||||
//
|
||||
// This is useful when an event is forwaded on behalf of another namespace or
|
||||
// when the event is propagated on behalf of another publisher.
|
||||
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
|
||||
if err := validateEnvelope(envelope); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"topic": envelope.Topic,
|
||||
"ns": envelope.Namespace,
|
||||
"type": envelope.Event.TypeUrl,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("error forwarding event")
|
||||
} else {
|
||||
logger.Debug("event forwarded")
|
||||
}
|
||||
}()
|
||||
|
||||
return e.broadcaster.Write(envelope)
|
||||
}
|
||||
|
||||
// Publish packages and sends an event. The caller will be considered the
|
||||
// initial publisher of the event. This means the timestamp will be calculated
|
||||
// at this point and this method may read from the calling context.
|
||||
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
|
||||
var (
|
||||
namespace string
|
||||
encoded *types.Any
|
||||
envelope v1.Envelope
|
||||
)
|
||||
|
||||
namespace, err = namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed publishing event")
|
||||
}
|
||||
if err := validateTopic(topic); err != nil {
|
||||
return errors.Wrapf(err, "envelope topic %q", topic)
|
||||
}
|
||||
|
||||
encoded, err = typeurl.MarshalAny(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
envelope.Timestamp = time.Now().UTC()
|
||||
envelope.Namespace = namespace
|
||||
envelope.Topic = topic
|
||||
envelope.Event = encoded
|
||||
|
||||
defer func() {
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"topic": envelope.Topic,
|
||||
"ns": envelope.Namespace,
|
||||
"type": envelope.Event.TypeUrl,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("error publishing event")
|
||||
} else {
|
||||
logger.Debug("event published")
|
||||
}
|
||||
}()
|
||||
|
||||
return e.broadcaster.Write(&envelope)
|
||||
}
|
||||
|
||||
// Subscribe to events on the exchange. Events are sent through the returned
|
||||
// channel ch. If an error is encountered, it will be sent on channel errs and
|
||||
// errs will be closed. To end the subscription, cancel the provided context.
|
||||
//
|
||||
// Zero or more filters may be provided as strings. Only events that match
|
||||
// *any* of the provided filters will be sent on the channel. The filters use
|
||||
// the standard containerd filters package syntax.
|
||||
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
|
||||
var (
|
||||
evch = make(chan *v1.Envelope)
|
||||
errq = make(chan error, 1)
|
||||
channel = goevents.NewChannel(0)
|
||||
queue = goevents.NewQueue(channel)
|
||||
dst goevents.Sink = queue
|
||||
)
|
||||
|
||||
closeAll := func() {
|
||||
defer close(errq)
|
||||
defer e.broadcaster.Remove(dst)
|
||||
defer queue.Close()
|
||||
defer channel.Close()
|
||||
}
|
||||
|
||||
ch = evch
|
||||
errs = errq
|
||||
|
||||
if len(fs) > 0 {
|
||||
filter, err := filters.ParseAll(fs...)
|
||||
if err != nil {
|
||||
errq <- errors.Wrapf(err, "failed parsing subscription filters")
|
||||
closeAll()
|
||||
return
|
||||
}
|
||||
|
||||
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
|
||||
return filter.Match(adapt(gev))
|
||||
}))
|
||||
}
|
||||
|
||||
e.broadcaster.Add(dst)
|
||||
|
||||
go func() {
|
||||
defer closeAll()
|
||||
|
||||
var err error
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case ev := <-channel.C:
|
||||
env, ok := ev.(*v1.Envelope)
|
||||
if !ok {
|
||||
// TODO(stevvooe): For the most part, we are well protected
|
||||
// from this condition. Both Forward and Publish protect
|
||||
// from this.
|
||||
err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev)
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case evch <- env:
|
||||
case <-ctx.Done():
|
||||
break loop
|
||||
}
|
||||
case <-ctx.Done():
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
if cerr := ctx.Err(); cerr != context.Canceled {
|
||||
err = cerr
|
||||
}
|
||||
}
|
||||
|
||||
errq <- err
|
||||
}()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func validateTopic(topic string) error {
|
||||
if topic == "" {
|
||||
return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty")
|
||||
}
|
||||
|
||||
if topic[0] != '/' {
|
||||
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'")
|
||||
}
|
||||
|
||||
if len(topic) == 1 {
|
||||
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component")
|
||||
}
|
||||
|
||||
components := strings.Split(topic[1:], "/")
|
||||
for _, component := range components {
|
||||
if err := identifiers.Validate(component); err != nil {
|
||||
return errors.Wrapf(err, "failed validation on component %q", component)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateEnvelope(envelope *v1.Envelope) error {
|
||||
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
||||
return errors.Wrapf(err, "event envelope has invalid namespace")
|
||||
}
|
||||
|
||||
if err := validateTopic(envelope.Topic); err != nil {
|
||||
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
|
||||
}
|
||||
|
||||
if envelope.Timestamp.IsZero() {
|
||||
return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func adapt(ev interface{}) filters.Adaptor {
|
||||
if adaptor, ok := ev.(filters.Adaptor); ok {
|
||||
return adaptor
|
||||
}
|
||||
|
||||
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
||||
return "", false
|
||||
})
|
||||
}
|
||||
153
events/exchange/exchange_test.go
Normal file
153
events/exchange/exchange_test.go
Normal file
@@ -0,0 +1,153 @@
|
||||
package exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestExchangeBasic(t *testing.T) {
|
||||
ctx := namespaces.WithNamespace(context.Background(), t.Name())
|
||||
testevents := []events.Event{
|
||||
&v1.ContainerCreate{ID: "asdf"},
|
||||
&v1.ContainerCreate{ID: "qwer"},
|
||||
&v1.ContainerCreate{ID: "zxcv"},
|
||||
}
|
||||
exchange := NewExchange()
|
||||
|
||||
t.Log("subscribe")
|
||||
var cancel1, cancel2 func()
|
||||
|
||||
// Create two subscribers for same set of events and make sure they
|
||||
// traverse the exchange.
|
||||
ctx1, cancel1 := context.WithCancel(ctx)
|
||||
eventq1, errq1 := exchange.Subscribe(ctx1)
|
||||
|
||||
ctx2, cancel2 := context.WithCancel(ctx)
|
||||
eventq2, errq2 := exchange.Subscribe(ctx2)
|
||||
|
||||
t.Log("publish")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for _, event := range testevents {
|
||||
fmt.Println("publish", event)
|
||||
if err := exchange.Publish(ctx, "/test", event); err != nil {
|
||||
fmt.Println("publish error", err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("finished publishing")
|
||||
}()
|
||||
|
||||
t.Log("waiting")
|
||||
wg.Wait()
|
||||
|
||||
for _, subscriber := range []struct {
|
||||
eventq <-chan *v1.Envelope
|
||||
errq <-chan error
|
||||
cancel func()
|
||||
}{
|
||||
{
|
||||
eventq: eventq1,
|
||||
errq: errq1,
|
||||
cancel: cancel1,
|
||||
},
|
||||
{
|
||||
eventq: eventq2,
|
||||
errq: errq2,
|
||||
cancel: cancel2,
|
||||
},
|
||||
} {
|
||||
var received []Event
|
||||
subscribercheck:
|
||||
for {
|
||||
select {
|
||||
case env := <-subscriber.eventq:
|
||||
ev, err := typeurl.UnmarshalAny(env.Event)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
received = append(received, ev.(*v1.ContainerCreate))
|
||||
case err := <-subscriber.errq:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
break subscribercheck
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(received, testevents) {
|
||||
// when we do this, we expect the errs channel to be closed and
|
||||
// this will return.
|
||||
subscriber.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestExchangeValidateTopic(t *testing.T) {
|
||||
namespace := t.Name()
|
||||
ctx := namespaces.WithNamespace(context.Background(), namespace)
|
||||
exchange := NewExchange()
|
||||
|
||||
for _, testcase := range []struct {
|
||||
input string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
input: "/test",
|
||||
},
|
||||
{
|
||||
input: "/test/test",
|
||||
},
|
||||
{
|
||||
input: "test",
|
||||
err: errdefs.ErrInvalidArgument,
|
||||
},
|
||||
} {
|
||||
t.Run(testcase.input, func(t *testing.T) {
|
||||
event := &v1.ContainerCreate{ID: t.Name()}
|
||||
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %v, received nil", testcase.err)
|
||||
} else {
|
||||
t.Fatalf("expected error %v, received %v", testcase.err, err)
|
||||
}
|
||||
}
|
||||
|
||||
evany, err := typeurl.MarshalAny(event)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
envelope := v1.Envelope{
|
||||
Timestamp: time.Now().UTC(),
|
||||
Namespace: namespace,
|
||||
Topic: testcase.input,
|
||||
Event: evany,
|
||||
}
|
||||
|
||||
// make sure we get same errors with forward.
|
||||
if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
|
||||
if err == nil {
|
||||
t.Fatalf("expected error %v, received nil", testcase.err)
|
||||
} else {
|
||||
t.Fatalf("expected error %v, received %v", testcase.err, err)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user