shim: move event context timeout to publsher

Before this change, if an event fails to send on the first attempt,
subsequent attempts will fail with context.Cancelled because the the
caller of publish passes a cancellable timeout, which the publisher uses
to send the event.

The publisher returns immediately if the send fails, but adds the event
to an async queue to try again.
Meanwhile the caller will return cancelling the context.

Additionally, subsequent attempts may fail to send because the timeout
was expected to be for a single request but the queue sleeps for
`attempt*time.Second`.

In the shim service, the timeout was set to 5s, which means the send
will fail with context.DeadlineExceeded before it reaches `maxRequeue`
(which is currently 5).

This change moves the timeout to the publisher so each send attempt gets
its own timeout.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2020-07-20 13:04:07 -07:00
parent e818fe27ce
commit d7b9cb0019
2 changed files with 9 additions and 4 deletions

View File

@ -769,9 +769,7 @@ func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel()
if err != nil {
logrus.WithError(err).Error("post event")
}

View File

@ -130,7 +130,9 @@ func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
service, err := l.client.EventsService()
if err == nil {
_, err = service.Forward(ctx, req)
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err == nil {
return nil
}
@ -149,7 +151,12 @@ func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.Forw
if err != nil {
return err
}
if _, err = service.Forward(ctx, req); err != nil {
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = service.Forward(fCtx, req)
cancel()
if err != nil {
return err
}