
For a given container, as long as the init process is the init process
of that PID namespace, we always receive the exits for execs before we
receive them for the init process.
It's important that we uphold this invariant for the outside world by
always emitting a TastExit event for a container's exec before we emit
one for the init process because this is the expected behavior from
callers, and changing this creates issues - such as Docker, which will
delete the container after receiving a TaskExit for the init process,
and then not be able to handle the exec's exit after having deleted
the container (see: https://github.com/containerd/containerd/issues/9719).
Since 5cd6210ad0
, if an exec is starting
at the same time that an init exits, if the exec is an "early exit"
i.e. we haven't emitted a TaskStart for it/put it in `s.running` by the
time we receive it's exit, we notify concurrent calls to `s.Start()` of
the exit and continue processing exits, which will cause us to process
the Init's exit before the exec, and emit it, which we don't want to do.
This commit introduces a map `s.pendingExecs` to keep track of the
number of pending execs keyed by container, which allows us to skip
processing exits for inits if there are pending execs, and instead
have the closure returned by `s.preStart` handle the init exit after
emitting the exec's exit.
Signed-off-by: Laura Brehm <laurabrehm@hey.com>
782 lines
22 KiB
Go
782 lines
22 KiB
Go
//go:build linux
|
|
|
|
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package task
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/containerd/cgroups/v3"
|
|
"github.com/containerd/cgroups/v3/cgroup1"
|
|
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
|
|
eventstypes "github.com/containerd/containerd/v2/api/events"
|
|
taskAPI "github.com/containerd/containerd/v2/api/runtime/task/v3"
|
|
"github.com/containerd/containerd/v2/api/types/task"
|
|
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/process"
|
|
"github.com/containerd/containerd/v2/cmd/containerd-shim-runc-v2/runc"
|
|
"github.com/containerd/containerd/v2/core/runtime"
|
|
"github.com/containerd/containerd/v2/core/runtime/v2/runc/options"
|
|
"github.com/containerd/containerd/v2/core/runtime/v2/shim"
|
|
"github.com/containerd/containerd/v2/pkg/namespaces"
|
|
"github.com/containerd/containerd/v2/pkg/oom"
|
|
oomv1 "github.com/containerd/containerd/v2/pkg/oom/v1"
|
|
oomv2 "github.com/containerd/containerd/v2/pkg/oom/v2"
|
|
"github.com/containerd/containerd/v2/pkg/shutdown"
|
|
"github.com/containerd/containerd/v2/pkg/stdio"
|
|
"github.com/containerd/containerd/v2/pkg/sys/reaper"
|
|
"github.com/containerd/containerd/v2/pkg/userns"
|
|
"github.com/containerd/containerd/v2/protobuf"
|
|
ptypes "github.com/containerd/containerd/v2/protobuf/types"
|
|
"github.com/containerd/errdefs"
|
|
runcC "github.com/containerd/go-runc"
|
|
"github.com/containerd/log"
|
|
"github.com/containerd/ttrpc"
|
|
"github.com/containerd/typeurl/v2"
|
|
)
|
|
|
|
var (
|
|
_ = shim.TTRPCService(&service{})
|
|
empty = &ptypes.Empty{}
|
|
)
|
|
|
|
// NewTaskService creates a new instance of a task service
|
|
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) {
|
|
var (
|
|
ep oom.Watcher
|
|
err error
|
|
)
|
|
if cgroups.Mode() == cgroups.Unified {
|
|
ep, err = oomv2.New(publisher)
|
|
} else {
|
|
ep, err = oomv1.New(publisher)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go ep.Run(ctx)
|
|
s := &service{
|
|
context: ctx,
|
|
events: make(chan interface{}, 128),
|
|
ec: reaper.Default.Subscribe(),
|
|
ep: ep,
|
|
shutdown: sd,
|
|
containers: make(map[string]*runc.Container),
|
|
running: make(map[int][]containerProcess),
|
|
pendingExecs: make(map[*runc.Container]int),
|
|
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
|
|
}
|
|
go s.processExits()
|
|
runcC.Monitor = reaper.Default
|
|
if err := s.initPlatform(); err != nil {
|
|
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
|
|
}
|
|
go s.forward(ctx, publisher)
|
|
sd.RegisterCallback(func(context.Context) error {
|
|
close(s.events)
|
|
return nil
|
|
})
|
|
|
|
if address, err := shim.ReadAddress("address"); err == nil {
|
|
sd.RegisterCallback(func(context.Context) error {
|
|
return shim.RemoveSocket(address)
|
|
})
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// service is the shim implementation of a remote shim over GRPC
|
|
type service struct {
|
|
mu sync.Mutex
|
|
|
|
context context.Context
|
|
events chan interface{}
|
|
platform stdio.Platform
|
|
ec chan runcC.Exit
|
|
ep oom.Watcher
|
|
|
|
containers map[string]*runc.Container
|
|
|
|
lifecycleMu sync.Mutex
|
|
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
|
|
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
|
|
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
|
|
// dereferencing the subscription pointers must only be done while holding
|
|
// lifecycleMu.
|
|
exitSubscribers map[*map[int][]runcC.Exit]struct{}
|
|
|
|
shutdown shutdown.Service
|
|
}
|
|
|
|
type containerProcess struct {
|
|
Container *runc.Container
|
|
Process process.Process
|
|
}
|
|
|
|
// preStart prepares for starting a container process and handling its exit.
|
|
// The container being started should be passed in as c when starting the container
|
|
// init process for an already-created container. c should be nil when creating a
|
|
// container or when starting an exec.
|
|
//
|
|
// The returned handleStarted closure records that the process has started so
|
|
// that its exit can be handled efficiently. If the process has already exited,
|
|
// it handles the exit immediately. In addition, if the process is an exec and
|
|
// its container's init process has already exited, that exit is also processed.
|
|
// handleStarted should be called after the event announcing the start of the
|
|
// process has been published. Note that s.lifecycleMu must not be held when
|
|
// calling handleStarted.
|
|
//
|
|
// The returned cleanup closure releases resources used to handle early exits.
|
|
// It must be called before the caller of preStart returns, otherwise severe
|
|
// memory leaks will occur.
|
|
func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Container, process.Process), cleanup func()) {
|
|
exits := make(map[int][]runcC.Exit)
|
|
s.exitSubscribers[&exits] = struct{}{}
|
|
|
|
if c != nil {
|
|
// Remove container init process from s.running so it will once again be
|
|
// treated as an early exit if it exits before handleStarted is called.
|
|
pid := c.Pid()
|
|
var newRunning []containerProcess
|
|
for _, cp := range s.running[pid] {
|
|
if cp.Container != c {
|
|
newRunning = append(newRunning, cp)
|
|
}
|
|
}
|
|
if len(newRunning) > 0 {
|
|
s.running[pid] = newRunning
|
|
} else {
|
|
delete(s.running, pid)
|
|
}
|
|
}
|
|
|
|
handleStarted = func(c *runc.Container, p process.Process) {
|
|
var pid int
|
|
if p != nil {
|
|
pid = p.Pid()
|
|
}
|
|
|
|
_, init := p.(*process.Init)
|
|
s.lifecycleMu.Lock()
|
|
|
|
var initExits []runcC.Exit
|
|
var initCps []containerProcess
|
|
if !init {
|
|
s.pendingExecs[c]--
|
|
|
|
initPid := c.Pid()
|
|
iExits, initExited := exits[initPid]
|
|
if initExited && s.pendingExecs[c] == 0 {
|
|
// c's init process has exited before handleStarted was called and
|
|
// this is the last pending exec process start - we need to process
|
|
// the exit for the init process after processing this exec, so:
|
|
// - delete c from the s.pendingExecs map
|
|
// - keep the exits for the init pid to process later (after we process
|
|
// this exec's exits)
|
|
// - get the necessary containerProcesses for the init process (that we
|
|
// need to process the exits), and remove them from s.running (which we skipped
|
|
// doing in processExits).
|
|
delete(s.pendingExecs, c)
|
|
initExits = iExits
|
|
var skipped []containerProcess
|
|
for _, initPidCp := range s.running[initPid] {
|
|
if initPidCp.Container == c {
|
|
initCps = append(initCps, initPidCp)
|
|
} else {
|
|
skipped = append(skipped, initPidCp)
|
|
}
|
|
}
|
|
if len(skipped) == 0 {
|
|
delete(s.running, initPid)
|
|
} else {
|
|
s.running[initPid] = skipped
|
|
}
|
|
}
|
|
}
|
|
|
|
ees, exited := exits[pid]
|
|
delete(s.exitSubscribers, &exits)
|
|
exits = nil
|
|
if pid == 0 || exited {
|
|
s.lifecycleMu.Unlock()
|
|
for _, ee := range ees {
|
|
s.handleProcessExit(ee, c, p)
|
|
}
|
|
for _, eee := range initExits {
|
|
for _, cp := range initCps {
|
|
s.handleProcessExit(eee, cp.Container, cp.Process)
|
|
}
|
|
}
|
|
} else {
|
|
// Process start was successful, add to `s.running`.
|
|
s.running[pid] = append(s.running[pid], containerProcess{
|
|
Container: c,
|
|
Process: p,
|
|
})
|
|
s.lifecycleMu.Unlock()
|
|
}
|
|
}
|
|
|
|
cleanup = func() {
|
|
if exits != nil {
|
|
s.lifecycleMu.Lock()
|
|
defer s.lifecycleMu.Unlock()
|
|
delete(s.exitSubscribers, &exits)
|
|
}
|
|
}
|
|
|
|
return handleStarted, cleanup
|
|
}
|
|
|
|
// Create a new initial process and container with the underlying OCI runtime
|
|
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.lifecycleMu.Lock()
|
|
handleStarted, cleanup := s.preStart(nil)
|
|
s.lifecycleMu.Unlock()
|
|
defer cleanup()
|
|
|
|
container, err := runc.NewContainer(ctx, s.platform, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.containers[r.ID] = container
|
|
|
|
s.send(&eventstypes.TaskCreate{
|
|
ContainerID: r.ID,
|
|
Bundle: r.Bundle,
|
|
Rootfs: r.Rootfs,
|
|
IO: &eventstypes.TaskIO{
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Terminal: r.Terminal,
|
|
},
|
|
Checkpoint: r.Checkpoint,
|
|
Pid: uint32(container.Pid()),
|
|
})
|
|
|
|
// The following line cannot return an error as the only state in which that
|
|
// could happen would also cause the container.Pid() call above to
|
|
// nil-deference panic.
|
|
proc, _ := container.Process("")
|
|
handleStarted(container, proc)
|
|
|
|
return &taskAPI.CreateTaskResponse{
|
|
Pid: uint32(container.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
|
|
taskAPI.RegisterTTRPCTaskService(server, s)
|
|
return nil
|
|
}
|
|
|
|
// Start a process
|
|
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cinit *runc.Container
|
|
s.lifecycleMu.Lock()
|
|
if r.ExecID == "" {
|
|
cinit = container
|
|
} else {
|
|
s.pendingExecs[container]++
|
|
}
|
|
handleStarted, cleanup := s.preStart(cinit)
|
|
s.lifecycleMu.Unlock()
|
|
defer cleanup()
|
|
|
|
p, err := container.Start(ctx, r)
|
|
if err != nil {
|
|
handleStarted(container, p)
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
switch r.ExecID {
|
|
case "":
|
|
switch cg := container.Cgroup().(type) {
|
|
case cgroup1.Cgroup:
|
|
if err := s.ep.Add(container.ID, cg); err != nil {
|
|
log.G(ctx).WithError(err).Error("add cg to OOM monitor")
|
|
}
|
|
case *cgroupsv2.Manager:
|
|
allControllers, err := cg.RootControllers()
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("failed to get root controllers")
|
|
} else {
|
|
if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
|
|
if userns.RunningInUserNS() {
|
|
log.G(ctx).WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
|
|
} else {
|
|
log.G(ctx).WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
|
|
}
|
|
}
|
|
}
|
|
if err := s.ep.Add(container.ID, cg); err != nil {
|
|
log.G(ctx).WithError(err).Error("add cg to OOM monitor")
|
|
}
|
|
}
|
|
|
|
s.send(&eventstypes.TaskStart{
|
|
ContainerID: container.ID,
|
|
Pid: uint32(p.Pid()),
|
|
})
|
|
default:
|
|
s.send(&eventstypes.TaskExecStarted{
|
|
ContainerID: container.ID,
|
|
ExecID: r.ExecID,
|
|
Pid: uint32(p.Pid()),
|
|
})
|
|
}
|
|
handleStarted(container, p)
|
|
return &taskAPI.StartResponse{
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Delete the initial process and container
|
|
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Delete(ctx, r)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
// if we deleted an init task, send the task delete event
|
|
if r.ExecID == "" {
|
|
s.mu.Lock()
|
|
delete(s.containers, r.ID)
|
|
s.mu.Unlock()
|
|
s.send(&eventstypes.TaskDelete{
|
|
ContainerID: container.ID,
|
|
Pid: uint32(p.Pid()),
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
|
})
|
|
}
|
|
return &taskAPI.DeleteResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Exec an additional process inside the container
|
|
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ok, cancel := container.ReserveProcess(r.ExecID)
|
|
if !ok {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
|
}
|
|
process, err := container.Exec(ctx, r)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
s.send(&eventstypes.TaskExecAdded{
|
|
ContainerID: container.ID,
|
|
ExecID: process.ID(),
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// ResizePty of a process
|
|
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.ResizePty(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// State returns runtime state information for a process
|
|
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Process(r.ExecID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
st, err := p.Status(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
status := task.Status_UNKNOWN
|
|
switch st {
|
|
case "created":
|
|
status = task.Status_CREATED
|
|
case "running":
|
|
status = task.Status_RUNNING
|
|
case "stopped":
|
|
status = task.Status_STOPPED
|
|
case "paused":
|
|
status = task.Status_PAUSED
|
|
case "pausing":
|
|
status = task.Status_PAUSING
|
|
}
|
|
sio := p.Stdio()
|
|
return &taskAPI.StateResponse{
|
|
ID: p.ID(),
|
|
Bundle: container.Bundle,
|
|
Pid: uint32(p.Pid()),
|
|
Status: status,
|
|
Stdin: sio.Stdin,
|
|
Stdout: sio.Stdout,
|
|
Stderr: sio.Stderr,
|
|
Terminal: sio.Terminal,
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
|
}, nil
|
|
}
|
|
|
|
// Pause the container
|
|
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Pause(ctx); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
s.send(&eventstypes.TaskPaused{
|
|
ContainerID: container.ID,
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// Resume the container
|
|
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Resume(ctx); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
s.send(&eventstypes.TaskResumed{
|
|
ContainerID: container.ID,
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// Kill a process with the provided signal
|
|
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Kill(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Pids returns all pids inside the container
|
|
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids, err := s.getContainerPids(ctx, container)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
var processes []*task.ProcessInfo
|
|
for _, pid := range pids {
|
|
pInfo := task.ProcessInfo{
|
|
Pid: pid,
|
|
}
|
|
for _, p := range container.ExecdProcesses() {
|
|
if p.Pid() == int(pid) {
|
|
d := &options.ProcessDetails{
|
|
ExecID: p.ID(),
|
|
}
|
|
a, err := protobuf.MarshalAnyToProto(d)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err)
|
|
}
|
|
pInfo.Info = a
|
|
break
|
|
}
|
|
}
|
|
processes = append(processes, &pInfo)
|
|
}
|
|
return &taskAPI.PidsResponse{
|
|
Processes: processes,
|
|
}, nil
|
|
}
|
|
|
|
// CloseIO of a process
|
|
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.CloseIO(ctx, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Checkpoint the container
|
|
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Checkpoint(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Update a running container
|
|
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Update(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Wait for a process to exit
|
|
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Process(r.ExecID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
p.Wait()
|
|
|
|
return &taskAPI.WaitResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
|
}, nil
|
|
}
|
|
|
|
// Connect returns shim information such as the shim's pid
|
|
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
|
var pid int
|
|
if container, err := s.getContainer(r.ID); err == nil {
|
|
pid = container.Pid()
|
|
}
|
|
return &taskAPI.ConnectResponse{
|
|
ShimPid: uint32(os.Getpid()),
|
|
TaskPid: uint32(pid),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// return out if the shim is still servicing containers
|
|
if len(s.containers) > 0 {
|
|
return empty, nil
|
|
}
|
|
|
|
// please make sure that temporary resource has been cleanup or registered
|
|
// for cleanup before calling shutdown
|
|
s.shutdown.Shutdown()
|
|
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cgx := container.Cgroup()
|
|
if cgx == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
|
}
|
|
var statsx interface{}
|
|
switch cg := cgx.(type) {
|
|
case cgroup1.Cgroup:
|
|
stats, err := cg.Stat(cgroup1.IgnoreNotExist)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
statsx = stats
|
|
case *cgroupsv2.Manager:
|
|
stats, err := cg.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
statsx = stats
|
|
default:
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
|
|
}
|
|
data, err := typeurl.MarshalAny(statsx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &taskAPI.StatsResponse{
|
|
Stats: protobuf.FromAny(data),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) processExits() {
|
|
for e := range s.ec {
|
|
// While unlikely, it is not impossible for a container process to exit
|
|
// and have its PID be recycled for a new container process before we
|
|
// have a chance to process the first exit. As we have no way to tell
|
|
// for sure which of the processes the exit event corresponds to (until
|
|
// pidfd support is implemented) there is no way for us to handle the
|
|
// exit correctly in that case.
|
|
|
|
s.lifecycleMu.Lock()
|
|
// Inform any concurrent s.Start() calls so they can handle the exit
|
|
// if the PID belongs to them.
|
|
for subscriber := range s.exitSubscribers {
|
|
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
|
|
}
|
|
// Handle the exit for a created/started process. If there's more than
|
|
// one, assume they've all exited. One of them will be the correct
|
|
// process.
|
|
var cps, skipped []containerProcess
|
|
for _, cp := range s.running[e.Pid] {
|
|
if s.pendingExecs[cp.Container] != 0 {
|
|
// This exit relates to a container for which we have pending execs. In
|
|
// order to ensure order between execs and the init process for a given
|
|
// container, skip processing this exit here and let the `handleStarted`
|
|
// closure for the pending exec publish it.
|
|
skipped = append(skipped, cp)
|
|
} else {
|
|
cps = append(cps, cp)
|
|
}
|
|
}
|
|
if len(skipped) > 0 {
|
|
s.running[e.Pid] = skipped
|
|
} else {
|
|
delete(s.running, e.Pid)
|
|
}
|
|
s.lifecycleMu.Unlock()
|
|
|
|
for _, cp := range cps {
|
|
s.handleProcessExit(e, cp.Container, cp.Process)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *service) send(evt interface{}) {
|
|
s.events <- evt
|
|
}
|
|
|
|
// s.mu must be locked when calling handleProcessExit
|
|
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
|
|
if ip, ok := p.(*process.Init); ok {
|
|
// Ensure all children are killed
|
|
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
|
|
if err := ip.KillAll(s.context); err != nil {
|
|
log.G(s.context).WithError(err).WithField("id", ip.ID()).
|
|
Error("failed to kill init's children")
|
|
}
|
|
}
|
|
}
|
|
|
|
p.SetExited(e.Status)
|
|
s.send(&eventstypes.TaskExit{
|
|
ContainerID: c.ID,
|
|
ID: p.ID(),
|
|
Pid: uint32(e.Pid),
|
|
ExitStatus: uint32(e.Status),
|
|
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
|
|
})
|
|
}
|
|
|
|
func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
|
|
p, err := container.Process("")
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
ps, err := p.(*process.Init).Runtime().Ps(ctx, container.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]uint32, 0, len(ps))
|
|
for _, pid := range ps {
|
|
pids = append(pids, uint32(pid))
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
|
|
ns, _ := namespaces.Namespace(ctx)
|
|
ctx = namespaces.WithNamespace(context.Background(), ns)
|
|
for e := range s.events {
|
|
err := publisher.Publish(ctx, runtime.GetTopic(e), e)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("post event")
|
|
}
|
|
}
|
|
publisher.Close()
|
|
}
|
|
|
|
func (s *service) getContainer(id string) (*runc.Container, error) {
|
|
s.mu.Lock()
|
|
container := s.containers[id]
|
|
s.mu.Unlock()
|
|
if container == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
|
|
}
|
|
return container, nil
|
|
}
|
|
|
|
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
|
// only be called once.
|
|
func (s *service) initPlatform() error {
|
|
if s.platform != nil {
|
|
return nil
|
|
}
|
|
p, err := runc.NewPlatform()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.platform = p
|
|
s.shutdown.RegisterCallback(func(context.Context) error { return s.platform.Close() })
|
|
return nil
|
|
}
|