Requeue events in the shim publisher

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-04-11 15:41:56 -04:00
parent 97d247cd0f
commit 63c7a879b6

View File

@ -26,21 +26,37 @@ import (
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
) )
const (
queueSize = 2048
maxRequeue = 5
)
type item struct {
ev *v1.Envelope
ctx context.Context
count int
}
func newPublisher(address string) *remoteEventsPublisher { func newPublisher(address string) *remoteEventsPublisher {
return &remoteEventsPublisher{ l := &remoteEventsPublisher{
dialer: newDialier(func() (net.Conn, error) { dialer: newDialier(func() (net.Conn, error) {
return connect(address, dial) return connect(address, dial)
}), }),
closed: make(chan struct{}), closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
} }
go l.processQueue()
return l
} }
type remoteEventsPublisher struct { type remoteEventsPublisher struct {
dialer *dialer dialer *dialer
closed chan struct{} closed chan struct{}
closer sync.Once closer sync.Once
requeue chan *item
} }
func (l *remoteEventsPublisher) Done() <-chan struct{} { func (l *remoteEventsPublisher) Done() <-chan struct{} {
@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) {
return err return err
} }
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { func (l *remoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
// drop the event
continue
}
client, err := l.dialer.Get() client, err := l.dialer.Get()
if err != nil { if err != nil {
return err l.dialer.Put(err)
l.queue(i)
logrus.WithError(err).Error("get events client")
continue
} }
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil {
l.dialer.Put(err)
logrus.WithError(err).Error("forward event")
l.queue(i)
}
}
}
func (l *remoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
time.Sleep(time.Duration(1*i.count) * time.Second)
l.requeue <- i
}()
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return err return err
@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
if err != nil { if err != nil {
return err return err
} }
if _, err := client.Forward(ctx, &v1.ForwardRequest{ i := &item{
Envelope: &v1.Envelope{ ev: &v1.Envelope{
Timestamp: time.Now(), Timestamp: time.Now(),
Namespace: ns, Namespace: ns,
Topic: topic, Topic: topic,
Event: any, Event: any,
}, },
ctx: ctx,
}
client, err := l.dialer.Get()
if err != nil {
l.dialer.Put(err)
l.queue(i)
return err
}
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil { }); err != nil {
l.dialer.Put(err) l.dialer.Put(err)
l.queue(i)
return err return err
} }
return nil return nil