This change further plumbs the components required for implementing event filters. Specifically, we now have the ability to filter on the `topic` and `namespace`. In the course of implementing this functionality, it was found that there were mismatches in the events API that created extra serialization round trips. A modification to `typeurl.MarshalAny` and a clear separation between publishing and forwarding allow us to avoid these serialization issues. Unfortunately, this has required a few tweaks to the GRPC API, so this is a breaking change. `Publish` and `Forward` have been clearly separated in the GRPC API. `Publish` honors the contextual namespace and performs timestamping while `Forward` simply validates and forwards. The behavior of `Subscribe` is to propagate events for all namespaces unless specifically filtered (and hence the relation to this particular change. The following is an example of using filters to monitor the task events generated while running the [bucketbench tool](https://github.com/estesp/bucketbench): ``` $ ctr events 'topic~=/tasks/.+,namespace==bb' ... 2017-07-28 22:19:51.78944874 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-6-8","pid":25889} 2017-07-28 22:19:51.791893688 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-4-8","pid":25882} 2017-07-28 22:19:51.792608389 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-2-9","pid":25860} 2017-07-28 22:19:51.793035217 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-5-6","pid":25869} 2017-07-28 22:19:51.802659622 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-0-7","pid":25877} 2017-07-28 22:19:51.805192898 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-3-6","pid":25856} 2017-07-28 22:19:51.832374931 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-8-6","id":"bb-ctr-8-6","pid":25864,"exited_at":"2017-07-28T22:19:51.832013043Z"} 2017-07-28 22:19:51.84001249 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-2-9","id":"bb-ctr-2-9","pid":25860,"exited_at":"2017-07-28T22:19:51.839717714Z"} 2017-07-28 22:19:51.840272635 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-7-6","id":"bb-ctr-7-6","pid":25855,"exited_at":"2017-07-28T22:19:51.839796335Z"} ... ``` In addition to the events changes, we now display the namespace origin of the event in the cli tool. This will be followed by a PR to add individual field filtering for the events API for each event type. Signed-off-by: Stephen J Day <stephen.day@docker.com>
247 lines
6.0 KiB
Go
247 lines
6.0 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
events "github.com/containerd/containerd/api/services/events/v1"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/filters"
|
|
"github.com/containerd/containerd/identifiers"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/typeurl"
|
|
goevents "github.com/docker/go-events"
|
|
"github.com/gogo/protobuf/types"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type Exchange struct {
|
|
broadcaster *goevents.Broadcaster
|
|
}
|
|
|
|
func NewExchange() *Exchange {
|
|
return &Exchange{
|
|
broadcaster: goevents.NewBroadcaster(),
|
|
}
|
|
}
|
|
|
|
// Forward accepts an envelope to be direcly distributed on the exchange.
|
|
//
|
|
// This is useful when an event is forwaded on behalf of another namespace or
|
|
// when the event is propagated on behalf of another publisher.
|
|
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
|
|
if err := validateEnvelope(envelope); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
|
"topic": envelope.Topic,
|
|
"ns": envelope.Namespace,
|
|
"type": envelope.Event.TypeUrl,
|
|
})
|
|
|
|
if err != nil {
|
|
logger.WithError(err).Error("error forwarding event")
|
|
} else {
|
|
logger.Debug("event forwarded")
|
|
}
|
|
}()
|
|
|
|
return e.broadcaster.Write(envelope)
|
|
}
|
|
|
|
// Publish packages and sends an event. The caller will be considered the
|
|
// initial publisher of the event. This means the timestamp will be calculated
|
|
// at this point and this method may read from the calling context.
|
|
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
|
|
var (
|
|
namespace string
|
|
encoded *types.Any
|
|
envelope events.Envelope
|
|
)
|
|
|
|
namespace, err = namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed publishing event")
|
|
}
|
|
if err := validateTopic(topic); err != nil {
|
|
return errors.Wrapf(err, "envelope topic %q", topic)
|
|
}
|
|
|
|
encoded, err = typeurl.MarshalAny(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
envelope.Timestamp = time.Now().UTC()
|
|
envelope.Namespace = namespace
|
|
envelope.Topic = topic
|
|
envelope.Event = encoded
|
|
|
|
defer func() {
|
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
|
"topic": envelope.Topic,
|
|
"ns": envelope.Namespace,
|
|
"type": envelope.Event.TypeUrl,
|
|
})
|
|
|
|
if err != nil {
|
|
logger.WithError(err).Error("error publishing event")
|
|
} else {
|
|
logger.Debug("event published")
|
|
}
|
|
}()
|
|
|
|
return e.broadcaster.Write(&envelope)
|
|
}
|
|
|
|
// Subscribe to events on the exchange. Events are sent through the returned
|
|
// channel ch. If an error is encountered, it will be sent on channel errs and
|
|
// errs will be closed. To end the subscription, cancel the provided context.
|
|
//
|
|
// Zero or more filters may be provided as strings. Only events that match
|
|
// *any* of the provided filters will be sent on the channel. The filters use
|
|
// the standard containerd filters package syntax.
|
|
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
|
|
var (
|
|
evch = make(chan *events.Envelope)
|
|
errq = make(chan error, 1)
|
|
channel = goevents.NewChannel(0)
|
|
queue = goevents.NewQueue(channel)
|
|
dst goevents.Sink = queue
|
|
)
|
|
|
|
closeAll := func() {
|
|
defer close(errq)
|
|
defer e.broadcaster.Remove(dst)
|
|
defer queue.Close()
|
|
defer channel.Close()
|
|
}
|
|
|
|
ch = evch
|
|
errs = errq
|
|
|
|
if len(fs) > 0 {
|
|
filter, err := filters.ParseAll(fs...)
|
|
if err != nil {
|
|
errq <- errors.Wrapf(err, "failed parsing subscription filters")
|
|
closeAll()
|
|
return
|
|
}
|
|
|
|
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
|
|
return filter.Match(adapt(gev))
|
|
}))
|
|
}
|
|
|
|
e.broadcaster.Add(dst)
|
|
|
|
go func() {
|
|
defer closeAll()
|
|
|
|
var err error
|
|
loop:
|
|
for {
|
|
select {
|
|
case ev := <-channel.C:
|
|
env, ok := ev.(*events.Envelope)
|
|
if !ok {
|
|
// TODO(stevvooe): For the most part, we are well protected
|
|
// from this condition. Both Forward and Publish protect
|
|
// from this.
|
|
err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev)
|
|
break
|
|
}
|
|
|
|
select {
|
|
case evch <- env:
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
}
|
|
|
|
if err == nil {
|
|
if cerr := ctx.Err(); cerr != context.Canceled {
|
|
err = cerr
|
|
}
|
|
}
|
|
|
|
errq <- err
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
func validateTopic(topic string) error {
|
|
if topic == "" {
|
|
return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty")
|
|
}
|
|
|
|
if topic[0] != '/' {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic)
|
|
}
|
|
|
|
if len(topic) == 1 {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic)
|
|
}
|
|
|
|
components := strings.Split(topic[1:], "/")
|
|
for _, component := range components {
|
|
if err := identifiers.Validate(component); err != nil {
|
|
return errors.Wrapf(err, "failed validation on component %q", component)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateEnvelope(envelope *events.Envelope) error {
|
|
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
|
return errors.Wrapf(err, "event envelope has invalid namespace")
|
|
}
|
|
|
|
if err := validateTopic(envelope.Topic); err != nil {
|
|
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
|
|
}
|
|
|
|
if envelope.Timestamp.IsZero() {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func adapt(ev interface{}) filters.Adaptor {
|
|
switch ev := ev.(type) {
|
|
case *events.Envelope:
|
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
|
if len(fieldpath) == 0 {
|
|
return "", false
|
|
}
|
|
|
|
switch fieldpath[0] {
|
|
case "namespace":
|
|
return ev.Namespace, len(ev.Namespace) > 0
|
|
case "topic":
|
|
return ev.Topic, len(ev.Topic) > 0
|
|
default:
|
|
// TODO(stevvooe): Handle event fields.
|
|
return "", false
|
|
}
|
|
})
|
|
case filters.Adaptor:
|
|
return ev
|
|
}
|
|
|
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
|
return "", false
|
|
})
|
|
}
|