events: initial support for filters

This change further plumbs the components required for implementing
event filters. Specifically, we now have the ability to filter on the
`topic` and `namespace`.

In the course of implementing this functionality, it was found that
there were mismatches in the events API that created extra serialization
round trips. A modification to `typeurl.MarshalAny` and a clear
separation between publishing and forwarding allow us to avoid these
serialization issues.

Unfortunately, this has required a few tweaks to the GRPC API, so this
is a breaking change. `Publish` and `Forward` have been clearly separated in
the GRPC API. `Publish` honors the contextual namespace and performs
timestamping while `Forward` simply validates and forwards. The behavior
of `Subscribe` is to propagate events for all namespaces unless
specifically filtered (and hence the relation to this particular change.

The following is an example of using filters to monitor the task events
generated while running the [bucketbench tool](https://github.com/estesp/bucketbench):

```
$ ctr events 'topic~=/tasks/.+,namespace==bb'
...
2017-07-28 22:19:51.78944874 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-6-8","pid":25889}
2017-07-28 22:19:51.791893688 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-4-8","pid":25882}
2017-07-28 22:19:51.792608389 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-2-9","pid":25860}
2017-07-28 22:19:51.793035217 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-5-6","pid":25869}
2017-07-28 22:19:51.802659622 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-0-7","pid":25877}
2017-07-28 22:19:51.805192898 +0000 UTC   bb        /tasks/start   {"container_id":"bb-ctr-3-6","pid":25856}
2017-07-28 22:19:51.832374931 +0000 UTC   bb        /tasks/exit   {"container_id":"bb-ctr-8-6","id":"bb-ctr-8-6","pid":25864,"exited_at":"2017-07-28T22:19:51.832013043Z"}
2017-07-28 22:19:51.84001249 +0000 UTC   bb        /tasks/exit   {"container_id":"bb-ctr-2-9","id":"bb-ctr-2-9","pid":25860,"exited_at":"2017-07-28T22:19:51.839717714Z"}
2017-07-28 22:19:51.840272635 +0000 UTC   bb        /tasks/exit   {"container_id":"bb-ctr-7-6","id":"bb-ctr-7-6","pid":25855,"exited_at":"2017-07-28T22:19:51.839796335Z"}
...
```

In addition to the events changes, we now display the namespace origin
of the event in the cli tool.

This will be followed by a PR to add individual field filtering for the
events API for each event type.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day
2017-07-28 15:21:55 -07:00
parent ce07fa04ac
commit af2d7f0e55
14 changed files with 652 additions and 274 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/typeurl"
goevents "github.com/docker/go-events"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -28,20 +29,27 @@ func NewExchange() *Exchange {
}
// Forward accepts an envelope to be direcly distributed on the exchange.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error {
log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
}).Debug("forward event")
if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
//
// This is useful when an event is forwaded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil {
return err
}
if err := validateTopic(envelope.Topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
}
defer func() {
logger := log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
})
if err != nil {
logger.WithError(err).Error("error forwarding event")
} else {
logger.Debug("event forwarded")
}
}()
return e.broadcaster.Write(envelope)
}
@@ -49,8 +57,14 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error {
namespace, err := namespaces.NamespaceRequired(ctx)
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
var (
namespace string
encoded *types.Any
envelope events.Envelope
)
namespace, err = namespaces.NamespaceRequired(ctx)
if err != nil {
return errors.Wrapf(err, "failed publishing event")
}
@@ -58,47 +72,76 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error
return errors.Wrapf(err, "envelope topic %q", topic)
}
evany, err := typeurl.MarshalAny(event)
encoded, err = typeurl.MarshalAny(event)
if err != nil {
return err
}
env := events.Envelope{
Timestamp: time.Now().UTC(),
Topic: topic,
Event: evany,
}
if err := e.broadcaster.Write(&env); err != nil {
return err
}
log.G(ctx).WithFields(logrus.Fields{
"topic": topic,
"type": evany.TypeUrl,
"ns": namespace,
}).Debug("published event")
return nil
envelope.Timestamp = time.Now().UTC()
envelope.Namespace = namespace
envelope.Topic = topic
envelope.Event = encoded
defer func() {
logger := log.G(ctx).WithFields(logrus.Fields{
"topic": envelope.Topic,
"ns": envelope.Namespace,
"type": envelope.Event.TypeUrl,
})
if err != nil {
logger.WithError(err).Error("error publishing event")
} else {
logger.Debug("event published")
}
}()
return e.broadcaster.Write(&envelope)
}
// Subscribe to events on the exchange. Events are sent through the returned
// channel ch. If an error is encountered, it will be sent on channel errs and
// errs will be closed. To end the subscription, cancel the provided context.
func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) {
//
// Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
evch = make(chan *events.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
dst goevents.Sink = queue
)
// TODO(stevvooe): Insert the filter!
e.broadcaster.Add(queue)
go func() {
closeAll := func() {
defer close(errq)
defer e.broadcaster.Remove(queue)
defer e.broadcaster.Remove(dst)
defer queue.Close()
defer channel.Close()
}
ch = evch
errs = errq
if len(fs) > 0 {
filter, err := filters.ParseAll(fs...)
if err != nil {
errq <- errors.Wrapf(err, "failed parsing subscription filters")
closeAll()
return
}
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
return filter.Match(adapt(gev))
}))
}
e.broadcaster.Add(dst)
go func() {
defer closeAll()
var err error
loop:
@@ -133,9 +176,6 @@ func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch
errq <- err
}()
ch = evch
errs = errq
return
}
@@ -161,3 +201,46 @@ func validateTopic(topic string) error {
return nil
}
func validateEnvelope(envelope *events.Envelope) error {
if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
}
if err := validateTopic(envelope.Topic); err != nil {
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
}
if envelope.Timestamp.IsZero() {
return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event")
}
return nil
}
func adapt(ev interface{}) filters.Adaptor {
switch ev := ev.(type) {
case *events.Envelope:
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "namespace":
return ev.Namespace, len(ev.Namespace) > 0
case "topic":
return ev.Topic, len(ev.Topic) > 0
default:
// TODO(stevvooe): Handle event fields.
return "", false
}
})
case filters.Adaptor:
return ev
}
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
return "", false
})
}

View File

@@ -6,6 +6,7 @@ import (
"reflect"
"sync"
"testing"
"time"
events "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
@@ -131,10 +132,12 @@ func TestExchangeValidateTopic(t *testing.T) {
}
envelope := events.Envelope{
Timestamp: time.Now().UTC(),
Namespace: namespace,
Topic: testcase.input,
Event: evany,
}
// make sure we get same errors with forward.
if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err {
if err == nil {