diff --git a/Makefile b/Makefile index bf7ee965d..aef96906b 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,7 @@ checkprotos: protos ## check if protobufs needs to be generated again # Depends on binaries because vet will silently fail if it can't load compiled # imports -vet: binaries ## run go vet +vet: ## run go vet @echo "$(WHALE) $@" @test -z "$$(go vet ${PACKAGES} 2>&1 | grep -v 'constant [0-9]* not a string in call to Errorf' | grep -v 'unrecognized printf verb 'r'' | egrep -v '(timestamp_test.go|duration_test.go|fetch.go|exit status 1)' | tee /dev/stderr)" diff --git a/client_test.go b/client_test.go index 84a33d932..2b6b89fd5 100644 --- a/client_test.go +++ b/client_test.go @@ -66,6 +66,7 @@ func TestMain(m *testing.M) { err := ctrd.start("containerd", address, []string{ "--root", defaultRoot, + "--state", defaultState, "--log-level", "debug", }, buf, buf) if err != nil { diff --git a/client_unix_test.go b/client_unix_test.go index 2eb0a657b..27fe7fab1 100644 --- a/client_unix_test.go +++ b/client_unix_test.go @@ -8,6 +8,7 @@ import ( const ( defaultRoot = "/var/lib/containerd-test" + defaultState = "/run/containerd-test" defaultAddress = "/run/containerd-test/containerd.sock" ) diff --git a/client_windows_test.go b/client_windows_test.go index e1073ca60..115d71497 100644 --- a/client_windows_test.go +++ b/client_windows_test.go @@ -17,7 +17,8 @@ const ( var ( dockerLayerFolders []string - defaultRoot = filepath.Join(os.Getenv("programfiles"), "containerd", "root-test") + defaultRoot = filepath.Join(os.Getenv("programfiles"), "containerd", "root-test") + defaultState = filepath.Join(os.Getenv("programfiles"), "containerd", "state-test") ) func platformTestSetup(client *Client) error { diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index eac15d617..a392cc2cf 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -9,6 +9,8 @@ import ( "os" "runtime" "strings" + "sync" + "syscall" "time" "github.com/containerd/containerd" @@ -20,6 +22,7 @@ import ( "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/typeurl" "github.com/containerd/containerd/version" + google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -101,7 +104,12 @@ func main() { if err := serve(server, socket); err != nil { return err } - return handleSignals(signals, server) + logger := logrus.WithFields(logrus.Fields{ + "pid": os.Getpid(), + "path": path, + "namespace": context.GlobalString("namespace"), + }) + return handleSignals(logger, signals, server, sv) } if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err) @@ -136,27 +144,41 @@ func serve(server *grpc.Server, path string) error { return nil } -func handleSignals(signals chan os.Signal, server *grpc.Server) error { - for s := range signals { - logrus.WithField("signal", s).Debug("received signal") - switch s { - case unix.SIGCHLD: - if err := reaper.Reap(); err != nil { - logrus.WithError(err).Error("reap exit status") - } - case unix.SIGTERM, unix.SIGINT: - // TODO: should we forward signals to the processes if they are still running? - // i.e. machine reboot - server.Stop() +func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *grpc.Server, sv *shim.Service) error { + var ( + termOnce sync.Once + done = make(chan struct{}) + ) + + for { + select { + case <-done: return nil - case unix.SIGUSR1: - dumpStacks() + case s := <-signals: + switch s { + case unix.SIGCHLD: + if err := reaper.Reap(); err != nil { + logger.WithError(err).Error("reap exit status") + } + case unix.SIGTERM, unix.SIGINT: + go termOnce.Do(func() { + server.Stop() + // Ensure our child is dead if any + sv.Kill(context.Background(), &shimapi.KillRequest{ + Signal: uint32(syscall.SIGKILL), + All: true, + }) + sv.Delete(context.Background(), &google_protobuf.Empty{}) + close(done) + }) + case unix.SIGUSR1: + dumpStacks(logger) + } } } - return nil } -func dumpStacks() { +func dumpStacks(logger *logrus.Entry) { var ( buf []byte stackSize int @@ -168,7 +190,7 @@ func dumpStacks() { bufferLen *= 2 } buf = buf[:stackSize] - logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) + logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } func connectEvents(address string) (eventsapi.EventsClient, error) { diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index dc724732a..f919d19be 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -66,6 +66,10 @@ func main() { Name: "root", Usage: "containerd root directory", }, + cli.StringFlag{ + Name: "state", + Usage: "containerd state directory", + }, } app.Commands = []cli.Command{ configCommand, @@ -156,6 +160,10 @@ func applyFlags(context *cli.Context, config *server.Config) error { name: "root", d: &config.Root, }, + { + name: "state", + d: &config.State, + }, { name: "address", d: &config.GRPC.Address, diff --git a/container_linux_test.go b/container_linux_test.go index 06a01e72e..d6d7550fb 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "os/exec" "runtime" "strings" "sync" @@ -222,13 +223,14 @@ func TestDaemonRestart(t *testing.T) { return } - if err := ctrd.Restart(); err != nil { + var exitStatus ExitStatus + if err := ctrd.Restart(func() { + exitStatus = <-statusC + }); err != nil { t.Fatal(err) } - status := <-statusC - _, _, err = status.Result() - if err == nil { + if exitStatus.Error() == nil { t.Errorf(`first task.Wait() should have failed with "transport is closing"`) } @@ -712,3 +714,140 @@ func TestContainerKillAll(t *testing.T) { return } } + +func TestShimSigkilled(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + // redis unset its PDeathSignal making it a good candidate + image, err = client.Pull(ctx, "docker.io/library/redis:alpine", WithPullUnpack) + if err != nil { + t.Fatal(err) + } + container, err := client.NewContainer(ctx, id, WithNewSpec(WithImageConfig(image)), withNewSnapshot(id, image)) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + pid := task.Pid() + if pid <= 0 { + t.Fatalf("invalid task pid %d", pid) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + // SIGKILL the shim + if err := exec.Command("pkill", "-KILL", "containerd-s").Run(); err != nil { + t.Fatalf("failed to kill shim: %v", err) + } + + <-statusC + + if err := unix.Kill(int(pid), 0); err != unix.ESRCH { + t.Errorf("pid %d still exists", pid) + } +} + +func TestDaemonRestartWithRunningShim(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, WithNewSpec(WithImageConfig(image), WithProcessArgs("sleep", "100")), withNewSnapshot(id, image)) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Error(err) + } + + pid := task.Pid() + if pid <= 0 { + t.Fatalf("invalid task pid %d", pid) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + var exitStatus ExitStatus + if err := ctrd.Restart(func() { + exitStatus = <-statusC + }); err != nil { + t.Fatal(err) + } + + if exitStatus.Error() == nil { + t.Errorf(`first task.Wait() should have failed with "transport is closing"`) + } + + waitCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + c, err := ctrd.waitForStart(waitCtx) + cancel() + if err != nil { + t.Fatal(err) + } + c.Close() + + statusC, err = task.Wait(ctx) + if err != nil { + t.Error(err) + } + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Fatal(err) + } + + <-statusC + + if err := unix.Kill(int(pid), 0); err != unix.ESRCH { + t.Errorf("pid %d still exists", pid) + } +} diff --git a/daemon_test.go b/daemon_test.go index 3bc9f6955..fe3a33932 100644 --- a/daemon_test.go +++ b/daemon_test.go @@ -6,7 +6,6 @@ import ( "os/exec" "sync" "syscall" - "time" "github.com/pkg/errors" ) @@ -82,10 +81,12 @@ func (d *daemon) Wait() error { if d.cmd == nil { return errors.New("daemon is not running") } - return d.cmd.Wait() + err := d.cmd.Wait() + d.cmd = nil + return err } -func (d *daemon) Restart() error { +func (d *daemon) Restart(stopCb func()) error { d.Lock() defer d.Unlock() if d.cmd == nil { @@ -99,7 +100,9 @@ func (d *daemon) Restart() error { d.cmd.Wait() - <-time.After(1 * time.Second) + if stopCb != nil { + stopCb() + } cmd := exec.Command(d.cmd.Path, d.cmd.Args[1:]...) cmd.Stdout = d.cmd.Stdout diff --git a/events/exchange.go b/events/exchange.go index eeb4b1fcb..93f6dfa91 100644 --- a/events/exchange.go +++ b/events/exchange.go @@ -185,11 +185,11 @@ func validateTopic(topic string) error { } if topic[0] != '/' { - return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic) + return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'") } if len(topic) == 1 { - return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic) + return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component") } components := strings.Split(topic[1:], "/") diff --git a/linux/bundle.go b/linux/bundle.go index b03dc8733..f05da157c 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -28,7 +28,7 @@ func loadBundle(path, workdir, namespace, id string, events *events.Exchange) *b } // newBundle creates a new bundle on disk at the provided path for the given id -func newBundle(path, namespace, workDir, id string, spec []byte, events *events.Exchange) (b *bundle, err error) { +func newBundle(namespace, id, path, workDir string, spec []byte, events *events.Exchange) (b *bundle, err error) { if err := os.MkdirAll(path, 0711); err != nil { return nil, err } @@ -78,8 +78,8 @@ type bundle struct { } // NewShim connects to the shim managing the bundle and tasks -func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts) (*client.Client, error) { - opt := client.WithStart(binary, grpcAddress, debug) +func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts, exitHandler func()) (*client.Client, error) { + opt := client.WithStart(binary, grpcAddress, debug, exitHandler) if !remote { opt = client.WithLocal(b.events) } diff --git a/linux/runtime.go b/linux/runtime.go index e8a52c960..a2e09d527 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -8,8 +8,10 @@ import ( "io/ioutil" "os" "path/filepath" + "time" "github.com/boltdb/bolt" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" @@ -20,10 +22,13 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/sys" runc "github.com/containerd/go-runc" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) @@ -101,6 +106,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + // TODO: need to add the tasks to the monitor for _, t := range tasks { if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil { return nil, err @@ -138,7 +144,11 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, errors.Wrapf(err, "invalid task id") } - bundle, err := newBundle(filepath.Join(r.state, namespace), namespace, filepath.Join(r.root, namespace), id, opts.Spec.Value, r.events) + bundle, err := newBundle( + namespace, id, + filepath.Join(r.state, namespace), + filepath.Join(r.root, namespace), + opts.Spec.Value, r.events) if err != nil { return nil, err } @@ -147,7 +157,36 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts bundle.Delete() } }() - s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote, r.shimDebug, opts) + s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote, r.shimDebug, opts, func() { + t, err := r.tasks.Get(ctx, id) + if err != nil { + // Task was never started or was already sucessfully deleted + return + } + lc := t.(*Task) + + // Stop the monitor + if err := r.monitor.Stop(lc); err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("failed to stop monitor") + } + + log.G(ctx).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("cleaning up after killed shim") + err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid, true) + if err == nil { + r.tasks.Delete(ctx, lc) + } else { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": namespace, + }).Warn("failed to clen up after killed shim") + } + }) if err != nil { return nil, err } @@ -176,15 +215,17 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts Options: m.Options, }) } - if _, err = s.Create(ctx, sopts); err != nil { + cr, err := s.Create(ctx, sopts) + if err != nil { return nil, errdefs.FromGRPC(err) } - t := newTask(id, namespace, s) + t := newTask(id, namespace, int(cr.Pid), s) if err := r.tasks.Add(ctx, t); err != nil { return nil, err } // after the task is created, add it to the monitor if err = r.monitor.Monitor(t); err != nil { + r.tasks.Delete(ctx, t) return nil, err } return t, nil @@ -206,10 +247,10 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err != nil { return nil, errdefs.FromGRPC(err) } + r.tasks.Delete(ctx, lc) if err := lc.shim.KillShim(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } - r.tasks.Delete(ctx, lc) bundle := loadBundle( filepath.Join(r.state, namespace, lc.id), @@ -268,18 +309,21 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } id := path.Name() - bundle := loadBundle(filepath.Join(r.state, ns, id), - filepath.Join(r.root, ns, id), ns, id, r.events) - + bundle := loadBundle( + filepath.Join(r.state, ns, id), + filepath.Join(r.root, ns, id), + ns, id, r.events) s, err := bundle.Connect(ctx, r.remote) if err != nil { - log.G(ctx).WithError(err).Error("connecting to shim") - if err := r.terminate(ctx, bundle, ns, id); err != nil { - log.G(ctx).WithError(err).WithField("bundle", bundle.path).Error("failed to terminate task, leaving bundle for debugging") - continue - } - if err := bundle.Delete(); err != nil { - log.G(ctx).WithError(err).Error("delete bundle") + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + }).Error("connecting to shim") + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, client.InitPidFile)) + err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid, false) + if err != nil { + log.G(ctx).WithError(err).WithField("bundle", bundle.path). + Error("cleaning up after dead shim") } continue } @@ -292,6 +336,45 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { return o, nil } +func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int, reap bool) error { + ctx = namespaces.WithNamespace(ctx, ns) + if err := r.terminate(ctx, bundle, ns, id); err != nil { + return errors.New("failed to terminate task, leaving bundle for debugging") + } + + if reap { + // if sub-reaper is set, reap our new child + if v, err := sys.GetSubreaper(); err == nil && v == 1 { + reaper.Default.Register(pid, &reaper.Cmd{ExitCh: make(chan struct{})}) + reaper.Default.WaitPid(pid) + reaper.Default.Delete(pid) + } + } + + // Notify Client + exitedAt := time.Now().UTC() + r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventsapi.TaskExit{ + ContainerID: id, + ID: id, + Pid: uint32(pid), + ExitStatus: 128 + uint32(unix.SIGKILL), + ExitedAt: exitedAt, + }) + + if err := bundle.Delete(); err != nil { + log.G(ctx).WithError(err).Error("delete bundle") + } + + r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ + ContainerID: id, + Pid: uint32(pid), + ExitStatus: 128 + uint32(unix.SIGKILL), + ExitedAt: exitedAt, + }) + + return nil +} + func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error { ctx = namespaces.WithNamespace(ctx, ns) rt, err := r.getRuntime(ctx, ns, id) @@ -304,7 +387,10 @@ func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) log.G(ctx).WithError(err).Warnf("delete runtime state %s", id) } if err := unix.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil { - log.G(ctx).WithError(err).Warnf("unmount task rootfs %s", id) + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "path": bundle.path, + "id": id, + }).Warnf("unmount task rootfs") } return nil } @@ -319,9 +405,7 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er return nil, err } return &runc.Runc{ - // TODO: until we have a way to store/retrieve the original command - // we can only rely on runc from the default $PATH - Command: runc.DefaultCommand, + Command: r.runtime, LogFormat: runc.JSON, PdeathSignal: unix.SIGKILL, Root: filepath.Join(client.RuncRoot, ns), diff --git a/linux/shim/client.go b/linux/shim/client.go index 2b5016fc5..17b1c4e05 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -10,6 +10,8 @@ import ( "os" "os/exec" "strings" + "sync" + "syscall" "time" "golang.org/x/sys/unix" @@ -28,7 +30,7 @@ import ( type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error) // WithStart executes a new shim process -func WithStart(binary, address string, debug bool) ClientOpt { +func WithStart(binary, address string, debug bool, exitHandler func()) ClientOpt { return func(ctx context.Context, config Config) (_ shim.ShimClient, _ io.Closer, err error) { socket, err := newSocket(config) if err != nil { @@ -47,9 +49,14 @@ func WithStart(binary, address string, debug bool) ClientOpt { } defer func() { if err != nil { - terminate(cmd) + cmd.Process.Kill() } }() + go func() { + reaper.Default.Wait(cmd) + reaper.Default.Delete(cmd.Process.Pid) + exitHandler() + }() log.G(ctx).WithFields(logrus.Fields{ "pid": cmd.Process.Pid, "address": config.Address, @@ -72,11 +79,6 @@ func WithStart(binary, address string, debug bool) ClientOpt { } } -func terminate(cmd *exec.Cmd) { - cmd.Process.Kill() - reaper.Default.Wait(cmd) -} - func newCommand(binary, address string, debug bool, config Config, socket *os.File) *exec.Cmd { args := []string{ "--namespace", config.Namespace, @@ -178,15 +180,20 @@ func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) { return &Client{ ShimClient: s, c: c, + exitCh: make(chan struct{}), }, nil } type Client struct { shim.ShimClient - c io.Closer + c io.Closer + exitCh chan struct{} + exitOnce sync.Once } +// IsAlive returns true if the shim can be contacted. +// NOTE: a negative answer doesn't mean that the process is gone. func (c *Client) IsAlive(ctx context.Context) (bool, error) { _, err := c.ShimInfo(ctx, empty) if err != nil { @@ -198,8 +205,24 @@ func (c *Client) IsAlive(ctx context.Context) (bool, error) { return true, nil } -// KillShim kills the shim forcefully +// StopShim signals the shim to exit and wait for the process to disappear +func (c *Client) StopShim(ctx context.Context) error { + return c.signalShim(ctx, unix.SIGTERM) +} + +// KillShim kills the shim forcefully and wait for the process to disappear func (c *Client) KillShim(ctx context.Context) error { + return c.signalShim(ctx, unix.SIGKILL) +} + +func (c *Client) Close() error { + if c.c == nil { + return nil + } + return c.c.Close() +} + +func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error { info, err := c.ShimInfo(ctx, empty) if err != nil { return err @@ -209,23 +232,29 @@ func (c *Client) KillShim(ctx context.Context) error { if os.Getpid() == pid { return nil } - if err := unix.Kill(pid, unix.SIGKILL); err != nil { + if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH { return err } - // wait for shim to die after being SIGKILL'd - for { - // use kill(pid, 0) here because the shim could have been reparented - // and we are no longer able to waitpid(pid, ...) on the shim - if err := unix.Kill(pid, 0); err != nil && err == unix.ESRCH { - return nil - } - time.Sleep(10 * time.Millisecond) + // wait for shim to die after being signaled + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.waitForExit(pid): + return nil } } -func (c *Client) Close() error { - if c.c == nil { - return nil - } - return c.c.Close() +func (c *Client) waitForExit(pid int) <-chan struct{} { + c.exitOnce.Do(func() { + for { + // use kill(pid, 0) here because the shim could have been reparented + // and we are no longer able to waitpid(pid, ...) on the shim + if err := unix.Kill(pid, 0); err == unix.ESRCH { + close(c.exitCh) + return + } + time.Sleep(10 * time.Millisecond) + } + }) + return c.exitCh } diff --git a/linux/shim/init.go b/linux/shim/init.go index 94f33718b..314375cd4 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -27,6 +27,8 @@ import ( "github.com/pkg/errors" ) +const InitPidFile = "init.pid" + type initProcess struct { sync.WaitGroup initState @@ -131,7 +133,7 @@ func newInitProcess(context context.Context, plat platform, path, namespace, wor return nil, errors.Wrap(err, "failed to create OCI runtime io pipes") } } - pidFile := filepath.Join(path, "init.pid") + pidFile := filepath.Join(path, InitPidFile) if r.Checkpoint != "" { opts := &runc.RestoreOpts{ CheckpointOpts: runc.CheckpointOpts{ diff --git a/linux/shim/service.go b/linux/shim/service.go index 41f1fdffa..a47e596e8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -22,6 +22,7 @@ import ( "github.com/containerd/containerd/runtime" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -35,6 +36,11 @@ func NewService(path, namespace, workDir string, publisher events.Publisher) (*S return nil, fmt.Errorf("shim namespace cannot be empty") } context := namespaces.WithNamespace(context.Background(), namespace) + context = log.WithLogger(context, logrus.WithFields(logrus.Fields{ + "namespace": namespace, + "pid": os.Getpid(), + "path": path, + })) s := &Service{ path: path, processes: make(map[string]process), @@ -58,7 +64,6 @@ type platform interface { } type Service struct { - initProcess *initProcess path string id string bundle string @@ -83,12 +88,11 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh // save the main task id and bundle to the shim for additional requests s.id = r.ID s.bundle = r.Bundle - s.initProcess = process pid := process.Pid() s.processes[r.ID] = process s.mu.Unlock() cmd := &reaper.Cmd{ - ExitCh: make(chan int, 1), + ExitCh: make(chan struct{}), } reaper.Default.Register(pid, cmd) s.events <- &eventsapi.TaskCreate{ @@ -111,8 +115,8 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh } func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) { - p, ok := s.processes[r.ID] - if !ok { + p := s.getProcess(r.ID) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID) } if err := p.Start(ctx); err != nil { @@ -121,12 +125,12 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. if r.ID == s.id { s.events <- &eventsapi.TaskStart{ ContainerID: s.id, - Pid: uint32(s.initProcess.Pid()), + Pid: uint32(p.Pid()), } } else { pid := p.Pid() cmd := &reaper.Cmd{ - ExitCh: make(chan int, 1), + ExitCh: make(chan struct{}), } reaper.Default.Register(pid, cmd) go s.waitExit(p, pid, cmd) @@ -143,16 +147,15 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. } func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimapi.DeleteResponse, error) { - if s.initProcess == nil { + p := s.getProcess(s.id) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - p := s.initProcess + if err := p.Delete(ctx); err != nil { return nil, err } - s.mu.Lock() - delete(s.processes, p.ID()) - s.mu.Unlock() + s.deleteProcess(p.ID()) s.events <- &eventsapi.TaskDelete{ ContainerID: s.id, ExitStatus: uint32(p.ExitStatus()), @@ -167,24 +170,17 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap } func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) { - if s.initProcess == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if r.ID == s.initProcess.id { + if r.ID == s.id { return nil, grpc.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess") } - s.mu.Lock() - p, ok := s.processes[r.ID] - s.mu.Unlock() - if !ok { - return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s not found", r.ID) + p := s.getProcess(r.ID) + if p == nil { + return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID) } if err := p.Delete(ctx); err != nil { return nil, err } - s.mu.Lock() - delete(s.processes, p.ID()) - s.mu.Unlock() + s.deleteProcess(r.ID) return &shimapi.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), @@ -193,13 +189,19 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq } func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*google_protobuf.Empty, error) { - if s.initProcess == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } s.mu.Lock() defer s.mu.Unlock() - process, err := newExecProcess(ctx, s.path, r, s.initProcess, r.ID) + if p := s.processes[r.ID]; p != nil { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID) + } + + p := s.processes[s.id] + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + + process, err := newExecProcess(ctx, s.path, r, p.(*initProcess), r.ID) if err != nil { return nil, errdefs.ToGRPC(err) } @@ -220,10 +222,8 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (* Width: uint16(r.Width), Height: uint16(r.Height), } - s.mu.Lock() - p, ok := s.processes[r.ID] - s.mu.Unlock() - if !ok { + p := s.getProcess(r.ID) + if p == nil { return nil, errors.Errorf("process does not exist %s", r.ID) } if err := p.Resize(ws); err != nil { @@ -233,8 +233,8 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (* } func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) { - p, ok := s.processes[r.ID] - if !ok { + p := s.getProcess(r.ID) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) } st, err := p.Status(ctx) @@ -270,10 +270,11 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. } func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { - if s.initProcess == nil { + p := s.getProcess(s.id) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := s.initProcess.Pause(ctx); err != nil { + if err := p.(*initProcess).Pause(ctx); err != nil { return nil, err } s.events <- &eventsapi.TaskPaused{ @@ -283,10 +284,11 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ } func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) { - if s.initProcess == nil { + p := s.getProcess(s.id) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := s.initProcess.Resume(ctx); err != nil { + if err := p.(*initProcess).Resume(ctx); err != nil { return nil, err } s.events <- &eventsapi.TaskResumed{ @@ -296,17 +298,19 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google } func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_protobuf.Empty, error) { - if s.initProcess == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } if r.ID == "" { - if err := s.initProcess.Kill(ctx, r.Signal, r.All); err != nil { + p := s.getProcess(s.id) + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + if err := p.Kill(ctx, r.Signal, r.All); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil } - p, ok := s.processes[r.ID] - if !ok { + + p := s.getProcess(r.ID) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) } if err := p.Kill(ctx, r.Signal, r.All); err != nil { @@ -326,8 +330,8 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh } func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*google_protobuf.Empty, error) { - p, ok := s.processes[r.ID] - if !ok { + p := s.getProcess(r.ID) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID) } if stdin := p.Stdin(); stdin != nil { @@ -339,10 +343,11 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*goog } func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*google_protobuf.Empty, error) { - if s.initProcess == nil { + p := s.getProcess(s.id) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := s.initProcess.Checkpoint(ctx, r); err != nil { + if err := p.(*initProcess).Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } s.events <- &eventsapi.TaskCheckpointed{ @@ -358,17 +363,37 @@ func (s *Service) ShimInfo(ctx context.Context, r *google_protobuf.Empty) (*shim } func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*google_protobuf.Empty, error) { - if s.initProcess == nil { + p := s.getProcess(s.id) + if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := s.initProcess.Update(ctx, r); err != nil { + if err := p.(*initProcess).Update(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil } +func (s *Service) addProcess(id string, p process) { + s.mu.Lock() + s.processes[id] = p + s.mu.Unlock() +} + +func (s *Service) getProcess(id string) process { + s.mu.Lock() + p := s.processes[id] + s.mu.Unlock() + return p +} + +func (s *Service) deleteProcess(id string) { + s.mu.Lock() + delete(s.processes, id) + s.mu.Unlock() +} + func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) { - status := <-cmd.ExitCh + status, _ := reaper.Default.WaitPid(pid) p.SetExited(status) reaper.Default.Delete(pid) @@ -382,12 +407,17 @@ func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) { } func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - p, err := s.initProcess.runtime.Ps(ctx, id) + p := s.getProcess(s.id) + if p == nil { + return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") + } + + ps, err := p.(*initProcess).runtime.Ps(ctx, id) if err != nil { return nil, err } - pids := make([]uint32, 0, len(p)) - for _, pid := range p { + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { pids = append(pids, uint32(pid)) } return pids, nil @@ -395,13 +425,13 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *Service) forward(publisher events.Publisher) { for e := range s.events { - if err := publisher.Publish(s.context, getTopic(e), e); err != nil { + if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { log.G(s.context).WithError(err).Error("post event") } } } -func getTopic(e interface{}) string { +func getTopic(ctx context.Context, e interface{}) string { switch e.(type) { case *eventsapi.TaskCreate: return runtime.TaskCreateEventTopic @@ -415,12 +445,16 @@ func getTopic(e interface{}) string { return runtime.TaskDeleteEventTopic case *eventsapi.TaskExecAdded: return runtime.TaskExecAddedEventTopic + case *eventsapi.TaskExecStarted: + return runtime.TaskExecStartedEventTopic case *eventsapi.TaskPaused: return runtime.TaskPausedEventTopic case *eventsapi.TaskResumed: return runtime.TaskResumedEventTopic case *eventsapi.TaskCheckpointed: return runtime.TaskCheckpointedEventTopic + default: + log.G(ctx).Warnf("no topic for type %#v", e) } - return "?" + return runtime.TaskUnknownTopic } diff --git a/linux/task.go b/linux/task.go index 2177262a1..3a5c48bbe 100644 --- a/linux/task.go +++ b/linux/task.go @@ -15,13 +15,15 @@ import ( type Task struct { id string + pid int shim *client.Client namespace string } -func newTask(id, namespace string, shim *client.Client) *Task { +func newTask(id, namespace string, pid int, shim *client.Client) *Task { return &Task{ id: id, + pid: pid, shim: shim, namespace: namespace, } diff --git a/reaper/reaper.go b/reaper/reaper.go index db99fb495..4c8a58c75 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -4,7 +4,6 @@ package reaper import ( "bytes" - "fmt" "os/exec" "sync" @@ -12,6 +11,10 @@ import ( "github.com/pkg/errors" ) +var ( + ErrNoSuchProcess = errors.New("no such process") +) + // Reap should be called when the process receives an SIGCHLD. Reap will reap // all exited processes and close their wait channels func Reap() error { @@ -30,7 +33,8 @@ func Reap() error { // pipes are closed and finalizers are run on the process c.c.Wait() } - c.ExitCh <- e.Status + c.exitStatus = e.Status + close(c.ExitCh) } return err } @@ -68,7 +72,7 @@ func (m *Monitor) CombinedOutput(c *exec.Cmd) ([]byte, error) { func (m *Monitor) Start(c *exec.Cmd) error { rc := &Cmd{ c: c, - ExitCh: make(chan int, 1), + ExitCh: make(chan struct{}), } // start the process m.Lock() @@ -105,7 +109,8 @@ func (m *Monitor) RegisterNL(pid int, c *Cmd) { if status, ok := m.unknown[pid]; ok { delete(m.unknown, pid) m.cmds[pid] = c - c.ExitCh <- status + c.exitStatus = status + close(c.ExitCh) return } m.cmds[pid] = c @@ -116,13 +121,13 @@ func (m *Monitor) WaitPid(pid int) (int, error) { rc, ok := m.cmds[pid] m.Unlock() if !ok { - return 255, fmt.Errorf("process does not exist") + return 255, errors.Wrapf(ErrNoSuchProcess, "pid %d", pid) } - ec := <-rc.ExitCh - if ec != 0 { - return ec, errors.Errorf("exit status %d", ec) + <-rc.ExitCh + if rc.exitStatus != 0 { + return rc.exitStatus, errors.Errorf("exit status %d", rc.exitStatus) } - return ec, nil + return rc.exitStatus, nil } // Command returns the registered pid for the command created @@ -139,6 +144,7 @@ func (m *Monitor) Delete(pid int) { } type Cmd struct { - c *exec.Cmd - ExitCh chan int + c *exec.Cmd + ExitCh chan struct{} + exitStatus int } diff --git a/runtime/events.go b/runtime/events.go index 2c7a54174..fd69c0312 100644 --- a/runtime/events.go +++ b/runtime/events.go @@ -7,7 +7,9 @@ const ( TaskExitEventTopic = "/tasks/exit" TaskDeleteEventTopic = "/tasks/delete" TaskExecAddedEventTopic = "/tasks/exec-added" + TaskExecStartedEventTopic = "/tasks/exec-started" TaskPausedEventTopic = "/tasks/paused" TaskResumedEventTopic = "/tasks/resumed" TaskCheckpointedEventTopic = "/tasks/checkpointed" + TaskUnknownTopic = "/tasks/?" ) diff --git a/runtime/task_list.go b/runtime/task_list.go index e5d5c0e53..12062cef5 100644 --- a/runtime/task_list.go +++ b/runtime/task_list.go @@ -2,10 +2,10 @@ package runtime import ( "context" - "errors" "sync" "github.com/containerd/containerd/namespaces" + "github.com/pkg/errors" ) var ( @@ -75,7 +75,7 @@ func (l *TaskList) AddWithNamespace(namespace string, t Task) error { l.tasks[namespace] = make(map[string]Task) } if _, ok := l.tasks[namespace][id]; ok { - return ErrTaskAlreadyExists + return errors.Wrap(ErrTaskAlreadyExists, id) } l.tasks[namespace][id] = t return nil