diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/service.go index 3e074d960..83c490689 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/service.go @@ -94,7 +94,8 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, // service is the shim implementation of a remote shim over GRPC type service struct { - mu sync.Mutex + mu sync.Mutex + eventSendMu sync.Mutex context context.Context task rproc.Process @@ -311,6 +312,21 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * s.cg = cg } s.task = process + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(pid), + }) + return &taskAPI.CreateTaskResponse{ Pid: uint32(pid), }, nil @@ -323,9 +339,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if err != nil { return nil, err } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() if err := p.Start(ctx); err != nil { + s.eventSendMu.Unlock() return nil, err } + // case for restore if s.getCgroup() == nil && p.Pid() > 0 { cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) @@ -334,6 +355,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. } s.setCgroup(cg) } + if r.ExecID != "" { + s.send(&eventstypes.TaskExecStarted{ + ContainerID: s.id, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } else { + s.send(&eventstypes.TaskStart{ + ContainerID: s.id, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() return &taskAPI.StartResponse{ Pid: uint32(p.Pid()), }, nil @@ -357,8 +391,16 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP delete(s.processes, r.ExecID) s.mu.Unlock() } - if isTask && s.platform != nil { - s.platform.Close() + if isTask { + if s.platform != nil { + s.platform.Close() + } + s.send(&eventstypes.TaskDelete{ + ContainerID: s.id, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) } return &taskAPI.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), @@ -393,6 +435,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty s.mu.Lock() s.processes[r.ExecID] = process s.mu.Unlock() + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: s.id, + ExecID: process.ID(), + }) return empty, nil } @@ -461,6 +508,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E if err := p.(*proc.Init).Pause(ctx); err != nil { return nil, err } + s.send(&eventstypes.TaskPaused{ + p.ID(), + }) return empty, nil } @@ -475,6 +525,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes if err := p.(*proc.Init).Resume(ctx); err != nil { return nil, err } + s.send(&eventstypes.TaskResumed{ + p.ID(), + }) return empty, nil } @@ -642,6 +695,16 @@ func (s *service) processExits() { } } +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + func (s *service) checkProcesses(e runcC.Exit) { shouldKillAll, err := shouldKillAllOnExit(s.bundle) if err != nil { @@ -660,13 +723,13 @@ func (s *service) checkProcesses(e runcC.Exit) { } } p.SetExited(e.Status) - s.events <- &eventstypes.TaskExit{ + s.sendL(&eventstypes.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(e.Pid), ExitStatus: uint32(e.Status), ExitedAt: p.ExitedAt(), - } + }) return } }