Merge pull request #2958 from crosbymichael/runcv2-events

Move task events to runc v2 shim
This commit is contained in:
Phil Estes 2019-01-25 16:24:01 -05:00 committed by GitHub
commit 9f705f40b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -95,6 +95,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
// service is the shim implementation of a remote shim over GRPC // service is the shim implementation of a remote shim over GRPC
type service struct { type service struct {
mu sync.Mutex mu sync.Mutex
eventSendMu sync.Mutex
context context.Context context context.Context
task rproc.Process task rproc.Process
@ -311,6 +312,21 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
s.cg = cg s.cg = cg
} }
s.task = process s.task = process
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
})
return &taskAPI.CreateTaskResponse{ return &taskAPI.CreateTaskResponse{
Pid: uint32(pid), Pid: uint32(pid),
}, nil }, nil
@ -323,9 +339,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if err != nil { if err != nil {
return nil, err return nil, err
} }
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
if err := p.Start(ctx); err != nil { if err := p.Start(ctx); err != nil {
s.eventSendMu.Unlock()
return nil, err return nil, err
} }
// case for restore // case for restore
if s.getCgroup() == nil && p.Pid() > 0 { if s.getCgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
@ -334,6 +355,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
} }
s.setCgroup(cg) s.setCgroup(cg)
} }
if r.ExecID != "" {
s.send(&eventstypes.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
} else {
s.send(&eventstypes.TaskStart{
ContainerID: s.id,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{ return &taskAPI.StartResponse{
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
}, nil }, nil
@ -357,9 +391,17 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
delete(s.processes, r.ExecID) delete(s.processes, r.ExecID)
s.mu.Unlock() s.mu.Unlock()
} }
if isTask && s.platform != nil { if isTask {
if s.platform != nil {
s.platform.Close() s.platform.Close()
} }
s.send(&eventstypes.TaskDelete{
ContainerID: s.id,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
})
}
return &taskAPI.DeleteResponse{ return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()), ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
@ -393,6 +435,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
s.mu.Lock() s.mu.Lock()
s.processes[r.ExecID] = process s.processes[r.ExecID] = process
s.mu.Unlock() s.mu.Unlock()
s.send(&eventstypes.TaskExecAdded{
ContainerID: s.id,
ExecID: process.ID(),
})
return empty, nil return empty, nil
} }
@ -461,6 +508,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
if err := p.(*proc.Init).Pause(ctx); err != nil { if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err return nil, err
} }
s.send(&eventstypes.TaskPaused{
p.ID(),
})
return empty, nil return empty, nil
} }
@ -475,6 +525,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
if err := p.(*proc.Init).Resume(ctx); err != nil { if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err return nil, err
} }
s.send(&eventstypes.TaskResumed{
p.ID(),
})
return empty, nil return empty, nil
} }
@ -642,6 +695,16 @@ func (s *service) processExits() {
} }
} }
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) { func (s *service) checkProcesses(e runcC.Exit) {
shouldKillAll, err := shouldKillAllOnExit(s.bundle) shouldKillAll, err := shouldKillAllOnExit(s.bundle)
if err != nil { if err != nil {
@ -660,13 +723,13 @@ func (s *service) checkProcesses(e runcC.Exit) {
} }
} }
p.SetExited(e.Status) p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{ s.sendL(&eventstypes.TaskExit{
ContainerID: s.id, ContainerID: s.id,
ID: p.ID(), ID: p.ID(),
Pid: uint32(e.Pid), Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status), ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
} })
return return
} }
} }