Move runc shim implementation to cmd

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko
2023-11-14 10:13:32 -08:00
parent 7deb68fbf4
commit 7d65a45639
10 changed files with 9 additions and 9 deletions

View File

@@ -1,512 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/console"
"github.com/containerd/containerd/v2/api/runtime/task/v3"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/pkg/process"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/containerd/v2/runtime/v2/runc/options"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
)
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, fmt.Errorf("create namespace: %w", err)
}
opts := &options.Options{}
if r.Options.GetValue() != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
if v != nil {
opts = v.(*options.Options)
}
}
var pmounts []process.Mount
for _, m := range r.Rootfs {
pmounts = append(pmounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rootfs := ""
if len(pmounts) > 0 {
rootfs = filepath.Join(r.Bundle, "rootfs")
if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
return nil, err
}
}
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
Rootfs: pmounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
if err := WriteOptions(r.Bundle, opts); err != nil {
return nil, err
}
// For historical reason, we write opts.BinaryName as well as the entire opts
if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
return nil, err
}
var mounts []mount.Mount
for _, pm := range pmounts {
mounts = append(mounts, mount.Mount{
Type: pm.Type,
Source: pm.Source,
Target: pm.Target,
Options: pm.Options,
})
}
defer func() {
if retErr != nil {
if err := mount.UnmountMounts(mounts, rootfs, 0); err != nil {
log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount")
}
}
}()
if err := mount.All(mounts, rootfs); err != nil {
return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
}
p, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
ns,
platform,
config,
opts,
rootfs,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := p.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: p,
processes: make(map[string]process.Process),
reservedProcess: make(map[string]struct{}),
}
pid := p.Pid()
if pid > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(pid)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
return container, nil
}
cg, err = cgroupsv2.Load(g)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", pid)
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(pid))
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup for %d", pid)
}
}
container.cgroup = cg
}
return container, nil
}
const optionsFilename = "options.json"
// ReadOptions reads the option information from the path.
// When the file does not exist, ReadOptions returns nil without an error.
func ReadOptions(path string) (*options.Options, error) {
filePath := filepath.Join(path, optionsFilename)
if _, err := os.Stat(filePath); err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
data, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
var opts options.Options
if err := json.Unmarshal(data, &opts); err != nil {
return nil, err
}
return &opts, nil
}
// WriteOptions writes the options information into the path
func WriteOptions(path string, opts *options.Options) error {
data, err := json.Marshal(opts)
if err != nil {
return err
}
return os.WriteFile(filepath.Join(path, optionsFilename), data, 0600)
}
// ReadRuntime reads the runtime information from the path
func ReadRuntime(path string) (string, error) {
data, err := os.ReadFile(filepath.Join(path, "runtime"))
if err != nil {
return "", err
}
return string(data), nil
}
// WriteRuntime writes the runtime information into the path
func WriteRuntime(path, runtime string) error {
return os.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform,
r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) {
runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.SystemdCgroup)
p := process.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
}
// Container for operating on a runc container and its processes
type Container struct {
mu sync.Mutex
// ID of the container
ID string
// Bundle path
Bundle string
// cgroup is either cgroups.Cgroup or *cgroupsv2.Manager
cgroup interface{}
process process.Process
processes map[string]process.Process
reservedProcess map[string]struct{}
}
// All processes in the container
func (c *Container) All() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
if c.process != nil {
o = append(o, c.process)
}
return o
}
// ExecdProcesses added to the container
func (c *Container) ExecdProcesses() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
return o
}
// Pid of the main process of a container
func (c *Container) Pid() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.process.Pid()
}
// Cgroup of the container
func (c *Container) Cgroup() interface{} {
c.mu.Lock()
defer c.mu.Unlock()
return c.cgroup
}
// CgroupSet sets the cgroup to the container
func (c *Container) CgroupSet(cg interface{}) {
c.mu.Lock()
c.cgroup = cg
c.mu.Unlock()
}
// Process returns the process by id
func (c *Container) Process(id string) (process.Process, error) {
c.mu.Lock()
defer c.mu.Unlock()
if id == "" {
if c.process == nil {
return nil, fmt.Errorf("container must be created: %w", errdefs.ErrFailedPrecondition)
}
return c.process, nil
}
p, ok := c.processes[id]
if !ok {
return nil, fmt.Errorf("process does not exist %s: %w", id, errdefs.ErrNotFound)
}
return p, nil
}
// ReserveProcess checks for the existence of an id and atomically
// reserves the process id if it does not already exist
//
// Returns true if the process id was successfully reserved and a
// cancel func to release the reservation
func (c *Container) ReserveProcess(id string) (bool, func()) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.processes[id]; ok {
return false, nil
}
if _, ok := c.reservedProcess[id]; ok {
return false, nil
}
c.reservedProcess[id] = struct{}{}
return true, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, id)
}
}
// ProcessAdd adds a new process to the container
func (c *Container) ProcessAdd(process process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, process.ID())
c.processes[process.ID()] = process
}
// ProcessRemove removes the process by id from the container
func (c *Container) ProcessRemove(id string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.processes, id)
}
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return p, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(p.Pid())
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
cg, err = cgroupsv2.Load(g)
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
} else {
cg, err = cgroup1.Load(cgroup1.PidPath(p.Pid()))
if err != nil {
log.G(ctx).WithError(err).Errorf("loading cgroup for %d", p.Pid())
}
}
c.cgroup = cg
}
return p, nil
}
// Delete the container or a process by id
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
if r.ExecID != "" {
c.ProcessRemove(r.ExecID)
}
return p, nil
}
// Exec an additional process
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (process.Process, error) {
process, err := c.process.(*process.Init).Exec(ctx, c.Bundle, &process.ExecConfig{
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil {
return nil, err
}
c.ProcessAdd(process)
return process, nil
}
// Pause the container
func (c *Container) Pause(ctx context.Context) error {
return c.process.(*process.Init).Pause(ctx)
}
// Resume the container
func (c *Container) Resume(ctx context.Context) error {
return c.process.(*process.Init).Resume(ctx)
}
// ResizePty of a process
func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
return p.Resize(ws)
}
// Kill a process
func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
return p.Kill(ctx, r.Signal, r.All)
}
// CloseIO of a process
func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return fmt.Errorf("close stdin: %w", err)
}
}
return nil
}
// Checkpoint the container
func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error {
p, err := c.Process("")
if err != nil {
return err
}
var opts options.CheckpointOptions
if r.Options != nil {
if err := typeurl.UnmarshalTo(r.Options, &opts); err != nil {
return err
}
}
return p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{
Path: r.Path,
Exit: opts.Exit,
AllowOpenTCP: opts.OpenTcp,
AllowExternalUnixSockets: opts.ExternalUnixSockets,
AllowTerminal: opts.Terminal,
FileLocks: opts.FileLocks,
EmptyNamespaces: opts.EmptyNamespaces,
WorkDir: opts.WorkPath,
})
}
// Update the resource information of a running container
func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error {
p, err := c.Process("")
if err != nil {
return err
}
return p.(*process.Init).Update(ctx, r.Resources)
}
// HasPid returns true if the container owns a specific pid
func (c *Container) HasPid(pid int) bool {
if c.Pid() == pid {
return true
}
for _, p := range c.All() {
if p.Pid() == pid {
return true
}
}
return false
}

View File

@@ -1,274 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package manager
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
goruntime "runtime"
"syscall"
"time"
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/oci"
"github.com/containerd/containerd/v2/pkg/process"
"github.com/containerd/containerd/v2/pkg/schedcore"
"github.com/containerd/containerd/v2/runtime/v2/runc"
"github.com/containerd/containerd/v2/runtime/v2/runc/options"
"github.com/containerd/containerd/v2/runtime/v2/shim"
runcC "github.com/containerd/go-runc"
"github.com/containerd/log"
"golang.org/x/sys/unix"
)
// NewShimManager returns an implementation of the shim manager
// using runc
func NewShimManager(name string) shim.Manager {
return &manager{
name: name,
}
}
// group labels specifies how the shim groups services.
// currently supports a runc.v2 specific .group label and the
// standard k8s pod label. Order matters in this list
var groupLabels = []string{
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
}
// spec is a shallow version of [oci.Spec] containing only the
// fields we need for the hook. We use a shallow struct to reduce
// the overhead of unmarshaling.
type spec struct {
// Annotations contains arbitrary metadata for the container.
Annotations map[string]string `json:"annotations,omitempty"`
}
type manager struct {
name string
}
func newCommand(ctx context.Context, id, containerdAddress, containerdTTRPCAddress string, debug bool) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
}
if debug {
args = append(args, "-debug")
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func readSpec() (*spec, error) {
f, err := os.Open(oci.ConfigFilename)
if err != nil {
return nil, err
}
defer f.Close()
var s spec
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func (m manager) Name() string {
return m.name
}
func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shim.BootstrapParams, retErr error) {
var params shim.BootstrapParams
params.Version = 3
params.Protocol = "ttrpc"
cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug)
if err != nil {
return params, err
}
grouping := id
spec, err := readSpec()
if err != nil {
return params, err
}
for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok {
grouping = groupID
break
}
}
address, err := shim.SocketAddress(ctx, opts.Address, grouping)
if err != nil {
return params, err
}
socket, err := shim.NewSocket(address)
if err != nil {
// the only time where this would happen is if there is a bug and the socket
// was not cleaned up in the cleanup method of the shim or we are using the
// grouping functionality where the new process should be run with the same
// shim as an existing container
if !shim.SocketEaddrinuse(err) {
return params, fmt.Errorf("create new shim socket: %w", err)
}
if shim.CanConnect(address) {
params.Address = address
return params, nil
}
if err := shim.RemoveSocket(address); err != nil {
return params, fmt.Errorf("remove pre-existing socket: %w", err)
}
if socket, err = shim.NewSocket(address); err != nil {
return params, fmt.Errorf("try create new shim socket 2x: %w", err)
}
}
defer func() {
if retErr != nil {
socket.Close()
_ = shim.RemoveSocket(address)
}
}()
f, err := socket.File()
if err != nil {
return params, err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
goruntime.LockOSThread()
if os.Getenv("SCHED_CORE") != "" {
if err := schedcore.Create(schedcore.ProcessGroup); err != nil {
return params, fmt.Errorf("enable sched core support: %w", err)
}
}
if err := cmd.Start(); err != nil {
f.Close()
return params, err
}
goruntime.UnlockOSThread()
defer func() {
if retErr != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if opts, err := shim.ReadRuntimeOptions[*options.Options](os.Stdin); err == nil {
if opts.ShimCgroup != "" {
if cgroups.Mode() == cgroups.Unified {
cg, err := cgroupsv2.Load(opts.ShimCgroup)
if err != nil {
return params, fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return params, fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err)
}
} else {
cg, err := cgroup1.Load(cgroup1.StaticPath(opts.ShimCgroup))
if err != nil {
return params, fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return params, fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err)
}
}
}
}
if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
return params, fmt.Errorf("failed to adjust OOM score for shim: %w", err)
}
params.Address = address
return params, nil
}
func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) {
cwd, err := os.Getwd()
if err != nil {
return shim.StopStatus{}, err
}
path := filepath.Join(filepath.Dir(cwd), id)
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return shim.StopStatus{}, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return shim.StopStatus{}, err
}
opts, err := runc.ReadOptions(path)
if err != nil {
return shim.StopStatus{}, err
}
root := process.RuncRoot
if opts != nil && opts.Root != "" {
root = opts.Root
}
r := process.NewRunc(root, path, ns, runtime, false)
if err := r.Delete(ctx, id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
log.G(ctx).WithError(err).Warn("failed to remove runc container")
}
if err := mount.UnmountRecursive(filepath.Join(path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).Warn("failed to cleanup rootfs mount")
}
pid, err := runcC.ReadPidFile(filepath.Join(path, process.InitPidFile))
if err != nil {
log.G(ctx).WithError(err).Warn("failed to read init pid file")
}
return shim.StopStatus{
ExitedAt: time.Now(),
ExitStatus: 128 + int(unix.SIGKILL),
Pid: pid,
}, nil
}

View File

@@ -1,122 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pause
import (
"context"
"runtime"
api "github.com/containerd/containerd/v2/api/runtime/sandbox/v1"
"github.com/containerd/containerd/v2/api/types"
"github.com/containerd/containerd/v2/pkg/shutdown"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/runtime/v2/shim"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
"github.com/containerd/ttrpc"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.TTRPCPlugin,
ID: "pause",
Requires: []plugin.Type{
plugins.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ss, err := ic.GetByID(plugins.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return &pauseService{
shutdown: ss.(shutdown.Service),
}, nil
},
})
}
var (
_ = shim.TTRPCService(&pauseService{})
_ = api.TTRPCSandboxService(&pauseService{})
)
// pauseService is an extension for task v2 runtime to support Pod "pause" containers via sandbox API.
type pauseService struct {
shutdown shutdown.Service
}
func (p *pauseService) RegisterTTRPC(server *ttrpc.Server) error {
api.RegisterTTRPCSandboxService(server, p)
return nil
}
func (p *pauseService) CreateSandbox(ctx context.Context, req *api.CreateSandboxRequest) (*api.CreateSandboxResponse, error) {
log.G(ctx).Debugf("create sandbox request: %+v", req)
return &api.CreateSandboxResponse{}, nil
}
func (p *pauseService) StartSandbox(ctx context.Context, req *api.StartSandboxRequest) (*api.StartSandboxResponse, error) {
log.G(ctx).Debugf("start sandbox request: %+v", req)
return &api.StartSandboxResponse{}, nil
}
func (p *pauseService) Platform(ctx context.Context, req *api.PlatformRequest) (*api.PlatformResponse, error) {
log.G(ctx).Debugf("platform request: %+v", req)
platform := types.Platform{
OS: runtime.GOOS,
Architecture: runtime.GOARCH,
}
return &api.PlatformResponse{Platform: &platform}, nil
}
func (p *pauseService) StopSandbox(ctx context.Context, req *api.StopSandboxRequest) (*api.StopSandboxResponse, error) {
log.G(ctx).Debugf("stop sandbox request: %+v", req)
p.shutdown.Shutdown()
return &api.StopSandboxResponse{}, nil
}
func (p *pauseService) WaitSandbox(ctx context.Context, req *api.WaitSandboxRequest) (*api.WaitSandboxResponse, error) {
log.G(ctx).Debugf("wait sandbox request: %+v", req)
return &api.WaitSandboxResponse{
ExitStatus: 0,
}, nil
}
func (p *pauseService) SandboxStatus(ctx context.Context, req *api.SandboxStatusRequest) (*api.SandboxStatusResponse, error) {
log.G(ctx).Debugf("sandbox status request: %+v", req)
return &api.SandboxStatusResponse{}, nil
}
func (p *pauseService) PingSandbox(ctx context.Context, req *api.PingRequest) (*api.PingResponse, error) {
return &api.PingResponse{}, nil
}
func (p *pauseService) ShutdownSandbox(ctx context.Context, req *api.ShutdownSandboxRequest) (*api.ShutdownSandboxResponse, error) {
log.G(ctx).Debugf("shutdown sandbox request: %+v", req)
return &api.ShutdownSandboxResponse{}, nil
}
func (p *pauseService) SandboxMetrics(ctx context.Context, req *api.SandboxMetricsRequest) (*api.SandboxMetricsResponse, error) {
log.G(ctx).Debugf("sandbox metrics request: %+v", req)
return &api.SandboxMetricsResponse{}, nil
}

View File

@@ -1,202 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/pkg/process"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/fifo"
)
var bufPool = sync.Pool{
New: func() interface{} {
// setting to 4096 to align with PIPE_BUF
// http://man7.org/linux/man-pages/man7/pipe.7.html
buffer := make([]byte, 4096)
return &buffer
},
}
// NewPlatform returns a linux platform for use with I/O operations
func NewPlatform() (stdio.Platform, error) {
epoller, err := console.NewEpoller()
if err != nil {
return nil, fmt.Errorf("failed to initialize epoller: %w", err)
}
go epoller.Wait()
return &linuxPlatform{
epoller: epoller,
}, nil
}
type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (cons console.Console, retErr error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
epollConsole, err := p.epoller.Add(console)
if err != nil {
return nil, err
}
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(context.Background(), stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
bp := bufPool.Get().(*[]byte)
defer bufPool.Put(bp)
io.CopyBuffer(epollConsole, in, *bp)
// we need to shutdown epollConsole when pipe broken
epollConsole.Shutdown(p.epoller.CloseConsole)
epollConsole.Close()
in.Close()
}()
}
uri, err := url.Parse(stdout)
if err != nil {
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
}
switch uri.Scheme {
case "binary":
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
cmd := process.NewBinaryCmd(uri, id, ns)
// In case of unexpected errors during logging binary start, close open pipes
var filesToClose []*os.File
defer func() {
if retErr != nil {
process.CloseFiles(filesToClose...)
}
}()
// Create pipe to be used by logging binary for Stdout
outR, outW, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
filesToClose = append(filesToClose, outR)
// Stderr is created for logging binary but unused when terminal is true
serrR, _, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
filesToClose = append(filesToClose, serrR)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
filesToClose = append(filesToClose, r)
cmd.ExtraFiles = append(cmd.ExtraFiles, outR, serrR, w)
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outW, epollConsole)
outW.Close()
wg.Done()
}()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start logging binary process: %w", err)
}
// Close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// Wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
cwg.Wait()
default:
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
io.CopyBuffer(outw, epollConsole, *buf)
outw.Close()
outr.Close()
wg.Done()
}()
cwg.Wait()
}
return epollConsole, nil
}
func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil {
return errors.New("uninitialized epoller")
}
epollConsole, ok := cons.(*console.EpollConsole)
if !ok {
return fmt.Errorf("expected EpollConsole, got %#v", cons)
}
return epollConsole.Shutdown(p.epoller.CloseConsole)
}
func (p *linuxPlatform) Close() error {
return p.epoller.Close()
}

View File

@@ -1,49 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"github.com/containerd/containerd/v2/pkg/shutdown"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/runtime/v2/runc/task"
"github.com/containerd/containerd/v2/runtime/v2/shim"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.TTRPCPlugin,
ID: "task",
Requires: []plugin.Type{
plugins.EventPlugin,
plugins.InternalPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
pp, err := ic.GetByID(plugins.EventPlugin, "publisher")
if err != nil {
return nil, err
}
ss, err := ic.GetByID(plugins.InternalPlugin, "shutdown")
if err != nil {
return nil, err
}
return task.NewTaskService(ic.Context, pp.(shim.Publisher), ss.(shutdown.Service))
},
})
}

View File

@@ -1,727 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package task
import (
"context"
"fmt"
"os"
"sync"
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
eventstypes "github.com/containerd/containerd/v2/api/events"
taskAPI "github.com/containerd/containerd/v2/api/runtime/task/v3"
"github.com/containerd/containerd/v2/api/types/task"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/pkg/oom"
oomv1 "github.com/containerd/containerd/v2/pkg/oom/v1"
oomv2 "github.com/containerd/containerd/v2/pkg/oom/v2"
"github.com/containerd/containerd/v2/pkg/process"
"github.com/containerd/containerd/v2/pkg/shutdown"
"github.com/containerd/containerd/v2/pkg/stdio"
"github.com/containerd/containerd/v2/pkg/userns"
"github.com/containerd/containerd/v2/protobuf"
ptypes "github.com/containerd/containerd/v2/protobuf/types"
"github.com/containerd/containerd/v2/runtime"
"github.com/containerd/containerd/v2/runtime/v2/runc"
"github.com/containerd/containerd/v2/runtime/v2/runc/options"
"github.com/containerd/containerd/v2/runtime/v2/shim"
"github.com/containerd/containerd/v2/sys/reaper"
runcC "github.com/containerd/go-runc"
"github.com/containerd/log"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl/v2"
)
var (
_ = shim.TTRPCService(&service{})
empty = &ptypes.Empty{}
)
// NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) {
var (
ep oom.Watcher
err error
)
if cgroups.Mode() == cgroups.Unified {
ep, err = oomv2.New(publisher)
} else {
ep, err = oomv1.New(publisher)
}
if err != nil {
return nil, err
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
}
go s.processExits()
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
}
go s.forward(ctx, publisher)
sd.RegisterCallback(func(context.Context) error {
close(s.events)
return nil
})
if address, err := shim.ReadAddress("address"); err == nil {
sd.RegisterCallback(func(context.Context) error {
return shim.RemoveSocket(address)
})
}
return s, nil
}
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
context context.Context
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep oom.Watcher
containers map[string]*runc.Container
lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
exitSubscribers map[*map[int][]runcC.Exit]struct{}
shutdown shutdown.Service
}
type containerProcess struct {
Container *runc.Container
Process process.Process
}
// preStart prepares for starting a container process and handling its exit.
// The container being started should be passed in as c when starting the
// container init process for an already-created container. c should be nil when
// creating a container or when starting an exec.
//
// The returned handleStarted closure records that the process has started so
// that its exit can be handled efficiently. If the process has already exited,
// it handles the exit immediately. handleStarted should be called after the
// event announcing the start of the process has been published.
// Note that handleStarted needs to be aware of whether s.mu is already held
// when it is called. If s.mu has been held, we don't need to lock it when
// calling handleProcessExit.
//
// The returned cleanup closure releases resources used to handle early exits.
// It must be called before the caller of preStart returns, otherwise severe
// memory leaks will occur.
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process, bool), cleanup func()) {
exits := make(map[int][]runcC.Exit)
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
s.exitSubscribers[&exits] = struct{}{}
if c != nil {
// Remove container init process from s.running so it will once again be
// treated as an early exit if it exits before handleStarted is called.
pid := c.Pid()
var newRunning []containerProcess
for _, cp := range s.running[pid] {
if cp.Container != c {
newRunning = append(newRunning, cp)
}
}
if len(newRunning) > 0 {
s.running[pid] = newRunning
} else {
delete(s.running, pid)
}
}
handleStarted = func(c *runc.Container, p process.Process, muLocked bool) {
var pid int
if p != nil {
pid = p.Pid()
}
s.lifecycleMu.Lock()
ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
if pid == 0 { // no-op
s.lifecycleMu.Unlock()
} else if exited {
s.lifecycleMu.Unlock()
for _, ee := range ees {
if muLocked {
s.handleProcessExit(ee, c, p)
} else {
s.mu.Lock()
s.handleProcessExit(ee, c, p)
s.mu.Unlock()
}
}
} else {
s.running[pid] = append(s.running[pid], containerProcess{
Container: c,
Process: p,
})
s.lifecycleMu.Unlock()
}
}
cleanup = func() {
if exits != nil {
s.lifecycleMu.Lock()
defer s.lifecycleMu.Unlock()
delete(s.exitSubscribers, &exits)
}
}
return handleStarted, cleanup
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
handleStarted, cleanup := s.preStart(nil)
defer cleanup()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
// The following line cannot return an error as the only state in which that
// could happen would also cause the container.Pid() call above to
// nil-deference panic.
proc, _ := container.Process("")
handleStarted(container, proc, true)
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
taskAPI.RegisterTTRPCTaskService(server, s)
return nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
var cinit *runc.Container
if r.ExecID == "" {
cinit = container
}
handleStarted, cleanup := s.preStart(cinit)
defer cleanup()
p, err := container.Start(ctx, r)
if err != nil {
handleStarted(container, p, false)
return nil, errdefs.ToGRPC(err)
}
switch r.ExecID {
case "":
switch cg := container.Cgroup().(type) {
case cgroup1.Cgroup:
if err := s.ep.Add(container.ID, cg); err != nil {
log.G(ctx).WithError(err).Error("add cg to OOM monitor")
}
case *cgroupsv2.Manager:
allControllers, err := cg.RootControllers()
if err != nil {
log.G(ctx).WithError(err).Error("failed to get root controllers")
} else {
if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
if userns.RunningInUserNS() {
log.G(ctx).WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
} else {
log.G(ctx).WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
}
}
}
if err := s.ep.Add(container.ID, cg); err != nil {
log.G(ctx).WithError(err).Error("add cg to OOM monitor")
}
}
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
handleStarted(container, p, false)
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// if we deleted an init task, send the task delete event
if r.ExecID == "" {
s.mu.Lock()
delete(s.containers, r.ID)
s.mu.Unlock()
s.send(&eventstypes.TaskDelete{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
ok, cancel := container.ReserveProcess(r.ExecID)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: container.ID,
ExecID: process.ID(),
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.Status_UNKNOWN
switch st {
case "created":
status = task.Status_CREATED
case "running":
status = task.Status_RUNNING
case "stopped":
status = task.Status_STOPPED
case "paused":
status = task.Status_PAUSED
case "pausing":
status = task.Status_PAUSING
}
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
ContainerID: container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
ContainerID: container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, container)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
}
a, err := protobuf.MarshalAnyToProto(d)
if err != nil {
return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &taskAPI.PidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if container, err := s.getContainer(r.ID); err == nil {
pid = container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(pid),
}, nil
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
return empty, nil
}
// please make sure that temporary resource has been cleanup or registered
// for cleanup before calling shutdown
s.shutdown.Shutdown()
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cgx := container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
var statsx interface{}
switch cg := cgx.(type) {
case cgroup1.Cgroup:
stats, err := cg.Stat(cgroup1.IgnoreNotExist)
if err != nil {
return nil, err
}
statsx = stats
case *cgroupsv2.Manager:
stats, err := cg.Stat()
if err != nil {
return nil, err
}
statsx = stats
default:
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
}
data, err := typeurl.MarshalAny(statsx)
if err != nil {
return nil, err
}
return &taskAPI.StatsResponse{
Stats: protobuf.FromAny(data),
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
// While unlikely, it is not impossible for a container process to exit
// and have its PID be recycled for a new container process before we
// have a chance to process the first exit. As we have no way to tell
// for sure which of the processes the exit event corresponds to (until
// pidfd support is implemented) there is no way for us to handle the
// exit correctly in that case.
s.lifecycleMu.Lock()
// Inform any concurrent s.Start() calls so they can handle the exit
// if the PID belongs to them.
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()
for _, cp := range cps {
s.mu.Lock()
s.handleProcessExit(e, cp.Container, cp.Process)
s.mu.Unlock()
}
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
// s.mu must be locked when calling handleProcessExit
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
p, err := container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*process.Init).Runtime().Ps(ctx, container.ID)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
err := publisher.Publish(ctx, runtime.GetTopic(e), e)
if err != nil {
log.G(ctx).WithError(err).Error("post event")
}
}
publisher.Close()
}
func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.containers[id]
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
return container, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
p, err := runc.NewPlatform()
if err != nil {
return err
}
s.platform = p
s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
return nil
}

View File

@@ -1,47 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"path/filepath"
"github.com/containerd/containerd/v2/oci"
"github.com/containerd/log"
"github.com/opencontainers/runtime-spec/specs-go"
)
// ShouldKillAllOnExit reads the bundle's OCI spec and returns true if
// there is an error reading the spec or if the container has a private PID namespace
func ShouldKillAllOnExit(ctx context.Context, bundlePath string) bool {
spec, err := oci.ReadSpec(filepath.Join(bundlePath, oci.ConfigFilename))
if err != nil {
log.G(ctx).WithError(err).Error("shouldKillAllOnExit: failed to read config.json")
return true
}
if spec.Linux != nil {
for _, ns := range spec.Linux.Namespaces {
if ns.Type == specs.PIDNamespace && ns.Path == "" {
return false
}
}
}
return true
}