Merge pull request #1002 from mlaventure/runtime-error-bubble

Bubble up linux runtime errors
This commit is contained in:
Michael Crosby 2017-06-14 12:25:25 -07:00 committed by GitHub
commit f5c587c1f7
7 changed files with 137 additions and 47 deletions

View File

@ -5,7 +5,6 @@ package linux
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
@ -15,6 +14,8 @@ import (
"sync"
"time"
"google.golang.org/grpc"
"github.com/containerd/containerd/api/services/shim"
"github.com/containerd/containerd/api/types/mount"
"github.com/containerd/containerd/api/types/task"
@ -23,6 +24,7 @@ import (
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
@ -181,7 +183,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
if err != nil {
return nil, err
}
s, err := newShim(r.shim, path, namespace, r.remote)
s, err := newShim(ctx, r.shim, path, namespace, r.remote)
if err != nil {
os.RemoveAll(path)
return nil, err
@ -215,7 +217,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
}
if _, err = s.Create(ctx, sopts); err != nil {
os.RemoveAll(path)
return nil, err
return nil, errors.New(grpc.ErrorDesc(err))
}
c := newTask(id, namespace, opts.Spec, s)
if err := r.tasks.add(ctx, c); err != nil {
@ -244,7 +246,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
}
rsp, err := lc.shim.Delete(ctx, &shim.DeleteRequest{})
if err != nil {
return nil, err
return nil, errors.New(grpc.ErrorDesc(err))
}
lc.shim.Exit(ctx, &shim.ExitRequest{})
r.tasks.delete(ctx, lc)

View File

@ -3,9 +3,8 @@
package linux
import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"os/exec"
"path/filepath"
@ -13,16 +12,16 @@ import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"github.com/containerd/containerd/api/services/shim"
localShim "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/sys"
"github.com/pkg/errors"
)
func newShim(shimName string, path, namespace string, remote bool) (shim.ShimClient, error) {
func newShim(ctx context.Context, shimName string, path, namespace string, remote bool) (shim.ShimClient, error) {
if !remote {
return localShim.Client(path, namespace)
}
@ -51,8 +50,16 @@ func newShim(shimName string, path, namespace string, remote bool) (shim.ShimCli
if err := reaper.Default.Start(cmd); err != nil {
return nil, errors.Wrapf(err, "failed to start shim")
}
if err := sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
return nil, err
defer func() {
if err != nil {
cmd.Process.Kill()
reaper.Default.Wait(cmd)
} else {
log.G(ctx).WithField("socket", socket).Infof("new shim started")
}
}()
if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
return nil, errors.Wrap(err, "failed to set OOM Score on shim")
}
return connectShim(socket)
}
@ -67,7 +74,6 @@ func loadShim(path, namespace string, remote bool) (shim.ShimClient, error) {
func connectShim(socket string) (shim.ShimClient, error) {
// reset the logger for grpc to log to dev/null so that it does not mess with our stdio
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second)}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {

View File

@ -20,6 +20,7 @@ import (
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
type execProcess struct {
@ -59,13 +60,13 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest
)
if r.Terminal {
if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create runc console socket")
}
defer os.Remove(socket.Path())
} else {
// TODO: get uid/gid
if io, err = runc.NewPipeIO(0, 0); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create runc io pipes")
}
e.io = io
}
@ -85,12 +86,12 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest
spec.Terminal = r.Terminal
if err := parent.runc.Exec(context, parent.id, spec, opts); err != nil {
return nil, err
return nil, parent.runcError(err, "runc exec failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
e.closers = append(e.closers, sc)
e.stdin = sc
@ -99,21 +100,21 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecRequest
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to retrieve console master")
}
e.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to start console copy")
}
} else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to retrieve runc exec pid")
}
e.pid = pid
return e, nil

View File

@ -4,6 +4,7 @@ package shim
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
@ -21,6 +22,7 @@ import (
"github.com/containerd/containerd/mount"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
type initProcess struct {
@ -57,7 +59,7 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
Options: rm.Options,
}
if err := m.Mount(filepath.Join(path, "rootfs")); err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
runtime := &runc.Runc{
@ -83,13 +85,13 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
)
if r.Terminal {
if socket, err = runc.NewConsoleSocket(filepath.Join(path, "pty.sock")); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create runc console socket")
}
defer os.Remove(socket.Path())
} else {
// TODO: get uid/gid
if io, err = runc.NewPipeIO(0, 0); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to create runc io pipes")
}
p.io = io
}
@ -108,7 +110,7 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
NoSubreaper: true,
}
if _, err := p.runc.Restore(context, r.ID, r.Bundle, opts); err != nil {
return nil, err
return nil, p.runcError(err, "runc restore failed")
}
} else {
opts := &runc.CreateOpts{
@ -120,13 +122,13 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
opts.ConsoleSocket = socket
}
if err := p.runc.Create(context, r.ID, r.Bundle, opts); err != nil {
return nil, err
return nil, p.runcError(err, "runc create failed")
}
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
@ -135,22 +137,22 @@ func newInitProcess(context context.Context, path, namespace string, r *shimapi.
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to retrieve console master")
}
p.console = console
if err := copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to start console copy")
}
} else {
if err := copyPipes(context, io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to retrieve runc container pid")
}
p.pid = pid
return p, nil
@ -172,7 +174,7 @@ func (p *initProcess) ExitedAt() time.Time {
func (p *initProcess) ContainerStatus(ctx context.Context) (string, error) {
c, err := p.runc.State(ctx, p.id)
if err != nil {
return "", err
return "", p.runcError(err, "runc state failed")
}
return c.Status, nil
}
@ -180,7 +182,8 @@ func (p *initProcess) ContainerStatus(ctx context.Context) (string, error) {
func (p *initProcess) Start(context context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
return p.runc.Start(context, p.id)
err := p.runc.Start(context, p.id)
return p.runcError(err, "runc start failed")
}
func (p *initProcess) Exited(status int) {
@ -207,7 +210,7 @@ func (p *initProcess) Delete(context context.Context) error {
}
p.io.Close()
}
return err
return p.runcError(err, "runc delete failed")
}
func (p *initProcess) Resize(ws console.WinSize) error {
@ -218,23 +221,27 @@ func (p *initProcess) Resize(ws console.WinSize) error {
}
func (p *initProcess) Pause(context context.Context) error {
return p.runc.Pause(context, p.id)
err := p.runc.Pause(context, p.id)
return p.runcError(err, "runc pause failed")
}
func (p *initProcess) Resume(context context.Context) error {
return p.runc.Resume(context, p.id)
err := p.runc.Resume(context, p.id)
return p.runcError(err, "runc resume failed")
}
func (p *initProcess) Kill(context context.Context, signal uint32, all bool) error {
return p.runc.Kill(context, p.id, int(signal), &runc.KillOpts{
err := p.runc.Kill(context, p.id, int(signal), &runc.KillOpts{
All: all,
})
return p.runcError(err, "runc kill failed")
}
func (p *initProcess) killAll(context context.Context) error {
return p.runc.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{
err := p.runc.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{
All: true,
})
return p.runcError(err, "runc killall failed")
}
func (p *initProcess) Signal(sig int) error {
@ -270,6 +277,55 @@ func (p *initProcess) Checkpoint(context context.Context, r *shimapi.CheckpointR
return nil
}
// TODO(mlaventure): move to runc package?
func getLastRuncError(r *runc.Runc) (string, error) {
if r.Log == "" {
return "", nil
}
f, err := os.OpenFile(r.Log, os.O_RDONLY, 0400)
if err != nil {
return "", err
}
var (
errMsg string
log struct {
Level string
Msg string
Time time.Time
}
)
dec := json.NewDecoder(f)
for err = nil; err == nil; {
if err = dec.Decode(&log); err != nil && err != io.EOF {
return "", err
}
if log.Level == "error" {
errMsg = strings.TrimSpace(log.Msg)
}
}
return errMsg, nil
}
func (p *initProcess) runcError(rErr error, msg string) error {
if rErr == nil {
return nil
}
rMsg, err := getLastRuncError(p.runc)
switch {
case err != nil:
return errors.Wrapf(err, "%s: %s (%s)", msg, "unable to retrieve runc error", err.Error())
case rMsg == "":
return errors.Wrap(err, msg)
default:
return errors.Errorf("%s: %s", msg, rMsg)
}
}
// criuError returns only the first line of the error message from criu
// it tries to add an invalid dump log location when returning the message
func criuError(err error) string {

View File

@ -5,11 +5,14 @@ package linux
import (
"context"
"google.golang.org/grpc"
"github.com/containerd/containerd/api/services/shim"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/plugin"
protobuf "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
type Task struct {
@ -40,13 +43,16 @@ func (c *Task) Info() plugin.TaskInfo {
func (c *Task) Start(ctx context.Context) error {
_, err := c.shim.Start(ctx, &shim.StartRequest{})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
func (c *Task) State(ctx context.Context) (plugin.State, error) {
response, err := c.shim.State(ctx, &shim.StateRequest{})
if err != nil {
return plugin.State{}, err
return plugin.State{}, errors.New(grpc.ErrorDesc(err))
}
var status plugin.Status
switch response.Status {
@ -72,11 +78,17 @@ func (c *Task) State(ctx context.Context) (plugin.State, error) {
func (c *Task) Pause(ctx context.Context) error {
_, err := c.shim.Pause(ctx, &shim.PauseRequest{})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
func (c *Task) Resume(ctx context.Context) error {
_, err := c.shim.Resume(ctx, &shim.ResumeRequest{})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
@ -86,6 +98,9 @@ func (c *Task) Kill(ctx context.Context, signal uint32, pid uint32, all bool) er
Pid: pid,
All: all,
})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
@ -102,7 +117,7 @@ func (c *Task) Exec(ctx context.Context, opts plugin.ExecOpts) (plugin.Process,
}
resp, err := c.shim.Exec(ctx, request)
if err != nil {
return nil, err
return nil, errors.New(grpc.ErrorDesc(err))
}
return &Process{
@ -115,9 +130,8 @@ func (c *Task) Processes(ctx context.Context) ([]uint32, error) {
resp, err := c.shim.Processes(ctx, &shim.ProcessesRequest{
ID: c.containerID,
})
if err != nil {
return nil, err
return nil, errors.New(grpc.ErrorDesc(err))
}
pids := make([]uint32, 0, len(resp.Processes))
@ -135,6 +149,9 @@ func (c *Task) Pty(ctx context.Context, pid uint32, size plugin.ConsoleSize) err
Width: size.Width,
Height: size.Height,
})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
@ -142,6 +159,9 @@ func (c *Task) CloseStdin(ctx context.Context, pid uint32) error {
_, err := c.shim.CloseStdin(ctx, &shim.CloseStdinRequest{
Pid: pid,
})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
@ -155,6 +175,9 @@ func (c *Task) Checkpoint(ctx context.Context, opts plugin.CheckpointOpts) error
EmptyNamespaces: opts.EmptyNamespaces,
CheckpointPath: opts.Path,
})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}
@ -163,7 +186,7 @@ func (c *Task) DeleteProcess(ctx context.Context, pid uint32) (*plugin.Exit, err
Pid: pid,
})
if err != nil {
return nil, err
return nil, errors.New(grpc.ErrorDesc(err))
}
return &plugin.Exit{
Status: r.ExitStatus,
@ -181,6 +204,9 @@ func (p *Process) Kill(ctx context.Context, signal uint32, _ bool) error {
Signal: signal,
Pid: uint32(p.pid),
})
if err != nil {
err = errors.New(grpc.ErrorDesc(err))
}
return err
}

View File

@ -1,14 +1,12 @@
package log
import (
"context"
"io/ioutil"
"log"
"google.golang.org/grpc/grpclog"
)
func init() {
ctx := WithModule(context.Background(), "grpc")
// completely replace the grpc logger with the logrus logger.
grpclog.SetLogger(G(ctx))
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
}

View File

@ -23,6 +23,7 @@ import (
protobuf "github.com/gogo/protobuf/types"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
specs "github.com/opencontainers/image-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -129,7 +130,7 @@ func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.Create
}
c, err := runtime.Create(ctx, r.ContainerID, opts)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "runtime create failed")
}
state, err := c.State(ctx)
if err != nil {