Add runc.v2 multi-shim

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2019-02-08 15:01:15 -05:00
parent 6bcbf88f82
commit 84a24711e8
22 changed files with 984 additions and 98 deletions

View File

@@ -38,6 +38,7 @@ import (
"github.com/sirupsen/logrus"
)
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
@@ -130,6 +131,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
return container, nil
}
// ReadRuntime reads the runtime information from the path
func ReadRuntime(path string) (string, error) {
data, err := ioutil.ReadFile(filepath.Join(path, "runtime"))
if err != nil {
@@ -138,6 +140,7 @@ func ReadRuntime(path string) (string, error) {
return string(data), nil
}
// WriteRuntime writes the runtime information into the path
func WriteRuntime(path, runtime string) error {
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
@@ -168,10 +171,13 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro
return p, nil
}
// Container for operating on a runc container and its processes
type Container struct {
mu sync.Mutex
ID string
// ID of the container
ID string
// Bundle path
Bundle string
cgroup cgroups.Cgroup
@@ -179,6 +185,7 @@ type Container struct {
processes map[string]rproc.Process
}
// All processes in the container
func (c *Container) All() (o []rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -192,6 +199,7 @@ func (c *Container) All() (o []rproc.Process) {
return o
}
// ExecdProcesses added to the container
func (c *Container) ExecdProcesses() (o []rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -208,27 +216,38 @@ func (c *Container) Pid() int {
return c.process.Pid()
}
// Cgroup of the container
func (c *Container) Cgroup() cgroups.Cgroup {
c.mu.Lock()
defer c.mu.Unlock()
return c.cgroup
}
// CgroupSet sets the cgroup to the container
func (c *Container) CgroupSet(cg cgroups.Cgroup) {
c.mu.Lock()
c.cgroup = cg
c.mu.Unlock()
}
func (c *Container) Process(id string) rproc.Process {
// Process returns the process by id
func (c *Container) Process(id string) (rproc.Process, error) {
c.mu.Lock()
defer c.mu.Unlock()
if id == "" {
return c.process
if c.process == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
return c.process, nil
}
return c.processes[id]
p, ok := c.processes[id]
if !ok {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", id)
}
return p, nil
}
// ProcessExists returns true if the process by id exists
func (c *Container) ProcessExists(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
@@ -236,22 +255,25 @@ func (c *Container) ProcessExists(id string) bool {
return ok
}
// ProcessAdd adds a new process to the container
func (c *Container) ProcessAdd(process rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
c.processes[process.ID()] = process
}
// ProcessRemove removes the process by id from the container
func (c *Container) ProcessRemove(id string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.processes, id)
}
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) {
p := c.Process(r.ExecID)
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
@@ -266,10 +288,11 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Proc
return p, nil
}
// Delete the container or a process by id
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) {
p := c.Process(r.ExecID)
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Delete(ctx); err != nil {
return nil, err
@@ -280,6 +303,7 @@ func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Pr
return p, nil
}
// Exec an additional process
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) {
process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{
ID: r.ExecID,
@@ -296,18 +320,21 @@ func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc
return process, nil
}
// Pause the container
func (c *Container) Pause(ctx context.Context) error {
return c.process.(*proc.Init).Pause(ctx)
}
// Resume the container
func (c *Container) Resume(ctx context.Context) error {
return c.process.(*proc.Init).Resume(ctx)
}
// ResizePty of a process
func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
ws := console.WinSize{
Width: uint16(r.Width),
@@ -316,18 +343,20 @@ func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) err
return p.Resize(ws)
}
// Kill a process
func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
return p.Kill(ctx, r.Signal, r.All)
}
// CloseIO of a process
func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
p, err := c.Process(r.ExecID)
if err != nil {
return err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
@@ -337,10 +366,11 @@ func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
return nil
}
// Checkpoint the container
func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error {
p := c.Process("")
if p == nil {
return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
p, err := c.Process("")
if err != nil {
return err
}
var opts options.CheckpointOptions
if r.Options != nil {
@@ -362,10 +392,24 @@ func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskReques
})
}
// Update the resource information of a running container
func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error {
p := c.Process("")
if p == nil {
return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
p, err := c.Process("")
if err != nil {
return err
}
return p.(*proc.Init).Update(ctx, r.Resources)
}
// HasPid returns true if the container owns a specific pid
func (c *Container) HasPid(pid int) bool {
if c.Pid() == pid {
return true
}
for _, p := range c.All() {
if p.Pid() == pid {
return true
}
}
return false
}

View File

@@ -210,7 +210,6 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
return nil, err
}
s.id = r.ID
s.container = container
s.send(&eventstypes.TaskCreate{
@@ -251,14 +250,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
}
switch r.ExecID {
case "":
s.send(&eventstypes.TaskExecStarted{
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskStart{
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
@@ -284,7 +283,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
s.platform.Close()
}
s.send(&eventstypes.TaskDelete{
ContainerID: s.id,
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
@@ -487,9 +486,9 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa
if err != nil {
return nil, err
}
p := container.Process(r.ExecID)
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
@@ -552,14 +551,17 @@ func (s *service) sendL(evt interface{}) {
}
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
shouldKillAll, err := shouldKillAllOnExit(s.container.Bundle)
container, err := s.getContainer()
if err != nil {
return
}
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
for _, p := range s.container.All() {
for _, p := range container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*proc.Init); ok {
@@ -572,7 +574,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: s.id,
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
@@ -603,9 +605,9 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p := s.container.Process("")
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
p, err := s.container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil {
@@ -644,9 +646,9 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
if err != nil {
return nil, err
}
p := container.Process(execID)
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID)
p, err := container.Process(execID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return p, nil
}

View File

@@ -0,0 +1,726 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
runcC "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
var (
_ = (taskAPI.TaskService)(&service{})
empty = &ptypes.Empty{}
)
// group labels specifies how the shim groups services.
// currently supports a runc.v2 specific .group label and the
// standard k8s pod label. Order matters in this list
var groupLabels = []string{
"io.containerd.runc.v2.group",
"io.kubernetes.cri.sandbox-id",
}
type spec struct {
Annotations map[string]string `json:"annotations,omitempty"`
}
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
ep, err := runc.NewOOMEpoller(publisher)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
go ep.Run(ctx)
s := &service{
id: id,
context: ctx,
events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(),
ep: ep,
cancel: cancel,
containers: make(map[string]*runc.Container),
}
go s.processExits()
runcC.Monitor = shim.Default
if err := s.initPlatform(); err != nil {
cancel()
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(publisher)
return s, nil
}
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
events chan interface{}
platform rproc.Platform
ec chan runcC.Exit
ep *runc.Epoller
// id only used in cleanup case
id string
containers map[string]*runc.Container
cancel func()
}
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func readSpec() (*spec, error) {
f, err := os.Open("config.json")
if err != nil {
return nil, err
}
defer f.Close()
var s spec
if err := json.NewDecoder(f).Decode(&s); err != nil {
return nil, err
}
return &s, nil
}
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
if err != nil {
return "", err
}
grouping := id
spec, err := readSpec()
if err != nil {
return "", err
}
for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok {
grouping = groupID
break
}
}
address, err := shim.SocketAddress(ctx, grouping)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
if strings.Contains(err.Error(), "address already in use") {
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
return address, nil
}
return "", err
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return "", err
}
defer f.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if err := cmd.Start(); err != nil {
return "", err
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
if err := shim.SetScore(cmd.Process.Pid); err != nil {
return "", errors.Wrap(err, "failed to set OOM Score on shim")
}
return address, nil
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
path := filepath.Join(filepath.Dir(cwd), s.id)
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
}
r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false)
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
logrus.WithError(err).Warn("failed to remove runc container")
}
if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
}
return &taskAPI.DeleteResponse{
ExitedAt: time.Now(),
ExitStatus: 128 + uint32(unix.SIGKILL),
}, nil
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.containers[r.ID] = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
switch r.ExecID {
case "":
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// if we deleted our init task, close the platform and send the task delete event
if r.ExecID == "" {
s.mu.Lock()
delete(s.containers, r.ID)
hasContainers := len(s.containers) > 0
s.mu.Unlock()
if s.platform != nil && !hasContainers {
s.platform.Close()
}
s.send(&eventstypes.TaskDelete{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if container.ProcessExists(r.ExecID) {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: container.ID,
ExecID: process.ID(),
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, err
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.StatusUnknown
switch st {
case "created":
status = task.StatusCreated
case "running":
status = task.StatusRunning
case "stopped":
status = task.StatusStopped
case "paused":
status = task.StatusPaused
case "pausing":
status = task.StatusPausing
}
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
}
a, err := typeurl.MarshalAny(d)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &taskAPI.PidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if container, err := s.getContainer(r.ID); err == nil {
pid = container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(pid),
}, nil
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.mu.Lock()
// return out if the shim is still servicing containers
if len(s.containers) > 0 {
s.mu.Unlock()
return empty, nil
}
s.cancel()
os.Exit(0)
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
container, err := s.getContainer(r.ID)
if err != nil {
return nil, err
}
cg := container.Cgroup()
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
data, err := typeurl.MarshalAny(stats)
if err != nil {
return nil, err
}
return &taskAPI.StatsResponse{
Stats: data,
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, container := range s.containers {
if container.HasPid(e.Pid) {
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
for _, p := range container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*proc.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
})
return
}
}
return
}
}
}
func shouldKillAllOnExit(bundlePath string) (bool, error) {
var bundleSpec specs.Spec
bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json"))
if err != nil {
return false, err
}
json.Unmarshal(bundleConfigContents, &bundleSpec)
if bundleSpec.Linux != nil {
for _, ns := range bundleSpec.Linux.Namespaces {
if ns.Type == specs.PIDNamespace {
return false, nil
}
}
}
return true, nil
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
container, err := s.getContainer(id)
if err != nil {
return nil, err
}
p, err := container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *service) forward(publisher events.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel()
if err != nil {
logrus.WithError(err).Error("post event")
}
}
}
func (s *service) getContainer(id string) (*runc.Container, error) {
s.mu.Lock()
container := s.containers[id]
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
return container, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
p, err := runc.NewPlatform()
if err != nil {
return err
}
s.platform = p
return nil
}

View File

@@ -99,7 +99,9 @@ type shim struct {
}
func (s *shim) Connect(ctx context.Context) error {
response, err := s.task.Connect(ctx, &task.ConnectRequest{})
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return err
}
@@ -317,6 +319,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
request := &task.CheckpointTaskRequest{
ID: s.ID(),
Path: path,
Options: options,
}

View File

@@ -19,6 +19,7 @@ package shim
import (
"context"
"fmt"
"io/ioutil"
"net"
"os"
"os/exec"
@@ -150,3 +151,22 @@ func WriteAddress(path, address string) error {
}
return os.Rename(tempPath, path)
}
// ErrNoAddress is returned when the address file has no content
var ErrNoAddress = errors.New("no shim address")
// ReadAddress returns the shim's abstract socket address from the path
func ReadAddress(path string) (string, error) {
path, err := filepath.Abs(path)
if err != nil {
return "", err
}
data, err := ioutil.ReadFile(path)
if err != nil {
return "", err
}
if len(data) == 0 {
return "", ErrNoAddress
}
return string(data), nil
}