Move runc shim code into common package

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-02-07 15:40:30 -05:00
parent 31438b61f9
commit 6bcbf88f82
6 changed files with 608 additions and 385 deletions

View File

@ -19,10 +19,10 @@
package main
import (
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/v1"
"github.com/containerd/containerd/runtime/v2/shim"
)
func main() {
shim.Run("io.containerd.runc.v1", runc.New)
shim.Run("io.containerd.runc.v1", v1.New)
}

View File

@ -0,0 +1,371 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"io/ioutil"
"path/filepath"
"sync"
"github.com/containerd/cgroups"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, errors.Wrap(err, "create namespace")
}
var opts options.Options
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
opts = *v.(*options.Options)
}
var mounts []proc.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, proc.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
config := &proc.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
Rootfs: mounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
return nil, err
}
rootfs := filepath.Join(r.Bundle, "rootfs")
defer func() {
if err != nil {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
logrus.WithError(err2).Warn("failed to cleanup rootfs mount")
}
}
}()
for _, rm := range mounts {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
process, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
ns,
platform,
config,
&opts,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := process.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: process,
processes: make(map[string]rproc.Process),
}
pid := process.Pid()
if pid > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
}
container.cgroup = cg
}
return container, nil
}
func ReadRuntime(path string) (string, error) {
data, err := ioutil.ReadFile(filepath.Join(path, "runtime"))
if err != nil {
return "", err
}
return string(data), nil
}
func WriteRuntime(path, runtime string) error {
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform,
r *proc.CreateConfig, options *options.Options) (*proc.Init, error) {
rootfs := filepath.Join(path, "rootfs")
runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
p := proc.New(r.ID, runtime, rproc.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
}
type Container struct {
mu sync.Mutex
ID string
Bundle string
cgroup cgroups.Cgroup
process rproc.Process
processes map[string]rproc.Process
}
func (c *Container) All() (o []rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
if c.process != nil {
o = append(o, c.process)
}
return o
}
func (c *Container) ExecdProcesses() (o []rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
o = append(o, p)
}
return o
}
// Pid of the main process of a container
func (c *Container) Pid() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.process.Pid()
}
func (c *Container) Cgroup() cgroups.Cgroup {
c.mu.Lock()
defer c.mu.Unlock()
return c.cgroup
}
func (c *Container) CgroupSet(cg cgroups.Cgroup) {
c.mu.Lock()
c.cgroup = cg
c.mu.Unlock()
}
func (c *Container) Process(id string) rproc.Process {
c.mu.Lock()
defer c.mu.Unlock()
if id == "" {
return c.process
}
return c.processes[id]
}
func (c *Container) ProcessExists(id string) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.processes[id]
return ok
}
func (c *Container) ProcessAdd(process rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
c.processes[process.ID()] = process
}
func (c *Container) ProcessRemove(id string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.processes, id)
}
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) {
p := c.Process(r.ExecID)
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
}
if err := p.Start(ctx); err != nil {
return nil, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
}
c.cgroup = cg
}
return p, nil
}
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) {
p := c.Process(r.ExecID)
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
if r.ExecID != "" {
c.ProcessRemove(r.ExecID)
}
return p, nil
}
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) {
process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil {
return nil, err
}
c.ProcessAdd(process)
return process, nil
}
func (c *Container) Pause(ctx context.Context) error {
return c.process.(*proc.Init).Pause(ctx)
}
func (c *Container) Resume(ctx context.Context) error {
return c.process.(*proc.Init).Resume(ctx)
}
func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
return p.Resize(ws)
}
func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
}
return p.Kill(ctx, r.Signal, r.All)
}
func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
p := c.Process(r.ExecID)
if p == nil {
return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID)
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return errors.Wrap(err, "close stdin")
}
}
return nil
}
func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error {
p := c.Process("")
if p == nil {
return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
var opts options.CheckpointOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return err
}
opts = *v.(*options.CheckpointOptions)
}
return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
Path: r.Path,
Exit: opts.Exit,
AllowOpenTCP: opts.OpenTcp,
AllowExternalUnixSockets: opts.ExternalUnixSockets,
AllowTerminal: opts.Terminal,
FileLocks: opts.FileLocks,
EmptyNamespaces: opts.EmptyNamespaces,
WorkDir: opts.WorkPath,
})
}
func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error {
p := c.Process("")
if p == nil {
return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
return p.(*proc.Init).Update(ctx, r.Resources)
}

View File

@ -30,19 +30,22 @@ import (
"golang.org/x/sys/unix"
)
func newOOMEpoller(publisher events.Publisher) (*epoller, error) {
// NewOOMEpoller returns an epoll implementation that listens to OOM events
// from a container's cgroups.
func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}
return &epoller{
return &Epoller{
fd: fd,
publisher: publisher,
set: make(map[uintptr]*item),
}, nil
}
type epoller struct {
// Epoller implementation for handling OOM events from a container's cgroup
type Epoller struct {
mu sync.Mutex
fd int
@ -55,11 +58,13 @@ type item struct {
cg cgroups.Cgroup
}
func (e *epoller) Close() error {
// Close the epoll fd
func (e *Epoller) Close() error {
return unix.Close(e.fd)
}
func (e *epoller) run(ctx context.Context) {
// Run the epoll loop
func (e *Epoller) Run(ctx context.Context) {
var events [128]unix.EpollEvent
for {
select {
@ -81,7 +86,8 @@ func (e *epoller) run(ctx context.Context) {
}
}
func (e *epoller) add(id string, cg cgroups.Cgroup) error {
// Add the cgroup to the epoll monitor
func (e *Epoller) Add(id string, cg cgroups.Cgroup) error {
e.mu.Lock()
defer e.mu.Unlock()
fd, err := cg.OOMEventFD()
@ -99,7 +105,7 @@ func (e *epoller) add(id string, cg cgroups.Cgroup) error {
return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
}
func (e *epoller) process(ctx context.Context, fd uintptr) {
func (e *Epoller) process(ctx context.Context, fd uintptr) {
flush(fd)
e.mu.Lock()
i, ok := e.set[fd]

View File

@ -1,3 +1,5 @@
// +build linux
/*
Copyright The containerd Authors.
@ -23,10 +25,30 @@ import (
"syscall"
"github.com/containerd/console"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
// NewPlatform returns a linux platform for use with I/O operations
func NewPlatform() (rproc.Platform, error) {
epoller, err := console.NewEpoller()
if err != nil {
return nil, errors.Wrap(err, "failed to initialize epoller")
}
go epoller.Wait()
return &linuxPlatform{
epoller: epoller,
}, nil
}
type linuxPlatform struct {
epoller *console.Epoller
}
@ -69,9 +91,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
io.CopyBuffer(outw, epollConsole, *buf)
epollConsole.Close()
outr.Close()
outw.Close()
@ -94,20 +116,3 @@ func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Consol
func (p *linuxPlatform) Close() error {
return p.epoller.Close()
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
epoller, err := console.NewEpoller()
if err != nil {
return errors.Wrap(err, "failed to initialize epoller")
}
s.platform = &linuxPlatform{
epoller: epoller,
}
go epoller.Wait()
return nil
}

55
runtime/v2/runc/util.go Normal file
View File

@ -0,0 +1,55 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/runtime"
"github.com/sirupsen/logrus"
)
// GetTopic converts an event from an interface type to the specific
// event topic id
func GetTopic(e interface{}) string {
switch e.(type) {
case *events.TaskCreate:
return runtime.TaskCreateEventTopic
case *events.TaskStart:
return runtime.TaskStartEventTopic
case *events.TaskOOM:
return runtime.TaskOOMEventTopic
case *events.TaskExit:
return runtime.TaskExitEventTopic
case *events.TaskDelete:
return runtime.TaskDeleteEventTopic
case *events.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
case *events.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
case *events.TaskPaused:
return runtime.TaskPausedEventTopic
case *events.TaskResumed:
return runtime.TaskResumedEventTopic
case *events.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
logrus.Warnf("no topic for type %#v", e)
}
return runtime.TaskUnknownTopic
}

View File

@ -16,7 +16,7 @@
limitations under the License.
*/
package runc
package v1
import (
"context"
@ -30,7 +30,6 @@ import (
"time"
"github.com/containerd/cgroups"
"github.com/containerd/console"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
@ -38,9 +37,9 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
@ -54,29 +53,21 @@ import (
)
var (
_ = (taskAPI.TaskService)(&service{})
empty = &ptypes.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
)
var _ = (taskAPI.TaskService)(&service{})
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
ep, err := newOOMEpoller(publisher)
ep, err := runc.NewOOMEpoller(publisher)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
go ep.run(ctx)
go ep.Run(ctx)
s := &service{
id: id,
context: ctx,
processes: make(map[string]rproc.Process),
events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(),
ep: ep,
@ -98,16 +89,14 @@ type service struct {
eventSendMu sync.Mutex
context context.Context
task rproc.Process
processes map[string]rproc.Process
events chan interface{}
platform rproc.Platform
ec chan runcC.Exit
ep *epoller
ep *runc.Epoller
id string
bundle string
cg cgroups.Cgroup
container *runc.Container
cancel func()
}
@ -192,7 +181,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
if err != nil {
return nil, err
}
runtime, err := s.readRuntime(path)
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
}
@ -211,107 +200,18 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
}, nil
}
func (s *service) readRuntime(path string) (string, error) {
data, err := ioutil.ReadFile(filepath.Join(path, "runtime"))
if err != nil {
return "", err
}
return string(data), nil
}
func (s *service) writeRuntime(path, runtime string) error {
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, errors.Wrap(err, "create namespace")
}
var opts options.Options
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
opts = *v.(*options.Options)
}
var mounts []proc.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, proc.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
config := &proc.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
Rootfs: mounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil {
return nil, err
}
rootfs := filepath.Join(r.Bundle, "rootfs")
defer func() {
if err != nil {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
logrus.WithError(err2).Warn("failed to cleanup rootfs mount")
}
}
}()
for _, rm := range mounts {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
process, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
ns,
s.platform,
config,
&opts,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := process.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
if pid > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
}
s.cg = cg
}
s.task = process
s.container = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
@ -324,46 +224,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(pid),
Pid: uint32(container.Pid()),
}, nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer()
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
if err := p.Start(ctx); err != nil {
s.eventSendMu.Unlock()
return nil, err
}
// case for restore
if s.getCgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
p, err := container.Start(ctx, r)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
s.setCgroup(cg)
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
if r.ExecID != "" {
switch r.ExecID {
case "":
s.send(&eventstypes.TaskExecStarted{
ContainerID: s.id,
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
} else {
default:
s.send(&eventstypes.TaskStart{
ContainerID: s.id,
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
}
@ -375,23 +270,16 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer()
if err != nil {
return nil, err
}
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
isTask := r.ExecID == ""
if !isTask {
s.mu.Lock()
delete(s.processes, r.ExecID)
s.mu.Unlock()
}
if isTask {
// if we deleted our init task, close the platform and send the task delete event
if r.ExecID == "" {
if s.platform != nil {
s.platform.Close()
}
@ -411,33 +299,20 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
p := s.processes[r.ExecID]
s.mu.Unlock()
if p != nil {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if container.ProcessExists(r.ExecID) {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
p = s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
process, err := container.Exec(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
s.processes[r.ExecID] = process
s.mu.Unlock()
s.send(&eventstypes.TaskExecAdded{
ContainerID: s.id,
ContainerID: s.container.ID,
ExecID: process.ID(),
})
return empty, nil
@ -445,15 +320,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer()
if err != nil {
return nil, err
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
if err := p.Resize(ws); err != nil {
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
@ -485,7 +356,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Bundle: s.container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
@ -499,48 +370,41 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
s.mu.Lock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Pause(ctx); err != nil {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
p.ID(),
container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
s.mu.Lock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Resume(ctx); err != nil {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
p.ID(),
container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer()
if err != nil {
return nil, err
}
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
@ -548,6 +412,10 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
@ -557,7 +425,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range s.processes {
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
@ -579,54 +447,63 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
p, err := s.getProcess(r.ExecID)
container, err := s.getContainer()
if err != nil {
return nil, err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
var opts options.CheckpointOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
container, err := s.getContainer()
if err != nil {
return nil, err
}
opts = *v.(*options.CheckpointOptions)
}
if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
Path: r.Path,
Exit: opts.Exit,
AllowOpenTCP: opts.OpenTcp,
AllowExternalUnixSockets: opts.ExternalUnixSockets,
AllowTerminal: opts.Terminal,
FileLocks: opts.FileLocks,
EmptyNamespaces: opts.EmptyNamespaces,
WorkDir: opts.WorkPath,
}); err != nil {
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p := container.Process(r.ExecID)
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if s.task != nil {
pid = s.task.Pid()
if s.container != nil {
pid = s.container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
@ -641,7 +518,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
cg := s.getCgroup()
cg := s.container.Cgroup()
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
@ -658,37 +535,6 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
}, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
@ -706,12 +552,14 @@ func (s *service) sendL(evt interface{}) {
}
func (s *service) checkProcesses(e runcC.Exit) {
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
s.mu.Lock()
defer s.mu.Unlock()
shouldKillAll, err := shouldKillAllOnExit(s.container.Bundle)
if err != nil {
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
}
for _, p := range s.allProcesses() {
for _, p := range s.container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*proc.Init); ok {
@ -754,24 +602,8 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
return true, nil
}
func (s *service) allProcesses() (o []rproc.Process) {
s.mu.Lock()
defer s.mu.Unlock()
o = make([]rproc.Process, 0, len(s.processes)+1)
for _, p := range s.processes {
o = append(o, p)
}
if s.task != nil {
o = append(o, s.task)
}
return o
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
s.mu.Lock()
p := s.task
s.mu.Unlock()
p := s.container.Process("")
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -789,7 +621,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
func (s *service) forward(publisher events.Publisher) {
for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
err := publisher.Publish(ctx, getTopic(e), e)
err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel()
if err != nil {
logrus.WithError(err).Error("post event")
@ -797,84 +629,38 @@ func (s *service) forward(publisher events.Publisher) {
}
}
func (s *service) getProcess(execID string) (rproc.Process, error) {
func (s *service) getContainer() (*runc.Container, error) {
s.mu.Lock()
defer s.mu.Unlock()
if execID == "" {
return s.task, nil
container := s.container
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
p := s.processes[execID]
return container, nil
}
func (s *service) getProcess(execID string) (rproc.Process, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p := container.Process(execID)
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID)
}
return p, nil
}
func (s *service) getCgroup() cgroups.Cgroup {
s.mu.Lock()
defer s.mu.Unlock()
return s.cg
}
func (s *service) setCgroup(cg cgroups.Cgroup) {
s.mu.Lock()
s.cg = cg
s.mu.Unlock()
if err := s.ep.add(s.id, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
}
func getTopic(e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:
return runtime.TaskCreateEventTopic
case *eventstypes.TaskStart:
return runtime.TaskStartEventTopic
case *eventstypes.TaskOOM:
return runtime.TaskOOMEventTopic
case *eventstypes.TaskExit:
return runtime.TaskExitEventTopic
case *eventstypes.TaskDelete:
return runtime.TaskDeleteEventTopic
case *eventstypes.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
case *eventstypes.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
case *eventstypes.TaskPaused:
return runtime.TaskPausedEventTopic
case *eventstypes.TaskResumed:
return runtime.TaskResumedEventTopic
case *eventstypes.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
logrus.Warnf("no topic for type %#v", e)
p, err := runc.NewPlatform()
if err != nil {
return err
}
return runtime.TaskUnknownTopic
}
func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options) (*proc.Init, error) {
rootfs := filepath.Join(path, "rootfs")
runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
p := proc.New(r.ID, runtime, rproc.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
s.platform = p
return nil
}