events: decouple events package from grpc
By defining a concrete, non-protobuf type for the events interface, we can completely decouple it from the grpc packages that are expensive at runtime. This does requires some allocation cost for converting between types, but the saving for the size of the shim are worth it. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
5c6e789dde
commit
2d05c4aa1a
@ -2,10 +2,19 @@ package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
events "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/gogo/protobuf/types"
|
||||
)
|
||||
|
||||
// Envelope provides the packaging for an event.
|
||||
type Envelope struct {
|
||||
Timestamp time.Time
|
||||
Namespace string
|
||||
Topic string
|
||||
Event *types.Any
|
||||
}
|
||||
|
||||
// Event is a generic interface for any type of event
|
||||
type Event interface{}
|
||||
|
||||
@ -16,16 +25,10 @@ type Publisher interface {
|
||||
|
||||
// Forwarder forwards an event to the underlying event bus
|
||||
type Forwarder interface {
|
||||
Forward(ctx context.Context, envelope *events.Envelope) error
|
||||
}
|
||||
|
||||
type publisherFunc func(ctx context.Context, topic string, event Event) error
|
||||
|
||||
func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error {
|
||||
return fn(ctx, topic, event)
|
||||
Forward(ctx context.Context, envelope *Envelope) error
|
||||
}
|
||||
|
||||
// Subscriber allows callers to subscribe to events
|
||||
type Subscriber interface {
|
||||
Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error)
|
||||
Subscribe(ctx context.Context, filters ...string) (ch <-chan *Envelope, errs <-chan error)
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/filters"
|
||||
@ -31,11 +30,15 @@ func NewExchange() *Exchange {
|
||||
}
|
||||
}
|
||||
|
||||
var _ events.Publisher = &Exchange{}
|
||||
var _ events.Forwarder = &Exchange{}
|
||||
var _ events.Subscriber = &Exchange{}
|
||||
|
||||
// Forward accepts an envelope to be direcly distributed on the exchange.
|
||||
//
|
||||
// This is useful when an event is forwaded on behalf of another namespace or
|
||||
// when the event is propagated on behalf of another publisher.
|
||||
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
|
||||
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
|
||||
if err := validateEnvelope(envelope); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -64,7 +67,7 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event
|
||||
var (
|
||||
namespace string
|
||||
encoded *types.Any
|
||||
envelope v1.Envelope
|
||||
envelope events.Envelope
|
||||
)
|
||||
|
||||
namespace, err = namespaces.NamespaceRequired(ctx)
|
||||
@ -109,9 +112,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event
|
||||
// Zero or more filters may be provided as strings. Only events that match
|
||||
// *any* of the provided filters will be sent on the channel. The filters use
|
||||
// the standard containerd filters package syntax.
|
||||
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
|
||||
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
|
||||
var (
|
||||
evch = make(chan *v1.Envelope)
|
||||
evch = make(chan *events.Envelope)
|
||||
errq = make(chan error, 1)
|
||||
channel = goevents.NewChannel(0)
|
||||
queue = goevents.NewQueue(channel)
|
||||
@ -151,7 +154,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.E
|
||||
for {
|
||||
select {
|
||||
case ev := <-channel.C:
|
||||
env, ok := ev.(*v1.Envelope)
|
||||
env, ok := ev.(*events.Envelope)
|
||||
if !ok {
|
||||
// TODO(stevvooe): For the most part, we are well protected
|
||||
// from this condition. Both Forward and Publish protect
|
||||
@ -205,7 +208,7 @@ func validateTopic(topic string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateEnvelope(envelope *v1.Envelope) error {
|
||||
func validateEnvelope(envelope *events.Envelope) error {
|
||||
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
||||
return errors.Wrapf(err, "event envelope has invalid namespace")
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"time"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
@ -57,7 +56,7 @@ func TestExchangeBasic(t *testing.T) {
|
||||
wg.Wait()
|
||||
|
||||
for _, subscriber := range []struct {
|
||||
eventq <-chan *v1.Envelope
|
||||
eventq <-chan *events.Envelope
|
||||
errq <-chan error
|
||||
cancel func()
|
||||
}{
|
||||
@ -133,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
envelope := v1.Envelope{
|
||||
envelope := events.Envelope{
|
||||
Timestamp: time.Now().UTC(),
|
||||
Namespace: namespace,
|
||||
Topic: testcase.input,
|
||||
|
@ -3,6 +3,7 @@ package events
|
||||
import (
|
||||
api "github.com/containerd/containerd/api/services/events/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/events/exchange"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
@ -44,7 +45,7 @@ func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.E
|
||||
}
|
||||
|
||||
func (s *service) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) {
|
||||
if err := s.events.Forward(ctx, r.Envelope); err != nil {
|
||||
if err := s.events.Forward(ctx, fromProto(r.Envelope)); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
@ -59,7 +60,7 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS
|
||||
for {
|
||||
select {
|
||||
case ev := <-eventq:
|
||||
if err := srv.Send(ev); err != nil {
|
||||
if err := srv.Send(toProto(ev)); err != nil {
|
||||
return errors.Wrapf(err, "failed sending event to subscriber")
|
||||
}
|
||||
case err := <-errq:
|
||||
@ -71,3 +72,21 @@ func (s *service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func toProto(env *events.Envelope) *api.Envelope {
|
||||
return &api.Envelope{
|
||||
Timestamp: env.Timestamp,
|
||||
Namespace: env.Namespace,
|
||||
Topic: env.Topic,
|
||||
Event: env.Event,
|
||||
}
|
||||
}
|
||||
|
||||
func fromProto(env *api.Envelope) *events.Envelope {
|
||||
return &events.Envelope{
|
||||
Timestamp: env.Timestamp,
|
||||
Namespace: env.Namespace,
|
||||
Topic: env.Topic,
|
||||
Event: env.Event,
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user