From 97d247cd0f63e7eaeb4145ad6d5c56ddcb8b6245 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 11 Apr 2019 15:06:06 -0400 Subject: [PATCH 1/2] Rename `hrpc` to `tcpServer` Signed-off-by: Michael Crosby --- services/server/server.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/services/server/server.go b/services/server/server.go index c5f30d924..ed4e8b9f5 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -107,7 +107,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { } var ( grpcServer = grpc.NewServer(serverOpts...) - hrpc = grpc.NewServer(tcpServerOpts...) + tcpServer = grpc.NewServer(tcpServerOpts...) grpcServices []plugin.Service tcpServices []plugin.TCPService @@ -115,7 +115,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { s = &Server{ grpcServer: grpcServer, - hrpc: hrpc, + tcpServer: tcpServer, ttrpcServer: ttrpcServer, events: exchange.NewExchange(), config: config, @@ -199,7 +199,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { } } for _, service := range tcpServices { - if err := service.RegisterTCP(hrpc); err != nil { + if err := service.RegisterTCP(tcpServer); err != nil { return nil, err } } @@ -210,7 +210,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { type Server struct { grpcServer *grpc.Server ttrpcServer *ttrpc.Server - hrpc *grpc.Server + tcpServer *grpc.Server events *exchange.Exchange config *srvconfig.Config plugins []*plugin.Plugin @@ -243,8 +243,8 @@ func (s *Server) ServeMetrics(l net.Listener) error { // ServeTCP allows services to serve over tcp func (s *Server) ServeTCP(l net.Listener) error { - grpc_prometheus.Register(s.hrpc) - return trapClosedConnErr(s.hrpc.Serve(l)) + grpc_prometheus.Register(s.tcpServer) + return trapClosedConnErr(s.tcpServer.Serve(l)) } // ServeDebug provides a debug endpoint From 63c7a879b698c73a1e922e96f6fb8018a1975415 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 11 Apr 2019 15:41:56 -0400 Subject: [PATCH 2/2] Requeue events in the shim publisher Signed-off-by: Michael Crosby --- runtime/v2/shim/publisher.go | 80 +++++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go index 915d5cd4d..da83201e3 100644 --- a/runtime/v2/shim/publisher.go +++ b/runtime/v2/shim/publisher.go @@ -26,21 +26,37 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" "github.com/containerd/typeurl" + "github.com/sirupsen/logrus" ) +const ( + queueSize = 2048 + maxRequeue = 5 +) + +type item struct { + ev *v1.Envelope + ctx context.Context + count int +} + func newPublisher(address string) *remoteEventsPublisher { - return &remoteEventsPublisher{ + l := &remoteEventsPublisher{ dialer: newDialier(func() (net.Conn, error) { return connect(address, dial) }), - closed: make(chan struct{}), + closed: make(chan struct{}), + requeue: make(chan *item, queueSize), } + go l.processQueue() + return l } type remoteEventsPublisher struct { - dialer *dialer - closed chan struct{} - closer sync.Once + dialer *dialer + closed chan struct{} + closer sync.Once + requeue chan *item } func (l *remoteEventsPublisher) Done() <-chan struct{} { @@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) { return err } -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - client, err := l.dialer.Get() - if err != nil { - return err +func (l *remoteEventsPublisher) processQueue() { + for i := range l.requeue { + if i.count > maxRequeue { + logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic) + // drop the event + continue + } + + client, err := l.dialer.Get() + if err != nil { + l.dialer.Put(err) + + l.queue(i) + logrus.WithError(err).Error("get events client") + continue + } + if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ + Envelope: i.ev, + }); err != nil { + l.dialer.Put(err) + logrus.WithError(err).Error("forward event") + l.queue(i) + } } +} + +func (l *remoteEventsPublisher) queue(i *item) { + go func() { + i.count++ + // re-queue after a short delay + time.Sleep(time.Duration(1*i.count) * time.Second) + l.requeue <- i + }() +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event if err != nil { return err } - if _, err := client.Forward(ctx, &v1.ForwardRequest{ - Envelope: &v1.Envelope{ + i := &item{ + ev: &v1.Envelope{ Timestamp: time.Now(), Namespace: ns, Topic: topic, Event: any, }, + ctx: ctx, + } + client, err := l.dialer.Get() + if err != nil { + l.dialer.Put(err) + l.queue(i) + return err + } + if _, err := client.Forward(i.ctx, &v1.ForwardRequest{ + Envelope: i.ev, }); err != nil { l.dialer.Put(err) + l.queue(i) return err } return nil