runtime/v2/runc: handle early exits w/o big locks

eventSendMu is causing severe lock contention when multiple processes
start and exit concurrently. Replace it with a different scheme for
maintaining causality w.r.t. start and exit events for a process which
does not rely on big locks for synchronization.

Keep track of all processes for which a Task(Exec)Start event has been
published and have not yet exited in a map, keyed by their PID.
Processing exits then is as simple as looking up which process
corresponds to the PID. If there are no started processes known with
that PID, the PID must either belong to a process which was started by
s.Start() and before the s.Start() call has added the process to the map
of running processes, or a reparented process which we don't care about.
Handle the former case by having each s.Start() call subscribe to exit
events before starting the process. It checks if the PID has exited in
the time between it starting the process and publishing the TaskStart
event, handling the exit if it has. Exit events for reparented processes
received when no s.Start() calls are in flight are immediately
discarded, and events received during an s.Start() call are discarded
when the s.Start() call returns.

Co-authored-by: Laura Brehm <laurabrehm@hey.com>
Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider 2023-06-09 14:15:14 -04:00
parent 326cd0623e
commit 5cd6210ad0
2 changed files with 152 additions and 50 deletions

View File

@ -362,7 +362,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
return p, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}

View File

@ -78,6 +78,8 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
@ -101,7 +103,6 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
events chan interface{}
@ -111,14 +112,103 @@ type service struct {
containers map[string]*runc.Container
lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
exitSubscribers map[*map[int][]runcC.Exit]struct{}
shutdown shutdown.Service
}
type containerProcess struct {
Container *runc.Container
Process process.Process
}
// preStart prepares for starting a container process and handling its exit.
// The container being started should be passed in as c when starting the
// container init process for an already-created container. c should be nil when
// creating a container or when starting an exec.
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. handleStarted should be called after the
// event announcing the start of the process has been published.
//
// The returned cleanup closure releases resources used to handle early exits.
// It must be called before the caller of preStart returns, otherwise severe
// memory leaks will occur.
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
exits := make(map[int][]runcC.Exit)
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
s.exitSubscribers[&exits] = struct{}{}
if c != nil {
// Remove container init process from s.running so it will once again be
// treated as an early exit if it exits before handleStarted is called.
pid := c.Pid()
var newRunning []containerProcess
for _, cp := range s.running[pid] {
if cp.Container != c {
newRunning = append(newRunning, cp)
}
}
if len(newRunning) > 0 {
s.running[pid] = newRunning
} else {
delete(s.running, pid)
}
}
handleStarted = func(c *runc.Container, p process.Process) {
var pid int
if p != nil {
pid = p.Pid()
}
s.lifecycleMu.Lock()
ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
if pid == 0 { // no-op
s.lifecycleMu.Unlock()
} else if exited {
s.lifecycleMu.Unlock()
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
} else {
s.running[pid] = append(s.running[pid], containerProcess{
Container: c,
Process: p,
})
s.lifecycleMu.Unlock()
}
}
cleanup = func() {
if exits != nil {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.exitSubscribers, &exits)
}
}
return handleStarted, cleanup
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
handleStarted, cleanup := s.preStart(nil)
defer cleanup()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
Pid: uint32(container.Pid()),
})
// The following line cannot return an error as the only state in which that
// could happen would also cause the container.Pid() call above to
// nil-deference panic.
proc, _ := container.Process("")
handleStarted(container, proc)
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
var cinit *runc.Container
if r.ExecID == "" {
cinit = container
}
handleStarted, cleanup := s.preStart(cinit)
defer cleanup()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
handleStarted(container, p)
return nil, errdefs.ToGRPC(err)
}
@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
handleStarted(container, p)
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
@ -509,7 +609,29 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
// While unlikely, it is not impossible for a container process to exit
// and have its PID be recycled for a new container process before we
// have a chance to process the first exit. As we have no way to tell
// for sure which of the processes the exit event corresponds to (until
// pidfd support is implemented) there is no way for us to handle the
// exit correctly in that case.
s.lifecycleMu.Lock()
// Inform any concurrent s.Start() calls so they can handle the exit
// if the PID belongs to them.
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()
for _, cp := range cps {
s.handleProcessExit(e, cp.Container, cp.Process)
}
}
}
@ -517,48 +639,28 @@ func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}
for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.L.WithError(err).WithField("id", ip.ID()).
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
return
}
return
}
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {