Move shim process code to package

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-11-09 11:17:53 -05:00
parent 92ca22c997
commit 6e25898ff0
11 changed files with 155 additions and 132 deletions

View File

@@ -1,50 +0,0 @@
// +build !windows
package shim
import (
"context"
"github.com/containerd/console"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/pkg/errors"
)
type deletedState struct {
}
func (s *deletedState) Pause(ctx context.Context) error {
return errors.Errorf("cannot pause a deleted process")
}
func (s *deletedState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume a deleted process")
}
func (s *deletedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
return errors.Errorf("cannot update a deleted process")
}
func (s *deletedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error {
return errors.Errorf("cannot checkpoint a deleted process")
}
func (s *deletedState) Resize(ws console.WinSize) error {
return errors.Errorf("cannot resize a deleted process")
}
func (s *deletedState) Start(ctx context.Context) error {
return errors.Errorf("cannot start a deleted process")
}
func (s *deletedState) Delete(ctx context.Context) error {
return errors.Errorf("cannot delete a deleted process")
}
func (s *deletedState) Kill(ctx context.Context, sig uint32, all bool) error {
return errors.Errorf("cannot kill a deleted process")
}
func (s *deletedState) SetExited(status int) {
// no op
}

View File

@@ -1,233 +0,0 @@
// +build !windows
package shim
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"golang.org/x/sys/unix"
"github.com/containerd/console"
"github.com/containerd/containerd/identifiers"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
type execProcess struct {
wg sync.WaitGroup
processState
mu sync.Mutex
id string
console console.Console
io runc.IO
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio stdio
path string
spec specs.Process
parent *initProcess
waitBlock chan struct{}
}
func newExecProcess(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *initProcess, id string) (process, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal
e := &execProcess{
id: id,
path: path,
parent: parent,
spec: spec,
stdio: stdio{
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
},
waitBlock: make(chan struct{}),
}
e.processState = &execCreatedState{p: e}
return e, nil
}
func (e *execProcess) Wait() {
<-e.waitBlock
}
func (e *execProcess) ID() string {
return e.id
}
func (e *execProcess) Pid() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.pid
}
func (e *execProcess) ExitStatus() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.status
}
func (e *execProcess) ExitedAt() time.Time {
e.mu.Lock()
defer e.mu.Unlock()
return e.exited
}
func (e *execProcess) setExited(status int) {
e.status = status
e.exited = time.Now()
e.parent.platform.shutdownConsole(context.Background(), e.console)
close(e.waitBlock)
}
func (e *execProcess) delete(ctx context.Context) error {
e.wg.Wait()
if e.io != nil {
for _, c := range e.closers {
c.Close()
}
e.io.Close()
}
pidfile := filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
// silently ignore error
os.Remove(pidfile)
return nil
}
func (e *execProcess) resize(ws console.WinSize) error {
if e.console == nil {
return nil
}
return e.console.Resize(ws)
}
func (e *execProcess) kill(ctx context.Context, sig uint32, _ bool) error {
pid := e.pid
if pid != 0 {
if err := unix.Kill(pid, syscall.Signal(sig)); err != nil {
return errors.Wrapf(checkKillError(err), "exec kill error")
}
}
return nil
}
func (e *execProcess) Stdin() io.Closer {
return e.stdin
}
func (e *execProcess) Stdio() stdio {
return e.stdio
}
func (e *execProcess) start(ctx context.Context) (err error) {
var (
socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
)
if e.stdio.terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket")
}
defer socket.Close()
} else if e.stdio.isNull() {
if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID); err != nil {
return errors.Wrap(err, "failed to create runc io pipes")
}
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: e.io,
Detach: true,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil {
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.stdin != "" {
sc, err := fifo.OpenFifo(ctx, e.stdio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.stdin)
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
if e.console, err = e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else if !e.stdio.isNull() {
if err := copyPipes(ctx, e.io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
e.pid = pid
return nil
}
func (e *execProcess) Status(ctx context.Context) (string, error) {
s, err := e.parent.Status(ctx)
if err != nil {
return "", err
}
// if the container as a whole is in the pausing/paused state, so are all
// other processes inside the container, use container state here
switch s {
case "paused", "pausing":
return s, nil
}
e.mu.Lock()
defer e.mu.Unlock()
// if we don't have a pid then the exec process has just been created
if e.pid == 0 {
return "created", nil
}
// if we have a pid and it can be signaled, the process is running
if err := unix.Kill(e.pid, 0); err == nil {
return "running", nil
}
// else if we have a pid but it can nolonger be signaled, it has stopped
return "stopped", nil
}

View File

@@ -1,172 +0,0 @@
// +build !windows
package shim
import (
"context"
"github.com/containerd/console"
"github.com/pkg/errors"
)
type execCreatedState struct {
p *execProcess
}
func (s *execCreatedState) transition(name string) error {
switch name {
case "running":
s.p.processState = &execRunningState{p: s.p}
case "stopped":
s.p.processState = &execStoppedState{p: s.p}
case "deleted":
s.p.processState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execCreatedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *execCreatedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.start(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *execCreatedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *execCreatedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execCreatedState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type execRunningState struct {
p *execProcess
}
func (s *execRunningState) transition(name string) error {
switch name {
case "stopped":
s.p.processState = &execStoppedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execRunningState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *execRunningState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a running process")
}
func (s *execRunningState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a running process")
}
func (s *execRunningState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execRunningState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type execStoppedState struct {
p *execProcess
}
func (s *execStoppedState) transition(name string) error {
switch name {
case "deleted":
s.p.processState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execStoppedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resize a stopped container")
}
func (s *execStoppedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a stopped process")
}
func (s *execStoppedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *execStoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execStoppedState) SetExited(status int) {
// no op
}

View File

@@ -1,393 +0,0 @@
// +build !windows
package shim
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/console"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/linux/runctypes"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
// InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid"
type initProcess struct {
wg sync.WaitGroup
initState
// mu is used to ensure that `Start()` and `Exited()` calls return in
// the right order when invoked in separate go routines.
// This is the case within the shim implementation as it makes use of
// the reaper interface.
mu sync.Mutex
waitBlock chan struct{}
workDir string
id string
bundle string
console console.Console
platform platform
io runc.IO
runtime *runc.Runc
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio stdio
rootfs string
IoUID int
IoGID int
}
func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskRequest) (*initProcess, error) {
var success bool
if err := identifiers.Validate(r.ID); err != nil {
return nil, errors.Wrapf(err, "invalid task id")
}
var options runctypes.CreateOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
options = *v.(*runctypes.CreateOptions)
}
rootfs := filepath.Join(s.config.Path, "rootfs")
// count the number of successful mounts so we can undo
// what was actually done rather than what should have been
// done.
defer func() {
if success {
return
}
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
log.G(context).WithError(err2).Warn("Failed to cleanup rootfs mount")
}
}()
for _, rm := range r.Rootfs {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
root := s.config.RuntimeRoot
if root == "" {
root = RuncRoot
}
runtime := &runc.Runc{
Command: r.Runtime,
Log: filepath.Join(s.config.Path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(root, s.config.Namespace),
Criu: s.config.Criu,
SystemdCgroup: s.config.SystemdCgroup,
}
p := &initProcess{
id: r.ID,
bundle: r.Bundle,
runtime: runtime,
platform: s.platform,
stdio: stdio{
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
},
rootfs: rootfs,
workDir: s.config.WorkDir,
status: 0,
waitBlock: make(chan struct{}),
IoUID: int(options.IoUid),
IoGID: int(options.IoGid),
}
p.initState = &createdState{p: p}
var (
err error
socket *runc.Socket
)
if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return nil, errors.Wrap(err, "failed to create OCI runtime console socket")
}
defer socket.Close()
} else if hasNoIO(r) {
if p.io, err = runc.NewNullIO(); err != nil {
return nil, errors.Wrap(err, "creating new NULL IO")
}
} else {
if p.io, err = runc.NewPipeIO(int(options.IoUid), int(options.IoGid)); err != nil {
return nil, errors.Wrap(err, "failed to create OCI runtime io pipes")
}
}
pidFile := filepath.Join(s.config.Path, InitPidFile)
if r.Checkpoint != "" {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.workDir,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
success = true
return p, nil
}
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
NoNewKeyring: options.NoNewKeyring,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := p.runtime.Create(context, r.ID, r.Bundle, opts); err != nil {
return nil, p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
console, err = s.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !hasNoIO(r) {
if err := copyPipes(context, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
success = true
return p, nil
}
func (p *initProcess) Wait() {
<-p.waitBlock
}
func (p *initProcess) ID() string {
return p.id
}
func (p *initProcess) Pid() int {
return p.pid
}
func (p *initProcess) ExitStatus() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.status
}
func (p *initProcess) ExitedAt() time.Time {
p.mu.Lock()
defer p.mu.Unlock()
return p.exited
}
func (p *initProcess) Status(ctx context.Context) (string, error) {
p.mu.Lock()
defer p.mu.Unlock()
c, err := p.runtime.State(ctx, p.id)
if err != nil {
if os.IsNotExist(err) {
return "stopped", nil
}
return "", p.runtimeError(err, "OCI runtime state failed")
}
return c.Status, nil
}
func (p *initProcess) start(context context.Context) error {
err := p.runtime.Start(context, p.id)
return p.runtimeError(err, "OCI runtime start failed")
}
func (p *initProcess) setExited(status int) {
p.exited = time.Now()
p.status = status
p.platform.shutdownConsole(context.Background(), p.console)
close(p.waitBlock)
}
func (p *initProcess) delete(context context.Context) error {
p.killAll(context)
p.wg.Wait()
err := p.runtime.Delete(context, p.id, nil)
// ignore errors if a runtime has already deleted the process
// but we still hold metadata and pipes
//
// this is common during a checkpoint, runc will delete the container state
// after a checkpoint and the container will no longer exist within runc
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
err = nil
} else {
err = p.runtimeError(err, "failed to delete task")
}
}
if p.io != nil {
for _, c := range p.closers {
c.Close()
}
p.io.Close()
}
if err2 := mount.UnmountAll(p.rootfs, 0); err2 != nil {
log.G(context).WithError(err2).Warn("failed to cleanup rootfs mount")
if err == nil {
err = errors.Wrap(err2, "failed rootfs umount")
}
}
return err
}
func (p *initProcess) resize(ws console.WinSize) error {
if p.console == nil {
return nil
}
return p.console.Resize(ws)
}
func (p *initProcess) pause(context context.Context) error {
err := p.runtime.Pause(context, p.id)
return p.runtimeError(err, "OCI runtime pause failed")
}
func (p *initProcess) resume(context context.Context) error {
err := p.runtime.Resume(context, p.id)
return p.runtimeError(err, "OCI runtime resume failed")
}
func (p *initProcess) kill(context context.Context, signal uint32, all bool) error {
err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{
All: all,
})
return checkKillError(err)
}
func (p *initProcess) killAll(context context.Context) error {
err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{
All: true,
})
return p.runtimeError(err, "OCI runtime killall failed")
}
func (p *initProcess) Stdin() io.Closer {
return p.stdin
}
func (p *initProcess) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error {
var options runctypes.CheckpointOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return err
}
options = *v.(*runctypes.CheckpointOptions)
}
var actions []runc.CheckpointAction
if !options.Exit {
actions = append(actions, runc.LeaveRunning)
}
work := filepath.Join(p.workDir, "criu-work")
defer os.RemoveAll(work)
if err := p.runtime.Checkpoint(context, p.id, &runc.CheckpointOpts{
WorkDir: work,
ImagePath: r.Path,
AllowOpenTCP: options.OpenTcp,
AllowExternalUnixSockets: options.ExternalUnixSockets,
AllowTerminal: options.Terminal,
FileLocks: options.FileLocks,
EmptyNamespaces: options.EmptyNamespaces,
}, actions...); err != nil {
dumpLog := filepath.Join(p.bundle, "criu-dump.log")
if cerr := copyFile(dumpLog, filepath.Join(work, "dump.log")); cerr != nil {
log.G(context).Error(err)
}
return fmt.Errorf("%s path= %s", criuError(err), dumpLog)
}
return nil
}
func (p *initProcess) update(context context.Context, r *shimapi.UpdateTaskRequest) error {
var resources specs.LinuxResources
if err := json.Unmarshal(r.Resources.Value, &resources); err != nil {
return err
}
return p.runtime.Update(context, p.id, &resources)
}
func (p *initProcess) Stdio() stdio {
return p.stdio
}
func (p *initProcess) runtimeError(rErr error, msg string) error {
if rErr == nil {
return nil
}
rMsg, err := getLastRuntimeError(p.runtime)
switch {
case err != nil:
return errors.Wrapf(rErr, "%s: %s (%s)", msg, "unable to retrieve OCI runtime error", err.Error())
case rMsg == "":
return errors.Wrap(rErr, msg)
default:
return errors.Errorf("%s: %s", msg, rMsg)
}
}

View File

@@ -1,473 +0,0 @@
// +build !windows
package shim
import (
"context"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
type initState interface {
processState
Pause(context.Context) error
Resume(context.Context) error
Update(context.Context, *shimapi.UpdateTaskRequest) error
Checkpoint(context.Context, *shimapi.CheckpointTaskRequest) error
}
type createdState struct {
p *initProcess
}
func (s *createdState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *createdState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause task in created state")
}
func (s *createdState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume task in created state")
}
func (s *createdState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *createdState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a task in created state")
}
func (s *createdState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *createdState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.start(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *createdState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *createdState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *createdState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type createdCheckpointState struct {
p *initProcess
opts *runc.RestoreOpts
}
func (s *createdCheckpointState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *createdCheckpointState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause task in created state")
}
func (s *createdCheckpointState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume task in created state")
}
func (s *createdCheckpointState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *createdCheckpointState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a task in created state")
}
func (s *createdCheckpointState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *createdCheckpointState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
p := s.p
if _, err := s.p.runtime.Restore(ctx, p.id, p.bundle, s.opts); err != nil {
return p.runtimeError(err, "OCI runtime restore failed")
}
sio := p.stdio
if sio.stdin != "" {
sc, err := fifo.OpenFifo(ctx, sio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if !sio.isNull() {
if err := copyPipes(ctx, p.io, sio.stdin, sio.stdout, sio.stderr, &p.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(s.opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
return s.transition("running")
}
func (s *createdCheckpointState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *createdCheckpointState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *createdCheckpointState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type runningState struct {
p *initProcess
}
func (s *runningState) transition(name string) error {
switch name {
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "paused":
s.p.initState = &pausedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *runningState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.pause(ctx); err != nil {
return err
}
return s.transition("paused")
}
func (s *runningState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume a running process")
}
func (s *runningState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *runningState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.checkpoint(ctx, r)
}
func (s *runningState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *runningState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a running process")
}
func (s *runningState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a running process")
}
func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *runningState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type pausedState struct {
p *initProcess
}
func (s *pausedState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *pausedState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause a paused container")
}
func (s *pausedState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.resume(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *pausedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *pausedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.checkpoint(ctx, r)
}
func (s *pausedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *pausedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a paused process")
}
func (s *pausedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a paused process")
}
func (s *pausedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *pausedState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type stoppedState struct {
p *initProcess
}
func (s *stoppedState) transition(name string) error {
switch name {
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *stoppedState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause a stopped container")
}
func (s *stoppedState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume a stopped container")
}
func (s *stoppedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot update a stopped container")
}
func (s *stoppedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a stopped container")
}
func (s *stoppedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resize a stopped container")
}
func (s *stoppedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a stopped process")
}
func (s *stoppedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {
// no op
}

View File

@@ -1,101 +0,0 @@
// +build !windows
package shim
import (
"context"
"fmt"
"io"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
)
func copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return err
}
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(console, in)
}()
}
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outw, console)
console.Close()
outr.Close()
outw.Close()
wg.Done()
}()
return nil
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
for name, dest := range map[string]func(wc io.WriteCloser, rc io.Closer){
stdout: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(wc, rio.Stdout())
wg.Done()
wc.Close()
rc.Close()
}()
},
stderr: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(wc, rio.Stderr())
wg.Done()
wc.Close()
rc.Close()
}()
},
} {
fw, err := fifo.OpenFifo(ctx, name, syscall.O_WRONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", name, err)
}
fr, err := fifo.OpenFifo(ctx, name, syscall.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", name, err)
}
dest(fw, fr)
}
if stdin == "" {
rio.Stdin().Close()
return nil
}
f, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
}
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(rio.Stdin(), f)
rio.Stdin().Close()
f.Close()
}()
return nil
}

View File

@@ -1,73 +0,0 @@
// +build !windows
package shim
import (
"context"
"io"
"time"
"github.com/containerd/console"
"github.com/pkg/errors"
)
type stdio struct {
stdin string
stdout string
stderr string
terminal bool
}
func (s stdio) isNull() bool {
return s.stdin == "" && s.stdout == "" && s.stderr == ""
}
type process interface {
processState
// ID returns the id for the process
ID() string
// Pid returns the pid for the process
Pid() int
// ExitStatus returns the exit status
ExitStatus() int
// ExitedAt is the time the process exited
ExitedAt() time.Time
// Stdin returns the process STDIN
Stdin() io.Closer
// Stdio returns io information for the container
Stdio() stdio
// Status returns the process status
Status(context.Context) (string, error)
// Wait blocks until the process has exited
Wait()
}
type processState interface {
// Resize resizes the process console
Resize(ws console.WinSize) error
// Start execution of the process
Start(context.Context) error
// Delete deletes the process and its resourcess
Delete(context.Context) error
// Kill kills the process
Kill(context.Context, uint32, bool) error
// SetExited sets the exit status for the process
SetExited(status int)
}
func stateName(v interface{}) string {
switch v.(type) {
case *runningState, *execRunningState:
return "running"
case *createdState, *execCreatedState, *createdCheckpointState:
return "created"
case *pausedState:
return "paused"
case *deletedState:
return "deleted"
case *stoppedState:
return "stopped"
}
panic(errors.Errorf("invalid state %v", v))
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/linux/proc"
"github.com/containerd/containerd/linux/runctypes"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
@@ -31,9 +32,6 @@ import (
var empty = &google_protobuf.Empty{}
// RuncRoot is the path to the root runc state directory
const RuncRoot = "/run/containerd/runc"
// Config contains shim specific configuration
type Config struct {
Path string
@@ -58,7 +56,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
s := &Service{
config: config,
context: context,
processes: make(map[string]process),
processes: make(map[string]proc.Process),
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
}
@@ -70,23 +68,15 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
return s, nil
}
// platform handles platform-specific behavior that may differs across
// platform implementations
type platform interface {
copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error)
shutdownConsole(ctx context.Context, console console.Console) error
close() error
}
// Service is the shim implementation of a remote shim over GRPC
type Service struct {
mu sync.Mutex
config Config
context context.Context
processes map[string]process
processes map[string]proc.Process
events chan interface{}
platform platform
platform proc.Platform
ec chan runc.Exit
// Filled by Create()
@@ -98,7 +88,17 @@ type Service struct {
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
process, err := s.newInitProcess(ctx, r)
process, err := proc.New(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.Criu,
s.config.SystemdCgroup,
s.platform,
r,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
@@ -168,7 +168,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
return nil, err
}
delete(s.processes, s.id)
s.platform.close()
s.platform.Close()
s.events <- &eventsapi.TaskDelete{
ContainerID: s.id,
ExitStatus: uint32(p.ExitStatus()),
@@ -218,7 +218,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
process, err := newExecProcess(ctx, s.config.Path, r, p.(*initProcess), r.ID)
process, err := proc.NewExec(ctx, s.config.Path, r, p.(*proc.Init), r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
@@ -283,10 +283,10 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
Bundle: s.bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.stdin,
Stdout: sio.stdout,
Stderr: sio.stderr,
Terminal: sio.terminal,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
@@ -300,7 +300,7 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*initProcess).Pause(ctx); err != nil {
if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err
}
s.events <- &eventsapi.TaskPaused{
@@ -317,7 +317,7 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*initProcess).Resume(ctx); err != nil {
if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err
}
s.events <- &eventsapi.TaskResumed{
@@ -406,7 +406,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*initProcess).Checkpoint(ctx, r); err != nil {
if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.events <- &eventsapi.TaskCheckpointed{
@@ -430,7 +430,7 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*go
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*initProcess).Update(ctx, r); err != nil {
if err := p.(*proc.Init).Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
@@ -463,9 +463,9 @@ func (s *Service) checkProcesses(e runc.Exit) {
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
if ip, ok := p.(*initProcess); ok {
if ip, ok := p.(*proc.Init); ok {
// Ensure all children are killed
if err := ip.killAll(s.context); err != nil {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
@@ -491,7 +491,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
ps, err := p.(*initProcess).runtime.Ps(ctx, id)
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}

View File

@@ -1,6 +1,7 @@
package shim
import (
"context"
"io"
"sync"
"syscall"
@@ -8,14 +9,13 @@ import (
"github.com/containerd/console"
"github.com/containerd/fifo"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
@@ -58,7 +58,7 @@ func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console
return epollConsole, nil
}
func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil {
return errors.New("uninitialized epoller")
}
@@ -69,7 +69,7 @@ func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Consol
return epollConsole.Shutdown(p.epoller.CloseConsole)
}
func (p *linuxPlatform) close() error {
func (p *linuxPlatform) Close() error {
return p.epoller.Close()
}

View File

@@ -15,7 +15,7 @@ import (
type unixPlatform struct {
}
func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
@@ -48,11 +48,11 @@ func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console,
return console, nil
}
func (p *unixPlatform) shutdownConsole(ctx context.Context, cons console.Console) error {
func (p *unixPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
return nil
}
func (p *unixPlatform) close() error {
func (p *unixPlatform) Close() error {
return nil
}

View File

@@ -1,86 +0,0 @@
// +build !windows
package shim
import (
"encoding/json"
"io"
"os"
"strings"
"time"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
// TODO(mlaventure): move to runc package?
func getLastRuntimeError(r *runc.Runc) (string, error) {
if r.Log == "" {
return "", nil
}
f, err := os.OpenFile(r.Log, os.O_RDONLY, 0400)
if err != nil {
return "", err
}
var (
errMsg string
log struct {
Level string
Msg string
Time time.Time
}
)
dec := json.NewDecoder(f)
for err = nil; err == nil; {
if err = dec.Decode(&log); err != nil && err != io.EOF {
return "", err
}
if log.Level == "error" {
errMsg = strings.TrimSpace(log.Msg)
}
}
return errMsg, nil
}
// criuError returns only the first line of the error message from criu
// it tries to add an invalid dump log location when returning the message
func criuError(err error) string {
parts := strings.Split(err.Error(), "\n")
return parts[0]
}
func copyFile(to, from string) error {
ff, err := os.Open(from)
if err != nil {
return err
}
defer ff.Close()
tt, err := os.Create(to)
if err != nil {
return err
}
defer tt.Close()
_, err = io.Copy(tt, ff)
return err
}
func checkKillError(err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "os: process already finished") || err == unix.ESRCH {
return errors.Wrapf(errdefs.ErrNotFound, "process already finished")
}
return errors.Wrapf(err, "unknown error after kill")
}
func hasNoIO(r *shimapi.CreateTaskRequest) bool {
return r.Stdin == "" && r.Stdout == "" && r.Stderr == ""
}