Merge pull request #1246 from stevvooe/events-refactor

events: refactor event distribution
This commit is contained in:
Michael Crosby
2017-07-26 08:31:12 -04:00
committed by GitHub
30 changed files with 669 additions and 644 deletions

View File

@@ -24,23 +24,22 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), e), nil
return NewService(m.(*bolt.DB), ic.Events), nil
},
})
}
type Service struct {
db *bolt.DB
emitter events.Poster
db *bolt.DB
publisher events.Publisher
}
func NewService(db *bolt.DB, evts events.Poster) api.ContainersServer {
return &Service{db: db, emitter: evts}
func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer {
return &Service{db: db, publisher: publisher}
}
func (s *Service) Register(server *grpc.Server) error {
@@ -94,7 +93,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) (
}); err != nil {
return &resp, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/containers/create", &eventsapi.ContainerCreate{
if err := s.publisher.Publish(ctx, "/containers/create", &eventsapi.ContainerCreate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: &eventsapi.ContainerCreate_Runtime{
@@ -136,7 +135,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) (
return &resp, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/containers/update", &eventsapi.ContainerUpdate{
if err := s.publisher.Publish(ctx, "/containers/update", &eventsapi.ContainerUpdate{
ID: resp.Container.ID,
Image: resp.Container.Image,
Labels: resp.Container.Labels,
@@ -155,7 +154,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (
return &empty.Empty{}, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/containers/delete", &eventsapi.ContainerDelete{
if err := s.publisher.Publish(ctx, "/containers/delete", &eventsapi.ContainerDelete{
ID: req.ID,
}); err != nil {
return &empty.Empty{}, err
@@ -175,12 +174,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -23,8 +23,8 @@ import (
)
type Service struct {
store content.Store
emitter events.Poster
store content.Store
publisher events.Publisher
}
var bufPool = sync.Pool{
@@ -58,8 +58,8 @@ func NewService(ic *plugin.InitContext) (interface{}, error) {
}
cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
return &Service{
store: cs,
emitter: events.GetPoster(ic.Context),
store: cs,
publisher: ic.Events,
}, nil
}
@@ -149,7 +149,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*e
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/content/delete", &eventsapi.ContentDelete{
if err := s.publisher.Publish(ctx, "/content/delete", &eventsapi.ContentDelete{
Digest: req.Digest,
}); err != nil {
return nil, err
@@ -459,12 +459,3 @@ func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empt
return &empty.Empty{}, nil
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -1,14 +1,13 @@
package events
import (
"fmt"
"time"
api "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@@ -18,18 +17,17 @@ func init() {
Type: plugin.GRPCPlugin,
ID: "events",
Init: func(ic *plugin.InitContext) (interface{}, error) {
return NewService(ic.Emitter), nil
return NewService(ic.Events), nil
},
})
}
type Service struct {
emitter *events.Emitter
timeouts map[string]*time.Timer
events *events.Exchange
}
func NewService(e *events.Emitter) api.EventsServer {
return &Service{emitter: e}
func NewService(events *events.Exchange) api.EventsServer {
return &Service{events: events}
}
func (s *Service) Register(server *grpc.Server) error {
@@ -37,28 +35,36 @@ func (s *Service) Register(server *grpc.Server) error {
return nil
}
func (s *Service) Stream(req *api.StreamEventsRequest, srv api.Events_StreamServer) error {
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error {
ctx, cancel := context.WithCancel(srv.Context())
defer cancel()
filter, err := filters.ParseAll(req.Filters...)
if err != nil {
return errdefs.ToGRPC(err)
}
eventq, errq := s.events.Subscribe(ctx, filter)
for {
e := <-s.emitter.Events(srv.Context(), clientID)
// upon the client event timeout this will be nil; ignore
if e == nil {
select {
case ev := <-eventq:
if err := srv.Send(ev); err != nil {
return errors.Wrapf(err, "failed sending event to subscriber")
}
case err := <-errq:
if err != nil {
return errors.Wrapf(err, "subscription error")
}
return nil
}
if err := srv.Send(e); err != nil {
logrus.WithFields(logrus.Fields{
"client": clientID,
}).Debug("error sending event; unsubscribing client")
s.emitter.Remove(clientID)
return err
}
}
}
func (s *Service) Post(ctx context.Context, r *api.PostEventRequest) (*empty.Empty, error) {
ctx = events.WithTopic(ctx, r.Envelope.Topic)
if err := s.emitter.Post(ctx, r.Envelope); err != nil {
return nil, err
func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) {
if err := s.events.Forward(ctx, r.Envelope); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &empty.Empty{}, nil
}

View File

@@ -24,25 +24,24 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), e), nil
return NewService(m.(*bolt.DB), ic.Events), nil
},
})
}
type Service struct {
db *bolt.DB
emitter events.Poster
db *bolt.DB
publisher events.Publisher
}
func NewService(db *bolt.DB, evts events.Poster) imagesapi.ImagesServer {
func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer {
return &Service{
db: db,
emitter: evts,
db: db,
publisher: publisher,
}
}
@@ -100,7 +99,7 @@ func (s *Service) Create(ctx context.Context, req *imagesapi.CreateImageRequest)
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/images/create", &eventsapi.ImageCreate{
if err := s.publisher.Publish(ctx, "/images/create", &eventsapi.ImageCreate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
@@ -139,7 +138,7 @@ func (s *Service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/images/update", &eventsapi.ImageUpdate{
if err := s.publisher.Publish(ctx, "/images/update", &eventsapi.ImageUpdate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
@@ -156,7 +155,7 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest)
return nil, err
}
if err := s.emit(ctx, "/images/delete", &eventsapi.ImageDelete{
if err := s.publisher.Publish(ctx, "/images/delete", &eventsapi.ImageDelete{
Name: req.Name,
}); err != nil {
return nil, err
@@ -176,12 +175,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -25,27 +25,26 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), e), nil
return NewService(m.(*bolt.DB), ic.Events), nil
},
})
}
type Service struct {
db *bolt.DB
emitter events.Poster
db *bolt.DB
publisher events.Publisher
}
var _ api.NamespacesServer = &Service{}
func NewService(db *bolt.DB, evts events.Poster) api.NamespacesServer {
func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer {
return &Service{
db: db,
emitter: evts,
db: db,
publisher: publisher,
}
}
@@ -119,7 +118,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (
return &resp, err
}
if err := s.emit(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{
if err := s.publisher.Publish(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
@@ -172,7 +171,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (
return &resp, err
}
if err := s.emit(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{
if err := s.publisher.Publish(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
@@ -189,7 +188,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (
return &empty.Empty{}, err
}
if err := s.emit(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{
if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &empty.Empty{}, err
@@ -209,12 +208,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -45,11 +45,10 @@ var empty = &protoempty.Empty{}
type service struct {
snapshotters map[string]snapshot.Snapshotter
defaultSnapshotterName string
emitter events.Poster
publisher events.Publisher
}
func newService(ic *plugin.InitContext) (interface{}, error) {
evts := events.GetPoster(ic.Context)
rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin)
if err != nil {
return nil, err
@@ -72,7 +71,7 @@ func newService(ic *plugin.InitContext) (interface{}, error) {
return &service{
snapshotters: snapshotters,
defaultSnapshotterName: cfg.Default,
emitter: evts,
publisher: ic.Events,
}, nil
}
@@ -105,7 +104,7 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotapi.PrepareSnapshotRe
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{
if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{
Key: pr.Key,
Parent: pr.Parent,
}); err != nil {
@@ -162,7 +161,7 @@ func (s *service) Commit(ctx context.Context, cr *snapshotapi.CommitSnapshotRequ
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{
if err := s.publisher.Publish(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{
Key: cr.Key,
Name: cr.Name,
}); err != nil {
@@ -183,7 +182,7 @@ func (s *service) Remove(ctx context.Context, rr *snapshotapi.RemoveSnapshotRequ
return nil, errdefs.ToGRPC(err)
}
if err := s.emit(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{
if err := s.publisher.Publish(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{
Key: rr.Key,
}); err != nil {
return nil, err
@@ -293,12 +292,3 @@ func fromMounts(mounts []mount.Mount) []*types.Mount {
}
return out
}
func (s *service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -67,20 +67,19 @@ func New(ic *plugin.InitContext) (interface{}, error) {
r := rr.(runtime.Runtime)
runtimes[r.ID()] = r
}
e := events.GetPoster(ic.Context)
return &Service{
runtimes: runtimes,
db: m.(*bolt.DB),
store: cs,
emitter: e,
runtimes: runtimes,
db: m.(*bolt.DB),
store: cs,
publisher: ic.Events,
}, nil
}
type Service struct {
runtimes map[string]runtime.Runtime
db *bolt.DB
store content.Store
emitter events.Poster
runtimes map[string]runtime.Runtime
db *bolt.DB
store content.Store
publisher events.Publisher
}
func (s *Service) Register(server *grpc.Server) error {
@@ -502,12 +501,3 @@ func (s *Service) getRuntime(name string) (runtime.Runtime, error) {
}
return runtime, nil
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}