Use event service post for shim events

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-07-07 16:12:33 -07:00
parent f39693eabe
commit 6578565216
12 changed files with 149 additions and 108 deletions

View File

@@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"google.golang.org/grpc"
@@ -84,22 +83,17 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, err
}
cfg := ic.Config.(*Config)
c, cancel := context.WithCancel(ic.Context)
r := &Runtime{
root: ic.Root,
remote: !cfg.NoShim,
shim: cfg.Shim,
runtime: cfg.Runtime,
events: make(chan *eventsapi.RuntimeEvent, 2048),
eventsContext: c,
eventsCancel: cancel,
monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB),
root: ic.Root,
remote: !cfg.NoShim,
shim: cfg.Shim,
runtime: cfg.Runtime,
monitor: monitor.(runtime.TaskMonitor),
tasks: newTaskList(),
emitter: events.GetPoster(ic.Context),
db: m.(*bolt.DB),
address: ic.Address,
}
// set the events output for a monitor if it generates events
r.monitor.Events(r.events)
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
@@ -108,9 +102,6 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err := r.tasks.addWithNamespace(t.namespace, t); err != nil {
return nil, err
}
if err := r.handleEvents(ic.Context, t.shim); err != nil {
return nil, err
}
}
return r, nil
}
@@ -120,14 +111,12 @@ type Runtime struct {
shim string
runtime string
remote bool
address string
events chan *eventsapi.RuntimeEvent
eventsContext context.Context
eventsCancel func()
monitor runtime.TaskMonitor
tasks *taskList
emitter events.Poster
db *bolt.DB
monitor runtime.TaskMonitor
tasks *taskList
emitter events.Poster
db *bolt.DB
}
func (r *Runtime) ID() string {
@@ -148,7 +137,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
bundle.Delete()
}
}()
s, err := bundle.NewShim(ctx, r.shim, r.remote)
s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote)
if err != nil {
return nil, err
}
@@ -159,9 +148,6 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}
}
}()
if err = r.handleEvents(ctx, s); err != nil {
return nil, err
}
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: bundle.path,
@@ -332,48 +318,6 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
return o, nil
}
func (r *Runtime) handleEvents(ctx context.Context, s *client.Client) error {
events, err := s.Stream(r.eventsContext, &shim.StreamEventsRequest{})
if err != nil {
return err
}
go r.forward(ctx, events)
return nil
}
// forward forwards events from a shim to the events service and monitors
func (r *Runtime) forward(ctx context.Context, events shim.Shim_StreamClient) {
for {
e, err := events.Recv()
if err != nil {
if !strings.HasSuffix(err.Error(), "transport is closing") {
log.G(r.eventsContext).WithError(err).Error("get event from shim")
}
return
}
r.events <- e
if err := r.emit(ctx, "/runtime/"+getTopic(e), e); err != nil {
return
}
}
}
func getTopic(e *eventsapi.RuntimeEvent) string {
switch e.Type {
case eventsapi.RuntimeEvent_CREATE:
return "task-create"
case eventsapi.RuntimeEvent_START:
return "task-start"
case eventsapi.RuntimeEvent_EXEC_ADDED:
return "task-execadded"
case eventsapi.RuntimeEvent_OOM:
return "task-oom"
case eventsapi.RuntimeEvent_EXIT:
return "task-exit"
}
return ""
}
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
ctx = namespaces.WithNamespace(ctx, ns)
rt, err := r.getRuntime(ctx, ns, id)