Merge pull request #10651 from laurazard/shim-refactor-without-pending
runc-shim: fix races/prevent init exits from being dropped
This commit is contained in:
commit
c8b095f3c2
@ -81,7 +81,9 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
|
|||||||
shutdown: sd,
|
shutdown: sd,
|
||||||
containers: make(map[string]*runc.Container),
|
containers: make(map[string]*runc.Container),
|
||||||
running: make(map[int][]containerProcess),
|
running: make(map[int][]containerProcess),
|
||||||
pendingExecs: make(map[*runc.Container]int),
|
runningExecs: make(map[*runc.Container]int),
|
||||||
|
execCountSubscribers: make(map[*runc.Container]chan<- int),
|
||||||
|
containerInitExit: make(map[*runc.Container]runcC.Exit),
|
||||||
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
|
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
|
||||||
}
|
}
|
||||||
go s.processExits()
|
go s.processExits()
|
||||||
@ -117,7 +119,19 @@ type service struct {
|
|||||||
|
|
||||||
lifecycleMu sync.Mutex
|
lifecycleMu sync.Mutex
|
||||||
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
|
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
|
||||||
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
|
runningExecs map[*runc.Container]int // container -> num running execs, guarded by lifecycleMu
|
||||||
|
// container -> subscription to exec exits/changes to s.runningExecs[container],
|
||||||
|
// guarded by lifecycleMu
|
||||||
|
execCountSubscribers map[*runc.Container]chan<- int
|
||||||
|
// container -> init exits, guarded by lifecycleMu
|
||||||
|
// Used to stash container init process exits, so that we can hold them
|
||||||
|
// until after we've made sure to publish all the container's exec exits.
|
||||||
|
// Also used to prevent starting new execs from being started if the
|
||||||
|
// container's init process (read: pid, not [process.Init]) has already been
|
||||||
|
// reaped by the shim.
|
||||||
|
// Note that this flag gets updated before the container's [process.Init.Status]
|
||||||
|
// is transitioned to "stopped".
|
||||||
|
containerInitExit map[*runc.Container]runcC.Exit
|
||||||
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
|
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
|
||||||
// dereferencing the subscription pointers must only be done while holding
|
// dereferencing the subscription pointers must only be done while holding
|
||||||
// lifecycleMu.
|
// lifecycleMu.
|
||||||
@ -138,8 +152,7 @@ type containerProcess struct {
|
|||||||
//
|
//
|
||||||
// The returned handleStarted closure records that the process has started so
|
// The returned handleStarted closure records that the process has started so
|
||||||
// that its exit can be handled efficiently. If the process has already exited,
|
// that its exit can be handled efficiently. If the process has already exited,
|
||||||
// it handles the exit immediately. In addition, if the process is an exec and
|
// it handles the exit immediately.
|
||||||
// its container's init process has already exited, that exit is also processed.
|
|
||||||
// handleStarted should be called after the event announcing the start of the
|
// handleStarted should be called after the event announcing the start of the
|
||||||
// process has been published. Note that s.lifecycleMu must not be held when
|
// process has been published. Note that s.lifecycleMu must not be held when
|
||||||
// calling handleStarted.
|
// calling handleStarted.
|
||||||
@ -174,44 +187,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
|
|||||||
pid = p.Pid()
|
pid = p.Pid()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, init := p.(*process.Init)
|
|
||||||
s.lifecycleMu.Lock()
|
s.lifecycleMu.Lock()
|
||||||
|
|
||||||
var initExits []runcC.Exit
|
|
||||||
var initCps []containerProcess
|
|
||||||
if !init {
|
|
||||||
s.pendingExecs[c]--
|
|
||||||
|
|
||||||
initPid := c.Pid()
|
|
||||||
iExits, initExited := exits[initPid]
|
|
||||||
if initExited && s.pendingExecs[c] == 0 {
|
|
||||||
// c's init process has exited before handleStarted was called and
|
|
||||||
// this is the last pending exec process start - we need to process
|
|
||||||
// the exit for the init process after processing this exec, so:
|
|
||||||
// - delete c from the s.pendingExecs map
|
|
||||||
// - keep the exits for the init pid to process later (after we process
|
|
||||||
// this exec's exits)
|
|
||||||
// - get the necessary containerProcesses for the init process (that we
|
|
||||||
// need to process the exits), and remove them from s.running (which we skipped
|
|
||||||
// doing in processExits).
|
|
||||||
delete(s.pendingExecs, c)
|
|
||||||
initExits = iExits
|
|
||||||
var skipped []containerProcess
|
|
||||||
for _, initPidCp := range s.running[initPid] {
|
|
||||||
if initPidCp.Container == c {
|
|
||||||
initCps = append(initCps, initPidCp)
|
|
||||||
} else {
|
|
||||||
skipped = append(skipped, initPidCp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(skipped) == 0 {
|
|
||||||
delete(s.running, initPid)
|
|
||||||
} else {
|
|
||||||
s.running[initPid] = skipped
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ees, exited := exits[pid]
|
ees, exited := exits[pid]
|
||||||
delete(s.exitSubscribers, &exits)
|
delete(s.exitSubscribers, &exits)
|
||||||
exits = nil
|
exits = nil
|
||||||
@ -220,11 +197,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
|
|||||||
for _, ee := range ees {
|
for _, ee := range ees {
|
||||||
s.handleProcessExit(ee, c, p)
|
s.handleProcessExit(ee, c, p)
|
||||||
}
|
}
|
||||||
for _, eee := range initExits {
|
|
||||||
for _, cp := range initCps {
|
|
||||||
s.handleProcessExit(eee, cp.Container, cp.Process)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// Process start was successful, add to `s.running`.
|
// Process start was successful, add to `s.running`.
|
||||||
s.running[pid] = append(s.running[pid], containerProcess{
|
s.running[pid] = append(s.running[pid], containerProcess{
|
||||||
@ -305,7 +277,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
if r.ExecID == "" {
|
if r.ExecID == "" {
|
||||||
cinit = container
|
cinit = container
|
||||||
} else {
|
} else {
|
||||||
s.pendingExecs[container]++
|
if _, initExited := s.containerInitExit[container]; initExited {
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
|
||||||
|
}
|
||||||
|
s.runningExecs[container]++
|
||||||
}
|
}
|
||||||
handleStarted, cleanup := s.preStart(cinit)
|
handleStarted, cleanup := s.preStart(cinit)
|
||||||
s.lifecycleMu.Unlock()
|
s.lifecycleMu.Unlock()
|
||||||
@ -313,6 +289,17 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
|
|
||||||
p, err := container.Start(ctx, r)
|
p, err := container.Start(ctx, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If we failed to even start the process, s.runningExecs
|
||||||
|
// won't get decremented in s.handleProcessExit. We still need
|
||||||
|
// to update it.
|
||||||
|
if r.ExecID != "" {
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
|
s.runningExecs[container]--
|
||||||
|
if ch, ok := s.execCountSubscribers[container]; ok {
|
||||||
|
ch <- s.runningExecs[container]
|
||||||
|
}
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
|
}
|
||||||
handleStarted(container, p)
|
handleStarted(container, p)
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
@ -677,48 +664,85 @@ func (s *service) processExits() {
|
|||||||
// Handle the exit for a created/started process. If there's more than
|
// Handle the exit for a created/started process. If there's more than
|
||||||
// one, assume they've all exited. One of them will be the correct
|
// one, assume they've all exited. One of them will be the correct
|
||||||
// process.
|
// process.
|
||||||
var cps, skipped []containerProcess
|
var cps []containerProcess
|
||||||
for _, cp := range s.running[e.Pid] {
|
for _, cp := range s.running[e.Pid] {
|
||||||
_, init := cp.Process.(*process.Init)
|
_, init := cp.Process.(*process.Init)
|
||||||
if init && s.pendingExecs[cp.Container] != 0 {
|
if init {
|
||||||
// This exit relates to a container for which we have pending execs. In
|
s.containerInitExit[cp.Container] = e
|
||||||
// order to ensure order between execs and the init process for a given
|
}
|
||||||
// container, skip processing this exit here and let the `handleStarted`
|
|
||||||
// closure for the pending exec publish it.
|
|
||||||
skipped = append(skipped, cp)
|
|
||||||
} else {
|
|
||||||
cps = append(cps, cp)
|
cps = append(cps, cp)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if len(skipped) > 0 {
|
|
||||||
s.running[e.Pid] = skipped
|
|
||||||
} else {
|
|
||||||
delete(s.running, e.Pid)
|
delete(s.running, e.Pid)
|
||||||
}
|
|
||||||
s.lifecycleMu.Unlock()
|
s.lifecycleMu.Unlock()
|
||||||
|
|
||||||
for _, cp := range cps {
|
for _, cp := range cps {
|
||||||
|
if ip, ok := cp.Process.(*process.Init); ok {
|
||||||
|
s.handleInitExit(e, cp.Container, ip)
|
||||||
|
} else {
|
||||||
s.handleProcessExit(e, cp.Container, cp.Process)
|
s.handleProcessExit(e, cp.Container, cp.Process)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) send(evt interface{}) {
|
func (s *service) send(evt interface{}) {
|
||||||
s.events <- evt
|
s.events <- evt
|
||||||
}
|
}
|
||||||
|
|
||||||
// s.mu must be locked when calling handleProcessExit
|
// handleInitExit processes container init process exits.
|
||||||
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
|
// This is handled separately from non-init exits, because there
|
||||||
if ip, ok := p.(*process.Init); ok {
|
// are some extra invariants we want to ensure in this case, namely:
|
||||||
// Ensure all children are killed
|
// - for a given container, the init process exit MUST be the last exit published
|
||||||
|
// This is achieved by:
|
||||||
|
// - killing all running container processes (if the container has a shared pid
|
||||||
|
// namespace, otherwise all other processes have been reaped already).
|
||||||
|
// - waiting for the container's running exec counter to reach 0.
|
||||||
|
// - finally, publishing the init exit.
|
||||||
|
func (s *service) handleInitExit(e runcC.Exit, c *runc.Container, p *process.Init) {
|
||||||
|
// kill all running container processes
|
||||||
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
|
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
|
||||||
if err := ip.KillAll(s.context); err != nil {
|
if err := p.KillAll(s.context); err != nil {
|
||||||
log.G(s.context).WithError(err).WithField("id", ip.ID()).
|
log.G(s.context).WithError(err).WithField("id", p.ID()).
|
||||||
Error("failed to kill init's children")
|
Error("failed to kill init's children")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
|
numRunningExecs := s.runningExecs[c]
|
||||||
|
if numRunningExecs == 0 {
|
||||||
|
delete(s.runningExecs, c)
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
|
s.handleProcessExit(e, c, p)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
events := make(chan int, numRunningExecs)
|
||||||
|
s.execCountSubscribers[c] = events
|
||||||
|
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
|
defer s.lifecycleMu.Unlock()
|
||||||
|
delete(s.execCountSubscribers, c)
|
||||||
|
delete(s.runningExecs, c)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for running processes to exit
|
||||||
|
for {
|
||||||
|
if runningExecs := <-events; runningExecs == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// all running processes have exited now, and no new
|
||||||
|
// ones can start, so we can publish the init exit
|
||||||
|
s.handleProcessExit(e, c, p)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
|
||||||
p.SetExited(e.Status)
|
p.SetExited(e.Status)
|
||||||
s.send(&eventstypes.TaskExit{
|
s.send(&eventstypes.TaskExit{
|
||||||
ContainerID: c.ID,
|
ContainerID: c.ID,
|
||||||
@ -727,6 +751,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
|
|||||||
ExitStatus: uint32(e.Status),
|
ExitStatus: uint32(e.Status),
|
||||||
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
||||||
})
|
})
|
||||||
|
if _, init := p.(*process.Init); !init {
|
||||||
|
s.lifecycleMu.Lock()
|
||||||
|
s.runningExecs[c]--
|
||||||
|
if ch, ok := s.execCountSubscribers[c]; ok {
|
||||||
|
ch <- s.runningExecs[c]
|
||||||
|
}
|
||||||
|
s.lifecycleMu.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
|
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user