123 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			123 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package containerd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 
 | |
| 	eventsapi "github.com/containerd/containerd/api/services/events/v1"
 | |
| 	"github.com/containerd/containerd/errdefs"
 | |
| 	"github.com/containerd/containerd/events"
 | |
| 	"github.com/containerd/typeurl"
 | |
| )
 | |
| 
 | |
| // EventService handles the publish, forward and subscribe of events.
 | |
| type EventService interface {
 | |
| 	events.Publisher
 | |
| 	events.Forwarder
 | |
| 	events.Subscriber
 | |
| }
 | |
| 
 | |
| // NewEventServiceFromClient returns a new event service which communicates
 | |
| // over a GRPC connection.
 | |
| func NewEventServiceFromClient(client eventsapi.EventsClient) EventService {
 | |
| 	return &eventRemote{
 | |
| 		client: client,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type eventRemote struct {
 | |
| 	client eventsapi.EventsClient
 | |
| }
 | |
| 
 | |
| func (e *eventRemote) Publish(ctx context.Context, topic string, event events.Event) error {
 | |
| 	any, err := typeurl.MarshalAny(event)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	req := &eventsapi.PublishRequest{
 | |
| 		Topic: topic,
 | |
| 		Event: any,
 | |
| 	}
 | |
| 	if _, err := e.client.Publish(ctx, req); err != nil {
 | |
| 		return errdefs.FromGRPC(err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (e *eventRemote) Forward(ctx context.Context, envelope *events.Envelope) error {
 | |
| 	req := &eventsapi.ForwardRequest{
 | |
| 		Envelope: &eventsapi.Envelope{
 | |
| 			Timestamp: envelope.Timestamp,
 | |
| 			Namespace: envelope.Namespace,
 | |
| 			Topic:     envelope.Topic,
 | |
| 			Event:     envelope.Event,
 | |
| 		},
 | |
| 	}
 | |
| 	if _, err := e.client.Forward(ctx, req); err != nil {
 | |
| 		return errdefs.FromGRPC(err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (e *eventRemote) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
 | |
| 	var (
 | |
| 		evq  = make(chan *events.Envelope)
 | |
| 		errq = make(chan error, 1)
 | |
| 	)
 | |
| 
 | |
| 	errs = errq
 | |
| 	ch = evq
 | |
| 
 | |
| 	session, err := e.client.Subscribe(ctx, &eventsapi.SubscribeRequest{
 | |
| 		Filters: filters,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		errq <- err
 | |
| 		close(errq)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		defer close(errq)
 | |
| 
 | |
| 		for {
 | |
| 			ev, err := session.Recv()
 | |
| 			if err != nil {
 | |
| 				errq <- err
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			select {
 | |
| 			case evq <- &events.Envelope{
 | |
| 				Timestamp: ev.Timestamp,
 | |
| 				Namespace: ev.Namespace,
 | |
| 				Topic:     ev.Topic,
 | |
| 				Event:     ev.Event,
 | |
| 			}:
 | |
| 			case <-ctx.Done():
 | |
| 				if cerr := ctx.Err(); cerr != context.Canceled {
 | |
| 					errq <- cerr
 | |
| 				}
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return ch, errs
 | |
| }
 | 
