Merge pull request #1739 from crosbymichael/shim-redux

Move shim process code into subpackage
This commit is contained in:
Phil Estes 2017-11-14 10:20:28 -05:00 committed by GitHub
commit 37ee054e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 435 additions and 258 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/containerd/containerd/dialer" "github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/linux/proc"
"github.com/containerd/containerd/linux/shim" "github.com/containerd/containerd/linux/shim"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/reaper" "github.com/containerd/containerd/reaper"
@ -56,7 +57,7 @@ func init() {
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data") flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data")
flag.StringVar(&runtimeRootFlag, "runtime-root", shim.RuncRoot, "root directory for the runtime") flag.StringVar(&runtimeRootFlag, "runtime-root", proc.RuncRoot, "root directory for the runtime")
flag.StringVar(&criuFlag, "criu", "", "path to criu binary") flag.StringVar(&criuFlag, "criu", "", "path to criu binary")
flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup") flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup")
flag.Parse() flag.Parse()

66
debug/debug.go Normal file
View File

@ -0,0 +1,66 @@
package debug
import (
"bufio"
"fmt"
"os"
"sort"
"strconv"
"strings"
)
// Smaps prints the smaps to a file
func Smaps(note, file string) error {
smaps, err := getMaps(os.Getpid())
if err != nil {
return err
}
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return err
}
defer f.Close()
fmt.Fprintf(f, "%s: rss %d\n", note, smaps["rss"])
fmt.Fprintf(f, "%s: pss %d\n", note, smaps["pss"])
return nil
}
func getMaps(pid int) (map[string]int, error) {
f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid))
if err != nil {
return nil, err
}
defer f.Close()
var (
smaps = make(map[string]int)
s = bufio.NewScanner(f)
)
for s.Scan() {
var (
fields = strings.Fields(s.Text())
name = fields[0]
)
name = strings.TrimSuffix(strings.ToLower(name), ":")
if len(fields) < 2 {
continue
}
n, err := strconv.Atoi(fields[1])
if err != nil {
continue
}
smaps[name] += n
}
if err := s.Err(); err != nil {
return nil, err
}
return smaps, nil
}
func keys(smaps map[string]int) []string {
var o []string
for k := range smaps {
o = append(o, k)
}
sort.Strings(o)
return o
}

View File

@ -4,7 +4,6 @@ import (
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@ -61,7 +60,7 @@ func FromGRPC(err error) error {
var cls error // divide these into error classes, becomes the cause var cls error // divide these into error classes, becomes the cause
switch grpc.Code(err) { switch code(err) {
case codes.InvalidArgument: case codes.InvalidArgument:
cls = ErrInvalidArgument cls = ErrInvalidArgument
case codes.AlreadyExists: case codes.AlreadyExists:
@ -94,7 +93,7 @@ func FromGRPC(err error) error {
// Effectively, we just remove the string of cls from the end of err if it // Effectively, we just remove the string of cls from the end of err if it
// appears there. // appears there.
func rebaseMessage(cls error, err error) string { func rebaseMessage(cls error, err error) string {
desc := grpc.ErrorDesc(err) desc := errDesc(err)
clss := cls.Error() clss := cls.Error()
if desc == clss { if desc == clss {
return "" return ""
@ -107,3 +106,17 @@ func isGRPCError(err error) bool {
_, ok := status.FromError(err) _, ok := status.FromError(err)
return ok return ok
} }
func code(err error) codes.Code {
if s, ok := status.FromError(err); ok {
return s.Code()
}
return codes.Unknown
}
func errDesc(err error) string {
if s, ok := status.FromError(err); ok {
return s.Message()
}
return err.Error()
}

View File

@ -1,12 +1,12 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
"github.com/containerd/console" "github.com/containerd/console"
shimapi "github.com/containerd/containerd/linux/shim/v1" google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -21,11 +21,11 @@ func (s *deletedState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume a deleted process") return errors.Errorf("cannot resume a deleted process")
} }
func (s *deletedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *deletedState) Update(context context.Context, r *google_protobuf.Any) error {
return errors.Errorf("cannot update a deleted process") return errors.Errorf("cannot update a deleted process")
} }
func (s *deletedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *deletedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
return errors.Errorf("cannot checkpoint a deleted process") return errors.Errorf("cannot checkpoint a deleted process")
} }

View File

@ -1,10 +1,9 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -16,8 +15,6 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/containerd/identifiers"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
@ -27,7 +24,7 @@ import (
type execProcess struct { type execProcess struct {
wg sync.WaitGroup wg sync.WaitGroup
processState State
mu sync.Mutex mu sync.Mutex
id string id string
@ -38,42 +35,14 @@ type execProcess struct {
pid int pid int
closers []io.Closer closers []io.Closer
stdin io.Closer stdin io.Closer
stdio stdio stdio Stdio
path string path string
spec specs.Process spec specs.Process
parent *initProcess parent *Init
waitBlock chan struct{} waitBlock chan struct{}
} }
func newExecProcess(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *initProcess, id string) (process, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal
e := &execProcess{
id: id,
path: path,
parent: parent,
spec: spec,
stdio: stdio{
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
},
waitBlock: make(chan struct{}),
}
e.processState = &execCreatedState{p: e}
return e, nil
}
func (e *execProcess) Wait() { func (e *execProcess) Wait() {
<-e.waitBlock <-e.waitBlock
} }
@ -103,7 +72,7 @@ func (e *execProcess) ExitedAt() time.Time {
func (e *execProcess) setExited(status int) { func (e *execProcess) setExited(status int) {
e.status = status e.status = status
e.exited = time.Now() e.exited = time.Now()
e.parent.platform.shutdownConsole(context.Background(), e.console) e.parent.platform.ShutdownConsole(context.Background(), e.console)
close(e.waitBlock) close(e.waitBlock)
} }
@ -142,7 +111,7 @@ func (e *execProcess) Stdin() io.Closer {
return e.stdin return e.stdin
} }
func (e *execProcess) Stdio() stdio { func (e *execProcess) Stdio() Stdio {
return e.stdio return e.stdio
} }
@ -151,12 +120,12 @@ func (e *execProcess) start(ctx context.Context) (err error) {
socket *runc.Socket socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
) )
if e.stdio.terminal { if e.stdio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil { if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket") return errors.Wrap(err, "failed to create runc console socket")
} }
defer socket.Close() defer socket.Close()
} else if e.stdio.isNull() { } else if e.stdio.IsNull() {
if e.io, err = runc.NewNullIO(); err != nil { if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO") return errors.Wrap(err, "creating new NULL IO")
} }
@ -176,10 +145,10 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil { if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil {
return e.parent.runtimeError(err, "OCI runtime exec failed") return e.parent.runtimeError(err, "OCI runtime exec failed")
} }
if e.stdio.stdin != "" { if e.stdio.Stdin != "" {
sc, err := fifo.OpenFifo(ctx, e.stdio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) sc, err := fifo.OpenFifo(ctx, e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.stdin) return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin)
} }
e.closers = append(e.closers, sc) e.closers = append(e.closers, sc)
e.stdin = sc e.stdin = sc
@ -190,11 +159,11 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil { if err != nil {
return errors.Wrap(err, "failed to retrieve console master") return errors.Wrap(err, "failed to retrieve console master")
} }
if e.console, err = e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil { if e.console, err = e.parent.platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start console copy") return errors.Wrap(err, "failed to start console copy")
} }
} else if !e.stdio.isNull() { } else if !e.stdio.IsNull() {
if err := copyPipes(ctx, e.io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil { if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy") return errors.Wrap(err, "failed to start io pipe copy")
} }
} }

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
@ -16,11 +16,11 @@ type execCreatedState struct {
func (s *execCreatedState) transition(name string) error { func (s *execCreatedState) transition(name string) error {
switch name { switch name {
case "running": case "running":
s.p.processState = &execRunningState{p: s.p} s.p.State = &execRunningState{p: s.p}
case "stopped": case "stopped":
s.p.processState = &execStoppedState{p: s.p} s.p.State = &execStoppedState{p: s.p}
case "deleted": case "deleted":
s.p.processState = &deletedState{} s.p.State = &deletedState{}
default: default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name) return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
} }
@ -77,7 +77,7 @@ type execRunningState struct {
func (s *execRunningState) transition(name string) error { func (s *execRunningState) transition(name string) error {
switch name { switch name {
case "stopped": case "stopped":
s.p.processState = &execStoppedState{p: s.p} s.p.State = &execStoppedState{p: s.p}
default: default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name) return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
} }
@ -130,7 +130,7 @@ type execStoppedState struct {
func (s *execStoppedState) transition(name string) error { func (s *execStoppedState) transition(name string) error {
switch name { switch name {
case "deleted": case "deleted":
s.p.processState = &deletedState{} s.p.State = &deletedState{}
default: default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name) return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
} }

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
@ -15,14 +15,13 @@ import (
"time" "time"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/linux/runctypes"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
google_protobuf "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -30,7 +29,8 @@ import (
// InitPidFile name of the file that contains the init pid // InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid" const InitPidFile = "init.pid"
type initProcess struct { // Init represents an initial process for a container
type Init struct {
wg sync.WaitGroup wg sync.WaitGroup
initState initState
@ -47,7 +47,7 @@ type initProcess struct {
id string id string
bundle string bundle string
console console.Console console console.Console
platform platform platform Platform
io runc.IO io runc.IO
runtime *runc.Runc runtime *runc.Runc
status int status int
@ -55,18 +55,32 @@ type initProcess struct {
pid int pid int
closers []io.Closer closers []io.Closer
stdin io.Closer stdin io.Closer
stdio stdio stdio Stdio
rootfs string rootfs string
IoUID int IoUID int
IoGID int IoGID int
} }
func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskRequest) (*initProcess, error) { // NewRunc returns a new runc instance for a process
func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc {
if root == "" {
root = RuncRoot
}
return &runc.Runc{
Command: runtime,
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(root, namespace),
Criu: criu,
SystemdCgroup: systemd,
}
}
// New returns a new init process
func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *CreateConfig) (*Init, error) {
var success bool var success bool
if err := identifiers.Validate(r.ID); err != nil {
return nil, errors.Wrapf(err, "invalid task id")
}
var options runctypes.CreateOptions var options runctypes.CreateOptions
if r.Options != nil { if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options) v, err := typeurl.UnmarshalAny(r.Options)
@ -76,7 +90,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
options = *v.(*runctypes.CreateOptions) options = *v.(*runctypes.CreateOptions)
} }
rootfs := filepath.Join(s.config.Path, "rootfs") rootfs := filepath.Join(path, "rootfs")
// count the number of successful mounts so we can undo // count the number of successful mounts so we can undo
// what was actually done rather than what should have been // what was actually done rather than what should have been
// done. // done.
@ -98,32 +112,20 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
} }
} }
root := s.config.RuntimeRoot runtime := NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup)
if root == "" { p := &Init{
root = RuncRoot
}
runtime := &runc.Runc{
Command: r.Runtime,
Log: filepath.Join(s.config.Path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(root, s.config.Namespace),
Criu: s.config.Criu,
SystemdCgroup: s.config.SystemdCgroup,
}
p := &initProcess{
id: r.ID, id: r.ID,
bundle: r.Bundle, bundle: r.Bundle,
runtime: runtime, runtime: runtime,
platform: s.platform, platform: platform,
stdio: stdio{ stdio: Stdio{
stdin: r.Stdin, Stdin: r.Stdin,
stdout: r.Stdout, Stdout: r.Stdout,
stderr: r.Stderr, Stderr: r.Stderr,
terminal: r.Terminal, Terminal: r.Terminal,
}, },
rootfs: rootfs, rootfs: rootfs,
workDir: s.config.WorkDir, workDir: workDir,
status: 0, status: 0,
waitBlock: make(chan struct{}), waitBlock: make(chan struct{}),
IoUID: int(options.IoUid), IoUID: int(options.IoUid),
@ -148,7 +150,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
return nil, errors.Wrap(err, "failed to create OCI runtime io pipes") return nil, errors.Wrap(err, "failed to create OCI runtime io pipes")
} }
} }
pidFile := filepath.Join(s.config.Path, InitPidFile) pidFile := filepath.Join(path, InitPidFile)
if r.Checkpoint != "" { if r.Checkpoint != "" {
opts := &runc.RestoreOpts{ opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{ CheckpointOpts: runc.CheckpointOpts{
@ -195,7 +197,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master") return nil, errors.Wrap(err, "failed to retrieve console master")
} }
console, err = s.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup) console, err = platform.CopyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to start console copy") return nil, errors.Wrap(err, "failed to start console copy")
} }
@ -216,31 +218,37 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
return p, nil return p, nil
} }
func (p *initProcess) Wait() { // Wait for the process to exit
func (p *Init) Wait() {
<-p.waitBlock <-p.waitBlock
} }
func (p *initProcess) ID() string { // ID of the process
func (p *Init) ID() string {
return p.id return p.id
} }
func (p *initProcess) Pid() int { // Pid of the process
func (p *Init) Pid() int {
return p.pid return p.pid
} }
func (p *initProcess) ExitStatus() int { // ExitStatus of the process
func (p *Init) ExitStatus() int {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
return p.status return p.status
} }
func (p *initProcess) ExitedAt() time.Time { // ExitedAt at time when the process exited
func (p *Init) ExitedAt() time.Time {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
return p.exited return p.exited
} }
func (p *initProcess) Status(ctx context.Context) (string, error) { // Status of the process
func (p *Init) Status(ctx context.Context) (string, error) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
c, err := p.runtime.State(ctx, p.id) c, err := p.runtime.State(ctx, p.id)
@ -253,20 +261,20 @@ func (p *initProcess) Status(ctx context.Context) (string, error) {
return c.Status, nil return c.Status, nil
} }
func (p *initProcess) start(context context.Context) error { func (p *Init) start(context context.Context) error {
err := p.runtime.Start(context, p.id) err := p.runtime.Start(context, p.id)
return p.runtimeError(err, "OCI runtime start failed") return p.runtimeError(err, "OCI runtime start failed")
} }
func (p *initProcess) setExited(status int) { func (p *Init) setExited(status int) {
p.exited = time.Now() p.exited = time.Now()
p.status = status p.status = status
p.platform.shutdownConsole(context.Background(), p.console) p.platform.ShutdownConsole(context.Background(), p.console)
close(p.waitBlock) close(p.waitBlock)
} }
func (p *initProcess) delete(context context.Context) error { func (p *Init) delete(context context.Context) error {
p.killAll(context) p.KillAll(context)
p.wg.Wait() p.wg.Wait()
err := p.runtime.Delete(context, p.id, nil) err := p.runtime.Delete(context, p.id, nil)
// ignore errors if a runtime has already deleted the process // ignore errors if a runtime has already deleted the process
@ -296,42 +304,75 @@ func (p *initProcess) delete(context context.Context) error {
return err return err
} }
func (p *initProcess) resize(ws console.WinSize) error { func (p *Init) resize(ws console.WinSize) error {
if p.console == nil { if p.console == nil {
return nil return nil
} }
return p.console.Resize(ws) return p.console.Resize(ws)
} }
func (p *initProcess) pause(context context.Context) error { func (p *Init) pause(context context.Context) error {
err := p.runtime.Pause(context, p.id) err := p.runtime.Pause(context, p.id)
return p.runtimeError(err, "OCI runtime pause failed") return p.runtimeError(err, "OCI runtime pause failed")
} }
func (p *initProcess) resume(context context.Context) error { func (p *Init) resume(context context.Context) error {
err := p.runtime.Resume(context, p.id) err := p.runtime.Resume(context, p.id)
return p.runtimeError(err, "OCI runtime resume failed") return p.runtimeError(err, "OCI runtime resume failed")
} }
func (p *initProcess) kill(context context.Context, signal uint32, all bool) error { func (p *Init) kill(context context.Context, signal uint32, all bool) error {
err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{ err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{
All: all, All: all,
}) })
return checkKillError(err) return checkKillError(err)
} }
func (p *initProcess) killAll(context context.Context) error { // KillAll processes belonging to the init process
func (p *Init) KillAll(context context.Context) error {
err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{
All: true, All: true,
}) })
return p.runtimeError(err, "OCI runtime killall failed") return p.runtimeError(err, "OCI runtime killall failed")
} }
func (p *initProcess) Stdin() io.Closer { // Stdin of the process
func (p *Init) Stdin() io.Closer {
return p.stdin return p.stdin
} }
func (p *initProcess) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { // Runtime returns the OCI runtime configured for the init process
func (p *Init) Runtime() *runc.Runc {
return p.runtime
}
// Exec returns a new exec'd process
func (p *Init) Exec(context context.Context, path string, r *ExecConfig) (Process, error) {
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal
e := &execProcess{
id: r.ID,
path: path,
parent: p,
spec: spec,
stdio: Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
waitBlock: make(chan struct{}),
}
e.State = &execCreatedState{p: e}
return e, nil
}
func (p *Init) checkpoint(context context.Context, r *CheckpointConfig) error {
var options runctypes.CheckpointOptions var options runctypes.CheckpointOptions
if r.Options != nil { if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options) v, err := typeurl.UnmarshalAny(r.Options)
@ -364,19 +405,20 @@ func (p *initProcess) checkpoint(context context.Context, r *shimapi.CheckpointT
return nil return nil
} }
func (p *initProcess) update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (p *Init) update(context context.Context, r *google_protobuf.Any) error {
var resources specs.LinuxResources var resources specs.LinuxResources
if err := json.Unmarshal(r.Resources.Value, &resources); err != nil { if err := json.Unmarshal(r.Value, &resources); err != nil {
return err return err
} }
return p.runtime.Update(context, p.id, &resources) return p.runtime.Update(context, p.id, &resources)
} }
func (p *initProcess) Stdio() stdio { // Stdio of the process
func (p *Init) Stdio() Stdio {
return p.stdio return p.stdio
} }
func (p *initProcess) runtimeError(rErr error, msg string) error { func (p *Init) runtimeError(rErr error, msg string) error {
if rErr == nil { if rErr == nil {
return nil return nil
} }

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
@ -9,23 +9,23 @@ import (
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type initState interface { type initState interface {
processState State
Pause(context.Context) error Pause(context.Context) error
Resume(context.Context) error Resume(context.Context) error
Update(context.Context, *shimapi.UpdateTaskRequest) error Update(context.Context, *google_protobuf.Any) error
Checkpoint(context.Context, *shimapi.CheckpointTaskRequest) error Checkpoint(context.Context, *CheckpointConfig) error
} }
type createdState struct { type createdState struct {
p *initProcess p *Init
} }
func (s *createdState) transition(name string) error { func (s *createdState) transition(name string) error {
@ -56,14 +56,14 @@ func (s *createdState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume task in created state") return errors.Errorf("cannot resume task in created state")
} }
func (s *createdState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *createdState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
return s.p.update(context, r) return s.p.update(context, r)
} }
func (s *createdState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *createdState) Checkpoint(context context.Context, r *CheckpointConfig) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
@ -114,7 +114,7 @@ func (s *createdState) SetExited(status int) {
} }
type createdCheckpointState struct { type createdCheckpointState struct {
p *initProcess p *Init
opts *runc.RestoreOpts opts *runc.RestoreOpts
} }
@ -146,14 +146,14 @@ func (s *createdCheckpointState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume task in created state") return errors.Errorf("cannot resume task in created state")
} }
func (s *createdCheckpointState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *createdCheckpointState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
return s.p.update(context, r) return s.p.update(context, r)
} }
func (s *createdCheckpointState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *createdCheckpointState) Checkpoint(context context.Context, r *CheckpointConfig) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
@ -175,17 +175,17 @@ func (s *createdCheckpointState) Start(ctx context.Context) error {
return p.runtimeError(err, "OCI runtime restore failed") return p.runtimeError(err, "OCI runtime restore failed")
} }
sio := p.stdio sio := p.stdio
if sio.stdin != "" { if sio.Stdin != "" {
sc, err := fifo.OpenFifo(ctx, sio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) sc, err := fifo.OpenFifo(ctx, sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.stdin) return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin)
} }
p.stdin = sc p.stdin = sc
p.closers = append(p.closers, sc) p.closers = append(p.closers, sc)
} }
var copyWaitGroup sync.WaitGroup var copyWaitGroup sync.WaitGroup
if !sio.isNull() { if !sio.IsNull() {
if err := copyPipes(ctx, p.io, sio.stdin, sio.stdout, sio.stderr, &p.wg, &copyWaitGroup); err != nil { if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy") return errors.Wrap(err, "failed to start io pipe copy")
} }
} }
@ -228,7 +228,7 @@ func (s *createdCheckpointState) SetExited(status int) {
} }
type runningState struct { type runningState struct {
p *initProcess p *Init
} }
func (s *runningState) transition(name string) error { func (s *runningState) transition(name string) error {
@ -259,14 +259,14 @@ func (s *runningState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume a running process") return errors.Errorf("cannot resume a running process")
} }
func (s *runningState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *runningState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
return s.p.update(context, r) return s.p.update(context, r)
} }
func (s *runningState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *runningState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
@ -313,7 +313,7 @@ func (s *runningState) SetExited(status int) {
} }
type pausedState struct { type pausedState struct {
p *initProcess p *Init
} }
func (s *pausedState) transition(name string) error { func (s *pausedState) transition(name string) error {
@ -345,14 +345,14 @@ func (s *pausedState) Resume(ctx context.Context) error {
return s.transition("running") return s.transition("running")
} }
func (s *pausedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *pausedState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
return s.p.update(context, r) return s.p.update(context, r)
} }
func (s *pausedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *pausedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
@ -400,7 +400,7 @@ func (s *pausedState) SetExited(status int) {
} }
type stoppedState struct { type stoppedState struct {
p *initProcess p *Init
} }
func (s *stoppedState) transition(name string) error { func (s *stoppedState) transition(name string) error {
@ -427,14 +427,14 @@ func (s *stoppedState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume a stopped container") return errors.Errorf("cannot resume a stopped container")
} }
func (s *stoppedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { func (s *stoppedState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()
return errors.Errorf("cannot update a stopped container") return errors.Errorf("cannot update a stopped container")
} }
func (s *stoppedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { func (s *stoppedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock() s.p.mu.Lock()
defer s.p.mu.Unlock() defer s.p.mu.Unlock()

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"

View File

@ -1,30 +1,36 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"context" "context"
"io" "io"
"sync"
"time" "time"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type stdio struct { // RuncRoot is the path to the root runc state directory
stdin string const RuncRoot = "/run/containerd/runc"
stdout string
stderr string // Stdio of a process
terminal bool type Stdio struct {
Stdin string
Stdout string
Stderr string
Terminal bool
} }
func (s stdio) isNull() bool { // IsNull returns true if the stdio is not defined
return s.stdin == "" && s.stdout == "" && s.stderr == "" func (s Stdio) IsNull() bool {
return s.Stdin == "" && s.Stdout == "" && s.Stderr == ""
} }
type process interface { // Process on a linux system
processState type Process interface {
State
// ID returns the id for the process // ID returns the id for the process
ID() string ID() string
// Pid returns the pid for the process // Pid returns the pid for the process
@ -36,14 +42,15 @@ type process interface {
// Stdin returns the process STDIN // Stdin returns the process STDIN
Stdin() io.Closer Stdin() io.Closer
// Stdio returns io information for the container // Stdio returns io information for the container
Stdio() stdio Stdio() Stdio
// Status returns the process status // Status returns the process status
Status(context.Context) (string, error) Status(context.Context) (string, error)
// Wait blocks until the process has exited // Wait blocks until the process has exited
Wait() Wait()
} }
type processState interface { // State of a process
type State interface {
// Resize resizes the process console // Resize resizes the process console
Resize(ws console.WinSize) error Resize(ws console.WinSize) error
// Start execution of the process // Start execution of the process
@ -71,3 +78,12 @@ func stateName(v interface{}) string {
} }
panic(errors.Errorf("invalid state %v", v)) panic(errors.Errorf("invalid state %v", v))
} }
// Platform handles platform-specific behavior that may differs across
// platform implementations
type Platform interface {
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
wg, cwg *sync.WaitGroup) (console.Console, error)
ShutdownConsole(ctx context.Context, console console.Console) error
Close() error
}

37
linux/proc/types.go Normal file
View File

@ -0,0 +1,37 @@
package proc
import (
containerd_types "github.com/containerd/containerd/api/types"
google_protobuf "github.com/gogo/protobuf/types"
)
// CreateConfig hold task creation configuration
type CreateConfig struct {
ID string
Bundle string
Runtime string
Rootfs []*containerd_types.Mount
Terminal bool
Stdin string
Stdout string
Stderr string
Checkpoint string
ParentCheckpoint string
Options *google_protobuf.Any
}
// ExecConfig holds exec creation configuration
type ExecConfig struct {
ID string
Terminal bool
Stdin string
Stdout string
Stderr string
Spec *google_protobuf.Any
}
// CheckpointConfig holds task checkpoint configuration
type CheckpointConfig struct {
Path string
Options *google_protobuf.Any
}

View File

@ -1,6 +1,6 @@
// +build !windows // +build !windows
package shim package proc
import ( import (
"encoding/json" "encoding/json"
@ -10,7 +10,6 @@ import (
"time" "time"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
@ -81,6 +80,6 @@ func checkKillError(err error) error {
return errors.Wrapf(err, "unknown error after kill") return errors.Wrapf(err, "unknown error after kill")
} }
func hasNoIO(r *shimapi.CreateTaskRequest) bool { func hasNoIO(r *CreateConfig) bool {
return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" return r.Stdin == "" && r.Stdout == "" && r.Stderr == ""
} }

View File

@ -5,6 +5,7 @@ package linux
import ( import (
"context" "context"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
@ -96,12 +97,17 @@ func (p *Process) CloseIO(ctx context.Context) error {
// Start the process // Start the process
func (p *Process) Start(ctx context.Context) error { func (p *Process) Start(ctx context.Context) error {
_, err := p.t.shim.Start(ctx, &shim.StartRequest{ r, err := p.t.shim.Start(ctx, &shim.StartRequest{
ID: p.id, ID: p.id,
}) })
if err != nil { if err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventsapi.TaskExecStarted{
ContainerID: p.t.id,
Pid: r.Pid,
ExecID: p.id,
})
return nil return nil
} }

View File

@ -17,8 +17,8 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/linux/proc"
"github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/linux/runctypes"
client "github.com/containerd/containerd/linux/shim"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata"
@ -242,14 +242,14 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
} }
}() }()
runtime := r.config.Runtime rt := r.config.Runtime
if ropts != nil && ropts.Runtime != "" { if ropts != nil && ropts.Runtime != "" {
runtime = ropts.Runtime rt = ropts.Runtime
} }
sopts := &shim.CreateTaskRequest{ sopts := &shim.CreateTaskRequest{
ID: id, ID: id,
Bundle: bundle.path, Bundle: bundle.path,
Runtime: runtime, Runtime: rt,
Stdin: opts.IO.Stdin, Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout, Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr, Stderr: opts.IO.Stderr,
@ -268,7 +268,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
if err != nil { if err != nil {
return nil, errdefs.FromGRPC(err) return nil, errdefs.FromGRPC(err)
} }
t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor) t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events,
proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -285,6 +286,20 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
return nil, err return nil, err
} }
} }
r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{
ContainerID: sopts.ID,
Bundle: sopts.Bundle,
Rootfs: sopts.Rootfs,
IO: &eventsapi.TaskIO{
Stdin: sopts.Stdin,
Stdout: sopts.Stdout,
Stderr: sopts.Stderr,
Terminal: sopts.Terminal,
},
Checkpoint: sopts.Checkpoint,
Pid: uint32(t.pid),
})
return t, nil return t, nil
} }
@ -322,6 +337,12 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
if err := bundle.Delete(); err != nil { if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle") log.G(ctx).WithError(err).Error("failed to delete bundle")
} }
r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{
ContainerID: lc.id,
ExitStatus: rsp.ExitStatus,
ExitedAt: rsp.ExitedAt,
Pid: rsp.Pid,
})
return &runtime.Exit{ return &runtime.Exit{
Status: rsp.ExitStatus, Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt, Timestamp: rsp.ExitedAt,
@ -376,7 +397,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
filepath.Join(r.state, ns, id), filepath.Join(r.state, ns, id),
filepath.Join(r.root, ns, id), filepath.Join(r.root, ns, id),
) )
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile)) ctx = namespaces.WithNamespace(ctx, ns)
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile))
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil)
if err != nil { if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{ log.G(ctx).WithError(err).WithFields(logrus.Fields{
@ -390,8 +412,15 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
} }
continue continue
} }
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
log.G(ctx).WithError(err).WithField("id", id).
Error("get runtime options")
continue
}
t, err := newTask(id, ns, pid, s, r.monitor) t, err := newTask(id, ns, pid, s, r.monitor, r.events,
proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup))
if err != nil { if err != nil {
log.G(ctx).WithError(err).Error("loading task type") log.G(ctx).WithError(err).Error("loading task type")
continue continue
@ -474,7 +503,7 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er
var ( var (
cmd = r.config.Runtime cmd = r.config.Runtime
root = client.RuncRoot root = proc.RuncRoot
) )
if ropts != nil { if ropts != nil {
if ropts.Runtime != "" { if ropts.Runtime != "" {
@ -517,5 +546,5 @@ func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.Run
return ropts, nil return ropts, nil
} }
return nil, nil return &runctypes.RuncOptions{}, nil
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/linux/proc"
"github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/linux/runctypes"
shimapi "github.com/containerd/containerd/linux/shim/v1" shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
@ -31,9 +32,6 @@ import (
var empty = &google_protobuf.Empty{} var empty = &google_protobuf.Empty{}
// RuncRoot is the path to the root runc state directory
const RuncRoot = "/run/containerd/runc"
// Config contains shim specific configuration // Config contains shim specific configuration
type Config struct { type Config struct {
Path string Path string
@ -49,16 +47,16 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
if config.Namespace == "" { if config.Namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty") return nil, fmt.Errorf("shim namespace cannot be empty")
} }
context := namespaces.WithNamespace(context.Background(), config.Namespace) ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
context = log.WithLogger(context, logrus.WithFields(logrus.Fields{ ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{
"namespace": config.Namespace, "namespace": config.Namespace,
"path": config.Path, "path": config.Path,
"pid": os.Getpid(), "pid": os.Getpid(),
})) }))
s := &Service{ s := &Service{
config: config, config: config,
context: context, context: ctx,
processes: make(map[string]process), processes: make(map[string]proc.Process),
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(), ec: reaper.Default.Subscribe(),
} }
@ -70,23 +68,15 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
return s, nil return s, nil
} }
// platform handles platform-specific behavior that may differs across
// platform implementations
type platform interface {
copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error)
shutdownConsole(ctx context.Context, console console.Console) error
close() error
}
// Service is the shim implementation of a remote shim over GRPC // Service is the shim implementation of a remote shim over GRPC
type Service struct { type Service struct {
mu sync.Mutex mu sync.Mutex
config Config config Config
context context.Context context context.Context
processes map[string]process processes map[string]proc.Process
events chan interface{} events chan interface{}
platform platform platform proc.Platform
ec chan runc.Exit ec chan runc.Exit
// Filled by Create() // Filled by Create()
@ -98,7 +88,29 @@ type Service struct {
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
process, err := s.newInitProcess(ctx, r) process, err := proc.New(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.Criu,
s.config.SystemdCgroup,
s.platform,
&proc.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: r.Runtime,
Rootfs: r.Rootfs,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
},
)
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
@ -107,19 +119,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
s.bundle = r.Bundle s.bundle = r.Bundle
pid := process.Pid() pid := process.Pid()
s.processes[r.ID] = process s.processes[r.ID] = process
s.events <- &eventsapi.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventsapi.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(pid),
}
return &shimapi.CreateTaskResponse{ return &shimapi.CreateTaskResponse{
Pid: uint32(pid), Pid: uint32(pid),
}, nil }, nil
@ -136,19 +135,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
if err := p.Start(ctx); err != nil { if err := p.Start(ctx); err != nil {
return nil, err return nil, err
} }
if r.ID == s.id {
s.events <- &eventsapi.TaskStart{
ContainerID: s.id,
Pid: uint32(p.Pid()),
}
} else {
pid := p.Pid()
s.events <- &eventsapi.TaskExecStarted{
ContainerID: s.id,
ExecID: r.ID,
Pid: uint32(pid),
}
}
return &shimapi.StartResponse{ return &shimapi.StartResponse{
ID: p.ID(), ID: p.ID(),
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
@ -168,13 +154,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
return nil, err return nil, err
} }
delete(s.processes, s.id) delete(s.processes, s.id)
s.platform.close() s.platform.Close()
s.events <- &eventsapi.TaskDelete{
ContainerID: s.id,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}
return &shimapi.DeleteResponse{ return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()), ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
@ -218,16 +198,18 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
} }
process, err := newExecProcess(ctx, s.config.Path, r, p.(*initProcess), r.ID) process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{
ID: r.ID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
s.processes[r.ID] = process s.processes[r.ID] = process
s.events <- &eventsapi.TaskExecAdded{
ContainerID: s.id,
ExecID: r.ID,
}
return empty, nil return empty, nil
} }
@ -283,10 +265,10 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
Bundle: s.bundle, Bundle: s.bundle,
Pid: uint32(p.Pid()), Pid: uint32(p.Pid()),
Status: status, Status: status,
Stdin: sio.stdin, Stdin: sio.Stdin,
Stdout: sio.stdout, Stdout: sio.Stdout,
Stderr: sio.stderr, Stderr: sio.Stderr,
Terminal: sio.terminal, Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()), ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(), ExitedAt: p.ExitedAt(),
}, nil }, nil
@ -300,12 +282,9 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
} }
if err := p.(*initProcess).Pause(ctx); err != nil { if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &eventsapi.TaskPaused{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }
@ -317,12 +296,9 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
} }
if err := p.(*initProcess).Resume(ctx); err != nil { if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err return nil, err
} }
s.events <- &eventsapi.TaskResumed{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }
@ -406,12 +382,12 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
} }
if err := p.(*initProcess).Checkpoint(ctx, r); err != nil { if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
Path: r.Path,
Options: r.Options,
}); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
s.events <- &eventsapi.TaskCheckpointed{
ContainerID: s.id,
}
return empty, nil return empty, nil
} }
@ -430,7 +406,7 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*go
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
} }
if err := p.(*initProcess).Update(ctx, r); err != nil { if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
return empty, nil return empty, nil
@ -463,9 +439,9 @@ func (s *Service) checkProcesses(e runc.Exit) {
defer s.mu.Unlock() defer s.mu.Unlock()
for _, p := range s.processes { for _, p := range s.processes {
if p.Pid() == e.Pid { if p.Pid() == e.Pid {
if ip, ok := p.(*initProcess); ok { if ip, ok := p.(*proc.Init); ok {
// Ensure all children are killed // Ensure all children are killed
if err := ip.killAll(s.context); err != nil { if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()). log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children") Error("failed to kill init's children")
} }
@ -491,7 +467,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
} }
ps, err := p.(*initProcess).runtime.Ps(ctx, id) ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,7 @@
package shim package shim
import ( import (
"context"
"io" "io"
"sync" "sync"
"syscall" "syscall"
@ -8,14 +9,13 @@ import (
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/context"
) )
type linuxPlatform struct { type linuxPlatform struct {
epoller *console.Epoller epoller *console.Epoller
} }
func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil { if p.epoller == nil {
return nil, errors.New("uninitialized epoller") return nil, errors.New("uninitialized epoller")
} }
@ -58,7 +58,7 @@ func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console
return epollConsole, nil return epollConsole, nil
} }
func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Console) error { func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil { if p.epoller == nil {
return errors.New("uninitialized epoller") return errors.New("uninitialized epoller")
} }
@ -69,7 +69,7 @@ func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Consol
return epollConsole.Shutdown(p.epoller.CloseConsole) return epollConsole.Shutdown(p.epoller.CloseConsole)
} }
func (p *linuxPlatform) close() error { func (p *linuxPlatform) Close() error {
return p.epoller.Close() return p.epoller.Close()
} }

View File

@ -3,19 +3,19 @@
package shim package shim
import ( import (
"context"
"io" "io"
"sync" "sync"
"syscall" "syscall"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"golang.org/x/net/context"
) )
type unixPlatform struct { type unixPlatform struct {
} }
func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if stdin != "" { if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil { if err != nil {
@ -48,11 +48,11 @@ func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console,
return console, nil return console, nil
} }
func (p *unixPlatform) shutdownConsole(ctx context.Context, cons console.Console) error { func (p *unixPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
return nil return nil
} }
func (p *unixPlatform) close() error { func (p *unixPlatform) Close() error {
return nil return nil
} }

View File

@ -9,11 +9,15 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/linux/shim/client" "github.com/containerd/containerd/linux/shim/client"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
) )
@ -25,9 +29,11 @@ type Task struct {
namespace string namespace string
cg cgroups.Cgroup cg cgroups.Cgroup
monitor runtime.TaskMonitor monitor runtime.TaskMonitor
events *exchange.Exchange
runtime *runc.Runc
} }
func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor) (*Task, error) { func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange, runtime *runc.Runc) (*Task, error) {
var ( var (
err error err error
cg cgroups.Cgroup cg cgroups.Cgroup
@ -45,6 +51,8 @@ func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime
namespace: namespace, namespace: namespace,
cg: cg, cg: cg,
monitor: monitor, monitor: monitor,
events: events,
runtime: runtime,
}, nil }, nil
} }
@ -82,6 +90,10 @@ func (t *Task) Start(ctx context.Context) error {
return err return err
} }
} }
t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventsapi.TaskStart{
ContainerID: t.id,
Pid: uint32(t.pid),
})
return nil return nil
} }
@ -123,11 +135,13 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) {
// Pause the task and all processes // Pause the task and all processes
func (t *Task) Pause(ctx context.Context) error { func (t *Task) Pause(ctx context.Context) error {
_, err := t.shim.Pause(ctx, empty) if _, err := t.shim.Pause(ctx, empty); err != nil {
if err != nil { return errdefs.FromGRPC(err)
err = errdefs.FromGRPC(err)
} }
return err t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{
ContainerID: t.id,
})
return nil
} }
// Resume the task and all processes // Resume the task and all processes
@ -135,6 +149,9 @@ func (t *Task) Resume(ctx context.Context) error {
if _, err := t.shim.Resume(ctx, empty); err != nil { if _, err := t.shim.Resume(ctx, empty); err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{
ContainerID: t.id,
})
return nil return nil
} }
@ -154,6 +171,9 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
// Exec creates a new process inside the task // Exec creates a new process inside the task
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
request := &shim.ExecProcessRequest{ request := &shim.ExecProcessRequest{
ID: id, ID: id,
Stdin: opts.IO.Stdin, Stdin: opts.IO.Stdin,
@ -223,6 +243,9 @@ func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any)
if _, err := t.shim.Checkpoint(ctx, r); err != nil { if _, err := t.shim.Checkpoint(ctx, r); err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventsapi.TaskCheckpointed{
ContainerID: t.id,
})
return nil return nil
} }

View File

@ -15,7 +15,7 @@ import (
// ErrNoSuchProcess is returned when the process no longer exists // ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process") var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048 const bufferSize = 1024
// Reap should be called when the process receives an SIGCHLD. Reap will reap // Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels // all exited processes and close their wait channels