Refactor runtime events into Task* types

This removes the RuntimeEvent super proto with enums into separate
runtime event protos to be inline with the other events that are output
by containerd.

This also renames the runtime events into Task* events.

Fixes #1071

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-07-11 16:12:14 -07:00
parent 8f1c11d862
commit 2b6d790ff4
17 changed files with 1831 additions and 2249 deletions

View File

@@ -54,7 +54,7 @@ func NewService(path, namespace, address string) (*Service, error) {
s := &Service{
path: path,
processes: make(map[string]process),
events: make(chan *events.RuntimeEvent, 4096),
events: make(chan interface{}, 4096),
namespace: namespace,
context: context,
}
@@ -69,9 +69,9 @@ type Service struct {
bundle string
mu sync.Mutex
processes map[string]process
events chan *events.RuntimeEvent
events chan interface{}
eventsMu sync.Mutex
deferredEvent *events.RuntimeEvent
deferredEvent interface{}
namespace string
context context.Context
}
@@ -96,11 +96,18 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
ExitCh: make(chan int, 1),
}
reaper.Default.Register(pid, cmd)
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_CREATE,
ID: r.ID,
ContainerID: s.id,
Pid: uint32(pid),
s.events <- &events.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &events.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
@@ -115,9 +122,7 @@ func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_
if err := s.initProcess.Start(ctx); err != nil {
return nil, err
}
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_START,
ID: s.id,
s.events <- &events.TaskStart{
ContainerID: s.id,
Pid: uint32(s.initProcess.Pid()),
}
@@ -134,6 +139,12 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
s.mu.Lock()
delete(s.processes, p.ID())
s.mu.Unlock()
s.events <- &events.TaskDelete{
ContainerID: s.id,
ExitStatus: uint32(p.Status()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.Status()),
ExitedAt: p.ExitedAt(),
@@ -184,10 +195,9 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
reaper.Default.Register(pid, cmd)
s.processes[r.ID] = process
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_EXEC_ADDED,
ID: r.ID,
s.events <- &events.TaskExecAdded{
ContainerID: s.id,
ExecID: r.ID,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
@@ -216,30 +226,6 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
return empty, nil
}
func (s *Service) Stream(r *shimapi.StreamEventsRequest, stream shimapi.Shim_StreamServer) error {
s.eventsMu.Lock()
defer s.eventsMu.Unlock()
if s.deferredEvent != nil {
if err := stream.Send(s.deferredEvent); err != nil {
return err
}
s.deferredEvent = nil
}
for {
select {
case e := <-s.events:
if err := stream.Send(e); err != nil {
s.deferredEvent = e
return err
}
case <-stream.Context().Done():
return stream.Context().Err()
}
}
}
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
if s.initProcess == nil {
return nil, errors.New(ErrContainerNotCreated)
@@ -283,11 +269,8 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
if err := s.initProcess.Pause(ctx); err != nil {
return nil, err
}
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_PAUSED,
ID: s.id,
s.events <- &events.TaskPaused{
ContainerID: s.id,
Timestamp: time.Now(),
}
return empty, nil
}
@@ -299,11 +282,8 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
if err := s.initProcess.Resume(ctx); err != nil {
return nil, err
}
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_RESUMED,
ID: s.id,
s.events <- &events.TaskResumed{
ContainerID: s.id,
Timestamp: time.Now(),
}
return empty, nil
}
@@ -356,11 +336,8 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque
if err := s.initProcess.Checkpoint(ctx, r); err != nil {
return nil, err
}
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_CHECKPOINTED,
ID: s.id,
s.events <- &events.TaskCheckpointed{
ContainerID: s.id,
Timestamp: time.Now(),
}
return empty, nil
}
@@ -386,10 +363,9 @@ func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
p.Exited(status)
reaper.Default.Delete(pid)
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_EXIT,
ID: p.ID(),
s.events <- &events.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
@@ -418,7 +394,7 @@ func (s *Service) forward(client poster) {
if _, err := client.Post(s.context, &events.PostEventRequest{
Envelope: &events.Envelope{
Timestamp: time.Now(),
Topic: "/runtime/" + getTopic(e),
Topic: "/task/" + getTopic(e),
Event: a,
},
}); err != nil {
@@ -427,18 +403,26 @@ func (s *Service) forward(client poster) {
}
}
func getTopic(e *events.RuntimeEvent) string {
switch e.Type {
case events.RuntimeEvent_CREATE:
func getTopic(e interface{}) string {
switch e.(type) {
case *events.TaskCreate:
return "task-create"
case events.RuntimeEvent_START:
case *events.TaskStart:
return "task-start"
case events.RuntimeEvent_EXEC_ADDED:
return "task-execadded"
case events.RuntimeEvent_OOM:
case *events.TaskOOM:
return "task-oom"
case events.RuntimeEvent_EXIT:
case *events.TaskExit:
return "task-exit"
case *events.TaskDelete:
return "task-delete"
case *events.TaskExecAdded:
return "task-exec-added"
case *events.TaskPaused:
return "task-paused"
case *events.TaskResumed:
return "task-resumed"
case *events.TaskCheckpointed:
return "task-checkpointed"
}
return "?"
}