Merge pull request #8617 from corhere/reduce-exec-lock-contention

runtime/v2/runc: handle early exits w/o big locks
This commit is contained in:
Kazuyoshi Kato 2023-06-14 15:55:07 -07:00 committed by GitHub
commit ded713010c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 152 additions and 50 deletions

View File

@ -362,7 +362,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
return p, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}

View File

@ -72,12 +72,14 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
@ -100,8 +102,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
mu sync.Mutex
context context.Context
events chan interface{}
@ -111,14 +112,103 @@ type service struct {
containers map[string]*runc.Container
lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
exitSubscribers map[*map[int][]runcC.Exit]struct{}
shutdown shutdown.Service
}
type containerProcess struct {
Container *runc.Container
Process process.Process
}
// preStart prepares for starting a container process and handling its exit.
// The container being started should be passed in as c when starting the
// container init process for an already-created container. c should be nil when
// creating a container or when starting an exec.
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. handleStarted should be called after the
// event announcing the start of the process has been published.
//
// The returned cleanup closure releases resources used to handle early exits.
// It must be called before the caller of preStart returns, otherwise severe
// memory leaks will occur.
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
exits := make(map[int][]runcC.Exit)
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
s.exitSubscribers[&exits] = struct{}{}
if c != nil {
// Remove container init process from s.running so it will once again be
// treated as an early exit if it exits before handleStarted is called.
pid := c.Pid()
var newRunning []containerProcess
for _, cp := range s.running[pid] {
if cp.Container != c {
newRunning = append(newRunning, cp)
}
}
if len(newRunning) > 0 {
s.running[pid] = newRunning
} else {
delete(s.running, pid)
}
}
handleStarted = func(c *runc.Container, p process.Process) {
var pid int
if p != nil {
pid = p.Pid()
}
s.lifecycleMu.Lock()
ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
if pid == 0 { // no-op
s.lifecycleMu.Unlock()
} else if exited {
s.lifecycleMu.Unlock()
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
} else {
s.running[pid] = append(s.running[pid], containerProcess{
Container: c,
Process: p,
})
s.lifecycleMu.Unlock()
}
}
cleanup = func() {
if exits != nil {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.exitSubscribers, &exits)
}
}
return handleStarted, cleanup
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
handleStarted, cleanup := s.preStart(nil)
defer cleanup()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
Pid: uint32(container.Pid()),
})
// The following line cannot return an error as the only state in which that
// could happen would also cause the container.Pid() call above to
// nil-deference panic.
proc, _ := container.Process("")
handleStarted(container, proc)
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
var cinit *runc.Container
if r.ExecID == "" {
cinit = container
}
handleStarted, cleanup := s.preStart(cinit)
defer cleanup()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
handleStarted(container, p)
return nil, errdefs.ToGRPC(err)
}
@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
handleStarted(container, p)
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
@ -509,7 +609,29 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
// While unlikely, it is not impossible for a container process to exit
// and have its PID be recycled for a new container process before we
// have a chance to process the first exit. As we have no way to tell
// for sure which of the processes the exit event corresponds to (until
// pidfd support is implemented) there is no way for us to handle the
// exit correctly in that case.
s.lifecycleMu.Lock()
// Inform any concurrent s.Start() calls so they can handle the exit
// if the PID belongs to them.
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()
for _, cp := range cps {
s.handleProcessExit(e, cp.Container, cp.Process)
}
}
}
@ -517,48 +639,28 @@ func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, container := range s.containers {
if !container.HasPid(e.Pid) {
continue
}
for _, p := range container.All() {
if p.Pid() != e.Pid {
continue
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.L.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
return
}
return
}
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {