diff --git a/runtime/v2/runc/container.go b/runtime/v2/runc/container.go index 2ed5b5407..448338561 100644 --- a/runtime/v2/runc/container.go +++ b/runtime/v2/runc/container.go @@ -362,7 +362,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr return nil, err } if err := p.Start(ctx); err != nil { - return nil, err + return p, err } if c.Cgroup() == nil && p.Pid() > 0 { var cg interface{} diff --git a/runtime/v2/runc/task/service.go b/runtime/v2/runc/task/service.go index 0e0f49705..4c4aed40d 100644 --- a/runtime/v2/runc/task/service.go +++ b/runtime/v2/runc/task/service.go @@ -72,12 +72,14 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S } go ep.Run(ctx) s := &service{ - context: ctx, - events: make(chan interface{}, 128), - ec: reaper.Default.Subscribe(), - ep: ep, - shutdown: sd, - containers: make(map[string]*runc.Container), + context: ctx, + events: make(chan interface{}, 128), + ec: reaper.Default.Subscribe(), + ep: ep, + shutdown: sd, + containers: make(map[string]*runc.Container), + running: make(map[int][]containerProcess), + exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}), } go s.processExits() runcC.Monitor = reaper.Default @@ -100,8 +102,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S // service is the shim implementation of a remote shim over GRPC type service struct { - mu sync.Mutex - eventSendMu sync.Mutex + mu sync.Mutex context context.Context events chan interface{} @@ -111,14 +112,103 @@ type service struct { containers map[string]*runc.Container + lifecycleMu sync.Mutex + running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu + // Subscriptions to exits for PIDs. Adding/deleting subscriptions and + // dereferencing the subscription pointers must only be done while holding + // lifecycleMu. + exitSubscribers map[*map[int][]runcC.Exit]struct{} + shutdown shutdown.Service } +type containerProcess struct { + Container *runc.Container + Process process.Process +} + +// preStart prepares for starting a container process and handling its exit. +// The container being started should be passed in as c when starting the +// container init process for an already-created container. c should be nil when +// creating a container or when starting an exec. +// +// The returned handleStarted closure records that the process has started so +// that its exit can be handled efficiently. If the process has already exited, +// it handles the exit immediately. handleStarted should be called after the +// event announcing the start of the process has been published. +// +// The returned cleanup closure releases resources used to handle early exits. +// It must be called before the caller of preStart returns, otherwise severe +// memory leaks will occur. +func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) { + exits := make(map[int][]runcC.Exit) + + s.lifecycleMu.Lock() + defer s.lifecycleMu.Unlock() + s.exitSubscribers[&exits] = struct{}{} + + if c != nil { + // Remove container init process from s.running so it will once again be + // treated as an early exit if it exits before handleStarted is called. + pid := c.Pid() + var newRunning []containerProcess + for _, cp := range s.running[pid] { + if cp.Container != c { + newRunning = append(newRunning, cp) + } + } + if len(newRunning) > 0 { + s.running[pid] = newRunning + } else { + delete(s.running, pid) + } + } + + handleStarted = func(c *runc.Container, p process.Process) { + var pid int + if p != nil { + pid = p.Pid() + } + + s.lifecycleMu.Lock() + ees, exited := exits[pid] + delete(s.exitSubscribers, &exits) + exits = nil + if pid == 0 { // no-op + s.lifecycleMu.Unlock() + } else if exited { + s.lifecycleMu.Unlock() + for _, ee := range ees { + s.handleProcessExit(ee, c, p) + } + } else { + s.running[pid] = append(s.running[pid], containerProcess{ + Container: c, + Process: p, + }) + s.lifecycleMu.Unlock() + } + } + + cleanup = func() { + if exits != nil { + s.lifecycleMu.Lock() + defer s.lifecycleMu.Unlock() + delete(s.exitSubscribers, &exits) + } + } + + return handleStarted, cleanup +} + // Create a new initial process and container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { s.mu.Lock() defer s.mu.Unlock() + handleStarted, cleanup := s.preStart(nil) + defer cleanup() + container, err := runc.NewContainer(ctx, s.platform, r) if err != nil { return nil, err @@ -140,6 +230,12 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * Pid: uint32(container.Pid()), }) + // The following line cannot return an error as the only state in which that + // could happen would also cause the container.Pid() call above to + // nil-deference panic. + proc, _ := container.Process("") + handleStarted(container, proc) + return &taskAPI.CreateTaskResponse{ Pid: uint32(container.Pid()), }, nil @@ -157,11 +253,15 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. return nil, err } - // hold the send lock so that the start events are sent before any exit events in the error case - s.eventSendMu.Lock() + var cinit *runc.Container + if r.ExecID == "" { + cinit = container + } + handleStarted, cleanup := s.preStart(cinit) + defer cleanup() p, err := container.Start(ctx, r) if err != nil { - s.eventSendMu.Unlock() + handleStarted(container, p) return nil, errdefs.ToGRPC(err) } @@ -201,7 +301,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. Pid: uint32(p.Pid()), }) } - s.eventSendMu.Unlock() + handleStarted(container, p) return &taskAPI.StartResponse{ Pid: uint32(p.Pid()), }, nil @@ -509,7 +609,29 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. func (s *service) processExits() { for e := range s.ec { - s.checkProcesses(e) + // While unlikely, it is not impossible for a container process to exit + // and have its PID be recycled for a new container process before we + // have a chance to process the first exit. As we have no way to tell + // for sure which of the processes the exit event corresponds to (until + // pidfd support is implemented) there is no way for us to handle the + // exit correctly in that case. + + s.lifecycleMu.Lock() + // Inform any concurrent s.Start() calls so they can handle the exit + // if the PID belongs to them. + for subscriber := range s.exitSubscribers { + (*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e) + } + // Handle the exit for a created/started process. If there's more than + // one, assume they've all exited. One of them will be the correct + // process. + cps := s.running[e.Pid] + delete(s.running, e.Pid) + s.lifecycleMu.Unlock() + + for _, cp := range cps { + s.handleProcessExit(e, cp.Container, cp.Process) + } } } @@ -517,48 +639,28 @@ func (s *service) send(evt interface{}) { s.events <- evt } -func (s *service) sendL(evt interface{}) { - s.eventSendMu.Lock() - s.events <- evt - s.eventSendMu.Unlock() -} - -func (s *service) checkProcesses(e runcC.Exit) { +func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) { s.mu.Lock() defer s.mu.Unlock() - for _, container := range s.containers { - if !container.HasPid(e.Pid) { - continue - } - - for _, p := range container.All() { - if p.Pid() != e.Pid { - continue + if ip, ok := p.(*process.Init); ok { + // Ensure all children are killed + if runc.ShouldKillAllOnExit(s.context, c.Bundle) { + if err := ip.KillAll(s.context); err != nil { + log.G(s.context).WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") } - - if ip, ok := p.(*process.Init); ok { - // Ensure all children are killed - if runc.ShouldKillAllOnExit(s.context, container.Bundle) { - if err := ip.KillAll(s.context); err != nil { - log.L.WithError(err).WithField("id", ip.ID()). - Error("failed to kill init's children") - } - } - } - - p.SetExited(e.Status) - s.sendL(&eventstypes.TaskExit{ - ContainerID: container.ID, - ID: p.ID(), - Pid: uint32(e.Pid), - ExitStatus: uint32(e.Status), - ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), - }) - return } - return } + + p.SetExited(e.Status) + s.send(&eventstypes.TaskExit{ + ContainerID: c.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), + }) } func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {