By defining a concrete, non-protobuf type for the events interface, we can completely decouple it from the grpc packages that are expensive at runtime. This does requires some allocation cost for converting between types, but the saving for the size of the shim are worth it. Signed-off-by: Stephen J Day <stephen.day@docker.com>
236 lines
5.8 KiB
Go
236 lines
5.8 KiB
Go
package exchange
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/events"
|
|
"github.com/containerd/containerd/filters"
|
|
"github.com/containerd/containerd/identifiers"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/typeurl"
|
|
goevents "github.com/docker/go-events"
|
|
"github.com/gogo/protobuf/types"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Exchange broadcasts events
|
|
type Exchange struct {
|
|
broadcaster *goevents.Broadcaster
|
|
}
|
|
|
|
// NewExchange returns a new event Exchange
|
|
func NewExchange() *Exchange {
|
|
return &Exchange{
|
|
broadcaster: goevents.NewBroadcaster(),
|
|
}
|
|
}
|
|
|
|
var _ events.Publisher = &Exchange{}
|
|
var _ events.Forwarder = &Exchange{}
|
|
var _ events.Subscriber = &Exchange{}
|
|
|
|
// Forward accepts an envelope to be direcly distributed on the exchange.
|
|
//
|
|
// This is useful when an event is forwaded on behalf of another namespace or
|
|
// when the event is propagated on behalf of another publisher.
|
|
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
|
|
if err := validateEnvelope(envelope); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
|
"topic": envelope.Topic,
|
|
"ns": envelope.Namespace,
|
|
"type": envelope.Event.TypeUrl,
|
|
})
|
|
|
|
if err != nil {
|
|
logger.WithError(err).Error("error forwarding event")
|
|
} else {
|
|
logger.Debug("event forwarded")
|
|
}
|
|
}()
|
|
|
|
return e.broadcaster.Write(envelope)
|
|
}
|
|
|
|
// Publish packages and sends an event. The caller will be considered the
|
|
// initial publisher of the event. This means the timestamp will be calculated
|
|
// at this point and this method may read from the calling context.
|
|
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
|
|
var (
|
|
namespace string
|
|
encoded *types.Any
|
|
envelope events.Envelope
|
|
)
|
|
|
|
namespace, err = namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed publishing event")
|
|
}
|
|
if err := validateTopic(topic); err != nil {
|
|
return errors.Wrapf(err, "envelope topic %q", topic)
|
|
}
|
|
|
|
encoded, err = typeurl.MarshalAny(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
envelope.Timestamp = time.Now().UTC()
|
|
envelope.Namespace = namespace
|
|
envelope.Topic = topic
|
|
envelope.Event = encoded
|
|
|
|
defer func() {
|
|
logger := log.G(ctx).WithFields(logrus.Fields{
|
|
"topic": envelope.Topic,
|
|
"ns": envelope.Namespace,
|
|
"type": envelope.Event.TypeUrl,
|
|
})
|
|
|
|
if err != nil {
|
|
logger.WithError(err).Error("error publishing event")
|
|
} else {
|
|
logger.Debug("event published")
|
|
}
|
|
}()
|
|
|
|
return e.broadcaster.Write(&envelope)
|
|
}
|
|
|
|
// Subscribe to events on the exchange. Events are sent through the returned
|
|
// channel ch. If an error is encountered, it will be sent on channel errs and
|
|
// errs will be closed. To end the subscription, cancel the provided context.
|
|
//
|
|
// Zero or more filters may be provided as strings. Only events that match
|
|
// *any* of the provided filters will be sent on the channel. The filters use
|
|
// the standard containerd filters package syntax.
|
|
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
|
|
var (
|
|
evch = make(chan *events.Envelope)
|
|
errq = make(chan error, 1)
|
|
channel = goevents.NewChannel(0)
|
|
queue = goevents.NewQueue(channel)
|
|
dst goevents.Sink = queue
|
|
)
|
|
|
|
closeAll := func() {
|
|
defer close(errq)
|
|
defer e.broadcaster.Remove(dst)
|
|
defer queue.Close()
|
|
defer channel.Close()
|
|
}
|
|
|
|
ch = evch
|
|
errs = errq
|
|
|
|
if len(fs) > 0 {
|
|
filter, err := filters.ParseAll(fs...)
|
|
if err != nil {
|
|
errq <- errors.Wrapf(err, "failed parsing subscription filters")
|
|
closeAll()
|
|
return
|
|
}
|
|
|
|
dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool {
|
|
return filter.Match(adapt(gev))
|
|
}))
|
|
}
|
|
|
|
e.broadcaster.Add(dst)
|
|
|
|
go func() {
|
|
defer closeAll()
|
|
|
|
var err error
|
|
loop:
|
|
for {
|
|
select {
|
|
case ev := <-channel.C:
|
|
env, ok := ev.(*events.Envelope)
|
|
if !ok {
|
|
// TODO(stevvooe): For the most part, we are well protected
|
|
// from this condition. Both Forward and Publish protect
|
|
// from this.
|
|
err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev)
|
|
break
|
|
}
|
|
|
|
select {
|
|
case evch <- env:
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
}
|
|
|
|
if err == nil {
|
|
if cerr := ctx.Err(); cerr != context.Canceled {
|
|
err = cerr
|
|
}
|
|
}
|
|
|
|
errq <- err
|
|
}()
|
|
|
|
return
|
|
}
|
|
|
|
func validateTopic(topic string) error {
|
|
if topic == "" {
|
|
return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty")
|
|
}
|
|
|
|
if topic[0] != '/' {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'")
|
|
}
|
|
|
|
if len(topic) == 1 {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component")
|
|
}
|
|
|
|
components := strings.Split(topic[1:], "/")
|
|
for _, component := range components {
|
|
if err := identifiers.Validate(component); err != nil {
|
|
return errors.Wrapf(err, "failed validation on component %q", component)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func validateEnvelope(envelope *events.Envelope) error {
|
|
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
|
return errors.Wrapf(err, "event envelope has invalid namespace")
|
|
}
|
|
|
|
if err := validateTopic(envelope.Topic); err != nil {
|
|
return errors.Wrapf(err, "envelope topic %q", envelope.Topic)
|
|
}
|
|
|
|
if envelope.Timestamp.IsZero() {
|
|
return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func adapt(ev interface{}) filters.Adaptor {
|
|
if adaptor, ok := ev.(filters.Adaptor); ok {
|
|
return adaptor
|
|
}
|
|
|
|
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
|
|
return "", false
|
|
})
|
|
}
|