From 6e25898ff0a0b61db833af661f2ed7d6c6f33710 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 11:17:53 -0500 Subject: [PATCH 1/9] Move shim process code to package Signed-off-by: Michael Crosby --- linux/{shim => proc}/deleted_state.go | 2 +- linux/{shim => proc}/exec.go | 43 ++++++------- linux/{shim => proc}/exec_state.go | 12 ++-- linux/{shim => proc}/init.go | 90 ++++++++++++++------------- linux/{shim => proc}/init_state.go | 24 +++---- linux/{shim => proc}/io.go | 2 +- linux/{shim => proc}/process.go | 42 +++++++++---- linux/{shim => proc}/utils.go | 2 +- linux/shim/service.go | 56 ++++++++--------- linux/shim/service_linux.go | 8 +-- linux/shim/service_unix.go | 6 +- 11 files changed, 155 insertions(+), 132 deletions(-) rename linux/{shim => proc}/deleted_state.go (99%) rename linux/{shim => proc}/exec.go (83%) rename linux/{shim => proc}/exec_state.go (93%) rename linux/{shim => proc}/init.go (80%) rename linux/{shim => proc}/init_state.go (97%) rename linux/{shim => proc}/io.go (99%) rename linux/{shim => proc}/process.go (61%) rename linux/{shim => proc}/utils.go (99%) diff --git a/linux/shim/deleted_state.go b/linux/proc/deleted_state.go similarity index 99% rename from linux/shim/deleted_state.go rename to linux/proc/deleted_state.go index 6d2273501..6ef2cfb3e 100644 --- a/linux/shim/deleted_state.go +++ b/linux/proc/deleted_state.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" diff --git a/linux/shim/exec.go b/linux/proc/exec.go similarity index 83% rename from linux/shim/exec.go rename to linux/proc/exec.go index 0e4475e64..cf335a2d6 100644 --- a/linux/shim/exec.go +++ b/linux/proc/exec.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" @@ -27,7 +27,7 @@ import ( type execProcess struct { wg sync.WaitGroup - processState + State mu sync.Mutex id string @@ -38,15 +38,16 @@ type execProcess struct { pid int closers []io.Closer stdin io.Closer - stdio stdio + stdio Stdio path string spec specs.Process - parent *initProcess + parent *Init waitBlock chan struct{} } -func newExecProcess(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *initProcess, id string) (process, error) { +// NewExec returns a new exec'd process +func NewExec(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *Init, id string) (Process, error) { if err := identifiers.Validate(id); err != nil { return nil, errors.Wrapf(err, "invalid exec id") } @@ -62,15 +63,15 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecProcess path: path, parent: parent, spec: spec, - stdio: stdio{ - stdin: r.Stdin, - stdout: r.Stdout, - stderr: r.Stderr, - terminal: r.Terminal, + stdio: Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, }, waitBlock: make(chan struct{}), } - e.processState = &execCreatedState{p: e} + e.State = &execCreatedState{p: e} return e, nil } @@ -103,7 +104,7 @@ func (e *execProcess) ExitedAt() time.Time { func (e *execProcess) setExited(status int) { e.status = status e.exited = time.Now() - e.parent.platform.shutdownConsole(context.Background(), e.console) + e.parent.platform.ShutdownConsole(context.Background(), e.console) close(e.waitBlock) } @@ -142,7 +143,7 @@ func (e *execProcess) Stdin() io.Closer { return e.stdin } -func (e *execProcess) Stdio() stdio { +func (e *execProcess) Stdio() Stdio { return e.stdio } @@ -151,12 +152,12 @@ func (e *execProcess) start(ctx context.Context) (err error) { socket *runc.Socket pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id)) ) - if e.stdio.terminal { + if e.stdio.Terminal { if socket, err = runc.NewTempConsoleSocket(); err != nil { return errors.Wrap(err, "failed to create runc console socket") } defer socket.Close() - } else if e.stdio.isNull() { + } else if e.stdio.IsNull() { if e.io, err = runc.NewNullIO(); err != nil { return errors.Wrap(err, "creating new NULL IO") } @@ -176,10 +177,10 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil { return e.parent.runtimeError(err, "OCI runtime exec failed") } - if e.stdio.stdin != "" { - sc, err := fifo.OpenFifo(ctx, e.stdio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if e.stdio.Stdin != "" { + sc, err := fifo.OpenFifo(ctx, e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.stdin) + return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin) } e.closers = append(e.closers, sc) e.stdin = sc @@ -190,11 +191,11 @@ func (e *execProcess) start(ctx context.Context) (err error) { if err != nil { return errors.Wrap(err, "failed to retrieve console master") } - if e.console, err = e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, ©WaitGroup); err != nil { + if e.console, err = e.parent.platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { return errors.Wrap(err, "failed to start console copy") } - } else if !e.stdio.isNull() { - if err := copyPipes(ctx, e.io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, ©WaitGroup); err != nil { + } else if !e.stdio.IsNull() { + if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, ©WaitGroup); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } diff --git a/linux/shim/exec_state.go b/linux/proc/exec_state.go similarity index 93% rename from linux/shim/exec_state.go rename to linux/proc/exec_state.go index 4a4aaa2b4..3c3c26582 100644 --- a/linux/shim/exec_state.go +++ b/linux/proc/exec_state.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" @@ -16,11 +16,11 @@ type execCreatedState struct { func (s *execCreatedState) transition(name string) error { switch name { case "running": - s.p.processState = &execRunningState{p: s.p} + s.p.State = &execRunningState{p: s.p} case "stopped": - s.p.processState = &execStoppedState{p: s.p} + s.p.State = &execStoppedState{p: s.p} case "deleted": - s.p.processState = &deletedState{} + s.p.State = &deletedState{} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } @@ -77,7 +77,7 @@ type execRunningState struct { func (s *execRunningState) transition(name string) error { switch name { case "stopped": - s.p.processState = &execStoppedState{p: s.p} + s.p.State = &execStoppedState{p: s.p} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } @@ -130,7 +130,7 @@ type execStoppedState struct { func (s *execStoppedState) transition(name string) error { switch name { case "deleted": - s.p.processState = &deletedState{} + s.p.State = &deletedState{} default: return errors.Errorf("invalid state transition %q to %q", stateName(s), name) } diff --git a/linux/shim/init.go b/linux/proc/init.go similarity index 80% rename from linux/shim/init.go rename to linux/proc/init.go index 5b4ed45a3..cf4783708 100644 --- a/linux/shim/init.go +++ b/linux/proc/init.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" @@ -30,7 +30,7 @@ import ( // InitPidFile name of the file that contains the init pid const InitPidFile = "init.pid" -type initProcess struct { +type Init struct { wg sync.WaitGroup initState @@ -47,7 +47,7 @@ type initProcess struct { id string bundle string console console.Console - platform platform + platform Platform io runc.IO runtime *runc.Runc status int @@ -55,13 +55,14 @@ type initProcess struct { pid int closers []io.Closer stdin io.Closer - stdio stdio + stdio Stdio rootfs string IoUID int IoGID int } -func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskRequest) (*initProcess, error) { +// New returns a new init process +func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *shimapi.CreateTaskRequest) (*Init, error) { var success bool if err := identifiers.Validate(r.ID); err != nil { @@ -76,7 +77,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR options = *v.(*runctypes.CreateOptions) } - rootfs := filepath.Join(s.config.Path, "rootfs") + rootfs := filepath.Join(path, "rootfs") // count the number of successful mounts so we can undo // what was actually done rather than what should have been // done. @@ -98,32 +99,32 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) } } - root := s.config.RuntimeRoot + root := runtimeRoot if root == "" { root = RuncRoot } runtime := &runc.Runc{ Command: r.Runtime, - Log: filepath.Join(s.config.Path, "log.json"), + Log: filepath.Join(path, "log.json"), LogFormat: runc.JSON, PdeathSignal: syscall.SIGKILL, - Root: filepath.Join(root, s.config.Namespace), - Criu: s.config.Criu, - SystemdCgroup: s.config.SystemdCgroup, + Root: filepath.Join(root, namespace), + Criu: criu, + SystemdCgroup: systemdCgroup, } - p := &initProcess{ + p := &Init{ id: r.ID, bundle: r.Bundle, runtime: runtime, - platform: s.platform, - stdio: stdio{ - stdin: r.Stdin, - stdout: r.Stdout, - stderr: r.Stderr, - terminal: r.Terminal, + platform: platform, + stdio: Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, }, rootfs: rootfs, - workDir: s.config.WorkDir, + workDir: workDir, status: 0, waitBlock: make(chan struct{}), IoUID: int(options.IoUid), @@ -148,7 +149,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR return nil, errors.Wrap(err, "failed to create OCI runtime io pipes") } } - pidFile := filepath.Join(s.config.Path, InitPidFile) + pidFile := filepath.Join(path, InitPidFile) if r.Checkpoint != "" { opts := &runc.RestoreOpts{ CheckpointOpts: runc.CheckpointOpts{ @@ -195,7 +196,7 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR if err != nil { return nil, errors.Wrap(err, "failed to retrieve console master") } - console, err = s.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) + console, err = platform.CopyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, ©WaitGroup) if err != nil { return nil, errors.Wrap(err, "failed to start console copy") } @@ -216,31 +217,31 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR return p, nil } -func (p *initProcess) Wait() { +func (p *Init) Wait() { <-p.waitBlock } -func (p *initProcess) ID() string { +func (p *Init) ID() string { return p.id } -func (p *initProcess) Pid() int { +func (p *Init) Pid() int { return p.pid } -func (p *initProcess) ExitStatus() int { +func (p *Init) ExitStatus() int { p.mu.Lock() defer p.mu.Unlock() return p.status } -func (p *initProcess) ExitedAt() time.Time { +func (p *Init) ExitedAt() time.Time { p.mu.Lock() defer p.mu.Unlock() return p.exited } -func (p *initProcess) Status(ctx context.Context) (string, error) { +func (p *Init) Status(ctx context.Context) (string, error) { p.mu.Lock() defer p.mu.Unlock() c, err := p.runtime.State(ctx, p.id) @@ -253,20 +254,20 @@ func (p *initProcess) Status(ctx context.Context) (string, error) { return c.Status, nil } -func (p *initProcess) start(context context.Context) error { +func (p *Init) start(context context.Context) error { err := p.runtime.Start(context, p.id) return p.runtimeError(err, "OCI runtime start failed") } -func (p *initProcess) setExited(status int) { +func (p *Init) setExited(status int) { p.exited = time.Now() p.status = status - p.platform.shutdownConsole(context.Background(), p.console) + p.platform.ShutdownConsole(context.Background(), p.console) close(p.waitBlock) } -func (p *initProcess) delete(context context.Context) error { - p.killAll(context) +func (p *Init) delete(context context.Context) error { + p.KillAll(context) p.wg.Wait() err := p.runtime.Delete(context, p.id, nil) // ignore errors if a runtime has already deleted the process @@ -296,42 +297,47 @@ func (p *initProcess) delete(context context.Context) error { return err } -func (p *initProcess) resize(ws console.WinSize) error { +func (p *Init) resize(ws console.WinSize) error { if p.console == nil { return nil } return p.console.Resize(ws) } -func (p *initProcess) pause(context context.Context) error { +func (p *Init) pause(context context.Context) error { err := p.runtime.Pause(context, p.id) return p.runtimeError(err, "OCI runtime pause failed") } -func (p *initProcess) resume(context context.Context) error { +func (p *Init) resume(context context.Context) error { err := p.runtime.Resume(context, p.id) return p.runtimeError(err, "OCI runtime resume failed") } -func (p *initProcess) kill(context context.Context, signal uint32, all bool) error { +func (p *Init) kill(context context.Context, signal uint32, all bool) error { err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{ All: all, }) return checkKillError(err) } -func (p *initProcess) killAll(context context.Context) error { +func (p *Init) KillAll(context context.Context) error { err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ All: true, }) return p.runtimeError(err, "OCI runtime killall failed") } -func (p *initProcess) Stdin() io.Closer { +func (p *Init) Stdin() io.Closer { return p.stdin } -func (p *initProcess) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { +// Runtime returns the OCI runtime configured for the init process +func (p *Init) Runtime() *runc.Runc { + return p.runtime +} + +func (p *Init) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { var options runctypes.CheckpointOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -364,7 +370,7 @@ func (p *initProcess) checkpoint(context context.Context, r *shimapi.CheckpointT return nil } -func (p *initProcess) update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (p *Init) update(context context.Context, r *shimapi.UpdateTaskRequest) error { var resources specs.LinuxResources if err := json.Unmarshal(r.Resources.Value, &resources); err != nil { return err @@ -372,11 +378,11 @@ func (p *initProcess) update(context context.Context, r *shimapi.UpdateTaskReque return p.runtime.Update(context, p.id, &resources) } -func (p *initProcess) Stdio() stdio { +func (p *Init) Stdio() Stdio { return p.stdio } -func (p *initProcess) runtimeError(rErr error, msg string) error { +func (p *Init) runtimeError(rErr error, msg string) error { if rErr == nil { return nil } diff --git a/linux/shim/init_state.go b/linux/proc/init_state.go similarity index 97% rename from linux/shim/init_state.go rename to linux/proc/init_state.go index da7e15b00..9e36b786c 100644 --- a/linux/shim/init_state.go +++ b/linux/proc/init_state.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" @@ -16,7 +16,7 @@ import ( ) type initState interface { - processState + State Pause(context.Context) error Resume(context.Context) error @@ -25,7 +25,7 @@ type initState interface { } type createdState struct { - p *initProcess + p *Init } func (s *createdState) transition(name string) error { @@ -114,7 +114,7 @@ func (s *createdState) SetExited(status int) { } type createdCheckpointState struct { - p *initProcess + p *Init opts *runc.RestoreOpts } @@ -175,17 +175,17 @@ func (s *createdCheckpointState) Start(ctx context.Context) error { return p.runtimeError(err, "OCI runtime restore failed") } sio := p.stdio - if sio.stdin != "" { - sc, err := fifo.OpenFifo(ctx, sio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) + if sio.Stdin != "" { + sc, err := fifo.OpenFifo(ctx, sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { - return errors.Wrapf(err, "failed to open stdin fifo %s", sio.stdin) + return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin) } p.stdin = sc p.closers = append(p.closers, sc) } var copyWaitGroup sync.WaitGroup - if !sio.isNull() { - if err := copyPipes(ctx, p.io, sio.stdin, sio.stdout, sio.stderr, &p.wg, ©WaitGroup); err != nil { + if !sio.IsNull() { + if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, ©WaitGroup); err != nil { return errors.Wrap(err, "failed to start io pipe copy") } } @@ -228,7 +228,7 @@ func (s *createdCheckpointState) SetExited(status int) { } type runningState struct { - p *initProcess + p *Init } func (s *runningState) transition(name string) error { @@ -313,7 +313,7 @@ func (s *runningState) SetExited(status int) { } type pausedState struct { - p *initProcess + p *Init } func (s *pausedState) transition(name string) error { @@ -400,7 +400,7 @@ func (s *pausedState) SetExited(status int) { } type stoppedState struct { - p *initProcess + p *Init } func (s *stoppedState) transition(name string) error { diff --git a/linux/shim/io.go b/linux/proc/io.go similarity index 99% rename from linux/shim/io.go rename to linux/proc/io.go index 49ba8e069..1907d4362 100644 --- a/linux/shim/io.go +++ b/linux/proc/io.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "context" diff --git a/linux/shim/process.go b/linux/proc/process.go similarity index 61% rename from linux/shim/process.go rename to linux/proc/process.go index f0b469238..c0d33adbc 100644 --- a/linux/shim/process.go +++ b/linux/proc/process.go @@ -1,30 +1,36 @@ // +build !windows -package shim +package proc import ( "context" "io" + "sync" "time" "github.com/containerd/console" "github.com/pkg/errors" ) -type stdio struct { - stdin string - stdout string - stderr string - terminal bool +// RuncRoot is the path to the root runc state directory +const RuncRoot = "/run/containerd/runc" + +// Stdio of a process +type Stdio struct { + Stdin string + Stdout string + Stderr string + Terminal bool } -func (s stdio) isNull() bool { - return s.stdin == "" && s.stdout == "" && s.stderr == "" +// IsNull returns true if the stdio is not defined +func (s Stdio) IsNull() bool { + return s.Stdin == "" && s.Stdout == "" && s.Stderr == "" } -type process interface { - processState - +// Process on a linux system +type Process interface { + State // ID returns the id for the process ID() string // Pid returns the pid for the process @@ -36,14 +42,15 @@ type process interface { // Stdin returns the process STDIN Stdin() io.Closer // Stdio returns io information for the container - Stdio() stdio + Stdio() Stdio // Status returns the process status Status(context.Context) (string, error) // Wait blocks until the process has exited Wait() } -type processState interface { +// State of a process +type State interface { // Resize resizes the process console Resize(ws console.WinSize) error // Start execution of the process @@ -71,3 +78,12 @@ func stateName(v interface{}) string { } panic(errors.Errorf("invalid state %v", v)) } + +// Platform handles platform-specific behavior that may differs across +// platform implementations +type Platform interface { + CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, + wg, cwg *sync.WaitGroup) (console.Console, error) + ShutdownConsole(ctx context.Context, console console.Console) error + Close() error +} diff --git a/linux/shim/utils.go b/linux/proc/utils.go similarity index 99% rename from linux/shim/utils.go rename to linux/proc/utils.go index 317f8daee..8bedd554b 100644 --- a/linux/shim/utils.go +++ b/linux/proc/utils.go @@ -1,6 +1,6 @@ // +build !windows -package shim +package proc import ( "encoding/json" diff --git a/linux/shim/service.go b/linux/shim/service.go index 6b229769f..3822dd1cc 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -15,6 +15,7 @@ import ( "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/linux/proc" "github.com/containerd/containerd/linux/runctypes" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" @@ -31,9 +32,6 @@ import ( var empty = &google_protobuf.Empty{} -// RuncRoot is the path to the root runc state directory -const RuncRoot = "/run/containerd/runc" - // Config contains shim specific configuration type Config struct { Path string @@ -58,7 +56,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { s := &Service{ config: config, context: context, - processes: make(map[string]process), + processes: make(map[string]proc.Process), events: make(chan interface{}, 128), ec: reaper.Default.Subscribe(), } @@ -70,23 +68,15 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { return s, nil } -// platform handles platform-specific behavior that may differs across -// platform implementations -type platform interface { - copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) - shutdownConsole(ctx context.Context, console console.Console) error - close() error -} - // Service is the shim implementation of a remote shim over GRPC type Service struct { mu sync.Mutex config Config context context.Context - processes map[string]process + processes map[string]proc.Process events chan interface{} - platform platform + platform proc.Platform ec chan runc.Exit // Filled by Create() @@ -98,7 +88,17 @@ type Service struct { func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { s.mu.Lock() defer s.mu.Unlock() - process, err := s.newInitProcess(ctx, r) + process, err := proc.New( + ctx, + s.config.Path, + s.config.WorkDir, + s.config.RuntimeRoot, + s.config.Namespace, + s.config.Criu, + s.config.SystemdCgroup, + s.platform, + r, + ) if err != nil { return nil, errdefs.ToGRPC(err) } @@ -168,7 +168,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap return nil, err } delete(s.processes, s.id) - s.platform.close() + s.platform.Close() s.events <- &eventsapi.TaskDelete{ ContainerID: s.id, ExitStatus: uint32(p.ExitStatus()), @@ -218,7 +218,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - process, err := newExecProcess(ctx, s.config.Path, r, p.(*initProcess), r.ID) + process, err := proc.NewExec(ctx, s.config.Path, r, p.(*proc.Init), r.ID) if err != nil { return nil, errdefs.ToGRPC(err) } @@ -283,10 +283,10 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. Bundle: s.bundle, Pid: uint32(p.Pid()), Status: status, - Stdin: sio.stdin, - Stdout: sio.stdout, - Stderr: sio.stderr, - Terminal: sio.terminal, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), }, nil @@ -300,7 +300,7 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*initProcess).Pause(ctx); err != nil { + if err := p.(*proc.Init).Pause(ctx); err != nil { return nil, err } s.events <- &eventsapi.TaskPaused{ @@ -317,7 +317,7 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*initProcess).Resume(ctx); err != nil { + if err := p.(*proc.Init).Resume(ctx); err != nil { return nil, err } s.events <- &eventsapi.TaskResumed{ @@ -406,7 +406,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*initProcess).Checkpoint(ctx, r); err != nil { + if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } s.events <- &eventsapi.TaskCheckpointed{ @@ -430,7 +430,7 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*go if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*initProcess).Update(ctx, r); err != nil { + if err := p.(*proc.Init).Update(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -463,9 +463,9 @@ func (s *Service) checkProcesses(e runc.Exit) { defer s.mu.Unlock() for _, p := range s.processes { if p.Pid() == e.Pid { - if ip, ok := p.(*initProcess); ok { + if ip, ok := p.(*proc.Init); ok { // Ensure all children are killed - if err := ip.killAll(s.context); err != nil { + if err := ip.KillAll(s.context); err != nil { log.G(s.context).WithError(err).WithField("id", ip.ID()). Error("failed to kill init's children") } @@ -491,7 +491,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") } - ps, err := p.(*initProcess).runtime.Ps(ctx, id) + ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) if err != nil { return nil, err } diff --git a/linux/shim/service_linux.go b/linux/shim/service_linux.go index 1d078ba73..bbe9d188a 100644 --- a/linux/shim/service_linux.go +++ b/linux/shim/service_linux.go @@ -1,6 +1,7 @@ package shim import ( + "context" "io" "sync" "syscall" @@ -8,14 +9,13 @@ import ( "github.com/containerd/console" "github.com/containerd/fifo" "github.com/pkg/errors" - "golang.org/x/net/context" ) type linuxPlatform struct { epoller *console.Epoller } -func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { if p.epoller == nil { return nil, errors.New("uninitialized epoller") } @@ -58,7 +58,7 @@ func (p *linuxPlatform) copyConsole(ctx context.Context, console console.Console return epollConsole, nil } -func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Console) error { +func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error { if p.epoller == nil { return errors.New("uninitialized epoller") } @@ -69,7 +69,7 @@ func (p *linuxPlatform) shutdownConsole(ctx context.Context, cons console.Consol return epollConsole.Shutdown(p.epoller.CloseConsole) } -func (p *linuxPlatform) close() error { +func (p *linuxPlatform) Close() error { return p.epoller.Close() } diff --git a/linux/shim/service_unix.go b/linux/shim/service_unix.go index c00b85306..6419e15cc 100644 --- a/linux/shim/service_unix.go +++ b/linux/shim/service_unix.go @@ -15,7 +15,7 @@ import ( type unixPlatform struct { } -func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { +func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) { if stdin != "" { in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0) if err != nil { @@ -48,11 +48,11 @@ func (p *unixPlatform) copyConsole(ctx context.Context, console console.Console, return console, nil } -func (p *unixPlatform) shutdownConsole(ctx context.Context, cons console.Console) error { +func (p *unixPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error { return nil } -func (p *unixPlatform) close() error { +func (p *unixPlatform) Close() error { return nil } From 1fe5a251c4fb0fb35478b8b1fd16277cb417ccb9 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 11:20:58 -0500 Subject: [PATCH 2/9] Move Exec creation to init process Signed-off-by: Michael Crosby --- cmd/containerd-shim/main_unix.go | 3 ++- linux/proc/exec.go | 32 -------------------------------- linux/proc/init.go | 29 +++++++++++++++++++++++++++++ linux/runtime.go | 6 +++--- linux/shim/service.go | 2 +- 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 6801cd04d..1f51b1aea 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -18,6 +18,7 @@ import ( "github.com/containerd/containerd/dialer" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/linux/proc" "github.com/containerd/containerd/linux/shim" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/reaper" @@ -56,7 +57,7 @@ func init() { flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data") - flag.StringVar(&runtimeRootFlag, "runtime-root", shim.RuncRoot, "root directory for the runtime") + flag.StringVar(&runtimeRootFlag, "runtime-root", proc.RuncRoot, "root directory for the runtime") flag.StringVar(&criuFlag, "criu", "", "path to criu binary") flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup") flag.Parse() diff --git a/linux/proc/exec.go b/linux/proc/exec.go index cf335a2d6..8154af550 100644 --- a/linux/proc/exec.go +++ b/linux/proc/exec.go @@ -4,7 +4,6 @@ package proc import ( "context" - "encoding/json" "fmt" "io" "os" @@ -16,8 +15,6 @@ import ( "golang.org/x/sys/unix" "github.com/containerd/console" - "github.com/containerd/containerd/identifiers" - shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -46,35 +43,6 @@ type execProcess struct { waitBlock chan struct{} } -// NewExec returns a new exec'd process -func NewExec(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *Init, id string) (Process, error) { - if err := identifiers.Validate(id); err != nil { - return nil, errors.Wrapf(err, "invalid exec id") - } - // process exec request - var spec specs.Process - if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { - return nil, err - } - spec.Terminal = r.Terminal - - e := &execProcess{ - id: id, - path: path, - parent: parent, - spec: spec, - stdio: Stdio{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - waitBlock: make(chan struct{}), - } - e.State = &execCreatedState{p: e} - return e, nil -} - func (e *execProcess) Wait() { <-e.waitBlock } diff --git a/linux/proc/init.go b/linux/proc/init.go index cf4783708..0d88e9bc5 100644 --- a/linux/proc/init.go +++ b/linux/proc/init.go @@ -337,6 +337,35 @@ func (p *Init) Runtime() *runc.Runc { return p.runtime } +// Exec returns a new exec'd process +func (p *Init) Exec(context context.Context, path string, r *shimapi.ExecProcessRequest) (Process, error) { + if err := identifiers.Validate(r.ID); err != nil { + return nil, errors.Wrapf(err, "invalid exec id") + } + // process exec request + var spec specs.Process + if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { + return nil, err + } + spec.Terminal = r.Terminal + + e := &execProcess{ + id: r.ID, + path: path, + parent: p, + spec: spec, + stdio: Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + waitBlock: make(chan struct{}), + } + e.State = &execCreatedState{p: e} + return e, nil +} + func (p *Init) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { var options runctypes.CheckpointOptions if r.Options != nil { diff --git a/linux/runtime.go b/linux/runtime.go index a7fd9dd43..605a8dae2 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -17,8 +17,8 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" + "github.com/containerd/containerd/linux/proc" "github.com/containerd/containerd/linux/runctypes" - client "github.com/containerd/containerd/linux/shim" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" @@ -376,7 +376,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { filepath.Join(r.state, ns, id), filepath.Join(r.root, ns, id), ) - pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile)) + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil) if err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ @@ -474,7 +474,7 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er var ( cmd = r.config.Runtime - root = client.RuncRoot + root = proc.RuncRoot ) if ropts != nil { if ropts.Runtime != "" { diff --git a/linux/shim/service.go b/linux/shim/service.go index 3822dd1cc..dd1ca62c8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -218,7 +218,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - process, err := proc.NewExec(ctx, s.config.Path, r, p.(*proc.Init), r.ID) + process, err := p.(*proc.Init).Exec(ctx, s.config.Path, r) if err != nil { return nil, errdefs.ToGRPC(err) } From a7343b0773c93d0265184e133180c43eb5d5cae9 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 13:15:08 -0500 Subject: [PATCH 3/9] Move events from shim into linux runtime Signed-off-by: Michael Crosby --- linux/process.go | 8 +++++++- linux/runtime.go | 30 +++++++++++++++++++++++----- linux/shim/service.go | 46 ------------------------------------------- linux/task.go | 26 +++++++++++++++++++----- 4 files changed, 53 insertions(+), 57 deletions(-) diff --git a/linux/process.go b/linux/process.go index 0febff9e7..7660ee898 100644 --- a/linux/process.go +++ b/linux/process.go @@ -5,6 +5,7 @@ package linux import ( "context" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" shim "github.com/containerd/containerd/linux/shim/v1" @@ -96,12 +97,17 @@ func (p *Process) CloseIO(ctx context.Context) error { // Start the process func (p *Process) Start(ctx context.Context) error { - _, err := p.t.shim.Start(ctx, &shim.StartRequest{ + r, err := p.t.shim.Start(ctx, &shim.StartRequest{ ID: p.id, }) if err != nil { return errdefs.FromGRPC(err) } + p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventsapi.TaskExecStarted{ + ContainerID: p.t.id, + Pid: r.Pid, + ExecID: p.id, + }) return nil } diff --git a/linux/runtime.go b/linux/runtime.go index 605a8dae2..e5b474827 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -242,14 +242,14 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } }() - runtime := r.config.Runtime + rt := r.config.Runtime if ropts != nil && ropts.Runtime != "" { - runtime = ropts.Runtime + rt = ropts.Runtime } sopts := &shim.CreateTaskRequest{ ID: id, Bundle: bundle.path, - Runtime: runtime, + Runtime: rt, Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, @@ -268,7 +268,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts if err != nil { return nil, errdefs.FromGRPC(err) } - t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor) + t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events) if err != nil { return nil, err } @@ -285,6 +285,20 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, err } } + r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{ + ContainerID: sopts.ID, + Bundle: sopts.Bundle, + Rootfs: sopts.Rootfs, + IO: &eventsapi.TaskIO{ + Stdin: sopts.Stdin, + Stdout: sopts.Stdout, + Stderr: sopts.Stderr, + Terminal: sopts.Terminal, + }, + Checkpoint: sopts.Checkpoint, + Pid: uint32(t.pid), + }) + return t, nil } @@ -322,6 +336,12 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err := bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("failed to delete bundle") } + r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ + ContainerID: lc.id, + ExitStatus: rsp.ExitStatus, + ExitedAt: rsp.ExitedAt, + Pid: rsp.Pid, + }) return &runtime.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, @@ -391,7 +411,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } - t, err := newTask(id, ns, pid, s, r.monitor) + t, err := newTask(id, ns, pid, s, r.monitor, r.events) if err != nil { log.G(ctx).WithError(err).Error("loading task type") continue diff --git a/linux/shim/service.go b/linux/shim/service.go index dd1ca62c8..25e9ac93a 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -107,19 +107,6 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh s.bundle = r.Bundle pid := process.Pid() s.processes[r.ID] = process - s.events <- &eventsapi.TaskCreate{ - ContainerID: r.ID, - Bundle: r.Bundle, - Rootfs: r.Rootfs, - IO: &eventsapi.TaskIO{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }, - Checkpoint: r.Checkpoint, - Pid: uint32(pid), - } return &shimapi.CreateTaskResponse{ Pid: uint32(pid), }, nil @@ -136,19 +123,6 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. if err := p.Start(ctx); err != nil { return nil, err } - if r.ID == s.id { - s.events <- &eventsapi.TaskStart{ - ContainerID: s.id, - Pid: uint32(p.Pid()), - } - } else { - pid := p.Pid() - s.events <- &eventsapi.TaskExecStarted{ - ContainerID: s.id, - ExecID: r.ID, - Pid: uint32(pid), - } - } return &shimapi.StartResponse{ ID: p.ID(), Pid: uint32(p.Pid()), @@ -169,12 +143,6 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap } delete(s.processes, s.id) s.platform.Close() - s.events <- &eventsapi.TaskDelete{ - ContainerID: s.id, - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - Pid: uint32(p.Pid()), - } return &shimapi.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), @@ -223,11 +191,6 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo return nil, errdefs.ToGRPC(err) } s.processes[r.ID] = process - - s.events <- &eventsapi.TaskExecAdded{ - ContainerID: s.id, - ExecID: r.ID, - } return empty, nil } @@ -303,9 +266,6 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ if err := p.(*proc.Init).Pause(ctx); err != nil { return nil, err } - s.events <- &eventsapi.TaskPaused{ - ContainerID: s.id, - } return empty, nil } @@ -320,9 +280,6 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google if err := p.(*proc.Init).Resume(ctx); err != nil { return nil, err } - s.events <- &eventsapi.TaskResumed{ - ContainerID: s.id, - } return empty, nil } @@ -409,9 +366,6 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } - s.events <- &eventsapi.TaskCheckpointed{ - ContainerID: s.id, - } return empty, nil } diff --git a/linux/task.go b/linux/task.go index 91a5279a8..235e67166 100644 --- a/linux/task.go +++ b/linux/task.go @@ -9,8 +9,10 @@ import ( "google.golang.org/grpc" "github.com/containerd/cgroups" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/linux/shim/client" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/runtime" @@ -25,9 +27,10 @@ type Task struct { namespace string cg cgroups.Cgroup monitor runtime.TaskMonitor + events *exchange.Exchange } -func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor) (*Task, error) { +func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange) (*Task, error) { var ( err error cg cgroups.Cgroup @@ -45,6 +48,7 @@ func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime namespace: namespace, cg: cg, monitor: monitor, + events: events, }, nil } @@ -82,6 +86,10 @@ func (t *Task) Start(ctx context.Context) error { return err } } + t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventsapi.TaskStart{ + ContainerID: t.id, + Pid: uint32(t.pid), + }) return nil } @@ -123,11 +131,13 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) { // Pause the task and all processes func (t *Task) Pause(ctx context.Context) error { - _, err := t.shim.Pause(ctx, empty) - if err != nil { - err = errdefs.FromGRPC(err) + if _, err := t.shim.Pause(ctx, empty); err != nil { + return errdefs.FromGRPC(err) } - return err + t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{ + ContainerID: t.id, + }) + return nil } // Resume the task and all processes @@ -135,6 +145,9 @@ func (t *Task) Resume(ctx context.Context) error { if _, err := t.shim.Resume(ctx, empty); err != nil { return errdefs.FromGRPC(err) } + t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{ + ContainerID: t.id, + }) return nil } @@ -223,6 +236,9 @@ func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) if _, err := t.shim.Checkpoint(ctx, r); err != nil { return errdefs.FromGRPC(err) } + t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventsapi.TaskCheckpointed{ + ContainerID: t.id, + }) return nil } From 36e5548e76dfe1aa714eb826143fa831f97893ee Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 13:45:13 -0500 Subject: [PATCH 4/9] Remove namepsaces and id imports from shim Signed-off-by: Michael Crosby --- linux/proc/init.go | 37 +++++++++++++++++-------------------- linux/shim/service.go | 7 +++---- linux/task.go | 4 ++++ reaper/reaper.go | 2 +- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/linux/proc/init.go b/linux/proc/init.go index 0d88e9bc5..212e14f63 100644 --- a/linux/proc/init.go +++ b/linux/proc/init.go @@ -15,7 +15,6 @@ import ( "time" "github.com/containerd/console" - "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/linux/runctypes" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" @@ -61,13 +60,26 @@ type Init struct { IoGID int } +// NewRunc returns a new runc instance for a process +func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc { + if root == "" { + root = RuncRoot + } + return &runc.Runc{ + Command: runtime, + Log: filepath.Join(path, "log.json"), + LogFormat: runc.JSON, + PdeathSignal: syscall.SIGKILL, + Root: filepath.Join(root, namespace), + Criu: criu, + SystemdCgroup: systemd, + } +} + // New returns a new init process func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *shimapi.CreateTaskRequest) (*Init, error) { var success bool - if err := identifiers.Validate(r.ID); err != nil { - return nil, errors.Wrapf(err, "invalid task id") - } var options runctypes.CreateOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -99,19 +111,7 @@ func New(context context.Context, path, workDir, runtimeRoot, namespace, criu st return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) } } - root := runtimeRoot - if root == "" { - root = RuncRoot - } - runtime := &runc.Runc{ - Command: r.Runtime, - Log: filepath.Join(path, "log.json"), - LogFormat: runc.JSON, - PdeathSignal: syscall.SIGKILL, - Root: filepath.Join(root, namespace), - Criu: criu, - SystemdCgroup: systemdCgroup, - } + runtime := NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup) p := &Init{ id: r.ID, bundle: r.Bundle, @@ -339,9 +339,6 @@ func (p *Init) Runtime() *runc.Runc { // Exec returns a new exec'd process func (p *Init) Exec(context context.Context, path string, r *shimapi.ExecProcessRequest) (Process, error) { - if err := identifiers.Validate(r.ID); err != nil { - return nil, errors.Wrapf(err, "invalid exec id") - } // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { diff --git a/linux/shim/service.go b/linux/shim/service.go index 25e9ac93a..0347c0b37 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -19,7 +19,6 @@ import ( "github.com/containerd/containerd/linux/runctypes" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" runc "github.com/containerd/go-runc" @@ -47,15 +46,15 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { if config.Namespace == "" { return nil, fmt.Errorf("shim namespace cannot be empty") } - context := namespaces.WithNamespace(context.Background(), config.Namespace) - context = log.WithLogger(context, logrus.WithFields(logrus.Fields{ + ctx := context.Background() + ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{ "namespace": config.Namespace, "path": config.Path, "pid": os.Getpid(), })) s := &Service{ config: config, - context: context, + context: ctx, processes: make(map[string]proc.Process), events: make(chan interface{}, 128), ec: reaper.Default.Subscribe(), diff --git a/linux/task.go b/linux/task.go index 235e67166..11b5032d3 100644 --- a/linux/task.go +++ b/linux/task.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events/exchange" + "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/linux/shim/client" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/runtime" @@ -167,6 +168,9 @@ func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error { // Exec creates a new process inside the task func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { + if err := identifiers.Validate(id); err != nil { + return nil, errors.Wrapf(err, "invalid exec id") + } request := &shim.ExecProcessRequest{ ID: id, Stdin: opts.IO.Stdin, diff --git a/reaper/reaper.go b/reaper/reaper.go index d7dfbb2aa..9127fc5a1 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -15,7 +15,7 @@ import ( // ErrNoSuchProcess is returned when the process no longer exists var ErrNoSuchProcess = errors.New("no such process") -const bufferSize = 2048 +const bufferSize = 1024 // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels From 66a70e7fda2ef3dbc80d860c2e6455e9ef44f854 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 15:12:25 -0500 Subject: [PATCH 5/9] Refactor runtime initialization Signed-off-by: Michael Crosby --- linux/runtime.go | 12 +++++++++--- linux/task.go | 5 ++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/linux/runtime.go b/linux/runtime.go index e5b474827..95f9242e0 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -268,7 +268,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts if err != nil { return nil, errdefs.FromGRPC(err) } - t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events) + t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { return nil, err } @@ -410,8 +410,14 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { } continue } + ropts, err := r.getRuncOptions(ctx, id) + if err != nil { + log.G(ctx).WithError(err).WithField("id", id). + Error("get runtime options") + continue + } - t, err := newTask(id, ns, pid, s, r.monitor, r.events) + t, err := newTask(id, ns, pid, s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { log.G(ctx).WithError(err).Error("loading task type") continue @@ -537,5 +543,5 @@ func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.Run return ropts, nil } - return nil, nil + return &runcopts.RuncOptions{}, nil } diff --git a/linux/task.go b/linux/task.go index 11b5032d3..d708cdb84 100644 --- a/linux/task.go +++ b/linux/task.go @@ -17,6 +17,7 @@ import ( "github.com/containerd/containerd/linux/shim/client" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/runtime" + runc "github.com/containerd/go-runc" "github.com/gogo/protobuf/types" ) @@ -29,9 +30,10 @@ type Task struct { cg cgroups.Cgroup monitor runtime.TaskMonitor events *exchange.Exchange + runtime *runc.Runc } -func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange) (*Task, error) { +func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange, runtime *runc.Runc) (*Task, error) { var ( err error cg cgroups.Cgroup @@ -50,6 +52,7 @@ func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime cg: cg, monitor: monitor, events: events, + runtime: runtime, }, nil } From 33fe5c1d22503370e715c9fc6e0b535a806e1c5f Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 16:08:19 -0500 Subject: [PATCH 6/9] Add debug package for memory reporting Signed-off-by: Michael Crosby --- debug/debug.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 debug/debug.go diff --git a/debug/debug.go b/debug/debug.go new file mode 100644 index 000000000..6359f1008 --- /dev/null +++ b/debug/debug.go @@ -0,0 +1,66 @@ +package debug + +import ( + "bufio" + "fmt" + "os" + "sort" + "strconv" + "strings" +) + +// Smaps prints the smaps to a file +func Smaps(note, file string) error { + smaps, err := getMaps(os.Getpid()) + if err != nil { + return err + } + f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return err + } + defer f.Close() + fmt.Fprintf(f, "%s: rss %d\n", note, smaps["rss"]) + fmt.Fprintf(f, "%s: pss %d\n", note, smaps["pss"]) + return nil +} + +func getMaps(pid int) (map[string]int, error) { + f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid)) + if err != nil { + return nil, err + } + defer f.Close() + var ( + smaps = make(map[string]int) + s = bufio.NewScanner(f) + ) + for s.Scan() { + var ( + fields = strings.Fields(s.Text()) + name = fields[0] + ) + name = strings.TrimSuffix(strings.ToLower(name), ":") + if len(fields) < 2 { + continue + } + n, err := strconv.Atoi(fields[1]) + if err != nil { + continue + } + smaps[name] += n + } + if err := s.Err(); err != nil { + return nil, err + } + return smaps, nil +} + +func keys(smaps map[string]int) []string { + var o []string + for k := range smaps { + o = append(o, k) + } + sort.Strings(o) + return o +} From 9abde39bf7d070dd4f9325c1fb116eee96fd7c64 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 16:14:28 -0500 Subject: [PATCH 7/9] Fix lint issues on init process Signed-off-by: Michael Crosby --- linux/proc/init.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/linux/proc/init.go b/linux/proc/init.go index 212e14f63..8a9aebd69 100644 --- a/linux/proc/init.go +++ b/linux/proc/init.go @@ -29,6 +29,7 @@ import ( // InitPidFile name of the file that contains the init pid const InitPidFile = "init.pid" +// Init represents an initial process for a container type Init struct { wg sync.WaitGroup initState @@ -217,30 +218,36 @@ func New(context context.Context, path, workDir, runtimeRoot, namespace, criu st return p, nil } +// Wait for the process to exit func (p *Init) Wait() { <-p.waitBlock } +// ID of the process func (p *Init) ID() string { return p.id } +// Pid of the process func (p *Init) Pid() int { return p.pid } +// ExitStatus of the process func (p *Init) ExitStatus() int { p.mu.Lock() defer p.mu.Unlock() return p.status } +// ExitedAt at time when the process exited func (p *Init) ExitedAt() time.Time { p.mu.Lock() defer p.mu.Unlock() return p.exited } +// Status of the process func (p *Init) Status(ctx context.Context) (string, error) { p.mu.Lock() defer p.mu.Unlock() @@ -321,6 +328,7 @@ func (p *Init) kill(context context.Context, signal uint32, all bool) error { return checkKillError(err) } +// KillAll processes belonging to the init process func (p *Init) KillAll(context context.Context) error { err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ All: true, @@ -328,6 +336,7 @@ func (p *Init) KillAll(context context.Context) error { return p.runtimeError(err, "OCI runtime killall failed") } +// Stdin of the process func (p *Init) Stdin() io.Closer { return p.stdin } @@ -404,6 +413,7 @@ func (p *Init) update(context context.Context, r *shimapi.UpdateTaskRequest) err return p.runtime.Update(context, p.id, &resources) } +// Stdio of the process func (p *Init) Stdio() Stdio { return p.stdio } From c81788b129b66ada58ff6ed214de60a33dc24ca7 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 9 Nov 2017 16:47:55 -0500 Subject: [PATCH 8/9] Remove errdefs and shimapi types from proc package Signed-off-by: Michael Crosby --- linux/proc/deleted_state.go | 6 +++--- linux/proc/init.go | 12 ++++++------ linux/proc/init_state.go | 26 +++++++++++++------------- linux/proc/types.go | 37 +++++++++++++++++++++++++++++++++++++ linux/proc/utils.go | 3 +-- linux/shim/service.go | 30 ++++++++++++++++++++++++++---- linux/shim/service_unix.go | 2 +- 7 files changed, 87 insertions(+), 29 deletions(-) create mode 100644 linux/proc/types.go diff --git a/linux/proc/deleted_state.go b/linux/proc/deleted_state.go index 6ef2cfb3e..fb2587804 100644 --- a/linux/proc/deleted_state.go +++ b/linux/proc/deleted_state.go @@ -6,7 +6,7 @@ import ( "context" "github.com/containerd/console" - shimapi "github.com/containerd/containerd/linux/shim/v1" + google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" ) @@ -21,11 +21,11 @@ func (s *deletedState) Resume(ctx context.Context) error { return errors.Errorf("cannot resume a deleted process") } -func (s *deletedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *deletedState) Update(context context.Context, r *google_protobuf.Any) error { return errors.Errorf("cannot update a deleted process") } -func (s *deletedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *deletedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { return errors.Errorf("cannot checkpoint a deleted process") } diff --git a/linux/proc/init.go b/linux/proc/init.go index 8a9aebd69..f24f92f7d 100644 --- a/linux/proc/init.go +++ b/linux/proc/init.go @@ -16,12 +16,12 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/linux/runctypes" - shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" + google_protobuf "github.com/gogo/protobuf/types" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) @@ -78,7 +78,7 @@ func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Ru } // New returns a new init process -func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *shimapi.CreateTaskRequest) (*Init, error) { +func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *CreateConfig) (*Init, error) { var success bool var options runctypes.CreateOptions @@ -347,7 +347,7 @@ func (p *Init) Runtime() *runc.Runc { } // Exec returns a new exec'd process -func (p *Init) Exec(context context.Context, path string, r *shimapi.ExecProcessRequest) (Process, error) { +func (p *Init) Exec(context context.Context, path string, r *ExecConfig) (Process, error) { // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { @@ -372,7 +372,7 @@ func (p *Init) Exec(context context.Context, path string, r *shimapi.ExecProcess return e, nil } -func (p *Init) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { +func (p *Init) checkpoint(context context.Context, r *CheckpointConfig) error { var options runctypes.CheckpointOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -405,9 +405,9 @@ func (p *Init) checkpoint(context context.Context, r *shimapi.CheckpointTaskRequ return nil } -func (p *Init) update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (p *Init) update(context context.Context, r *google_protobuf.Any) error { var resources specs.LinuxResources - if err := json.Unmarshal(r.Resources.Value, &resources); err != nil { + if err := json.Unmarshal(r.Value, &resources); err != nil { return err } return p.runtime.Update(context, p.id, &resources) diff --git a/linux/proc/init_state.go b/linux/proc/init_state.go index 9e36b786c..b5b398ec3 100644 --- a/linux/proc/init_state.go +++ b/linux/proc/init_state.go @@ -9,9 +9,9 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" + google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" ) @@ -20,8 +20,8 @@ type initState interface { Pause(context.Context) error Resume(context.Context) error - Update(context.Context, *shimapi.UpdateTaskRequest) error - Checkpoint(context.Context, *shimapi.CheckpointTaskRequest) error + Update(context.Context, *google_protobuf.Any) error + Checkpoint(context.Context, *CheckpointConfig) error } type createdState struct { @@ -56,14 +56,14 @@ func (s *createdState) Resume(ctx context.Context) error { return errors.Errorf("cannot resume task in created state") } -func (s *createdState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *createdState) Update(context context.Context, r *google_protobuf.Any) error { s.p.mu.Lock() defer s.p.mu.Unlock() return s.p.update(context, r) } -func (s *createdState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *createdState) Checkpoint(context context.Context, r *CheckpointConfig) error { s.p.mu.Lock() defer s.p.mu.Unlock() @@ -146,14 +146,14 @@ func (s *createdCheckpointState) Resume(ctx context.Context) error { return errors.Errorf("cannot resume task in created state") } -func (s *createdCheckpointState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *createdCheckpointState) Update(context context.Context, r *google_protobuf.Any) error { s.p.mu.Lock() defer s.p.mu.Unlock() return s.p.update(context, r) } -func (s *createdCheckpointState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *createdCheckpointState) Checkpoint(context context.Context, r *CheckpointConfig) error { s.p.mu.Lock() defer s.p.mu.Unlock() @@ -259,14 +259,14 @@ func (s *runningState) Resume(ctx context.Context) error { return errors.Errorf("cannot resume a running process") } -func (s *runningState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *runningState) Update(context context.Context, r *google_protobuf.Any) error { s.p.mu.Lock() defer s.p.mu.Unlock() return s.p.update(context, r) } -func (s *runningState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *runningState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { s.p.mu.Lock() defer s.p.mu.Unlock() @@ -345,14 +345,14 @@ func (s *pausedState) Resume(ctx context.Context) error { return s.transition("running") } -func (s *pausedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *pausedState) Update(context context.Context, r *google_protobuf.Any) error { s.p.mu.Lock() defer s.p.mu.Unlock() return s.p.update(context, r) } -func (s *pausedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *pausedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { s.p.mu.Lock() defer s.p.mu.Unlock() @@ -427,14 +427,14 @@ func (s *stoppedState) Resume(ctx context.Context) error { return errors.Errorf("cannot resume a stopped container") } -func (s *stoppedState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error { +func (s *stoppedState) Update(context context.Context, r *google_protobuf.Any) error { s.p.mu.Lock() defer s.p.mu.Unlock() return errors.Errorf("cannot update a stopped container") } -func (s *stoppedState) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) error { +func (s *stoppedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error { s.p.mu.Lock() defer s.p.mu.Unlock() diff --git a/linux/proc/types.go b/linux/proc/types.go new file mode 100644 index 000000000..9055c25d1 --- /dev/null +++ b/linux/proc/types.go @@ -0,0 +1,37 @@ +package proc + +import ( + containerd_types "github.com/containerd/containerd/api/types" + google_protobuf "github.com/gogo/protobuf/types" +) + +// CreateConfig hold task creation configuration +type CreateConfig struct { + ID string + Bundle string + Runtime string + Rootfs []*containerd_types.Mount + Terminal bool + Stdin string + Stdout string + Stderr string + Checkpoint string + ParentCheckpoint string + Options *google_protobuf.Any +} + +// ExecConfig holds exec creation configuration +type ExecConfig struct { + ID string + Terminal bool + Stdin string + Stdout string + Stderr string + Spec *google_protobuf.Any +} + +// CheckpointConfig holds task checkpoint configuration +type CheckpointConfig struct { + Path string + Options *google_protobuf.Any +} diff --git a/linux/proc/utils.go b/linux/proc/utils.go index 8bedd554b..1197957b5 100644 --- a/linux/proc/utils.go +++ b/linux/proc/utils.go @@ -10,7 +10,6 @@ import ( "time" "github.com/containerd/containerd/errdefs" - shimapi "github.com/containerd/containerd/linux/shim/v1" runc "github.com/containerd/go-runc" "github.com/pkg/errors" "golang.org/x/sys/unix" @@ -81,6 +80,6 @@ func checkKillError(err error) error { return errors.Wrapf(err, "unknown error after kill") } -func hasNoIO(r *shimapi.CreateTaskRequest) bool { +func hasNoIO(r *CreateConfig) bool { return r.Stdin == "" && r.Stdout == "" && r.Stderr == "" } diff --git a/linux/shim/service.go b/linux/shim/service.go index 0347c0b37..31c054392 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -96,7 +96,19 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh s.config.Criu, s.config.SystemdCgroup, s.platform, - r, + &proc.CreateConfig{ + ID: r.ID, + Bundle: r.Bundle, + Runtime: r.Runtime, + Rootfs: r.Rootfs, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Checkpoint: r.Checkpoint, + ParentCheckpoint: r.ParentCheckpoint, + Options: r.Options, + }, ) if err != nil { return nil, errdefs.ToGRPC(err) @@ -185,7 +197,14 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - process, err := p.(*proc.Init).Exec(ctx, s.config.Path, r) + process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{ + ID: r.ID, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Spec: r.Spec, + }) if err != nil { return nil, errdefs.ToGRPC(err) } @@ -362,7 +381,10 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*proc.Init).Checkpoint(ctx, r); err != nil { + if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ + Path: r.Path, + Options: r.Options, + }); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -383,7 +405,7 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*go if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.(*proc.Init).Update(ctx, r); err != nil { + if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil diff --git a/linux/shim/service_unix.go b/linux/shim/service_unix.go index 6419e15cc..d4419e56a 100644 --- a/linux/shim/service_unix.go +++ b/linux/shim/service_unix.go @@ -3,13 +3,13 @@ package shim import ( + "context" "io" "sync" "syscall" "github.com/containerd/console" "github.com/containerd/fifo" - "golang.org/x/net/context" ) type unixPlatform struct { From 8376b50b193079fe89a82cc86f4f37f44e544bab Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 10 Nov 2017 12:09:48 -0500 Subject: [PATCH 9/9] Add grpc methods to errdefs Signed-off-by: Michael Crosby --- errdefs/grpc.go | 19 ++++++++++++++++--- linux/runtime.go | 9 ++++++--- linux/shim/service.go | 3 ++- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/errdefs/grpc.go b/errdefs/grpc.go index 2aa2e11b4..6a3bbcaa1 100644 --- a/errdefs/grpc.go +++ b/errdefs/grpc.go @@ -4,7 +4,6 @@ import ( "strings" "github.com/pkg/errors" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -61,7 +60,7 @@ func FromGRPC(err error) error { var cls error // divide these into error classes, becomes the cause - switch grpc.Code(err) { + switch code(err) { case codes.InvalidArgument: cls = ErrInvalidArgument case codes.AlreadyExists: @@ -94,7 +93,7 @@ func FromGRPC(err error) error { // Effectively, we just remove the string of cls from the end of err if it // appears there. func rebaseMessage(cls error, err error) string { - desc := grpc.ErrorDesc(err) + desc := errDesc(err) clss := cls.Error() if desc == clss { return "" @@ -107,3 +106,17 @@ func isGRPCError(err error) bool { _, ok := status.FromError(err) return ok } + +func code(err error) codes.Code { + if s, ok := status.FromError(err); ok { + return s.Code() + } + return codes.Unknown +} + +func errDesc(err error) string { + if s, ok := status.FromError(err); ok { + return s.Message() + } + return err.Error() +} diff --git a/linux/runtime.go b/linux/runtime.go index 95f9242e0..af07a6001 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -268,7 +268,8 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts if err != nil { return nil, errdefs.FromGRPC(err) } - t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup)) + t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events, + proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { return nil, err } @@ -396,6 +397,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { filepath.Join(r.state, ns, id), filepath.Join(r.root, ns, id), ) + ctx = namespaces.WithNamespace(ctx, ns) pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil) if err != nil { @@ -417,7 +419,8 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } - t, err := newTask(id, ns, pid, s, r.monitor, r.events, proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup)) + t, err := newTask(id, ns, pid, s, r.monitor, r.events, + proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup)) if err != nil { log.G(ctx).WithError(err).Error("loading task type") continue @@ -543,5 +546,5 @@ func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.Run return ropts, nil } - return &runcopts.RuncOptions{}, nil + return &runctypes.RuncOptions{}, nil } diff --git a/linux/shim/service.go b/linux/shim/service.go index 31c054392..566811dee 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -19,6 +19,7 @@ import ( "github.com/containerd/containerd/linux/runctypes" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" runc "github.com/containerd/go-runc" @@ -46,7 +47,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { if config.Namespace == "" { return nil, fmt.Errorf("shim namespace cannot be empty") } - ctx := context.Background() + ctx := namespaces.WithNamespace(context.Background(), config.Namespace) ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{ "namespace": config.Namespace, "path": config.Path,