runc-shim: process exec exits before init

For a given container, as long as the init process is the init process
of that PID namespace, we always receive the exits for execs before we
receive them for the init process.

It's important that we uphold this invariant for the outside world by
always emitting a TastExit event for a container's exec before we emit
one for the init process because this is the expected behavior from
callers, and changing this creates issues - such as Docker, which will
delete the container after receiving a TaskExit for the init process,
and then not be able to handle the exec's exit after having deleted
the container (see: https://github.com/containerd/containerd/issues/9719).

Since 5cd6210ad0, if an exec is starting
at the same time that an init exits, if the exec is an "early exit"
i.e. we haven't emitted a TaskStart for it/put it in `s.running` by the
time we receive it's exit, we notify concurrent calls to `s.Start()` of
the exit and continue processing exits, which will cause us to process
the Init's exit before the exec, and emit it, which we don't want to do.

This commit introduces a map `s.pendingExecs` to keep track of the
number of pending execs keyed by container, which allows us to skip
processing exits for inits if there are pending execs, and instead
have the closure returned by `s.preStart` handle the init exit after
emitting the exec's exit.

Signed-off-by: Laura Brehm <laurabrehm@hey.com>
This commit is contained in:
Laura Brehm 2024-02-15 12:43:27 +00:00
parent 9a2b85561a
commit 892dc54bd2
No known key found for this signature in database
GPG Key ID: CFBF847B4A313468

View File

@ -80,6 +80,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
shutdown: sd, shutdown: sd,
containers: make(map[string]*runc.Container), containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess), running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
} }
go s.processExits() go s.processExits()
@ -113,8 +114,9 @@ type service struct {
containers map[string]*runc.Container containers map[string]*runc.Container
lifecycleMu sync.Mutex lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and // Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding // dereferencing the subscription pointers must only be done while holding
// lifecycleMu. // lifecycleMu.
@ -129,26 +131,23 @@ type containerProcess struct {
} }
// preStart prepares for starting a container process and handling its exit. // preStart prepares for starting a container process and handling its exit.
// The container being started should be passed in as c when starting the // The container being started should be passed in as c when starting the container
// container init process for an already-created container. c should be nil when // init process for an already-created container. c should be nil when creating a
// creating a container or when starting an exec. // container or when starting an exec.
// //
// The returned handleStarted closure records that the process has started so // The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited, // that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. handleStarted should be called after the // it handles the exit immediately. In addition, if the process is an exec and
// event announcing the start of the process has been published. // its container's init process has already exited, that exit is also processed.
// Note that handleStarted needs to be aware of whether s.mu is already held // handleStarted should be called after the event announcing the start of the
// when it is called. If s.mu has been held, we don't need to lock it when // process has been published. Note that s.lifecycleMu must not be held when
// calling handleProcessExit. // calling handleStarted.
// //
// The returned cleanup closure releases resources used to handle early exits. // The returned cleanup closure releases resources used to handle early exits.
// It must be called before the caller of preStart returns, otherwise severe // It must be called before the caller of preStart returns, otherwise severe
// memory leaks will occur. // memory leaks will occur.
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process, bool), cleanup func()) { func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
exits := make(map[int][]runcC.Exit) exits := make(map[int][]runcC.Exit)
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
s.exitSubscribers[&exits] = struct{}{} s.exitSubscribers[&exits] = struct{}{}
if c != nil { if c != nil {
@ -168,30 +167,65 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
} }
} }
handleStarted = func(c *runc.Container, p process.Process, muLocked bool) { handleStarted = func(c *runc.Container, p process.Process) {
var pid int var pid int
if p != nil { if p != nil {
pid = p.Pid() pid = p.Pid()
} }
_, init := p.(*process.Init)
s.lifecycleMu.Lock() s.lifecycleMu.Lock()
var initExits []runcC.Exit
var initCps []containerProcess
if !init {
s.pendingExecs[c]--
initPid := c.Pid()
iExits, initExited := exits[initPid]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExits = iExits
var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}
if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}
ees, exited := exits[pid] ees, exited := exits[pid]
delete(s.exitSubscribers, &exits) delete(s.exitSubscribers, &exits)
exits = nil exits = nil
if pid == 0 { // no-op if pid == 0 || exited {
s.lifecycleMu.Unlock()
} else if exited {
s.lifecycleMu.Unlock() s.lifecycleMu.Unlock()
for _, ee := range ees { for _, ee := range ees {
if muLocked { s.handleProcessExit(ee, c, p)
s.handleProcessExit(ee, c, p) }
} else { for _, eee := range initExits {
s.mu.Lock() for _, cp := range initCps {
s.handleProcessExit(ee, c, p) s.handleProcessExit(eee, cp.Container, cp.Process)
s.mu.Unlock()
} }
} }
} else { } else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{ s.running[pid] = append(s.running[pid], containerProcess{
Container: c, Container: c,
Process: p, Process: p,
@ -216,7 +250,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.lifecycleMu.Lock()
handleStarted, cleanup := s.preStart(nil) handleStarted, cleanup := s.preStart(nil)
s.lifecycleMu.Unlock()
defer cleanup() defer cleanup()
container, err := runc.NewContainer(ctx, s.platform, r) container, err := runc.NewContainer(ctx, s.platform, r)
@ -244,7 +280,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// could happen would also cause the container.Pid() call above to // could happen would also cause the container.Pid() call above to
// nil-deference panic. // nil-deference panic.
proc, _ := container.Process("") proc, _ := container.Process("")
handleStarted(container, proc, true) handleStarted(container, proc)
return &taskAPI.CreateTaskResponse{ return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()), Pid: uint32(container.Pid()),
@ -264,14 +300,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
} }
var cinit *runc.Container var cinit *runc.Container
s.lifecycleMu.Lock()
if r.ExecID == "" { if r.ExecID == "" {
cinit = container cinit = container
} else {
s.pendingExecs[container]++
} }
handleStarted, cleanup := s.preStart(cinit) handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
defer cleanup() defer cleanup()
p, err := container.Start(ctx, r) p, err := container.Start(ctx, r)
if err != nil { if err != nil {
handleStarted(container, p, false) handleStarted(container, p)
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
@ -311,7 +352,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
}) })
} }
handleStarted(container, p, false) handleStarted(container, p)
return &taskAPI.StartResponse{ return &taskAPI.StartResponse{
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
}, nil }, nil
@ -635,14 +676,27 @@ func (s *service) processExits() {
// Handle the exit for a created/started process. If there's more than // Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct // one, assume they've all exited. One of them will be the correct
// process. // process.
cps := s.running[e.Pid] var cps, skipped []containerProcess
delete(s.running, e.Pid) for _, cp := range s.running[e.Pid] {
if s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)
}
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
}
s.lifecycleMu.Unlock() s.lifecycleMu.Unlock()
for _, cp := range cps { for _, cp := range cps {
s.mu.Lock()
s.handleProcessExit(e, cp.Container, cp.Process) s.handleProcessExit(e, cp.Container, cp.Process)
s.mu.Unlock()
} }
} }
} }