diff --git a/linux/runtime.go b/linux/runtime.go index 4b4267152..81ad9837d 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -5,7 +5,6 @@ package linux import ( "bytes" "context" - "errors" "fmt" "io" "io/ioutil" @@ -15,6 +14,8 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/containerd/containerd/api/services/shim" "github.com/containerd/containerd/api/types/mount" "github.com/containerd/containerd/api/types/task" @@ -23,6 +24,7 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" runc "github.com/containerd/go-runc" + "github.com/pkg/errors" "golang.org/x/sys/unix" ) @@ -181,7 +183,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) if err != nil { return nil, err } - s, err := newShim(r.shim, path, namespace, r.remote) + s, err := newShim(ctx, r.shim, path, namespace, r.remote) if err != nil { os.RemoveAll(path) return nil, err @@ -215,7 +217,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) } if _, err = s.Create(ctx, sopts); err != nil { os.RemoveAll(path) - return nil, err + return nil, errors.New(grpc.ErrorDesc(err)) } c := newTask(id, namespace, opts.Spec, s) if err := r.tasks.add(ctx, c); err != nil { @@ -244,7 +246,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro } rsp, err := lc.shim.Delete(ctx, &shim.DeleteRequest{}) if err != nil { - return nil, err + return nil, errors.New(grpc.ErrorDesc(err)) } lc.shim.Exit(ctx, &shim.ExitRequest{}) r.tasks.delete(ctx, lc) diff --git a/linux/shim.go b/linux/shim.go index b30f44509..bc5b99fd8 100644 --- a/linux/shim.go +++ b/linux/shim.go @@ -3,9 +3,8 @@ package linux import ( + "context" "fmt" - "io/ioutil" - "log" "net" "os/exec" "path/filepath" @@ -13,16 +12,16 @@ import ( "time" "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" "github.com/containerd/containerd/api/services/shim" localShim "github.com/containerd/containerd/linux/shim" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" "github.com/pkg/errors" ) -func newShim(shimName string, path, namespace string, remote bool) (shim.ShimClient, error) { +func newShim(ctx context.Context, shimName string, path, namespace string, remote bool) (shim.ShimClient, error) { if !remote { return localShim.Client(path, namespace) } @@ -51,8 +50,16 @@ func newShim(shimName string, path, namespace string, remote bool) (shim.ShimCli if err := reaper.Default.Start(cmd); err != nil { return nil, errors.Wrapf(err, "failed to start shim") } - if err := sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil { - return nil, err + defer func() { + if err != nil { + cmd.Process.Kill() + reaper.Default.Wait(cmd) + } else { + log.G(ctx).WithField("socket", socket).Infof("new shim started") + } + }() + if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil { + return nil, errors.Wrap(err, "failed to set OOM Score on shim") } return connectShim(socket) } @@ -67,7 +74,6 @@ func loadShim(path, namespace string, remote bool) (shim.ShimClient, error) { func connectShim(socket string) (shim.ShimClient, error) { // reset the logger for grpc to log to dev/null so that it does not mess with our stdio - grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)} dialOpts = append(dialOpts, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { diff --git a/linux/shim/exec.go b/linux/shim/exec.go index 39e2b3aeb..2b790b233 100644 --- a/linux/shim/exec.go +++ b/linux/shim/exec.go @@ -20,6 +20,7 @@ import ( "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" ) type execProcess struct { @@ -59,13 +60,13 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest ) if r.Terminal { if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create runc console socket") } defer os.Remove(socket.Path()) } else { // TODO: get uid/gid if io, err = runc.NewPipeIO(0, 0); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create runc io pipes") } e.io = io } @@ -85,12 +86,12 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest spec.Terminal = r.Terminal if err := parent.runc.Exec(context, parent.id, spec, opts); err != nil { - return nil, err + return nil, parent.runcError(err, "runc exec failed") } if r.Stdin != "" { sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin) } e.closers = append(e.closers, sc) e.stdin = sc @@ -99,21 +100,21 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest if socket != nil { console, err := socket.ReceiveMaster() if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to retrieve console master") } e.console = console if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to start console copy") } } else { if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, ©WaitGroup); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to start io pipe copy") } } copyWaitGroup.Wait() pid, err := runc.ReadPidFile(opts.PidFile) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to retrieve runc exec pid") } e.pid = pid return e, nil diff --git a/linux/shim/init.go b/linux/shim/init.go index 6748e687b..11249c071 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -4,6 +4,7 @@ package shim import ( "context" + "encoding/json" "fmt" "io" "os" @@ -21,6 +22,7 @@ import ( "github.com/containerd/containerd/mount" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" + "github.com/pkg/errors" ) type initProcess struct { @@ -57,7 +59,7 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi. Options: rm.Options, } if err := m.Mount(filepath.Join(path, "rootfs")); err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) } } runtime := &runc.Runc{ @@ -83,13 +85,13 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi. ) if r.Terminal { if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create runc console socket") } defer os.Remove(socket.Path()) } else { // TODO: get uid/gid if io, err = runc.NewPipeIO(0, 0); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create runc io pipes") } p.io = io } @@ -108,7 +110,7 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi. NoSubreaper: true, } if _, err := p.runc.Restore(context, r.ID, r.Bundle, opts); err != nil { - return nil, err + return nil, p.runcError(err, "runc restore failed") } } else { opts := &runc.CreateOpts{ @@ -120,13 +122,13 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi. opts.ConsoleSocket = socket } if err := p.runc.Create(context, r.ID, r.Bundle, opts); err != nil { - return nil, err + return nil, p.runcError(err, "runc create failed") } } if r.Stdin != "" { sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin) } p.stdin = sc p.closers = append(p.closers, sc) @@ -135,22 +137,22 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi. if socket != nil { console, err := socket.ReceiveMaster() if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to retrieve console master") } p.console = console if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to start console copy") } } else { if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, ©WaitGroup); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to start io pipe copy") } } copyWaitGroup.Wait() pid, err := runc.ReadPidFile(pidFile) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to retrieve runc container pid") } p.pid = pid return p, nil @@ -172,7 +174,7 @@ func (p *initProcess) ExitedAt() time.Time { func (p *initProcess) ContainerStatus(ctx context.Context) (string, error) { c, err := p.runc.State(ctx, p.id) if err != nil { - return "", err + return "", p.runcError(err, "runc state failed") } return c.Status, nil } @@ -180,7 +182,8 @@ func (p *initProcess) ContainerStatus(ctx context.Context) (string, error) { func (p *initProcess) Start(context context.Context) error { p.mu.Lock() defer p.mu.Unlock() - return p.runc.Start(context, p.id) + err := p.runc.Start(context, p.id) + return p.runcError(err, "runc start failed") } func (p *initProcess) Exited(status int) { @@ -207,7 +210,7 @@ func (p *initProcess) Delete(context context.Context) error { } p.io.Close() } - return err + return p.runcError(err, "runc delete failed") } func (p *initProcess) Resize(ws console.WinSize) error { @@ -218,23 +221,27 @@ func (p *initProcess) Resize(ws console.WinSize) error { } func (p *initProcess) Pause(context context.Context) error { - return p.runc.Pause(context, p.id) + err := p.runc.Pause(context, p.id) + return p.runcError(err, "runc pause failed") } func (p *initProcess) Resume(context context.Context) error { - return p.runc.Resume(context, p.id) + err := p.runc.Resume(context, p.id) + return p.runcError(err, "runc resume failed") } func (p *initProcess) Kill(context context.Context, signal uint32, all bool) error { - return p.runc.Kill(context, p.id, int(signal), &runc.KillOpts{ + err := p.runc.Kill(context, p.id, int(signal), &runc.KillOpts{ All: all, }) + return p.runcError(err, "runc kill failed") } func (p *initProcess) killAll(context context.Context) error { - return p.runc.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ + err := p.runc.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{ All: true, }) + return p.runcError(err, "runc killall failed") } func (p *initProcess) Signal(sig int) error { @@ -270,6 +277,55 @@ func (p *initProcess) Checkpoint(context context.Context, r *shimapi.CheckpointR return nil } +// TODO(mlaventure): move to runc package? +func getLastRuncError(r *runc.Runc) (string, error) { + if r.Log == "" { + return "", nil + } + + f, err := os.OpenFile(r.Log, os.O_RDONLY, 0400) + if err != nil { + return "", err + } + + var ( + errMsg string + log struct { + Level string + Msg string + Time time.Time + } + ) + + dec := json.NewDecoder(f) + for err = nil; err == nil; { + if err = dec.Decode(&log); err != nil && err != io.EOF { + return "", err + } + if log.Level == "error" { + errMsg = strings.TrimSpace(log.Msg) + } + } + + return errMsg, nil +} + +func (p *initProcess) runcError(rErr error, msg string) error { + if rErr == nil { + return nil + } + + rMsg, err := getLastRuncError(p.runc) + switch { + case err != nil: + return errors.Wrapf(err, "%s: %s (%s)", msg, "unable to retrieve runc error", err.Error()) + case rMsg == "": + return errors.Wrap(err, msg) + default: + return errors.Errorf("%s: %s", msg, rMsg) + } +} + // criuError returns only the first line of the error message from criu // it tries to add an invalid dump log location when returning the message func criuError(err error) string { diff --git a/linux/task.go b/linux/task.go index 07036ee47..b2ed95641 100644 --- a/linux/task.go +++ b/linux/task.go @@ -5,11 +5,14 @@ package linux import ( "context" + "google.golang.org/grpc" + "github.com/containerd/containerd/api/services/shim" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/plugin" protobuf "github.com/gogo/protobuf/types" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" ) type Task struct { @@ -40,13 +43,16 @@ func (c *Task) Info() plugin.TaskInfo { func (c *Task) Start(ctx context.Context) error { _, err := c.shim.Start(ctx, &shim.StartRequest{}) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } func (c *Task) State(ctx context.Context) (plugin.State, error) { response, err := c.shim.State(ctx, &shim.StateRequest{}) if err != nil { - return plugin.State{}, err + return plugin.State{}, errors.New(grpc.ErrorDesc(err)) } var status plugin.Status switch response.Status { @@ -72,11 +78,17 @@ func (c *Task) State(ctx context.Context) (plugin.State, error) { func (c *Task) Pause(ctx context.Context) error { _, err := c.shim.Pause(ctx, &shim.PauseRequest{}) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } func (c *Task) Resume(ctx context.Context) error { _, err := c.shim.Resume(ctx, &shim.ResumeRequest{}) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } @@ -86,6 +98,9 @@ func (c *Task) Kill(ctx context.Context, signal uint32, pid uint32, all bool) er Pid: pid, All: all, }) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } @@ -102,7 +117,7 @@ func (c *Task) Exec(ctx context.Context, opts plugin.ExecOpts) (plugin.Process, } resp, err := c.shim.Exec(ctx, request) if err != nil { - return nil, err + return nil, errors.New(grpc.ErrorDesc(err)) } return &Process{ @@ -115,9 +130,8 @@ func (c *Task) Processes(ctx context.Context) ([]uint32, error) { resp, err := c.shim.Processes(ctx, &shim.ProcessesRequest{ ID: c.containerID, }) - if err != nil { - return nil, err + return nil, errors.New(grpc.ErrorDesc(err)) } pids := make([]uint32, 0, len(resp.Processes)) @@ -135,6 +149,9 @@ func (c *Task) Pty(ctx context.Context, pid uint32, size plugin.ConsoleSize) err Width: size.Width, Height: size.Height, }) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } @@ -142,6 +159,9 @@ func (c *Task) CloseStdin(ctx context.Context, pid uint32) error { _, err := c.shim.CloseStdin(ctx, &shim.CloseStdinRequest{ Pid: pid, }) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } @@ -155,6 +175,9 @@ func (c *Task) Checkpoint(ctx context.Context, opts plugin.CheckpointOpts) error EmptyNamespaces: opts.EmptyNamespaces, CheckpointPath: opts.Path, }) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } @@ -163,7 +186,7 @@ func (c *Task) DeleteProcess(ctx context.Context, pid uint32) (*plugin.Exit, err Pid: pid, }) if err != nil { - return nil, err + return nil, errors.New(grpc.ErrorDesc(err)) } return &plugin.Exit{ Status: r.ExitStatus, @@ -181,6 +204,9 @@ func (p *Process) Kill(ctx context.Context, signal uint32, _ bool) error { Signal: signal, Pid: uint32(p.pid), }) + if err != nil { + err = errors.New(grpc.ErrorDesc(err)) + } return err } diff --git a/log/grpc.go b/log/grpc.go index c209f7541..cb2c92182 100644 --- a/log/grpc.go +++ b/log/grpc.go @@ -1,14 +1,12 @@ package log import ( - "context" + "io/ioutil" + "log" "google.golang.org/grpc/grpclog" ) func init() { - ctx := WithModule(context.Background(), "grpc") - - // completely replace the grpc logger with the logrus logger. - grpclog.SetLogger(G(ctx)) + grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags)) } diff --git a/services/execution/service.go b/services/execution/service.go index b6d944c0b..a688e08f1 100644 --- a/services/execution/service.go +++ b/services/execution/service.go @@ -23,6 +23,7 @@ import ( protobuf "github.com/gogo/protobuf/types" google_protobuf "github.com/golang/protobuf/ptypes/empty" specs "github.com/opencontainers/image-spec/specs-go" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -129,7 +130,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create } c, err := runtime.Create(ctx, r.ContainerID, opts) if err != nil { - return nil, err + return nil, errors.Wrap(err, "runtime create failed") } state, err := c.State(ctx) if err != nil {