events: refactor event distribution

In the course of setting out to add filters and address some cleanup, it
was found that we had a few problems in the events subsystem that needed
addressing before moving forward.

The biggest change was to move to the more standard terminology of
publish and subscribe. We make this terminology change across the Go
interface and the GRPC API, making the behavior more familier. The
previous system was very context-oriented, which is no longer required.

With this, we've removed a large amount of dead and unneeded code. Event
transactions, context storage and the concept of `Poster` is gone. This
has been replaced in most places with a `Publisher`, which matches the
actual usage throughout the codebase, removing the need for helpers.

There are still some questions around the way events are handled in the
shim. Right now, we've preserved some of the existing bugs which may
require more extensive changes to resolve correctly.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-07-24 22:19:02 -07:00
parent a9ab45fb77
commit a615a6fe5d
30 changed files with 669 additions and 644 deletions

View File

@@ -1,67 +0,0 @@
package events
import (
"context"
"sync"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/namespaces"
goevents "github.com/docker/go-events"
)
const (
EventVersion = "v1"
)
type Emitter struct {
sinks map[string]*eventSink
broadcaster *goevents.Broadcaster
m sync.Mutex
}
func NewEmitter() *Emitter {
return &Emitter{
sinks: make(map[string]*eventSink),
broadcaster: goevents.NewBroadcaster(),
m: sync.Mutex{},
}
}
func (e *Emitter) Post(ctx context.Context, evt Event) error {
if err := e.broadcaster.Write(&sinkEvent{
ctx: ctx,
event: evt,
}); err != nil {
return err
}
return nil
}
func (e *Emitter) Events(ctx context.Context, clientID string) chan *events.Envelope {
e.m.Lock()
if _, ok := e.sinks[clientID]; !ok {
ns, _ := namespaces.Namespace(ctx)
s := &eventSink{
ch: make(chan *events.Envelope),
ns: ns,
}
e.sinks[clientID] = s
e.m.Unlock()
e.broadcaster.Add(s)
return s.ch
}
ch := e.sinks[clientID].ch
e.m.Unlock()
return ch
}
func (e *Emitter) Remove(clientID string) {
e.m.Lock()
if v, ok := e.sinks[clientID]; ok {
e.broadcaster.Remove(v)
delete(e.sinks, clientID)
}
e.m.Unlock()
}

View File

@@ -1,3 +0,0 @@
package events
type Event interface{}

24
events/events.go Normal file
View File

@@ -0,0 +1,24 @@
package events
import (
"context"
events "github.com/containerd/containerd/api/services/events/v1"
)
type Event interface{}
// Publisher posts the event.
type Publisher interface {
Publish(ctx context.Context, topic string, event Event) error
}
type Forwarder interface {
Forward(ctx context.Context, envelope *events.Envelope) error
}
type publisherFunc func(ctx context.Context, topic string, event Event) error
func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error {
return fn(ctx, topic, event)
}

View File

@@ -1,33 +0,0 @@
package events
import (
"context"
"fmt"
"testing"
)
func TestBasicEvent(t *testing.T) {
ctx := context.Background()
// simulate a layer pull with events
ctx, commit, _ := WithTx(ctx)
G(ctx).Post(ctx, "pull ubuntu")
for layer := 0; layer < 4; layer++ {
// make a subtransaction for each layer
ctx, commit, _ := WithTx(ctx)
G(ctx).Post(ctx, fmt.Sprintf("fetch layer %v", layer))
ctx = WithTopic(ctx, "content")
// simulate sub-operations with a separate topic, on the content store
G(ctx).Post(ctx, fmt.Sprint("received sha:256"))
G(ctx).Post(ctx, fmt.Sprintf("unpack layer %v", layer))
commit()
}
commit()
}

162
events/exchange.go Normal file
View File

@@ -0,0 +1,162 @@
package events
import (
"context"
"strings"
"time"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/typeurl"
goevents "github.com/docker/go-events"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Exchange struct {
broadcaster *goevents.Broadcaster
}
func NewExchange() *Exchange {
return &Exchange{
broadcaster: goevents.NewBroadcaster(),
}
}
// Forward accepts an envelope to be direcly distributed on the exchange.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error {
log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
}).Debug("forward event")
if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
}
if err := validateTopic(envelope.Topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
}
return e.broadcaster.Write(envelope)
}
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return errors.Wrapf(err, "failed publishing event")
}
if err := validateTopic(topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", topic)
}
evany, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
env := events.Envelope{
Timestamp: time.Now().UTC(),
Topic: topic,
Event: evany,
}
if err := e.broadcaster.Write(&env); err != nil {
return err
}
log.G(ctx).WithFields(logrus.Fields{
"topic": topic,
"type": evany.TypeUrl,
"ns": namespace,
}).Debug("published event")
return nil
}
// Subscribe to events on the exchange. Events are sent through the returned
// channel ch. If an error is encountered, it will be sent on channel errs and
// errs will be closed. To end the subscription, cancel the provided context.
func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
)
// TODO(stevvooe): Insert the filter!
e.broadcaster.Add(queue)
go func() {
defer close(errq)
defer e.broadcaster.Remove(queue)
defer queue.Close()
var err error
loop:
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
// from this.
err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev)
break
}
select {
case evch <- env:
case <-ctx.Done():
break loop
}
case <-ctx.Done():
break loop
}
}
if err == nil {
if cerr := ctx.Err(); cerr != context.Canceled {
err = cerr
}
}
errq <- err
}()
ch = evch
errs = errq
return
}
func validateTopic(topic string) error {
if topic == "" {
return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty")
}
if topic[0] != '/' {
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic)
}
if len(topic) == 1 {
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic)
}
components := strings.Split(topic[1:], "/")
for _, component := range components {
if err := identifiers.Validate(component); err != nil {
return errors.Wrapf(err, "failed validation on component %q", component)
}
}
return nil
}

149
events/exchange_test.go Normal file
View File

@@ -0,0 +1,149 @@
package events
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/typeurl"
"github.com/pkg/errors"
)
func TestExchangeBasic(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), t.Name())
testevents := []Event{
&events.ContainerCreate{ID: "asdf"},
&events.ContainerCreate{ID: "qwer"},
&events.ContainerCreate{ID: "zxcv"},
}
exchange := NewExchange()
t.Log("subscribe")
var cancel1, cancel2 func()
// Create two subscribers for same set of events and make sure they
// traverse the exchange.
ctx1, cancel1 := context.WithCancel(ctx)
eventq1, errq1 := exchange.Subscribe(ctx1)
ctx2, cancel2 := context.WithCancel(ctx)
eventq2, errq2 := exchange.Subscribe(ctx2)
t.Log("publish")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for _, event := range testevents {
fmt.Println("publish", event)
if err := exchange.Publish(ctx, "/test", event); err != nil {
fmt.Println("publish error", err)
t.Fatal(err)
}
}
t.Log("finished publishing")
}()
t.Log("waiting")
wg.Wait()
for _, subscriber := range []struct {
eventq <-chan *events.Envelope
errq <-chan error
cancel func()
}{
{
eventq: eventq1,
errq: errq1,
cancel: cancel1,
},
{
eventq: eventq2,
errq: errq2,
cancel: cancel2,
},
} {
var received []Event
subscribercheck:
for {
select {
case env := <-subscriber.eventq:
ev, err := typeurl.UnmarshalAny(env.Event)
if err != nil {
t.Fatal(err)
}
received = append(received, ev.(*events.ContainerCreate))
case err := <-subscriber.errq:
if err != nil {
t.Fatal(err)
}
break subscribercheck
}
if reflect.DeepEqual(received, testevents) {
// when we do this, we expect the errs channel to be closed and
// this will return.
subscriber.cancel()
}
}
}
}
func TestExchangeValidateTopic(t *testing.T) {
namespace := t.Name()
ctx := namespaces.WithNamespace(context.Background(), namespace)
exchange := NewExchange()
for _, testcase := range []struct {
input string
err error
}{
{
input: "/test",
},
{
input: "/test/test",
},
{
input: "test",
err: errdefs.ErrInvalidArgument,
},
} {
t.Run(testcase.input, func(t *testing.T) {
event := &events.ContainerCreate{ID: t.Name()}
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
evany, err := typeurl.MarshalAny(event)
if err != nil {
t.Fatal(err)
}
envelope := events.Envelope{
Namespace: namespace,
Topic: testcase.input,
Event: evany,
}
// make sure we get same errors with forward.
if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
if err == nil {
t.Fatalf("expected error %v, received nil", testcase.err)
} else {
t.Fatalf("expected error %v, received %v", testcase.err, err)
}
}
})
}
}

View File

@@ -1,65 +0,0 @@
package events
import (
"context"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/sirupsen/logrus"
)
var (
G = GetPoster
)
// Poster posts the event.
type Poster interface {
Post(ctx context.Context, evt Event) error
}
type posterKey struct{}
func WithPoster(ctx context.Context, poster Poster) context.Context {
return context.WithValue(ctx, posterKey{}, poster)
}
func GetPoster(ctx context.Context) Poster {
poster := ctx.Value(posterKey{})
if poster == nil {
tx, _ := getTx(ctx)
topic := getTopic(ctx)
// likely means we don't have a configured event system. Just return
// the default poster, which merely logs events.
return posterFunc(func(ctx context.Context, evt Event) error {
fields := logrus.Fields{"event": evt}
if topic != "" {
fields["topic"] = topic
}
ns, _ := namespaces.Namespace(ctx)
fields["ns"] = ns
if tx != nil {
fields["tx.id"] = tx.id
if tx.parent != nil {
fields["tx.parent.id"] = tx.parent.id
}
}
log.G(ctx).WithFields(fields).Debug("event fired")
return nil
})
}
return poster.(Poster)
}
type posterFunc func(ctx context.Context, evt Event) error
func (fn posterFunc) Post(ctx context.Context, evt Event) error {
fn(ctx, evt)
return nil
}

View File

@@ -1,65 +0,0 @@
package events
import (
"context"
"time"
"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/typeurl"
goevents "github.com/docker/go-events"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type sinkEvent struct {
ctx context.Context
event Event
}
type eventSink struct {
ns string
ch chan *events.Envelope
}
func (s *eventSink) Write(evt goevents.Event) error {
e, ok := evt.(*sinkEvent)
if !ok {
return errors.New("event is not a sink event")
}
ns, _ := namespaces.Namespace(e.ctx)
if ns != "" && ns != s.ns {
// ignore events not intended for this ns
return nil
}
if ev, ok := e.event.(*events.Envelope); ok {
s.ch <- ev
return nil
}
topic := getTopic(e.ctx)
eventData, err := typeurl.MarshalAny(e.event)
if err != nil {
return err
}
log.G(e.ctx).WithFields(logrus.Fields{
"topic": topic,
"type": eventData.TypeUrl,
"ns": ns,
}).Debug("event")
s.ch <- &events.Envelope{
Timestamp: time.Now(),
Topic: topic,
Event: eventData,
}
return nil
}
func (s *eventSink) Close() error {
return nil
}

View File

@@ -1,35 +0,0 @@
package events
import "context"
type topicKey struct{}
// WithTopic returns a context with a new topic set, such that events emitted
// from the resulting context will be marked with the topic.
//
// A topic groups events by the target module they operate on. This is
// primarily designed to support multi-module log compaction of events. In
// typical journaling systems, the entries operate on a single data structure.
// When compacting the journal, we can replace all former log entries with a
// summary data structure that will result in the same state.
//
// By providing a compaction mechanism by topic, we can prune down to a data
// structure oriented towards a single topic, leaving unrelated messages alone.
func WithTopic(ctx context.Context, topic string) context.Context {
return context.WithValue(ctx, topicKey{}, topic)
}
func getTopic(ctx context.Context) string {
topic := ctx.Value(topicKey{})
if topic == nil {
return ""
}
return topic.(string)
}
// RegisterCompactor sets the compacter for the given topic.
func RegisterCompactor(topic string, compactor interface{}) {
panic("not implemented")
}

View File

@@ -1,96 +0,0 @@
package events
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
var txCounter int64 // replace this with something that won't break
// nextTXID provides the next transaction identifier.
func nexttxID() int64 {
// TODO(stevvooe): Need to coordinate this with existing transaction logs.
// For now, this is a toy, but not a racy one.
return atomic.AddInt64(&txCounter, 1)
}
type transaction struct {
ctx context.Context
id int64
parent *transaction // if nil, no parent transaction
finish sync.Once
start, end time.Time // informational
}
// begin creates a sub-transaction.
func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction {
id := nexttxID()
child := &transaction{
ctx: ctx,
id: id,
parent: tx,
start: time.Now(),
}
// post the transaction started event
poster.Post(ctx, child.makeTransactionEvent("begin")) // transactions are really just events
return child
}
// commit writes out the transaction.
func (tx *transaction) commit(poster Poster) {
tx.finish.Do(func() {
tx.end = time.Now()
poster.Post(tx.ctx, tx.makeTransactionEvent("commit"))
})
}
func (tx *transaction) rollback(poster Poster, cause error) {
tx.finish.Do(func() {
tx.end = time.Now()
event := tx.makeTransactionEvent("rollback")
event = fmt.Sprintf("%s error=%q", event, cause.Error())
poster.Post(tx.ctx, event)
})
}
func (tx *transaction) makeTransactionEvent(status string) Event {
// TODO(stevvooe): obviously, we need more structure than this.
event := fmt.Sprintf("%v %v", status, tx.id)
if tx.parent != nil {
event += " parent=" + fmt.Sprint(tx.parent.id)
}
return event
}
type txKey struct{}
func getTx(ctx context.Context) (*transaction, bool) {
tx := ctx.Value(txKey{})
if tx == nil {
return nil, false
}
return tx.(*transaction), true
}
// WithTx returns a new context with an event transaction, such that events
// posted to the underlying context will be committed to the event log as a
// group, organized by a transaction id, when commit is called.
func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) {
poster := G(pctx)
parent, _ := getTx(pctx)
tx := parent.begin(pctx, poster)
return context.WithValue(pctx, txKey{}, tx), func() {
tx.commit(poster)
}, func(err error) {
tx.rollback(poster, err)
}
}