Merge pull request #3207 from crosbymichael/ttrpc-deps

Requeue events in the ttrpc publisher
This commit is contained in:
Phil Estes 2019-04-12 17:54:12 +01:00 committed by GitHub
commit 906e8bc7dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 17 deletions

View File

@ -26,21 +26,37 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/sirupsen/logrus"
)
const (
queueSize = 2048
maxRequeue = 5
)
type item struct {
ev *v1.Envelope
ctx context.Context
count int
}
func newPublisher(address string) *remoteEventsPublisher {
return &remoteEventsPublisher{
l := &remoteEventsPublisher{
dialer: newDialier(func() (net.Conn, error) {
return connect(address, dial)
}),
closed: make(chan struct{}),
requeue: make(chan *item, queueSize),
}
go l.processQueue()
return l
}
type remoteEventsPublisher struct {
dialer *dialer
closed chan struct{}
closer sync.Once
requeue chan *item
}
func (l *remoteEventsPublisher) Done() <-chan struct{} {
@ -55,11 +71,42 @@ func (l *remoteEventsPublisher) Close() (err error) {
return err
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
func (l *remoteEventsPublisher) processQueue() {
for i := range l.requeue {
if i.count > maxRequeue {
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
// drop the event
continue
}
client, err := l.dialer.Get()
if err != nil {
return err
l.dialer.Put(err)
l.queue(i)
logrus.WithError(err).Error("get events client")
continue
}
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil {
l.dialer.Put(err)
logrus.WithError(err).Error("forward event")
l.queue(i)
}
}
}
func (l *remoteEventsPublisher) queue(i *item) {
go func() {
i.count++
// re-queue after a short delay
time.Sleep(time.Duration(1*i.count) * time.Second)
l.requeue <- i
}()
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@ -68,15 +115,26 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
if err != nil {
return err
}
if _, err := client.Forward(ctx, &v1.ForwardRequest{
Envelope: &v1.Envelope{
i := &item{
ev: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
ctx: ctx,
}
client, err := l.dialer.Get()
if err != nil {
l.dialer.Put(err)
l.queue(i)
return err
}
if _, err := client.Forward(i.ctx, &v1.ForwardRequest{
Envelope: i.ev,
}); err != nil {
l.dialer.Put(err)
l.queue(i)
return err
}
return nil

View File

@ -107,7 +107,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
}
var (
grpcServer = grpc.NewServer(serverOpts...)
hrpc = grpc.NewServer(tcpServerOpts...)
tcpServer = grpc.NewServer(tcpServerOpts...)
grpcServices []plugin.Service
tcpServices []plugin.TCPService
@ -115,7 +115,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
s = &Server{
grpcServer: grpcServer,
hrpc: hrpc,
tcpServer: tcpServer,
ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config,
@ -199,7 +199,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
}
}
for _, service := range tcpServices {
if err := service.RegisterTCP(hrpc); err != nil {
if err := service.RegisterTCP(tcpServer); err != nil {
return nil, err
}
}
@ -210,7 +210,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
type Server struct {
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
hrpc *grpc.Server
tcpServer *grpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
@ -243,8 +243,8 @@ func (s *Server) ServeMetrics(l net.Listener) error {
// ServeTCP allows services to serve over tcp
func (s *Server) ServeTCP(l net.Listener) error {
grpc_prometheus.Register(s.hrpc)
return trapClosedConnErr(s.hrpc.Serve(l))
grpc_prometheus.Register(s.tcpServer)
return trapClosedConnErr(s.tcpServer.Serve(l))
}
// ServeDebug provides a debug endpoint