events: add protos

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: update events package to include emitter and use envelope proto

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: add events service

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: enable events service and update ctr events to use events service

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

event listeners

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: helper func for emitting in services

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: improved cli for containers and tasks

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

create event envelope with poster

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: introspect event data to use for type url

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: use pb encoding; add event types

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: instrument content and snapshot services with events

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: instrument image service with events

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: instrument namespace service with events

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: add namespace support

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: only send events from namespace requested from client

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>

events: switch to go-events for broadcasting

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett
2017-06-02 15:06:38 -04:00
parent 7e3b7dead6
commit 935645b03a
50 changed files with 8290 additions and 69 deletions

92
events/convert.go Normal file
View File

@@ -0,0 +1,92 @@
package events
import (
"path"
"strings"
"github.com/containerd/containerd/api/types/event"
"github.com/gogo/protobuf/proto"
protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
)
func getUrl(name string) string {
base := "types.containerd.io"
return path.Join(base, strings.Join([]string{
"containerd",
EventVersion,
"types",
"event",
name,
}, "."))
}
func convertToAny(evt Event) (*protobuf.Any, error) {
url := ""
var pb proto.Message
switch v := evt.(type) {
case event.ContainerCreate:
url = getUrl("ContainerCreate")
pb = &v
case event.ContainerDelete:
url = getUrl("ContainerDelete")
pb = &v
case event.TaskCreate:
url = getUrl("TaskCreate")
pb = &v
case event.TaskStart:
url = getUrl("TaskStart")
pb = &v
case event.TaskDelete:
url = getUrl("TaskDelete")
pb = &v
case event.ContentDelete:
url = getUrl("ContentDelete")
pb = &v
case event.SnapshotPrepare:
url = getUrl("SnapshotPrepare")
pb = &v
case event.SnapshotCommit:
url = getUrl("SnapshotCommit")
pb = &v
case event.SnapshotRemove:
url = getUrl("SnapshotRemove")
pb = &v
case event.ImagePut:
url = getUrl("ImagePut")
pb = &v
case event.ImageDelete:
url = getUrl("ImageDelete")
pb = &v
case event.NamespaceCreate:
url = getUrl("NamespaceCreate")
pb = &v
case event.NamespaceUpdate:
url = getUrl("NamespaceUpdate")
pb = &v
case event.NamespaceDelete:
url = getUrl("NamespaceDelete")
pb = &v
case event.RuntimeCreate:
url = getUrl("RuntimeCreate")
pb = &v
case event.RuntimeEvent:
url = getUrl("RuntimeEvent")
pb = &v
case event.RuntimeDelete:
url = getUrl("RuntimeDelete")
pb = &v
default:
return nil, errors.Errorf("unsupported event type: %T", v)
}
val, err := proto.Marshal(pb)
if err != nil {
return nil, err
}
return &protobuf.Any{
TypeUrl: url,
Value: val,
}, nil
}

69
events/emitter.go Normal file
View File

@@ -0,0 +1,69 @@
package events
import (
"context"
"sync"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/namespaces"
goevents "github.com/docker/go-events"
)
const (
EventVersion = "v1"
)
type Emitter struct {
sinks map[string]*eventSink
broadcaster *goevents.Broadcaster
m sync.Mutex
}
func NewEmitter() *Emitter {
return &Emitter{
sinks: make(map[string]*eventSink),
broadcaster: goevents.NewBroadcaster(),
m: sync.Mutex{},
}
}
func (e *Emitter) Post(ctx context.Context, evt Event) error {
if err := e.broadcaster.Write(&sinkEvent{
ctx: ctx,
event: evt,
}); err != nil {
return err
}
return nil
}
func (e *Emitter) Events(ctx context.Context, clientID string) chan *event.Envelope {
e.m.Lock()
if _, ok := e.sinks[clientID]; !ok {
ns, _ := namespaces.Namespace(ctx)
s := &eventSink{
ch: make(chan *event.Envelope),
ns: ns,
}
e.sinks[clientID] = s
e.broadcaster.Add(s)
}
ch := e.sinks[clientID].ch
e.m.Unlock()
return ch
}
func (e *Emitter) Remove(clientID string) {
e.m.Lock()
if v, ok := e.sinks[clientID]; ok {
e.broadcaster.Remove(v)
delete(e.sinks, clientID)
}
e.m.Unlock()
}
func (e *Emitter) Close() error {
return e.broadcaster.Close()
}

3
events/event.go Normal file
View File

@@ -0,0 +1,3 @@
package events
type Event interface{}

View File

@@ -1,9 +0,0 @@
package events
type Event interface{}
type Envelope struct {
Tx int64 `json:",omitempty"`
Topic string
Event interface{}
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
)
var (
@@ -13,7 +14,7 @@ var (
// Poster posts the event.
type Poster interface {
Post(ctx context.Context, event Event)
Post(ctx context.Context, evt Event) error
}
type posterKey struct{}
@@ -24,19 +25,21 @@ func WithPoster(ctx context.Context, poster Poster) context.Context {
func GetPoster(ctx context.Context) Poster {
poster := ctx.Value(posterKey{})
if poster == nil {
logger := log.G(ctx)
tx, _ := getTx(ctx)
topic := getTopic(ctx)
// likely means we don't have a configured event system. Just return
// the default poster, which merely logs events.
return posterFunc(func(ctx context.Context, event Event) {
fields := logrus.Fields{"event": event}
return posterFunc(func(ctx context.Context, evt Event) error {
fields := logrus.Fields{"event": evt}
if topic != "" {
fields["topic"] = topic
}
ns, _ := namespaces.Namespace(ctx)
fields["ns"] = ns
if tx != nil {
fields["tx.id"] = tx.id
@@ -45,15 +48,18 @@ func GetPoster(ctx context.Context) Poster {
}
}
logger.WithFields(fields).Info("event posted")
log.G(ctx).WithFields(fields).Debug("event fired")
return nil
})
}
return poster.(Poster)
}
type posterFunc func(ctx context.Context, event Event)
type posterFunc func(ctx context.Context, evt Event) error
func (fn posterFunc) Post(ctx context.Context, event Event) {
fn(ctx, event)
func (fn posterFunc) Post(ctx context.Context, evt Event) error {
fn(ctx, evt)
return nil
}

59
events/sink.go Normal file
View File

@@ -0,0 +1,59 @@
package events
import (
"context"
"time"
"github.com/Sirupsen/logrus"
"github.com/containerd/containerd/api/types/event"
"github.com/containerd/containerd/namespaces"
goevents "github.com/docker/go-events"
"github.com/pkg/errors"
)
type sinkEvent struct {
ctx context.Context
event Event
}
type eventSink struct {
ns string
ch chan *event.Envelope
}
func (s *eventSink) Write(evt goevents.Event) error {
e, ok := evt.(*sinkEvent)
if !ok {
return errors.New("event is not a sink event")
}
topic := getTopic(e.ctx)
ns, _ := namespaces.Namespace(e.ctx)
if ns != "" && ns != s.ns {
// ignore events not intended for this ns
return nil
}
eventData, err := convertToAny(e.event)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"topic": topic,
"type": eventData.TypeUrl,
"ns": ns,
}).Debug("event")
s.ch <- &event.Envelope{
Timestamp: time.Now(),
Topic: topic,
Event: eventData,
}
return nil
}
func (s *eventSink) Close() error {
close(s.ch)
return nil
}