Move task events to runc v2 shim

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-01-25 12:28:45 -05:00
parent 9aac018896
commit 85aa8ad361

View File

@ -94,7 +94,8 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
task rproc.Process
@ -311,6 +312,21 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
s.cg = cg
}
s.task = process
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(pid),
}, nil
@ -323,9 +339,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
if err := p.Start(ctx); err != nil {
s.eventSendMu.Unlock()
return nil, err
}
// case for restore
if s.getCgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
@ -334,6 +355,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
}
s.setCgroup(cg)
}
if r.ExecID != "" {
s.send(&eventstypes.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
} else {
s.send(&eventstypes.TaskStart{
ContainerID: s.id,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
@ -357,8 +391,16 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
delete(s.processes, r.ExecID)
s.mu.Unlock()
}
if isTask && s.platform != nil {
s.platform.Close()
if isTask {
if s.platform != nil {
s.platform.Close()
}
s.send(&eventstypes.TaskDelete{
ContainerID: s.id,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
@ -393,6 +435,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
s.mu.Lock()
s.processes[r.ExecID] = process
s.mu.Unlock()
s.send(&eventstypes.TaskExecAdded{
ContainerID: s.id,
ExecID: process.ID(),
})
return empty, nil
}
@ -461,6 +508,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err
}
s.send(&eventstypes.TaskPaused{
p.ID(),
})
return empty, nil
}
@ -475,6 +525,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err
}
s.send(&eventstypes.TaskResumed{
p.ID(),
})
return empty, nil
}
@ -642,6 +695,16 @@ func (s *service) processExits() {
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
if err != nil {
@ -660,13 +723,13 @@ func (s *service) checkProcesses(e runcC.Exit) {
}
}
p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{
s.sendL(&eventstypes.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
})
return
}
}