Merge pull request #9828 from laurazard/fix-exec-exit-better
runc-shim: process exec exits before init (w/o extra locks!)
This commit is contained in:
commit
d3d4c5d2ae
@ -80,6 +80,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
|
|||||||
shutdown: sd,
|
shutdown: sd,
|
||||||
containers: make(map[string]*runc.Container),
|
containers: make(map[string]*runc.Container),
|
||||||
running: make(map[int][]containerProcess),
|
running: make(map[int][]containerProcess),
|
||||||
|
pendingExecs: make(map[*runc.Container]int),
|
||||||
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
|
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
|
||||||
}
|
}
|
||||||
go s.processExits()
|
go s.processExits()
|
||||||
@ -113,8 +114,9 @@ type service struct {
|
|||||||
|
|
||||||
containers map[string]*runc.Container
|
containers map[string]*runc.Container
|
||||||
|
|
||||||
lifecycleMu sync.Mutex
|
lifecycleMu sync.Mutex
|
||||||
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
|
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
|
||||||
|
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
|
||||||
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
|
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
|
||||||
// dereferencing the subscription pointers must only be done while holding
|
// dereferencing the subscription pointers must only be done while holding
|
||||||
// lifecycleMu.
|
// lifecycleMu.
|
||||||
@ -129,26 +131,23 @@ type containerProcess struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// preStart prepares for starting a container process and handling its exit.
|
// preStart prepares for starting a container process and handling its exit.
|
||||||
// The container being started should be passed in as c when starting the
|
// The container being started should be passed in as c when starting the container
|
||||||
// container init process for an already-created container. c should be nil when
|
// init process for an already-created container. c should be nil when creating a
|
||||||
// creating a container or when starting an exec.
|
// container or when starting an exec.
|
||||||
//
|
//
|
||||||
// The returned handleStarted closure records that the process has started so
|
// The returned handleStarted closure records that the process has started so
|
||||||
// that its exit can be handled efficiently. If the process has already exited,
|
// that its exit can be handled efficiently. If the process has already exited,
|
||||||
// it handles the exit immediately. handleStarted should be called after the
|
// it handles the exit immediately. In addition, if the process is an exec and
|
||||||
// event announcing the start of the process has been published.
|
// its container's init process has already exited, that exit is also processed.
|
||||||
// Note that handleStarted needs to be aware of whether s.mu is already held
|
// handleStarted should be called after the event announcing the start of the
|
||||||
// when it is called. If s.mu has been held, we don't need to lock it when
|
// process has been published. Note that s.lifecycleMu must not be held when
|
||||||
// calling handleProcessExit.
|
// calling handleStarted.
|
||||||
//
|
//
|
||||||
// The returned cleanup closure releases resources used to handle early exits.
|
// The returned cleanup closure releases resources used to handle early exits.
|
||||||
// It must be called before the caller of preStart returns, otherwise severe
|
// It must be called before the caller of preStart returns, otherwise severe
|
||||||
// memory leaks will occur.
|
// memory leaks will occur.
|
||||||
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process, bool), cleanup func()) {
|
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
|
||||||
exits := make(map[int][]runcC.Exit)
|
exits := make(map[int][]runcC.Exit)
|
||||||
|
|
||||||
s.lifecycleMu.Lock()
|
|
||||||
defer s.lifecycleMu.Unlock()
|
|
||||||
s.exitSubscribers[&exits] = struct{}{}
|
s.exitSubscribers[&exits] = struct{}{}
|
||||||
|
|
||||||
if c != nil {
|
if c != nil {
|
||||||
@ -168,30 +167,65 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
handleStarted = func(c *runc.Container, p process.Process, muLocked bool) {
|
handleStarted = func(c *runc.Container, p process.Process) {
|
||||||
var pid int
|
var pid int
|
||||||
if p != nil {
|
if p != nil {
|
||||||
pid = p.Pid()
|
pid = p.Pid()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, init := p.(*process.Init)
|
||||||
s.lifecycleMu.Lock()
|
s.lifecycleMu.Lock()
|
||||||
|
|
||||||
|
var initExits []runcC.Exit
|
||||||
|
var initCps []containerProcess
|
||||||
|
if !init {
|
||||||
|
s.pendingExecs[c]--
|
||||||
|
|
||||||
|
initPid := c.Pid()
|
||||||
|
iExits, initExited := exits[initPid]
|
||||||
|
if initExited && s.pendingExecs[c] == 0 {
|
||||||
|
// c's init process has exited before handleStarted was called and
|
||||||
|
// this is the last pending exec process start - we need to process
|
||||||
|
// the exit for the init process after processing this exec, so:
|
||||||
|
// - delete c from the s.pendingExecs map
|
||||||
|
// - keep the exits for the init pid to process later (after we process
|
||||||
|
// this exec's exits)
|
||||||
|
// - get the necessary containerProcesses for the init process (that we
|
||||||
|
// need to process the exits), and remove them from s.running (which we skipped
|
||||||
|
// doing in processExits).
|
||||||
|
delete(s.pendingExecs, c)
|
||||||
|
initExits = iExits
|
||||||
|
var skipped []containerProcess
|
||||||
|
for _, initPidCp := range s.running[initPid] {
|
||||||
|
if initPidCp.Container == c {
|
||||||
|
initCps = append(initCps, initPidCp)
|
||||||
|
} else {
|
||||||
|
skipped = append(skipped, initPidCp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(skipped) == 0 {
|
||||||
|
delete(s.running, initPid)
|
||||||
|
} else {
|
||||||
|
s.running[initPid] = skipped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ees, exited := exits[pid]
|
ees, exited := exits[pid]
|
||||||
delete(s.exitSubscribers, &exits)
|
delete(s.exitSubscribers, &exits)
|
||||||
exits = nil
|
exits = nil
|
||||||
if pid == 0 { // no-op
|
if pid == 0 || exited {
|
||||||
s.lifecycleMu.Unlock()
|
|
||||||
} else if exited {
|
|
||||||
s.lifecycleMu.Unlock()
|
s.lifecycleMu.Unlock()
|
||||||
for _, ee := range ees {
|
for _, ee := range ees {
|
||||||
if muLocked {
|
s.handleProcessExit(ee, c, p)
|
||||||
s.handleProcessExit(ee, c, p)
|
}
|
||||||
} else {
|
for _, eee := range initExits {
|
||||||
s.mu.Lock()
|
for _, cp := range initCps {
|
||||||
s.handleProcessExit(ee, c, p)
|
s.handleProcessExit(eee, cp.Container, cp.Process)
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// Process start was successful, add to `s.running`.
|
||||||
s.running[pid] = append(s.running[pid], containerProcess{
|
s.running[pid] = append(s.running[pid], containerProcess{
|
||||||
Container: c,
|
Container: c,
|
||||||
Process: p,
|
Process: p,
|
||||||
@ -216,7 +250,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
handleStarted, cleanup := s.preStart(nil)
|
handleStarted, cleanup := s.preStart(nil)
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
container, err := runc.NewContainer(ctx, s.platform, r)
|
container, err := runc.NewContainer(ctx, s.platform, r)
|
||||||
@ -244,7 +280,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
// could happen would also cause the container.Pid() call above to
|
// could happen would also cause the container.Pid() call above to
|
||||||
// nil-deference panic.
|
// nil-deference panic.
|
||||||
proc, _ := container.Process("")
|
proc, _ := container.Process("")
|
||||||
handleStarted(container, proc, true)
|
handleStarted(container, proc)
|
||||||
|
|
||||||
return &taskAPI.CreateTaskResponse{
|
return &taskAPI.CreateTaskResponse{
|
||||||
Pid: uint32(container.Pid()),
|
Pid: uint32(container.Pid()),
|
||||||
@ -264,14 +300,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
}
|
}
|
||||||
|
|
||||||
var cinit *runc.Container
|
var cinit *runc.Container
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
if r.ExecID == "" {
|
if r.ExecID == "" {
|
||||||
cinit = container
|
cinit = container
|
||||||
|
} else {
|
||||||
|
s.pendingExecs[container]++
|
||||||
}
|
}
|
||||||
handleStarted, cleanup := s.preStart(cinit)
|
handleStarted, cleanup := s.preStart(cinit)
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
p, err := container.Start(ctx, r)
|
p, err := container.Start(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handleStarted(container, p, false)
|
handleStarted(container, p)
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,7 +352,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
Pid: uint32(p.Pid()),
|
Pid: uint32(p.Pid()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
handleStarted(container, p, false)
|
handleStarted(container, p)
|
||||||
return &taskAPI.StartResponse{
|
return &taskAPI.StartResponse{
|
||||||
Pid: uint32(p.Pid()),
|
Pid: uint32(p.Pid()),
|
||||||
}, nil
|
}, nil
|
||||||
@ -635,14 +676,27 @@ func (s *service) processExits() {
|
|||||||
// Handle the exit for a created/started process. If there's more than
|
// Handle the exit for a created/started process. If there's more than
|
||||||
// one, assume they've all exited. One of them will be the correct
|
// one, assume they've all exited. One of them will be the correct
|
||||||
// process.
|
// process.
|
||||||
cps := s.running[e.Pid]
|
var cps, skipped []containerProcess
|
||||||
delete(s.running, e.Pid)
|
for _, cp := range s.running[e.Pid] {
|
||||||
|
if s.pendingExecs[cp.Container] != 0 {
|
||||||
|
// This exit relates to a container for which we have pending execs. In
|
||||||
|
// order to ensure order between execs and the init process for a given
|
||||||
|
// container, skip processing this exit here and let the `handleStarted`
|
||||||
|
// closure for the pending exec publish it.
|
||||||
|
skipped = append(skipped, cp)
|
||||||
|
} else {
|
||||||
|
cps = append(cps, cp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(skipped) > 0 {
|
||||||
|
s.running[e.Pid] = skipped
|
||||||
|
} else {
|
||||||
|
delete(s.running, e.Pid)
|
||||||
|
}
|
||||||
s.lifecycleMu.Unlock()
|
s.lifecycleMu.Unlock()
|
||||||
|
|
||||||
for _, cp := range cps {
|
for _, cp := range cps {
|
||||||
s.mu.Lock()
|
|
||||||
s.handleProcessExit(e, cp.Container, cp.Process)
|
s.handleProcessExit(e, cp.Container, cp.Process)
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user