containerd: export Subscribe method on client
Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
0baecaa7cf
commit
9255e752b3
45
client.go
45
client.go
@ -353,11 +353,50 @@ func (c *Client) ListImages(ctx context.Context) ([]Image, error) {
|
||||
return images, nil
|
||||
}
|
||||
|
||||
// Events returns an event subscription for the provided filters
|
||||
func (c *Client) Events(ctx context.Context, filters ...string) (eventsapi.Events_SubscribeClient, error) {
|
||||
return c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
|
||||
// Subscribe to events that match one or more of the provided filters.
|
||||
//
|
||||
// Callers should listen on both the envelope channel and errs channel. If the
|
||||
// errs channel returns nil or an error, the subscriber should terminate.
|
||||
//
|
||||
// To cancel shutdown reciept of events, cancel the provided context. The errs
|
||||
// channel will be closed and return a nil error.
|
||||
func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *eventsapi.Envelope, errs <-chan error) {
|
||||
var (
|
||||
evq = make(chan *eventsapi.Envelope)
|
||||
errq = make(chan error, 1)
|
||||
)
|
||||
|
||||
errs = errq
|
||||
ch = evq
|
||||
|
||||
session, err := c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
|
||||
Filters: filters,
|
||||
})
|
||||
if err != nil {
|
||||
errq <- err
|
||||
close(errq)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(errq)
|
||||
|
||||
for {
|
||||
ev, err := session.Recv()
|
||||
if err != nil {
|
||||
errq <- err
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case evq <- ev:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, errs
|
||||
}
|
||||
|
||||
// Close closes the clients connection to containerd
|
||||
|
@ -22,3 +22,7 @@ type publisherFunc func(ctx context.Context, topic string, event Event) error
|
||||
func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error {
|
||||
return fn(ctx, topic, event)
|
||||
}
|
||||
|
||||
type Subscriber interface {
|
||||
Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user