
When the shim returns a plain error when a process does not exist, the server is unable to recognise its GRPC status code and assumes UnknownError. This is awkward for containerd client users as they are unable to recognise the actual reason for the error. When the shim returns a NotFound GRPC error, it is properly translated by the server and clients receive a proper NotFound error instead of Unknown Please note that we (CF Garden) would like to have the eventual fix backported to 1.4 as well. Co-authored-by: Danail Branekov <danailster@gmail.com> Signed-off-by: Danail Branekov <danailster@gmail.com> Signed-off-by: Georgi Sabev <georgethebeatle@gmail.com>
826 lines
21 KiB
Go
826 lines
21 KiB
Go
// +build linux
|
|
|
|
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package v2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/containerd/cgroups"
|
|
cgroupsv2 "github.com/containerd/cgroups/v2"
|
|
eventstypes "github.com/containerd/containerd/api/events"
|
|
"github.com/containerd/containerd/api/types/task"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/mount"
|
|
"github.com/containerd/containerd/namespaces"
|
|
"github.com/containerd/containerd/pkg/oom"
|
|
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
|
|
oomv2 "github.com/containerd/containerd/pkg/oom/v2"
|
|
"github.com/containerd/containerd/pkg/process"
|
|
"github.com/containerd/containerd/pkg/stdio"
|
|
"github.com/containerd/containerd/runtime/v2/runc"
|
|
"github.com/containerd/containerd/runtime/v2/runc/options"
|
|
"github.com/containerd/containerd/runtime/v2/shim"
|
|
taskAPI "github.com/containerd/containerd/runtime/v2/task"
|
|
"github.com/containerd/containerd/sys"
|
|
"github.com/containerd/containerd/sys/reaper"
|
|
runcC "github.com/containerd/go-runc"
|
|
"github.com/containerd/typeurl"
|
|
"github.com/gogo/protobuf/proto"
|
|
ptypes "github.com/gogo/protobuf/types"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
var (
|
|
_ = (taskAPI.TaskService)(&service{})
|
|
empty = &ptypes.Empty{}
|
|
)
|
|
|
|
// group labels specifies how the shim groups services.
|
|
// currently supports a runc.v2 specific .group label and the
|
|
// standard k8s pod label. Order matters in this list
|
|
var groupLabels = []string{
|
|
"io.containerd.runc.v2.group",
|
|
"io.kubernetes.cri.sandbox-id",
|
|
}
|
|
|
|
type spec struct {
|
|
Annotations map[string]string `json:"annotations,omitempty"`
|
|
}
|
|
|
|
// New returns a new shim service that can be used via GRPC
|
|
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
|
|
var (
|
|
ep oom.Watcher
|
|
err error
|
|
)
|
|
if cgroups.Mode() == cgroups.Unified {
|
|
ep, err = oomv2.New(publisher)
|
|
} else {
|
|
ep, err = oomv1.New(publisher)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go ep.Run(ctx)
|
|
s := &service{
|
|
id: id,
|
|
context: ctx,
|
|
events: make(chan interface{}, 128),
|
|
ec: reaper.Default.Subscribe(),
|
|
ep: ep,
|
|
cancel: shutdown,
|
|
containers: make(map[string]*runc.Container),
|
|
}
|
|
go s.processExits()
|
|
runcC.Monitor = reaper.Default
|
|
if err := s.initPlatform(); err != nil {
|
|
shutdown()
|
|
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
|
}
|
|
go s.forward(ctx, publisher)
|
|
|
|
if address, err := shim.ReadAddress("address"); err == nil {
|
|
s.shimAddress = address
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// service is the shim implementation of a remote shim over GRPC
|
|
type service struct {
|
|
mu sync.Mutex
|
|
eventSendMu sync.Mutex
|
|
|
|
context context.Context
|
|
events chan interface{}
|
|
platform stdio.Platform
|
|
ec chan runcC.Exit
|
|
ep oom.Watcher
|
|
|
|
// id only used in cleanup case
|
|
id string
|
|
|
|
containers map[string]*runc.Container
|
|
|
|
shimAddress string
|
|
cancel func()
|
|
}
|
|
|
|
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
|
|
ns, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
self, err := os.Executable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cwd, err := os.Getwd()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
args := []string{
|
|
"-namespace", ns,
|
|
"-id", id,
|
|
"-address", containerdAddress,
|
|
}
|
|
cmd := exec.Command(self, args...)
|
|
cmd.Dir = cwd
|
|
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{
|
|
Setpgid: true,
|
|
}
|
|
return cmd, nil
|
|
}
|
|
|
|
func readSpec() (*spec, error) {
|
|
f, err := os.Open("config.json")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
var s spec
|
|
if err := json.NewDecoder(f).Decode(&s); err != nil {
|
|
return nil, err
|
|
}
|
|
return &s, nil
|
|
}
|
|
|
|
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (_ string, retErr error) {
|
|
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
grouping := id
|
|
spec, err := readSpec()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, group := range groupLabels {
|
|
if groupID, ok := spec.Annotations[group]; ok {
|
|
grouping = groupID
|
|
break
|
|
}
|
|
}
|
|
address, err := shim.SocketAddress(ctx, containerdAddress, grouping)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
socket, err := shim.NewSocket(address)
|
|
if err != nil {
|
|
// the only time where this would happen is if there is a bug and the socket
|
|
// was not cleaned up in the cleanup method of the shim or we are using the
|
|
// grouping functionality where the new process should be run with the same
|
|
// shim as an existing container
|
|
if !shim.SocketEaddrinuse(err) {
|
|
return "", errors.Wrap(err, "create new shim socket")
|
|
}
|
|
if shim.CanConnect(address) {
|
|
if err := shim.WriteAddress("address", address); err != nil {
|
|
return "", errors.Wrap(err, "write existing socket for shim")
|
|
}
|
|
return address, nil
|
|
}
|
|
if err := shim.RemoveSocket(address); err != nil {
|
|
return "", errors.Wrap(err, "remove pre-existing socket")
|
|
}
|
|
if socket, err = shim.NewSocket(address); err != nil {
|
|
return "", errors.Wrap(err, "try create new shim socket 2x")
|
|
}
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
socket.Close()
|
|
_ = shim.RemoveSocket(address)
|
|
}
|
|
}()
|
|
f, err := socket.File()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
f.Close()
|
|
return "", err
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
cmd.Process.Kill()
|
|
}
|
|
}()
|
|
// make sure to wait after start
|
|
go cmd.Wait()
|
|
if err := shim.WriteAddress("address", address); err != nil {
|
|
return "", err
|
|
}
|
|
if data, err := ioutil.ReadAll(os.Stdin); err == nil {
|
|
if len(data) > 0 {
|
|
var any ptypes.Any
|
|
if err := proto.Unmarshal(data, &any); err != nil {
|
|
return "", err
|
|
}
|
|
v, err := typeurl.UnmarshalAny(&any)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if opts, ok := v.(*options.Options); ok {
|
|
if opts.ShimCgroup != "" {
|
|
if cgroups.Mode() == cgroups.Unified {
|
|
if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil {
|
|
return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup)
|
|
}
|
|
cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup)
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
|
|
}
|
|
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
|
|
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
|
|
}
|
|
} else {
|
|
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
|
|
}
|
|
if err := cg.Add(cgroups.Process{
|
|
Pid: cmd.Process.Pid,
|
|
}); err != nil {
|
|
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
|
|
return "", errors.Wrap(err, "failed to adjust OOM score for shim")
|
|
}
|
|
return address, nil
|
|
}
|
|
|
|
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
|
|
cwd, err := os.Getwd()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
path := filepath.Join(filepath.Dir(cwd), s.id)
|
|
ns, err := namespaces.NamespaceRequired(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
runtime, err := runc.ReadRuntime(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
opts, err := runc.ReadOptions(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
root := process.RuncRoot
|
|
if opts != nil && opts.Root != "" {
|
|
root = opts.Root
|
|
}
|
|
|
|
r := process.NewRunc(root, path, ns, runtime, "", false)
|
|
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
|
|
Force: true,
|
|
}); err != nil {
|
|
logrus.WithError(err).Warn("failed to remove runc container")
|
|
}
|
|
if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
|
|
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
|
|
}
|
|
return &taskAPI.DeleteResponse{
|
|
ExitedAt: time.Now(),
|
|
ExitStatus: 128 + uint32(unix.SIGKILL),
|
|
}, nil
|
|
}
|
|
|
|
// Create a new initial process and container with the underlying OCI runtime
|
|
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
container, err := runc.NewContainer(ctx, s.platform, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.containers[r.ID] = container
|
|
|
|
s.send(&eventstypes.TaskCreate{
|
|
ContainerID: r.ID,
|
|
Bundle: r.Bundle,
|
|
Rootfs: r.Rootfs,
|
|
IO: &eventstypes.TaskIO{
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Terminal: r.Terminal,
|
|
},
|
|
Checkpoint: r.Checkpoint,
|
|
Pid: uint32(container.Pid()),
|
|
})
|
|
|
|
return &taskAPI.CreateTaskResponse{
|
|
Pid: uint32(container.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Start a process
|
|
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// hold the send lock so that the start events are sent before any exit events in the error case
|
|
s.eventSendMu.Lock()
|
|
p, err := container.Start(ctx, r)
|
|
if err != nil {
|
|
s.eventSendMu.Unlock()
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
switch r.ExecID {
|
|
case "":
|
|
switch cg := container.Cgroup().(type) {
|
|
case cgroups.Cgroup:
|
|
if err := s.ep.Add(container.ID, cg); err != nil {
|
|
logrus.WithError(err).Error("add cg to OOM monitor")
|
|
}
|
|
case *cgroupsv2.Manager:
|
|
allControllers, err := cg.RootControllers()
|
|
if err != nil {
|
|
logrus.WithError(err).Error("failed to get root controllers")
|
|
} else {
|
|
if err := cg.ToggleControllers(allControllers, cgroupsv2.Enable); err != nil {
|
|
if sys.RunningInUserNS() {
|
|
logrus.WithError(err).Debugf("failed to enable controllers (%v)", allControllers)
|
|
} else {
|
|
logrus.WithError(err).Errorf("failed to enable controllers (%v)", allControllers)
|
|
}
|
|
}
|
|
}
|
|
if err := s.ep.Add(container.ID, cg); err != nil {
|
|
logrus.WithError(err).Error("add cg to OOM monitor")
|
|
}
|
|
}
|
|
|
|
s.send(&eventstypes.TaskStart{
|
|
ContainerID: container.ID,
|
|
Pid: uint32(p.Pid()),
|
|
})
|
|
default:
|
|
s.send(&eventstypes.TaskExecStarted{
|
|
ContainerID: container.ID,
|
|
ExecID: r.ExecID,
|
|
Pid: uint32(p.Pid()),
|
|
})
|
|
}
|
|
s.eventSendMu.Unlock()
|
|
return &taskAPI.StartResponse{
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Delete the initial process and container
|
|
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Delete(ctx, r)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
// if we deleted an init task, send the task delete event
|
|
if r.ExecID == "" {
|
|
s.mu.Lock()
|
|
delete(s.containers, r.ID)
|
|
s.mu.Unlock()
|
|
s.send(&eventstypes.TaskDelete{
|
|
ContainerID: container.ID,
|
|
Pid: uint32(p.Pid()),
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
})
|
|
}
|
|
return &taskAPI.DeleteResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
Pid: uint32(p.Pid()),
|
|
}, nil
|
|
}
|
|
|
|
// Exec an additional process inside the container
|
|
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ok, cancel := container.ReserveProcess(r.ExecID)
|
|
if !ok {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
|
}
|
|
process, err := container.Exec(ctx, r)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
|
|
s.send(&eventstypes.TaskExecAdded{
|
|
ContainerID: container.ID,
|
|
ExecID: process.ID(),
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// ResizePty of a process
|
|
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.ResizePty(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// State returns runtime state information for a process
|
|
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Process(r.ExecID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
st, err := p.Status(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
status := task.StatusUnknown
|
|
switch st {
|
|
case "created":
|
|
status = task.StatusCreated
|
|
case "running":
|
|
status = task.StatusRunning
|
|
case "stopped":
|
|
status = task.StatusStopped
|
|
case "paused":
|
|
status = task.StatusPaused
|
|
case "pausing":
|
|
status = task.StatusPausing
|
|
}
|
|
sio := p.Stdio()
|
|
return &taskAPI.StateResponse{
|
|
ID: p.ID(),
|
|
Bundle: container.Bundle,
|
|
Pid: uint32(p.Pid()),
|
|
Status: status,
|
|
Stdin: sio.Stdin,
|
|
Stdout: sio.Stdout,
|
|
Stderr: sio.Stderr,
|
|
Terminal: sio.Terminal,
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
}, nil
|
|
}
|
|
|
|
// Pause the container
|
|
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Pause(ctx); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
s.send(&eventstypes.TaskPaused{
|
|
ContainerID: container.ID,
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// Resume the container
|
|
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Resume(ctx); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
s.send(&eventstypes.TaskResumed{
|
|
ContainerID: container.ID,
|
|
})
|
|
return empty, nil
|
|
}
|
|
|
|
// Kill a process with the provided signal
|
|
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Kill(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Pids returns all pids inside the container
|
|
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids, err := s.getContainerPids(ctx, r.ID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
var processes []*task.ProcessInfo
|
|
for _, pid := range pids {
|
|
pInfo := task.ProcessInfo{
|
|
Pid: pid,
|
|
}
|
|
for _, p := range container.ExecdProcesses() {
|
|
if p.Pid() == int(pid) {
|
|
d := &options.ProcessDetails{
|
|
ExecID: p.ID(),
|
|
}
|
|
a, err := typeurl.MarshalAny(d)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
|
|
}
|
|
pInfo.Info = a
|
|
break
|
|
}
|
|
}
|
|
processes = append(processes, &pInfo)
|
|
}
|
|
return &taskAPI.PidsResponse{
|
|
Processes: processes,
|
|
}, nil
|
|
}
|
|
|
|
// CloseIO of a process
|
|
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.CloseIO(ctx, r); err != nil {
|
|
return nil, err
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Checkpoint the container
|
|
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Checkpoint(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Update a running container
|
|
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := container.Update(ctx, r); err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
// Wait for a process to exit
|
|
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Process(r.ExecID)
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
p.Wait()
|
|
|
|
return &taskAPI.WaitResponse{
|
|
ExitStatus: uint32(p.ExitStatus()),
|
|
ExitedAt: p.ExitedAt(),
|
|
}, nil
|
|
}
|
|
|
|
// Connect returns shim information such as the shim's pid
|
|
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
|
var pid int
|
|
if container, err := s.getContainer(r.ID); err == nil {
|
|
pid = container.Pid()
|
|
}
|
|
return &taskAPI.ConnectResponse{
|
|
ShimPid: uint32(os.Getpid()),
|
|
TaskPid: uint32(pid),
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
|
s.mu.Lock()
|
|
// return out if the shim is still servicing containers
|
|
if len(s.containers) > 0 {
|
|
s.mu.Unlock()
|
|
return empty, nil
|
|
}
|
|
s.cancel()
|
|
close(s.events)
|
|
|
|
if s.platform != nil {
|
|
s.platform.Close()
|
|
}
|
|
if s.shimAddress != "" {
|
|
_ = shim.RemoveSocket(s.shimAddress)
|
|
}
|
|
return empty, nil
|
|
}
|
|
|
|
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
|
container, err := s.getContainer(r.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cgx := container.Cgroup()
|
|
if cgx == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
|
}
|
|
var statsx interface{}
|
|
switch cg := cgx.(type) {
|
|
case cgroups.Cgroup:
|
|
stats, err := cg.Stat(cgroups.IgnoreNotExist)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
statsx = stats
|
|
case *cgroupsv2.Manager:
|
|
stats, err := cg.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
statsx = stats
|
|
default:
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
|
|
}
|
|
data, err := typeurl.MarshalAny(statsx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &taskAPI.StatsResponse{
|
|
Stats: data,
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) processExits() {
|
|
for e := range s.ec {
|
|
s.checkProcesses(e)
|
|
}
|
|
}
|
|
|
|
func (s *service) send(evt interface{}) {
|
|
s.events <- evt
|
|
}
|
|
|
|
func (s *service) sendL(evt interface{}) {
|
|
s.eventSendMu.Lock()
|
|
s.events <- evt
|
|
s.eventSendMu.Unlock()
|
|
}
|
|
|
|
func (s *service) checkProcesses(e runcC.Exit) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for _, container := range s.containers {
|
|
if !container.HasPid(e.Pid) {
|
|
continue
|
|
}
|
|
|
|
for _, p := range container.All() {
|
|
if p.Pid() != e.Pid {
|
|
continue
|
|
}
|
|
|
|
if ip, ok := p.(*process.Init); ok {
|
|
// Ensure all children are killed
|
|
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
|
|
if err := ip.KillAll(s.context); err != nil {
|
|
logrus.WithError(err).WithField("id", ip.ID()).
|
|
Error("failed to kill init's children")
|
|
}
|
|
}
|
|
}
|
|
|
|
p.SetExited(e.Status)
|
|
s.sendL(&eventstypes.TaskExit{
|
|
ContainerID: container.ID,
|
|
ID: p.ID(),
|
|
Pid: uint32(e.Pid),
|
|
ExitStatus: uint32(e.Status),
|
|
ExitedAt: p.ExitedAt(),
|
|
})
|
|
return
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
|
container, err := s.getContainer(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p, err := container.Process("")
|
|
if err != nil {
|
|
return nil, errdefs.ToGRPC(err)
|
|
}
|
|
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pids := make([]uint32, 0, len(ps))
|
|
for _, pid := range ps {
|
|
pids = append(pids, uint32(pid))
|
|
}
|
|
return pids, nil
|
|
}
|
|
|
|
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
|
|
ns, _ := namespaces.Namespace(ctx)
|
|
ctx = namespaces.WithNamespace(context.Background(), ns)
|
|
for e := range s.events {
|
|
err := publisher.Publish(ctx, runc.GetTopic(e), e)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("post event")
|
|
}
|
|
}
|
|
publisher.Close()
|
|
}
|
|
|
|
func (s *service) getContainer(id string) (*runc.Container, error) {
|
|
s.mu.Lock()
|
|
container := s.containers[id]
|
|
s.mu.Unlock()
|
|
if container == nil {
|
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
|
|
}
|
|
return container, nil
|
|
}
|
|
|
|
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
|
// only be called once.
|
|
func (s *service) initPlatform() error {
|
|
if s.platform != nil {
|
|
return nil
|
|
}
|
|
p, err := runc.NewPlatform()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.platform = p
|
|
return nil
|
|
}
|