containerd/runtime/v2/runc/container.go
Cory Snider 5cd6210ad0 runtime/v2/runc: handle early exits w/o big locks
eventSendMu is causing severe lock contention when multiple processes
start and exit concurrently. Replace it with a different scheme for
maintaining causality w.r.t. start and exit events for a process which
does not rely on big locks for synchronization.

Keep track of all processes for which a Task(Exec)Start event has been
published and have not yet exited in a map, keyed by their PID.
Processing exits then is as simple as looking up which process
corresponds to the PID. If there are no started processes known with
that PID, the PID must either belong to a process which was started by
s.Start() and before the s.Start() call has added the process to the map
of running processes, or a reparented process which we don't care about.
Handle the former case by having each s.Start() call subscribe to exit
events before starting the process. It checks if the PID has exited in
the time between it starting the process and publishing the TaskStart
event, handling the exit if it has. Exit events for reparented processes
received when no s.Start() calls are in flight are immediately
discarded, and events received during an s.Start() call are discarded
when the s.Start() call returns.

Co-authored-by: Laura Brehm <laurabrehm@hey.com>
Signed-off-by: Cory Snider <csnider@mirantis.com>
2023-06-09 16:53:43 -04:00

513 lines
13 KiB
Go

//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/console"
"github.com/containerd/containerd/api/runtime/task/v2"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/typeurl/v2"
)
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, fmt.Errorf("create namespace: %w", err)
}
opts := &options.Options{}
if r.Options.GetValue() != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
if v != nil {
opts = v.(*options.Options)
}
}
var pmounts []process.Mount
for _, m := range r.Rootfs {
pmounts = append(pmounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rootfs := ""
if len(pmounts) > 0 {
rootfs = filepath.Join(r.Bundle, "rootfs")
if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
return nil, err
}
}
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
Rootfs: pmounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
if err := WriteOptions(r.Bundle, opts); err != nil {
return nil, err
}
// For historical reason, we write opts.BinaryName as well as the entire opts
if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
return nil, err
}
var mounts []mount.Mount
for _, pm := range pmounts {
mounts = append(mounts, mount.Mount{
Type: pm.Type,
Source: pm.Source,
Target: pm.Target,
Options: pm.Options,
})
}
defer func() {
if retErr != nil {
if err := mount.UnmountMounts(mounts, rootfs, 0); err != nil {
log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount")
}
}
}()
if err := mount.All(mounts, rootfs); err != nil {
return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
}
p, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
ns,
platform,
config,
opts,
rootfs,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := p.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: p,
processes: make(map[string]process.Process),
reservedProcess: make(map[string]struct{}),
}
pid := p.Pid()
if pid > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(pid)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
return container, nil
}
cg, err = cgroupsv2.Load(g)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(pid))
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup for %d", pid)
}
}
container.cgroup = cg
}
return container, nil
}
const optionsFilename = "options.json"
// ReadOptions reads the option information from the path.
// When the file does not exist, ReadOptions returns nil without an error.
func ReadOptions(path string) (*options.Options, error) {
filePath := filepath.Join(path, optionsFilename)
if _, err := os.Stat(filePath); err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
var opts options.Options
if err := json.Unmarshal(data, &opts); err != nil {
return nil, err
}
return &opts, nil
}
// WriteOptions writes the options information into the path
func WriteOptions(path string, opts *options.Options) error {
data, err := json.Marshal(opts)
if err != nil {
return err
}
return os.WriteFile(filepath.Join(path, optionsFilename), data, 0600)
}
// ReadRuntime reads the runtime information from the path
func ReadRuntime(path string) (string, error) {
data, err := os.ReadFile(filepath.Join(path, "runtime"))
if err != nil {
return "", err
}
return string(data), nil
}
// WriteRuntime writes the runtime information into the path
func WriteRuntime(path, runtime string) error {
return os.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform,
r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) {
runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.SystemdCgroup)
p := process.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
}
// Container for operating on a runc container and its processes
type Container struct {
mu sync.Mutex
// ID of the container
ID string
// Bundle path
Bundle string
// cgroup is either cgroups.Cgroup or *cgroupsv2.Manager
cgroup interface{}
process process.Process
processes map[string]process.Process
reservedProcess map[string]struct{}
}
// All processes in the container
func (c *Container) All() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
if c.process != nil {
o = append(o, c.process)
}
return o
}
// ExecdProcesses added to the container
func (c *Container) ExecdProcesses() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
return o
}
// Pid of the main process of a container
func (c *Container) Pid() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.process.Pid()
}
// Cgroup of the container
func (c *Container) Cgroup() interface{} {
c.mu.Lock()
defer c.mu.Unlock()
return c.cgroup
}
// CgroupSet sets the cgroup to the container
func (c *Container) CgroupSet(cg interface{}) {
c.mu.Lock()
c.cgroup = cg
c.mu.Unlock()
}
// Process returns the process by id
func (c *Container) Process(id string) (process.Process, error) {
c.mu.Lock()
defer c.mu.Unlock()
if id == "" {
if c.process == nil {
return nil, fmt.Errorf("container must be created: %w", errdefs.ErrFailedPrecondition)
}
return c.process, nil
}
p, ok := c.processes[id]
if !ok {
return nil, fmt.Errorf("process does not exist %s: %w", id, errdefs.ErrNotFound)
}
return p, nil
}
// ReserveProcess checks for the existence of an id and atomically
// reserves the process id if it does not already exist
//
// Returns true if the process id was successfully reserved and a
// cancel func to release the reservation
func (c *Container) ReserveProcess(id string) (bool, func()) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.processes[id]; ok {
return false, nil
}
if _, ok := c.reservedProcess[id]; ok {
return false, nil
}
c.reservedProcess[id] = struct{}{}
return true, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, id)
}
}
// ProcessAdd adds a new process to the container
func (c *Container) ProcessAdd(process process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, process.ID())
c.processes[process.ID()] = process
}
// ProcessRemove removes the process by id from the container
func (c *Container) ProcessRemove(id string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.processes, id)
}
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return p, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(p.Pid())
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
cg, err = cgroupsv2.Load(g)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(p.Pid()))
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup for %d", p.Pid())
}
}
c.cgroup = cg
}
return p, nil
}
// Delete the container or a process by id
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
if r.ExecID != "" {
c.ProcessRemove(r.ExecID)
}
return p, nil
}
// Exec an additional process
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (process.Process, error) {
process, err := c.process.(*process.Init).Exec(ctx, c.Bundle, &process.ExecConfig{
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil {
return nil, err
}
c.ProcessAdd(process)
return process, nil
}
// Pause the container
func (c *Container) Pause(ctx context.Context) error {
return c.process.(*process.Init).Pause(ctx)
}
// Resume the container
func (c *Container) Resume(ctx context.Context) error {
return c.process.(*process.Init).Resume(ctx)
}
// ResizePty of a process
func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
return p.Resize(ws)
}
// Kill a process
func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
return p.Kill(ctx, r.Signal, r.All)
}
// CloseIO of a process
func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return fmt.Errorf("close stdin: %w", err)
}
}
return nil
}
// Checkpoint the container
func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error {
p, err := c.Process("")
if err != nil {
return err
}
var opts options.CheckpointOptions
if r.Options != nil {
if err := typeurl.UnmarshalTo(r.Options, &opts); err != nil {
return err
}
}
return p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{
Path: r.Path,
Exit: opts.Exit,
AllowOpenTCP: opts.OpenTcp,
AllowExternalUnixSockets: opts.ExternalUnixSockets,
AllowTerminal: opts.Terminal,
FileLocks: opts.FileLocks,
EmptyNamespaces: opts.EmptyNamespaces,
WorkDir: opts.WorkPath,
})
}
// Update the resource information of a running container
func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error {
p, err := c.Process("")
if err != nil {
return err
}
return p.(*process.Init).Update(ctx, r.Resources)
}
// HasPid returns true if the container owns a specific pid
func (c *Container) HasPid(pid int) bool {
if c.Pid() == pid {
return true
}
for _, p := range c.All() {
if p.Pid() == pid {
return true
}
}
return false
}