From 63c7a879b698c73a1e922e96f6fb8018a1975415 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 11 Apr 2019 15:41:56 -0400 Subject: [PATCH] Requeue events in the shim publisher Signed-off-by: Michael Crosby --- runtime/v2/shim/publisher.go | 80 +++++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go index 915d5cd4d..da83201e3 100644 --- a/runtime/v2/shim/publisher.go +++ b/runtime/v2/shim/publisher.go @@ -26,21 +26,37 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" "github.com/containerd/typeurl" + "github.com/sirupsen/logrus" ) +const ( + queueSize = 2048 + maxRequeue = 5 +) + +type item struct { + ev *v1.Envelope + ctx context.Context + count int +} + func newPublisher(address string) *remoteEventsPublisher { - return &remoteEventsPublisher{ + l := &remoteEventsPublisher{ dialer: newDialier(func() (net.Conn, error) { return connect(address, dial) }), - closed: make(chan struct{}), + closed: make(chan struct{}), + requeue: make(chan *item, queueSize), } + go l.processQueue() + return l } type remoteEventsPublisher struct { - dialer *dialer - closed chan struct{} - closer sync.Once + dialer *dialer + closed chan struct{} + closer sync.Once + requeue chan *item } func (l *remoteEventsPublisher) Done() <-chan struct{} { @@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) { return err } -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - client, err := l.dialer.Get() - if err != nil { - return err +func (l *remoteEventsPublisher) processQueue() { + for i := range l.requeue { + if i.count > maxRequeue { + logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic) + // drop the event + continue + } + + client, err := l.dialer.Get() + if err != nil { + l.dialer.Put(err) + + l.queue(i) + logrus.WithError(err).Error("get events client") + continue + } + if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ + Envelope: i.ev, + }); err != nil { + l.dialer.Put(err) + logrus.WithError(err).Error("forward event") + l.queue(i) + } } +} + +func (l *remoteEventsPublisher) queue(i *item) { + go func() { + i.count++ + // re-queue after a short delay + time.Sleep(time.Duration(1*i.count) * time.Second) + l.requeue <- i + }() +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event if err != nil { return err } - if _, err := client.Forward(ctx, &v1.ForwardRequest{ - Envelope: &v1.Envelope{ + i := &item{ + ev: &v1.Envelope{ Timestamp: time.Now(), Namespace: ns, Topic: topic, Event: any, }, + ctx: ctx, + } + client, err := l.dialer.Get() + if err != nil { + l.dialer.Put(err) + l.queue(i) + return err + } + if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ + Envelope: i.ev, }); err != nil { l.dialer.Put(err) + l.queue(i) return err } return nil