Merge pull request #956 from ehazlett/events-service

Events Service
This commit is contained in:
Michael Crosby
2017-06-20 13:23:19 -07:00
committed by GitHub
50 changed files with 8284 additions and 69 deletions

View File

@@ -3,7 +3,9 @@ package containers
import (
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/containers"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
@@ -20,21 +22,23 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB)), nil
return NewService(m.(*bolt.DB), e), nil
},
})
}
type Service struct {
db *bolt.DB
db *bolt.DB
emitter events.Poster
}
func NewService(db *bolt.DB) api.ContainersServer {
return &Service{db: db}
func NewService(db *bolt.DB, evts events.Poster) api.ContainersServer {
return &Service{db: db, emitter: evts}
}
func (s *Service) Register(server *grpc.Server) error {
@@ -52,6 +56,7 @@ func (s *Service) Get(ctx context.Context, req *api.GetContainerRequest) (*api.G
}
containerpb := containerToProto(&container)
resp.Container = containerpb
return nil
})
}
@@ -74,7 +79,7 @@ func (s *Service) List(ctx context.Context, req *api.ListContainersRequest) (*ap
func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container)
@@ -83,14 +88,26 @@ func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) (
}
resp.Container = containerToProto(&created)
return nil
})
}); err != nil {
return &resp, err
}
if err := s.emit(ctx, "/containers/create", event.ContainerCreate{
ContainerID: resp.Container.ID,
Image: resp.Container.Image,
Runtime: resp.Container.Runtime,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) (*api.UpdateContainerResponse, error) {
var resp api.UpdateContainerResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
container := containerFromProto(&req.Container)
current, err := store.Get(ctx, container.ID)
@@ -134,14 +151,38 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) (
}
resp.Container = containerToProto(&created)
return nil
})
}); err != nil {
return &resp, err
}
if err := s.emit(ctx, "/containers/update", event.ContainerUpdate{
ContainerID: resp.Container.ID,
Image: resp.Container.Image,
Labels: resp.Container.Labels,
RootFS: resp.Container.RootFS,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (*empty.Empty, error) {
return &empty.Empty{}, s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
return mapGRPCError(store.Delete(ctx, req.ID), req.ID)
})
}); err != nil {
return &empty.Empty{}, mapGRPCError(err, req.ID)
}
if err := s.emit(ctx, "/containers/delete", event.ContainerDelete{
ContainerID: req.ID,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
@@ -155,3 +196,12 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -6,7 +6,9 @@ import (
"github.com/Sirupsen/logrus"
api "github.com/containerd/containerd/api/services/content"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
@@ -18,7 +20,8 @@ import (
)
type Service struct {
store content.Store
store content.Store
emitter events.Poster
}
var bufPool = sync.Pool{
@@ -46,7 +49,8 @@ func NewService(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
return &Service{
store: c.(content.Store),
store: c.(content.Store),
emitter: events.GetPoster(ic.Context),
}, nil
}
@@ -124,6 +128,12 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*e
return nil, serverErrorToGRPC(err, req.Digest.String())
}
if err := s.emit(ctx, "/content/delete", event.ContentDelete{
Digest: req.Digest,
}); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}
@@ -409,3 +419,12 @@ func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empt
return &empty.Empty{}, nil
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,54 @@
package events
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
api "github.com/containerd/containerd/api/services/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"google.golang.org/grpc"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "events",
Init: func(ic *plugin.InitContext) (interface{}, error) {
return NewService(ic.Emitter), nil
},
})
}
type Service struct {
emitter *events.Emitter
timeouts map[string]*time.Timer
}
func NewService(e *events.Emitter) api.EventsServer {
return &Service{emitter: e}
}
func (s *Service) Register(server *grpc.Server) error {
api.RegisterEventsServer(server, s)
return nil
}
func (s *Service) EventStream(req *api.EventStreamRequest, srv api.Events_EventStreamServer) error {
clientID := fmt.Sprintf("%d", time.Now().UnixNano())
for {
e := <-s.emitter.Events(srv.Context(), clientID)
// upon the client event timeout this will be nil; ignore
if e == nil {
return nil
}
if err := srv.Send(e); err != nil {
logrus.WithFields(logrus.Fields{
"client": clientID,
}).Debug("error sending event; unsubscribing client")
s.emitter.Remove(clientID)
return err
}
}
}

View File

@@ -11,10 +11,12 @@ import (
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/descriptor"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
@@ -69,11 +71,13 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err != nil {
return nil, err
}
e := events.GetPoster(ic.Context)
return &Service{
runtimes: runtimes,
db: m.(*bolt.DB),
collector: c,
store: ct.(content.Store),
emitter: e,
}, nil
}
@@ -82,6 +86,7 @@ type Service struct {
db *bolt.DB
collector *collector
store content.Store
emitter events.Poster
}
func (s *Service) Register(server *grpc.Server) error {
@@ -159,6 +164,13 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
if err != nil {
log.G(ctx).Error(err)
}
if err := s.emit(ctx, "/tasks/create", event.TaskCreate{
ContainerID: r.ContainerID,
}); err != nil {
return nil, err
}
return &api.CreateResponse{
ContainerID: r.ContainerID,
Pid: state.Pid,
@@ -173,6 +185,13 @@ func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_proto
if err := c.Start(ctx); err != nil {
return nil, err
}
if err := s.emit(ctx, "/tasks/start", event.TaskStart{
ContainerID: r.ContainerID,
}); err != nil {
return nil, err
}
return empty, nil
}
@@ -189,6 +208,13 @@ func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.Delete
if err != nil {
return nil, err
}
if err := s.emit(ctx, "/tasks/delete", event.TaskDelete{
ContainerID: r.ContainerID,
}); err != nil {
return nil, err
}
return &api.DeleteResponse{
ExitStatus: exit.Status,
ExitedAt: exit.Timestamp,
@@ -489,6 +515,15 @@ func (s *Service) getRuntime(name string) (plugin.Runtime, error) {
return runtime, nil
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}
type grpcEventWriter struct {
server api.Tasks_EventsServer
}

View File

@@ -3,6 +3,8 @@ package images
import (
"github.com/boltdb/bolt"
imagesapi "github.com/containerd/containerd/api/services/images"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
@@ -19,21 +21,26 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB)), nil
return NewService(m.(*bolt.DB), e), nil
},
})
}
type Service struct {
db *bolt.DB
db *bolt.DB
emitter events.Poster
}
func NewService(db *bolt.DB) imagesapi.ImagesServer {
return &Service{db: db}
func NewService(db *bolt.DB, evts events.Poster) imagesapi.ImagesServer {
return &Service{
db: db,
emitter: evts,
}
}
func (s *Service) Register(server *grpc.Server) error {
@@ -56,9 +63,20 @@ func (s *Service) Get(ctx context.Context, req *imagesapi.GetRequest) (*imagesap
}
func (s *Service) Put(ctx context.Context, req *imagesapi.PutRequest) (*empty.Empty, error) {
return &empty.Empty{}, s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
return mapGRPCError(store.Put(ctx, req.Image.Name, descFromProto(&req.Image.Target)), req.Image.Name)
})
}); err != nil {
return &empty.Empty{}, err
}
if err := s.emit(ctx, "/images/put", event.ImagePut{
Name: req.Image.Name,
Labels: req.Image.Labels,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *Service) List(ctx context.Context, _ *imagesapi.ListRequest) (*imagesapi.ListResponse, error) {
@@ -76,9 +94,19 @@ func (s *Service) List(ctx context.Context, _ *imagesapi.ListRequest) (*imagesap
}
func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteRequest) (*empty.Empty, error) {
return &empty.Empty{}, s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
return mapGRPCError(store.Delete(ctx, req.Name), req.Name)
})
}); err != nil {
return &empty.Empty{}, err
}
if err := s.emit(ctx, "/images/delete", event.ImageDelete{
Name: req.Name,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
@@ -92,3 +120,12 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -5,6 +5,8 @@ import (
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/namespaces"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
@@ -22,23 +24,28 @@ func init() {
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB)), nil
return NewService(m.(*bolt.DB), e), nil
},
})
}
type Service struct {
db *bolt.DB
db *bolt.DB
emitter events.Poster
}
var _ api.NamespacesServer = &Service{}
func NewService(db *bolt.DB) api.NamespacesServer {
return &Service{db: db}
func NewService(db *bolt.DB, evts events.Poster) api.NamespacesServer {
return &Service{
db: db,
emitter: evts,
}
}
func (s *Service) Register(server *grpc.Server) error {
@@ -94,7 +101,7 @@ func (s *Service) List(ctx context.Context, req *api.ListNamespacesRequest) (*ap
func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) {
var resp api.CreateNamespaceResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil {
return mapGRPCError(err, req.Namespace.Name)
}
@@ -107,13 +114,24 @@ func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (
resp.Namespace = req.Namespace
return nil
})
}); err != nil {
return &resp, err
}
if err := s.emit(ctx, "/namespaces/create", event.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (*api.UpdateNamespaceResponse, error) {
var resp api.UpdateNamespaceResponse
return &resp, s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch {
@@ -149,14 +167,34 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (
}
return nil
})
}); err != nil {
return &resp, err
}
if err := s.emit(ctx, "/namespaces/update", event.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (*empty.Empty, error) {
return &empty.Empty{}, s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
return mapGRPCError(store.Delete(ctx, req.Name), req.Name)
})
}); err != nil {
return &empty.Empty{}, err
}
if err := s.emit(ctx, "/namespaces/delete", event.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error {
@@ -170,3 +208,12 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}
func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}

View File

@@ -4,7 +4,9 @@ import (
gocontext "context"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/api/types/event"
mounttypes "github.com/containerd/containerd/api/types/mount"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
@@ -23,11 +25,12 @@ func init() {
plugin.SnapshotPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
e := events.GetPoster(ic.Context)
s, err := ic.Get(plugin.SnapshotPlugin)
if err != nil {
return nil, err
}
return newService(s.(snapshot.Snapshotter))
return newService(s.(snapshot.Snapshotter), e)
},
})
}
@@ -36,11 +39,13 @@ var empty = &protoempty.Empty{}
type service struct {
snapshotter snapshot.Snapshotter
emitter events.Poster
}
func newService(snapshotter snapshot.Snapshotter) (*service, error) {
func newService(snapshotter snapshot.Snapshotter, evts events.Poster) (*service, error) {
return &service{
snapshotter: snapshotter,
emitter: evts,
}, nil
}
@@ -57,6 +62,14 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotapi.PrepareRequest) (
if err != nil {
return nil, grpcError(err)
}
if err := s.emit(ctx, "/snapshot/prepare", event.SnapshotPrepare{
Key: pr.Key,
Parent: pr.Parent,
}); err != nil {
return nil, err
}
return fromMounts(mounts), nil
}
@@ -89,6 +102,13 @@ func (s *service) Commit(ctx context.Context, cr *snapshotapi.CommitRequest) (*p
if err := s.snapshotter.Commit(ctx, cr.Name, cr.Key); err != nil {
return nil, grpcError(err)
}
if err := s.emit(ctx, "/snapshot/commit", event.SnapshotCommit{
Key: cr.Key,
Name: cr.Name,
}); err != nil {
return nil, err
}
return empty, nil
}
@@ -99,6 +119,12 @@ func (s *service) Remove(ctx context.Context, rr *snapshotapi.RemoveRequest) (*p
if err := s.snapshotter.Remove(ctx, rr.Key); err != nil {
return nil, grpcError(err)
}
if err := s.emit(ctx, "/snapshot/remove", event.SnapshotRemove{
Key: rr.Key,
}); err != nil {
return nil, err
}
return empty, nil
}
@@ -210,3 +236,12 @@ func fromMounts(mounts []mount.Mount) *snapshotapi.MountsResponse {
}
return resp
}
func (s *service) emit(ctx context.Context, topic string, evt interface{}) error {
emitterCtx := events.WithTopic(ctx, topic)
if err := s.emitter.Post(emitterCtx, evt); err != nil {
return err
}
return nil
}