diff --git a/.travis.yml b/.travis.yml index 9a230677a..8ec36b2a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,6 +28,7 @@ addons: env: - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v1 TRAVIS_CGO_ENABLED=1 + - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v2 TRAVIS_CGO_ENABLED=1 - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runtime.v1.linux TRAVIS_CGO_ENABLED=1 - TRAVIS_GOOS=darwin TRAVIS_CGO_ENABLED=0 diff --git a/Makefile b/Makefile index 8e6801331..4e6a0ba8a 100644 --- a/Makefile +++ b/Makefile @@ -189,6 +189,10 @@ bin/containerd-shim-runc-v1: cmd/containerd-shim-runc-v1 FORCE # set !cgo and om @echo "$(WHALE) bin/containerd-shim-runc-v1" @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v1 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v1 +bin/containerd-shim-runc-v2: cmd/containerd-shim-runc-v2 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220 + @echo "$(WHALE) bin/containerd-shim-runc-v2" + @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v2 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v2 + bin/containerd-shim-runhcs-v1: cmd/containerd-shim-runhcs-v1 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220 @echo "$(WHALE) bin/containerd-shim-runhcs-v1${BINARY_SUFFIX}" @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runhcs-v1${BINARY_SUFFIX} ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runhcs-v1 diff --git a/Makefile.linux b/Makefile.linux index d9dda903b..7b2700897 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -16,7 +16,7 @@ #linux specific settings WHALE="+" ONI="-" -COMMANDS += containerd-shim containerd-shim-runc-v1 +COMMANDS += containerd-shim containerd-shim-runc-v1 containerd-shim-runc-v2 # check GOOS for cross compile builds ifeq ($(GOOS),linux) diff --git a/client.go b/client.go index 3613c99d3..63e8c193a 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "net/http" "runtime" "strconv" + "strings" "sync" "time" @@ -614,3 +615,20 @@ func (c *Client) Version(ctx context.Context) (Version, error) { Revision: response.Revision, }, nil } + +// CheckRuntime returns true if the current runtime matches the expected +// runtime. Providing various parts of the runtime schema will match those +// parts of the expected runtime +func CheckRuntime(current, expected string) bool { + cp := strings.Split(current, ".") + l := len(cp) + for i, p := range strings.Split(expected, ".") { + if i > l { + return false + } + if p != cp[i] { + return false + } + } + return true +} diff --git a/client_test.go b/client_test.go index 83081a6c9..f50020fa4 100644 --- a/client_test.go +++ b/client_test.go @@ -124,6 +124,7 @@ func TestMain(m *testing.M) { log.G(ctx).WithFields(logrus.Fields{ "version": version.Version, "revision": version.Revision, + "runtime": os.Getenv("TEST_RUNTIME"), }).Info("running tests against containerd") // pull a seed image diff --git a/cmd/containerd-shim-runc-v1/main.go b/cmd/containerd-shim-runc-v1/main.go index d339886bc..b8a8df7b0 100644 --- a/cmd/containerd-shim-runc-v1/main.go +++ b/cmd/containerd-shim-runc-v1/main.go @@ -19,10 +19,10 @@ package main import ( - "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/v1" "github.com/containerd/containerd/runtime/v2/shim" ) func main() { - shim.Run("io.containerd.runc.v1", runc.New) + shim.Run("io.containerd.runc.v1", v1.New) } diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go new file mode 100644 index 000000000..2ebe18ce2 --- /dev/null +++ b/cmd/containerd-shim-runc-v2/main.go @@ -0,0 +1,28 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "github.com/containerd/containerd/runtime/v2/runc/v2" + "github.com/containerd/containerd/runtime/v2/shim" +) + +func main() { + shim.Run("io.containerd.runc.v2", v2.New) +} diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index 6d21ce2d9..f70d6fe11 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" metrics "github.com/docker/go-metrics" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -146,7 +147,7 @@ func main() { cli.StringFlag{ Name: "runtime", Usage: "set the runtime to stress test", - Value: "io.containerd.runtime.v1.linux", + Value: plugin.RuntimeLinuxV1, }, } app.Before = func(context *cli.Context) error { diff --git a/cmd/ctr/commands/run/run_unix.go b/cmd/ctr/commands/run/run_unix.go index 52a3205ab..8ecf3d03a 100644 --- a/cmd/ctr/commands/run/run_unix.go +++ b/cmd/ctr/commands/run/run_unix.go @@ -147,6 +147,7 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli cOpts = append(cOpts, containerd.WithRuntime(context.String("runtime"), nil)) + opts = append(opts, oci.WithAnnotations(commands.LabelArgs(context.StringSlice("label")))) var s specs.Spec spec = containerd.WithSpec(&s, opts...) diff --git a/cmd/ctr/commands/tasks/checkpoint.go b/cmd/ctr/commands/tasks/checkpoint.go index 94309e38c..e6d1b73bf 100644 --- a/cmd/ctr/commands/tasks/checkpoint.go +++ b/cmd/ctr/commands/tasks/checkpoint.go @@ -21,6 +21,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/cmd/ctr/commands" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/pkg/errors" @@ -86,7 +87,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa workPath := context.String("work-path") switch rt { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: if r.Options == nil { r.Options = &options.CheckpointOptions{} } @@ -101,7 +102,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa if workPath != "" { opts.WorkPath = workPath } - case "io.containerd.runtime.v1.linux": + case plugin.RuntimeLinuxV1: if r.Options == nil { r.Options = &runctypes.CheckpointOptions{} } diff --git a/container_checkpoint_test.go b/container_checkpoint_test.go index 2e5203a5a..be66d0e15 100644 --- a/container_checkpoint_test.go +++ b/container_checkpoint_test.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/cio" "github.com/containerd/containerd/oci" + "github.com/containerd/containerd/plugin" ) const ( @@ -47,7 +48,7 @@ func TestCheckpointRestorePTY(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -173,7 +174,7 @@ func TestCheckpointRestore(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -263,7 +264,7 @@ func TestCheckpointRestoreNewContainer(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -353,7 +354,7 @@ func TestCheckpointLeaveRunning(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } diff --git a/container_linux_test.go b/container_linux_test.go index 350fc64b7..3627073fa 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -39,6 +39,7 @@ import ( "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/oci" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -132,7 +133,7 @@ func TestShimInCgroup(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == "io.containerd.runc.v1" { + if CheckRuntime(client.runtime, "io.containerd.runc") { t.Skip() } @@ -450,7 +451,7 @@ func writeToFile(t *testing.T, filePath, message string) { func getLogDirPath(runtimeVersion, id string) string { switch runtimeVersion { case "v1": - return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id) + return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) default: @@ -460,7 +461,7 @@ func getLogDirPath(runtimeVersion, id string) string { func getRuntimeVersion() string { switch rt := os.Getenv("TEST_RUNTIME"); rt { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: return "v2" default: return "v1" @@ -1188,7 +1189,7 @@ func TestContainerRuntimeOptionsv1(t *testing.T) { ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)), - WithRuntime("io.containerd.runtime.v1.linux", &runctypes.RuncOptions{Runtime: "no-runc"}), + WithRuntime(plugin.RuntimeLinuxV1, &runctypes.RuncOptions{Runtime: "no-runc"}), ) if err != nil { t.Fatal(err) @@ -1231,7 +1232,7 @@ func TestContainerRuntimeOptionsv2(t *testing.T) { ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)), - WithRuntime("io.containerd.runc.v1", &options.Options{BinaryName: "no-runc"}), + WithRuntime(plugin.RuntimeRuncV1, &options.Options{BinaryName: "no-runc"}), ) if err != nil { t.Fatal(err) @@ -1384,7 +1385,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) { defer container.Delete(ctx, WithSnapshotCleanup) var copts interface{} - if client.runtime == "io.containerd.runc.v1" { + if CheckRuntime(client.runtime, "io.containerd.runc") { copts = &options.Options{ IoUid: 1000, IoGid: 1000, diff --git a/daemon_config_linux_test.go b/daemon_config_linux_test.go index 7fca228e1..e0801bab2 100644 --- a/daemon_config_linux_test.go +++ b/daemon_config_linux_test.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/oci" "github.com/containerd/containerd/pkg/testutil" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/v2/runc/options" srvconfig "github.com/containerd/containerd/services/server/config" ) @@ -140,7 +141,7 @@ func TestDaemonRuntimeRoot(t *testing.T) { } id := t.Name() - container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime("io.containerd.runc.v1", &options.Options{ + container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime(plugin.RuntimeRuncV1, &options.Options{ Root: runtimeRoot, })) if err != nil { diff --git a/oci/spec_opts.go b/oci/spec_opts.go index b9949f837..dcffc0787 100644 --- a/oci/spec_opts.go +++ b/oci/spec_opts.go @@ -1063,3 +1063,17 @@ func WithMemoryLimit(limit uint64) SpecOpts { return nil } } + +// WithAnnotations appends or replaces the annotations on the spec with the +// provided annotations +func WithAnnotations(annotations map[string]string) SpecOpts { + return func(_ context.Context, _ Client, _ *containers.Container, s *Spec) error { + if s.Annotations == nil { + s.Annotations = make(map[string]string) + } + for k, v := range annotations { + s.Annotations[k] = v + } + return nil + } +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 4d2d486d0..5e69145ef 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -75,6 +75,15 @@ const ( GCPlugin Type = "io.containerd.gc.v1" ) +const ( + // RuntimeLinuxV1 is the legacy linux runtime + RuntimeLinuxV1 = "io.containerd.runtime.v1.linux" + // RuntimeRuncV1 is the runc runtime that supports a single container + RuntimeRuncV1 = "io.containerd.runc.v1" + // RuntimeRuncV2 is the runc runtime that supports multiple containers per shim + RuntimeRuncV2 = "io.containerd.runc.v2" +) + // Registration contains information for registering a plugin type Registration struct { // Type of the plugin diff --git a/runtime/v2/runc/container.go b/runtime/v2/runc/container.go new file mode 100644 index 000000000..0606381bc --- /dev/null +++ b/runtime/v2/runc/container.go @@ -0,0 +1,415 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runc + +import ( + "context" + "io/ioutil" + "path/filepath" + "sync" + + "github.com/containerd/cgroups" + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + rproc "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/typeurl" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// NewContainer returns a new runc container +func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, errors.Wrap(err, "create namespace") + } + + var opts options.Options + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return nil, err + } + opts = *v.(*options.Options) + } + + var mounts []proc.Mount + for _, m := range r.Rootfs { + mounts = append(mounts, proc.Mount{ + Type: m.Type, + Source: m.Source, + Target: m.Target, + Options: m.Options, + }) + } + config := &proc.CreateConfig{ + ID: r.ID, + Bundle: r.Bundle, + Runtime: opts.BinaryName, + Rootfs: mounts, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Checkpoint: r.Checkpoint, + ParentCheckpoint: r.ParentCheckpoint, + Options: r.Options, + } + + if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil { + return nil, err + } + rootfs := filepath.Join(r.Bundle, "rootfs") + defer func() { + if err != nil { + if err2 := mount.UnmountAll(rootfs, 0); err2 != nil { + logrus.WithError(err2).Warn("failed to cleanup rootfs mount") + } + } + }() + for _, rm := range mounts { + m := &mount.Mount{ + Type: rm.Type, + Source: rm.Source, + Options: rm.Options, + } + if err := m.Mount(rootfs); err != nil { + return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) + } + } + + process, err := newInit( + ctx, + r.Bundle, + filepath.Join(r.Bundle, "work"), + ns, + platform, + config, + &opts, + ) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + if err := process.Create(ctx, config); err != nil { + return nil, errdefs.ToGRPC(err) + } + container := &Container{ + ID: r.ID, + Bundle: r.Bundle, + process: process, + processes: make(map[string]rproc.Process), + } + pid := process.Pid() + if pid > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) + if err != nil { + logrus.WithError(err).Errorf("loading cgroup for %d", pid) + } + container.cgroup = cg + } + return container, nil +} + +// ReadRuntime reads the runtime information from the path +func ReadRuntime(path string) (string, error) { + data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) + if err != nil { + return "", err + } + return string(data), nil +} + +// WriteRuntime writes the runtime information into the path +func WriteRuntime(path, runtime string) error { + return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) +} + +func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, + r *proc.CreateConfig, options *options.Options) (*proc.Init, error) { + rootfs := filepath.Join(path, "rootfs") + runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) + p := proc.New(r.ID, runtime, rproc.Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }) + p.Bundle = r.Bundle + p.Platform = platform + p.Rootfs = rootfs + p.WorkDir = workDir + p.IoUID = int(options.IoUid) + p.IoGID = int(options.IoGid) + p.NoPivotRoot = options.NoPivotRoot + p.NoNewKeyring = options.NoNewKeyring + p.CriuWorkPath = options.CriuWorkPath + if p.CriuWorkPath == "" { + // if criu work path not set, use container WorkDir + p.CriuWorkPath = p.WorkDir + } + return p, nil +} + +// Container for operating on a runc container and its processes +type Container struct { + mu sync.Mutex + + // ID of the container + ID string + // Bundle path + Bundle string + + cgroup cgroups.Cgroup + process rproc.Process + processes map[string]rproc.Process +} + +// All processes in the container +func (c *Container) All() (o []rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, p := range c.processes { + o = append(o, p) + } + if c.process != nil { + o = append(o, c.process) + } + return o +} + +// ExecdProcesses added to the container +func (c *Container) ExecdProcesses() (o []rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + for _, p := range c.processes { + o = append(o, p) + } + return o +} + +// Pid of the main process of a container +func (c *Container) Pid() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.process.Pid() +} + +// Cgroup of the container +func (c *Container) Cgroup() cgroups.Cgroup { + c.mu.Lock() + defer c.mu.Unlock() + return c.cgroup +} + +// CgroupSet sets the cgroup to the container +func (c *Container) CgroupSet(cg cgroups.Cgroup) { + c.mu.Lock() + c.cgroup = cg + c.mu.Unlock() +} + +// Process returns the process by id +func (c *Container) Process(id string) (rproc.Process, error) { + c.mu.Lock() + defer c.mu.Unlock() + if id == "" { + if c.process == nil { + return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") + } + return c.process, nil + } + p, ok := c.processes[id] + if !ok { + return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", id) + } + return p, nil +} + +// ProcessExists returns true if the process by id exists +func (c *Container) ProcessExists(id string) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.processes[id] + return ok +} + +// ProcessAdd adds a new process to the container +func (c *Container) ProcessAdd(process rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + c.processes[process.ID()] = process +} + +// ProcessRemove removes the process by id from the container +func (c *Container) ProcessRemove(id string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.processes, id) +} + +// Start a container process +func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) { + p, err := c.Process(r.ExecID) + if err != nil { + return nil, err + } + if err := p.Start(ctx); err != nil { + return nil, err + } + if c.Cgroup() == nil && p.Pid() > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) + if err != nil { + logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) + } + c.cgroup = cg + } + return p, nil +} + +// Delete the container or a process by id +func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) { + p, err := c.Process(r.ExecID) + if err != nil { + return nil, err + } + if err := p.Delete(ctx); err != nil { + return nil, err + } + if r.ExecID != "" { + c.ProcessRemove(r.ExecID) + } + return p, nil +} + +// Exec an additional process +func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) { + process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{ + ID: r.ExecID, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Spec: r.Spec, + }) + if err != nil { + return nil, err + } + c.ProcessAdd(process) + return process, nil +} + +// Pause the container +func (c *Container) Pause(ctx context.Context) error { + return c.process.(*proc.Init).Pause(ctx) +} + +// Resume the container +func (c *Container) Resume(ctx context.Context) error { + return c.process.(*proc.Init).Resume(ctx) +} + +// ResizePty of a process +func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error { + p, err := c.Process(r.ExecID) + if err != nil { + return err + } + ws := console.WinSize{ + Width: uint16(r.Width), + Height: uint16(r.Height), + } + return p.Resize(ws) +} + +// Kill a process +func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error { + p, err := c.Process(r.ExecID) + if err != nil { + return err + } + return p.Kill(ctx, r.Signal, r.All) +} + +// CloseIO of a process +func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error { + p, err := c.Process(r.ExecID) + if err != nil { + return err + } + if stdin := p.Stdin(); stdin != nil { + if err := stdin.Close(); err != nil { + return errors.Wrap(err, "close stdin") + } + } + return nil +} + +// Checkpoint the container +func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error { + p, err := c.Process("") + if err != nil { + return err + } + var opts options.CheckpointOptions + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return err + } + opts = *v.(*options.CheckpointOptions) + } + return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ + Path: r.Path, + Exit: opts.Exit, + AllowOpenTCP: opts.OpenTcp, + AllowExternalUnixSockets: opts.ExternalUnixSockets, + AllowTerminal: opts.Terminal, + FileLocks: opts.FileLocks, + EmptyNamespaces: opts.EmptyNamespaces, + WorkDir: opts.WorkPath, + }) +} + +// Update the resource information of a running container +func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error { + p, err := c.Process("") + if err != nil { + return err + } + return p.(*proc.Init).Update(ctx, r.Resources) +} + +// HasPid returns true if the container owns a specific pid +func (c *Container) HasPid(pid int) bool { + if c.Pid() == pid { + return true + } + for _, p := range c.All() { + if p.Pid() == pid { + return true + } + } + return false +} diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go index 6aea9b8c1..5425655ff 100644 --- a/runtime/v2/runc/epoll.go +++ b/runtime/v2/runc/epoll.go @@ -30,19 +30,22 @@ import ( "golang.org/x/sys/unix" ) -func newOOMEpoller(publisher events.Publisher) (*epoller, error) { +// NewOOMEpoller returns an epoll implementation that listens to OOM events +// from a container's cgroups. +func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, err } - return &epoller{ + return &Epoller{ fd: fd, publisher: publisher, set: make(map[uintptr]*item), }, nil } -type epoller struct { +// Epoller implementation for handling OOM events from a container's cgroup +type Epoller struct { mu sync.Mutex fd int @@ -55,11 +58,13 @@ type item struct { cg cgroups.Cgroup } -func (e *epoller) Close() error { +// Close the epoll fd +func (e *Epoller) Close() error { return unix.Close(e.fd) } -func (e *epoller) run(ctx context.Context) { +// Run the epoll loop +func (e *Epoller) Run(ctx context.Context) { var events [128]unix.EpollEvent for { select { @@ -81,7 +86,8 @@ func (e *epoller) run(ctx context.Context) { } } -func (e *epoller) add(id string, cg cgroups.Cgroup) error { +// Add the cgroup to the epoll monitor +func (e *Epoller) Add(id string, cg cgroups.Cgroup) error { e.mu.Lock() defer e.mu.Unlock() fd, err := cg.OOMEventFD() @@ -99,7 +105,7 @@ func (e *epoller) add(id string, cg cgroups.Cgroup) error { return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event) } -func (e *epoller) process(ctx context.Context, fd uintptr) { +func (e *Epoller) process(ctx context.Context, fd uintptr) { flush(fd) e.mu.Lock() i, ok := e.set[fd] diff --git a/runtime/v2/runc/service_linux.go b/runtime/v2/runc/platform.go similarity index 83% rename from runtime/v2/runc/service_linux.go rename to runtime/v2/runc/platform.go index 195c23014..d38aa5469 100644 --- a/runtime/v2/runc/service_linux.go +++ b/runtime/v2/runc/platform.go @@ -1,3 +1,5 @@ +// +build linux + /* Copyright The containerd Authors. @@ -23,10 +25,30 @@ import ( "syscall" "github.com/containerd/console" + rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" "github.com/pkg/errors" ) +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + +// NewPlatform returns a linux platform for use with I/O operations +func NewPlatform() (rproc.Platform, error) { + epoller, err := console.NewEpoller() + if err != nil { + return nil, errors.Wrap(err, "failed to initialize epoller") + } + go epoller.Wait() + return &linuxPlatform{ + epoller: epoller, + }, nil +} + type linuxPlatform struct { epoller *console.Epoller } @@ -69,9 +91,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console cwg.Add(1) go func() { cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - io.CopyBuffer(outw, epollConsole, *p) + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + io.CopyBuffer(outw, epollConsole, *buf) epollConsole.Close() outr.Close() outw.Close() @@ -94,20 +116,3 @@ func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Consol func (p *linuxPlatform) Close() error { return p.epoller.Close() } - -// initialize a single epoll fd to manage our consoles. `initPlatform` should -// only be called once. -func (s *service) initPlatform() error { - if s.platform != nil { - return nil - } - epoller, err := console.NewEpoller() - if err != nil { - return errors.Wrap(err, "failed to initialize epoller") - } - s.platform = &linuxPlatform{ - epoller: epoller, - } - go epoller.Wait() - return nil -} diff --git a/runtime/v2/runc/util.go b/runtime/v2/runc/util.go new file mode 100644 index 000000000..51ca04864 --- /dev/null +++ b/runtime/v2/runc/util.go @@ -0,0 +1,55 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runc + +import ( + "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/runtime" + "github.com/sirupsen/logrus" +) + +// GetTopic converts an event from an interface type to the specific +// event topic id +func GetTopic(e interface{}) string { + switch e.(type) { + case *events.TaskCreate: + return runtime.TaskCreateEventTopic + case *events.TaskStart: + return runtime.TaskStartEventTopic + case *events.TaskOOM: + return runtime.TaskOOMEventTopic + case *events.TaskExit: + return runtime.TaskExitEventTopic + case *events.TaskDelete: + return runtime.TaskDeleteEventTopic + case *events.TaskExecAdded: + return runtime.TaskExecAddedEventTopic + case *events.TaskExecStarted: + return runtime.TaskExecStartedEventTopic + case *events.TaskPaused: + return runtime.TaskPausedEventTopic + case *events.TaskResumed: + return runtime.TaskResumedEventTopic + case *events.TaskCheckpointed: + return runtime.TaskCheckpointedEventTopic + default: + logrus.Warnf("no topic for type %#v", e) + } + return runtime.TaskUnknownTopic +} diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/v1/service.go similarity index 59% rename from runtime/v2/runc/service.go rename to runtime/v2/runc/v1/service.go index 83c490689..c69f417a2 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/v1/service.go @@ -16,7 +16,7 @@ limitations under the License. */ -package runc +package v1 import ( "context" @@ -30,7 +30,6 @@ import ( "time" "github.com/containerd/cgroups" - "github.com/containerd/console" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" @@ -38,9 +37,9 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/runtime" rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" @@ -54,33 +53,25 @@ import ( ) var ( - empty = &ptypes.Empty{} - bufPool = sync.Pool{ - New: func() interface{} { - buffer := make([]byte, 32<<10) - return &buffer - }, - } + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} ) -var _ = (taskAPI.TaskService)(&service{}) - // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { - ep, err := newOOMEpoller(publisher) + ep, err := runc.NewOOMEpoller(publisher) if err != nil { return nil, err } ctx, cancel := context.WithCancel(ctx) - go ep.run(ctx) + go ep.Run(ctx) s := &service{ - id: id, - context: ctx, - processes: make(map[string]rproc.Process), - events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), - ep: ep, - cancel: cancel, + id: id, + context: ctx, + events: make(chan interface{}, 128), + ec: shim.Default.Subscribe(), + ep: ep, + cancel: cancel, } go s.processExits() runcC.Monitor = shim.Default @@ -97,17 +88,15 @@ type service struct { mu sync.Mutex eventSendMu sync.Mutex - context context.Context - task rproc.Process - processes map[string]rproc.Process - events chan interface{} - platform rproc.Platform - ec chan runcC.Exit - ep *epoller + context context.Context + events chan interface{} + platform rproc.Platform + ec chan runcC.Exit + ep *runc.Epoller + + id string + container *runc.Container - id string - bundle string - cg cgroups.Cgroup cancel func() } @@ -192,7 +181,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - runtime, err := s.readRuntime(path) + runtime, err := runc.ReadRuntime(path) if err != nil { return nil, err } @@ -211,107 +200,17 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) }, nil } -func (s *service) readRuntime(path string) (string, error) { - data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) - if err != nil { - return "", err - } - return string(data), nil -} - -func (s *service) writeRuntime(path, runtime string) error { - return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) -} - // Create a new initial process and container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { s.mu.Lock() defer s.mu.Unlock() - ns, err := namespaces.NamespaceRequired(ctx) + container, err := runc.NewContainer(ctx, s.platform, r) if err != nil { - return nil, errors.Wrap(err, "create namespace") - } - - var opts options.Options - if r.Options != nil { - v, err := typeurl.UnmarshalAny(r.Options) - if err != nil { - return nil, err - } - opts = *v.(*options.Options) - } - - var mounts []proc.Mount - for _, m := range r.Rootfs { - mounts = append(mounts, proc.Mount{ - Type: m.Type, - Source: m.Source, - Target: m.Target, - Options: m.Options, - }) - } - config := &proc.CreateConfig{ - ID: r.ID, - Bundle: r.Bundle, - Runtime: opts.BinaryName, - Rootfs: mounts, - Terminal: r.Terminal, - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Checkpoint: r.Checkpoint, - ParentCheckpoint: r.ParentCheckpoint, - Options: r.Options, - } - if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil { return nil, err } - rootfs := filepath.Join(r.Bundle, "rootfs") - defer func() { - if err != nil { - if err2 := mount.UnmountAll(rootfs, 0); err2 != nil { - logrus.WithError(err2).Warn("failed to cleanup rootfs mount") - } - } - }() - for _, rm := range mounts { - m := &mount.Mount{ - Type: rm.Type, - Source: rm.Source, - Options: rm.Options, - } - if err := m.Mount(rootfs); err != nil { - return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) - } - } - process, err := newInit( - ctx, - r.Bundle, - filepath.Join(r.Bundle, "work"), - ns, - s.platform, - config, - &opts, - ) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - if err := process.Create(ctx, config); err != nil { - return nil, errdefs.ToGRPC(err) - } - // save the main task id and bundle to the shim for additional requests - s.id = r.ID - s.bundle = r.Bundle - pid := process.Pid() - if pid > 0 { - cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) - if err != nil { - logrus.WithError(err).Errorf("loading cgroup for %d", pid) - } - s.cg = cg - } - s.task = process + + s.container = container s.send(&eventstypes.TaskCreate{ ContainerID: r.ID, @@ -324,46 +223,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * Terminal: r.Terminal, }, Checkpoint: r.Checkpoint, - Pid: uint32(pid), + Pid: uint32(container.Pid()), }) return &taskAPI.CreateTaskResponse{ - Pid: uint32(pid), + Pid: uint32(container.Pid()), }, nil - } // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } // hold the send lock so that the start events are sent before any exit events in the error case s.eventSendMu.Lock() - if err := p.Start(ctx); err != nil { + p, err := container.Start(ctx, r) + if err != nil { s.eventSendMu.Unlock() - return nil, err + return nil, errdefs.ToGRPC(err) } - - // case for restore - if s.getCgroup() == nil && p.Pid() > 0 { - cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) - if err != nil { - logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) - } - s.setCgroup(cg) + if err := s.ep.Add(container.ID, container.Cgroup()); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") } - if r.ExecID != "" { - s.send(&eventstypes.TaskExecStarted{ - ContainerID: s.id, - ExecID: r.ExecID, + switch r.ExecID { + case "": + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, Pid: uint32(p.Pid()), }) - } else { - s.send(&eventstypes.TaskStart{ - ContainerID: s.id, + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, Pid: uint32(p.Pid()), }) } @@ -375,28 +269,21 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) } - if err := p.Delete(ctx); err != nil { - return nil, err - } - isTask := r.ExecID == "" - if !isTask { - s.mu.Lock() - delete(s.processes, r.ExecID) - s.mu.Unlock() - } - if isTask { + // if we deleted our init task, close the platform and send the task delete event + if r.ExecID == "" { if s.platform != nil { s.platform.Close() } s.send(&eventstypes.TaskDelete{ - ContainerID: s.id, + ContainerID: container.ID, Pid: uint32(p.Pid()), ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), @@ -411,33 +298,20 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.processes[r.ExecID] - s.mu.Unlock() - if p != nil { + container, err := s.getContainer() + if err != nil { + return nil, err + } + if container.ProcessExists(r.ExecID) { return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) } - p = s.task - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{ - ID: r.ExecID, - Terminal: r.Terminal, - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Spec: r.Spec, - }) + process, err := container.Exec(ctx, r) if err != nil { return nil, errdefs.ToGRPC(err) } - s.mu.Lock() - s.processes[r.ExecID] = process - s.mu.Unlock() s.send(&eventstypes.TaskExecAdded{ - ContainerID: s.id, + ContainerID: s.container.ID, ExecID: process.ID(), }) return empty, nil @@ -445,15 +319,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - ws := console.WinSize{ - Width: uint16(r.Width), - Height: uint16(r.Height), - } - if err := p.Resize(ws); err != nil { + if err := container.ResizePty(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -485,7 +355,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. sio := p.Stdio() return &taskAPI.StateResponse{ ID: p.ID(), - Bundle: s.bundle, + Bundle: s.container.Bundle, Pid: uint32(p.Pid()), Status: status, Stdin: sio.Stdin, @@ -499,48 +369,41 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Pause(ctx); err != nil { + container, err := s.getContainer() + if err != nil { return nil, err } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } s.send(&eventstypes.TaskPaused{ - p.ID(), + container.ID, }) return empty, nil } // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Resume(ctx); err != nil { + container, err := s.getContainer() + if err != nil { return nil, err } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } s.send(&eventstypes.TaskResumed{ - p.ID(), + container.ID, }) return empty, nil } // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.Kill(ctx, r.Signal, r.All); err != nil { + if err := container.Kill(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -548,6 +411,10 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp // Pids returns all pids inside the container func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } pids, err := s.getContainerPids(ctx, r.ID) if err != nil { return nil, errdefs.ToGRPC(err) @@ -557,7 +424,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi pInfo := task.ProcessInfo{ Pid: pid, } - for _, p := range s.processes { + for _, p := range container.ExecdProcesses() { if p.Pid() == int(pid) { d := &options.ProcessDetails{ ExecID: p.ID(), @@ -579,54 +446,63 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if stdin := p.Stdin(); stdin != nil { - if err := stdin.Close(); err != nil { - return nil, errors.Wrap(err, "close stdin") - } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err } return empty, nil } // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + container, err := s.getContainer() + if err != nil { + return nil, err } - var opts options.CheckpointOptions - if r.Options != nil { - v, err := typeurl.UnmarshalAny(r.Options) - if err != nil { - return nil, err - } - opts = *v.(*options.CheckpointOptions) - } - if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ - Path: r.Path, - Exit: opts.Exit, - AllowOpenTCP: opts.OpenTcp, - AllowExternalUnixSockets: opts.ExternalUnixSockets, - AllowTerminal: opts.Terminal, - FileLocks: opts.FileLocks, - EmptyNamespaces: opts.EmptyNamespaces, - WorkDir: opts.WorkPath, - }); err != nil { + if err := container.Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil } +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { var pid int - if s.task != nil { - pid = s.task.Pid() + if s.container != nil { + pid = s.container.Pid() } return &taskAPI.ConnectResponse{ ShimPid: uint32(os.Getpid()), @@ -641,7 +517,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - cg := s.getCgroup() + cg := s.container.Cgroup() if cg == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") } @@ -658,37 +534,6 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. }, nil } -// Update a running container -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Wait for a process to exit -func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - p, err := s.getProcess(r.ExecID) - if err != nil { - return nil, err - } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - p.Wait() - - return &taskAPI.WaitResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }, nil -} - func (s *service) processExits() { for e := range s.ec { s.checkProcesses(e) @@ -706,12 +551,17 @@ func (s *service) sendL(evt interface{}) { } func (s *service) checkProcesses(e runcC.Exit) { - shouldKillAll, err := shouldKillAllOnExit(s.bundle) + container, err := s.getContainer() + if err != nil { + return + } + + shouldKillAll, err := shouldKillAllOnExit(container.Bundle) if err != nil { log.G(s.context).WithError(err).Error("failed to check shouldKillAll") } - for _, p := range s.allProcesses() { + for _, p := range container.All() { if p.Pid() == e.Pid { if shouldKillAll { if ip, ok := p.(*proc.Init); ok { @@ -724,7 +574,7 @@ func (s *service) checkProcesses(e runcC.Exit) { } p.SetExited(e.Status) s.sendL(&eventstypes.TaskExit{ - ContainerID: s.id, + ContainerID: container.ID, ID: p.ID(), Pid: uint32(e.Pid), ExitStatus: uint32(e.Status), @@ -754,26 +604,10 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) { return true, nil } -func (s *service) allProcesses() (o []rproc.Process) { - s.mu.Lock() - defer s.mu.Unlock() - - o = make([]rproc.Process, 0, len(s.processes)+1) - for _, p := range s.processes { - o = append(o, p) - } - if s.task != nil { - o = append(o, s.task) - } - return o -} - func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := s.container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) } ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) if err != nil { @@ -789,7 +623,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *service) forward(publisher events.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) - err := publisher.Publish(ctx, getTopic(e), e) + err := publisher.Publish(ctx, runc.GetTopic(e), e) cancel() if err != nil { logrus.WithError(err).Error("post event") @@ -797,84 +631,38 @@ func (s *service) forward(publisher events.Publisher) { } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getContainer() (*runc.Container, error) { s.mu.Lock() - defer s.mu.Unlock() - if execID == "" { - return s.task, nil - } - p := s.processes[execID] - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID) - } - return p, nil -} - -func (s *service) getCgroup() cgroups.Cgroup { - s.mu.Lock() - defer s.mu.Unlock() - return s.cg -} - -func (s *service) setCgroup(cg cgroups.Cgroup) { - s.mu.Lock() - s.cg = cg + container := s.container s.mu.Unlock() - if err := s.ep.add(s.id, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") } + return container, nil } -func getTopic(e interface{}) string { - switch e.(type) { - case *eventstypes.TaskCreate: - return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: - return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: - return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: - return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: - return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: - return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: - return runtime.TaskExecStartedEventTopic - case *eventstypes.TaskPaused: - return runtime.TaskPausedEventTopic - case *eventstypes.TaskResumed: - return runtime.TaskResumedEventTopic - case *eventstypes.TaskCheckpointed: - return runtime.TaskCheckpointedEventTopic - default: - logrus.Warnf("no topic for type %#v", e) +func (s *service) getProcess(execID string) (rproc.Process, error) { + container, err := s.getContainer() + if err != nil { + return nil, err } - return runtime.TaskUnknownTopic -} - -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options) (*proc.Init, error) { - rootfs := filepath.Join(path, "rootfs") - runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) - p := proc.New(r.ID, runtime, rproc.Stdio{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }) - p.Bundle = r.Bundle - p.Platform = platform - p.Rootfs = rootfs - p.WorkDir = workDir - p.IoUID = int(options.IoUid) - p.IoGID = int(options.IoGid) - p.NoPivotRoot = options.NoPivotRoot - p.NoNewKeyring = options.NoNewKeyring - p.CriuWorkPath = options.CriuWorkPath - if p.CriuWorkPath == "" { - // if criu work path not set, use container WorkDir - p.CriuWorkPath = p.WorkDir + p, err := container.Process(execID) + if err != nil { + return nil, errdefs.ToGRPC(err) } - return p, nil } + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + return nil +} diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go new file mode 100644 index 000000000..7d71e9dfc --- /dev/null +++ b/runtime/v2/runc/v2/service.go @@ -0,0 +1,726 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v2 + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/containerd/cgroups" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + rproc "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + runcC "github.com/containerd/go-runc" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" +) + +var ( + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} +) + +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + +// New returns a new shim service that can be used via GRPC +func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { + ep, err := runc.NewOOMEpoller(publisher) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(ctx) + go ep.Run(ctx) + s := &service{ + id: id, + context: ctx, + events: make(chan interface{}, 128), + ec: shim.Default.Subscribe(), + ep: ep, + cancel: cancel, + containers: make(map[string]*runc.Container), + } + go s.processExits() + runcC.Monitor = shim.Default + if err := s.initPlatform(); err != nil { + cancel() + return nil, errors.Wrap(err, "failed to initialized platform behavior") + } + go s.forward(publisher) + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + mu sync.Mutex + eventSendMu sync.Mutex + + context context.Context + events chan interface{} + platform rproc.Platform + ec chan runcC.Exit + ep *runc.Epoller + + // id only used in cleanup case + id string + + containers map[string]*runc.Container + + cancel func() +} + +func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-id", id, + "-address", containerdAddress, + "-publish-binary", containerdBinary, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { + cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress) + if err != nil { + return "", err + } + grouping := id + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, grouping) + if err != nil { + return "", err + } + socket, err := shim.NewSocket(address) + if err != nil { + if strings.Contains(err.Error(), "address already in use") { + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + return address, nil + } + return "", err + } + defer socket.Close() + f, err := socket.File() + if err != nil { + return "", err + } + defer f.Close() + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + if err := cmd.Start(); err != nil { + return "", err + } + defer func() { + if err != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + if err := shim.SetScore(cmd.Process.Pid); err != nil { + return "", errors.Wrap(err, "failed to set OOM Score on shim") + } + return address, nil +} + +func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + path := filepath.Join(filepath.Dir(cwd), s.id) + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + runtime, err := runc.ReadRuntime(path) + if err != nil { + return nil, err + } + r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false) + if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ + Force: true, + }); err != nil { + logrus.WithError(err).Warn("failed to remove runc container") + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + logrus.WithError(err).Warn("failed to cleanup rootfs mount") + } + return &taskAPI.DeleteResponse{ + ExitedAt: time.Now(), + ExitStatus: 128 + uint32(unix.SIGKILL), + }, nil +} + +// Create a new initial process and container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + container, err := runc.NewContainer(ctx, s.platform, r) + if err != nil { + return nil, err + } + + s.containers[r.ID] = container + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(container.Pid()), + }) + + return &taskAPI.CreateTaskResponse{ + Pid: uint32(container.Pid()), + }, nil +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + p, err := container.Start(ctx, r) + if err != nil { + s.eventSendMu.Unlock() + return nil, errdefs.ToGRPC(err) + } + if err := s.ep.Add(container.ID, container.Cgroup()); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + switch r.ExecID { + case "": + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + }) + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + // if we deleted our init task, close the platform and send the task delete event + if r.ExecID == "" { + s.mu.Lock() + delete(s.containers, r.ID) + hasContainers := len(s.containers) > 0 + s.mu.Unlock() + if s.platform != nil && !hasContainers { + s.platform.Close() + } + s.send(&eventstypes.TaskDelete{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if container.ProcessExists(r.ExecID) { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + process, err := container.Exec(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: container.ID, + ExecID: process.ID(), + }) + return empty, nil +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.ResizePty(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, err + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + case "paused": + status = task.StatusPaused + case "pausing": + status = task.StatusPausing + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: container.Bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskPaused{ + container.ID, + }) + return empty, nil +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskResumed{ + container.ID, + }) + return empty, nil +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Kill(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range container.ExecdProcesses() { + if p.Pid() == int(pid) { + d := &options.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err + } + return empty, nil +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Checkpoint(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + s.mu.Unlock() + return empty, nil + } + s.cancel() + os.Exit(0) + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cg := container.Cgroup() + if cg == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") + } + stats, err := cg.Stat(cgroups.IgnoreNotExist) + if err != nil { + return nil, err + } + data, err := typeurl.MarshalAny(stats) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + +func (s *service) checkProcesses(e runcC.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, container := range s.containers { + if container.HasPid(e.Pid) { + shouldKillAll, err := shouldKillAllOnExit(container.Bundle) + if err != nil { + log.G(s.context).WithError(err).Error("failed to check shouldKillAll") + } + + for _, p := range container.All() { + if p.Pid() == e.Pid { + if shouldKillAll { + if ip, ok := p.(*proc.Init); ok { + // Ensure all children are killed + if err := ip.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + } + p.SetExited(e.Status) + s.sendL(&eventstypes.TaskExit{ + ContainerID: container.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + }) + return + } + } + return + } + } +} + +func shouldKillAllOnExit(bundlePath string) (bool, error) { + var bundleSpec specs.Spec + bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json")) + if err != nil { + return false, err + } + json.Unmarshal(bundleConfigContents, &bundleSpec) + + if bundleSpec.Linux != nil { + for _, ns := range bundleSpec.Linux.Namespaces { + if ns.Type == specs.PIDNamespace { + return false, nil + } + } + } + + return true, nil +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(publisher events.Publisher) { + for e := range s.events { + ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + err := publisher.Publish(ctx, runc.GetTopic(e), e) + cancel() + if err != nil { + logrus.WithError(err).Error("post event") + } + } +} + +func (s *service) getContainer(id string) (*runc.Container, error) { + s.mu.Lock() + container := s.containers[id] + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") + } + return container, nil +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + return nil +} diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 94814e754..65022390e 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -99,7 +99,9 @@ type shim struct { } func (s *shim) Connect(ctx context.Context) error { - response, err := s.task.Connect(ctx, &task.ConnectRequest{}) + response, err := s.task.Connect(ctx, &task.ConnectRequest{ + ID: s.ID(), + }) if err != nil { return err } @@ -317,6 +319,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { request := &task.CheckpointTaskRequest{ + ID: s.ID(), Path: path, Options: options, } diff --git a/runtime/v2/shim/util.go b/runtime/v2/shim/util.go index a4e300f86..2e34444a8 100644 --- a/runtime/v2/shim/util.go +++ b/runtime/v2/shim/util.go @@ -19,6 +19,7 @@ package shim import ( "context" "fmt" + "io/ioutil" "net" "os" "os/exec" @@ -150,3 +151,22 @@ func WriteAddress(path, address string) error { } return os.Rename(tempPath, path) } + +// ErrNoAddress is returned when the address file has no content +var ErrNoAddress = errors.New("no shim address") + +// ReadAddress returns the shim's abstract socket address from the path +func ReadAddress(path string) (string, error) { + path, err := filepath.Abs(path) + if err != nil { + return "", err + } + data, err := ioutil.ReadFile(path) + if err != nil { + return "", err + } + if len(data) == 0 { + return "", ErrNoAddress + } + return string(data), nil +} diff --git a/services/tasks/local.go b/services/tasks/local.go index ee7dc1bc9..1dca4daa0 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "time" api "github.com/containerd/containerd/api/services/tasks/v1" @@ -687,8 +688,8 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) { } var checkpointPath string - switch runtime { - case "io.containerd.runc.v1": + switch { + case checkRuntime(runtime, "io.containerd.runc"): v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -699,7 +700,7 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) { } checkpointPath = opts.ImagePath - case "io.containerd.runtime.v1.linux": + case runtime == plugin.RuntimeLinuxV1: v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -721,8 +722,8 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { } var restorePath string - switch runtime { - case "io.containerd.runc.v1": + switch { + case checkRuntime(runtime, "io.containerd.runc"): v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -732,8 +733,7 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { return "", fmt.Errorf("invalid task create option for %s", runtime) } restorePath = opts.CriuImagePath - - case "io.containerd.runtime.v1.linux": + case runtime == plugin.RuntimeLinuxV1: v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -747,3 +747,20 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { return restorePath, nil } + +// checkRuntime returns true if the current runtime matches the expected +// runtime. Providing various parts of the runtime schema will match those +// parts of the expected runtime +func checkRuntime(current, expected string) bool { + cp := strings.Split(current, ".") + l := len(cp) + for i, p := range strings.Split(expected, ".") { + if i > l { + return false + } + if p != cp[i] { + return false + } + } + return true +} diff --git a/task.go b/task.go index 9deb66c7e..8d281dc10 100644 --- a/task.go +++ b/task.go @@ -642,12 +642,12 @@ func isCheckpointPathExist(runtime string, v interface{}) bool { } switch runtime { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: if opts, ok := v.(*options.CheckpointOptions); ok && opts.ImagePath != "" { return true } - case "io.containerd.runtime.v1.linux": + case plugin.RuntimeLinuxV1: if opts, ok := v.(*runctypes.CheckpointOptions); ok && opts.ImagePath != "" { return true } diff --git a/task_opts.go b/task_opts.go index 714a152c5..4619e2c0e 100644 --- a/task_opts.go +++ b/task_opts.go @@ -34,11 +34,6 @@ import ( "github.com/pkg/errors" ) -const ( - v1runtime = "io.containerd.runtime.v1.linux" - v2runtime = "io.containerd.runc.v1" -) - // NewTaskOpts allows the caller to set options on a new task type NewTaskOpts func(context.Context, *Client, *TaskInfo) error @@ -99,23 +94,22 @@ func WithCheckpointName(name string) CheckpointTaskOpts { // WithCheckpointImagePath sets image path for checkpoint option func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts { return func(r *CheckpointTaskInfo) error { - switch rt { - case v1runtime: - if r.Options == nil { - r.Options = &runctypes.CheckpointOptions{} - } - opts, ok := r.Options.(*runctypes.CheckpointOptions) - if !ok { - return errors.New("invalid v1 checkpoint options format") - } - opts.ImagePath = path - case v2runtime: + if CheckRuntime(rt, "io.containerd.runc") { if r.Options == nil { r.Options = &options.CheckpointOptions{} } opts, ok := r.Options.(*options.CheckpointOptions) if !ok { - return errors.New("invalid v2 checkpoint options format") + return errors.New("invalid v2 shim checkpoint options format") + } + opts.ImagePath = path + } else { + if r.Options == nil { + r.Options = &runctypes.CheckpointOptions{} + } + opts, ok := r.Options.(*runctypes.CheckpointOptions) + if !ok { + return errors.New("invalid v1 shim checkpoint options format") } opts.ImagePath = path } @@ -126,23 +120,22 @@ func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts { // WithRestoreImagePath sets image path for create option func WithRestoreImagePath(rt, path string) NewTaskOpts { return func(ctx context.Context, c *Client, ti *TaskInfo) error { - switch rt { - case v1runtime: - if ti.Options == nil { - ti.Options = &runctypes.CreateOptions{} - } - opts, ok := ti.Options.(*runctypes.CreateOptions) - if !ok { - return errors.New("invalid v1 create options format") - } - opts.CriuImagePath = path - case v2runtime: + if CheckRuntime(rt, "io.containerd.runc") { if ti.Options == nil { ti.Options = &options.Options{} } opts, ok := ti.Options.(*options.Options) if !ok { - return errors.New("invalid v2 create options format") + return errors.New("invalid v2 shim create options format") + } + opts.CriuImagePath = path + } else { + if ti.Options == nil { + ti.Options = &runctypes.CreateOptions{} + } + opts, ok := ti.Options.(*runctypes.CreateOptions) + if !ok { + return errors.New("invalid v1 shim create options format") } opts.CriuImagePath = path }