diff --git a/client.go b/client.go index bc46f8632..ddf66bc8f 100644 --- a/client.go +++ b/client.go @@ -353,11 +353,50 @@ func (c *Client) ListImages(ctx context.Context) ([]Image, error) { return images, nil } -// Events returns an event subscription for the provided filters -func (c *Client) Events(ctx context.Context, filters ...string) (eventsapi.Events_SubscribeClient, error) { - return c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ +// Subscribe to events that match one or more of the provided filters. +// +// Callers should listen on both the envelope channel and errs channel. If the +// errs channel returns nil or an error, the subscriber should terminate. +// +// To cancel shutdown reciept of events, cancel the provided context. The errs +// channel will be closed and return a nil error. +func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *eventsapi.Envelope, errs <-chan error) { + var ( + evq = make(chan *eventsapi.Envelope) + errq = make(chan error, 1) + ) + + errs = errq + ch = evq + + session, err := c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ Filters: filters, }) + if err != nil { + errq <- err + close(errq) + return + } + + go func() { + defer close(errq) + + for { + ev, err := session.Recv() + if err != nil { + errq <- err + return + } + + select { + case evq <- ev: + case <-ctx.Done(): + return + } + } + }() + + return ch, errs } // Close closes the clients connection to containerd diff --git a/events/events.go b/events/events.go index d00c9e578..1ae0fc880 100644 --- a/events/events.go +++ b/events/events.go @@ -22,3 +22,7 @@ type publisherFunc func(ctx context.Context, topic string, event Event) error func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error { return fn(ctx, topic, event) } + +type Subscriber interface { + Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) +}