diff --git a/.travis.yml b/.travis.yml index 9a230677a..8ec36b2a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,6 +28,7 @@ addons: env: - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v1 TRAVIS_CGO_ENABLED=1 + - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v2 TRAVIS_CGO_ENABLED=1 - TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runtime.v1.linux TRAVIS_CGO_ENABLED=1 - TRAVIS_GOOS=darwin TRAVIS_CGO_ENABLED=0 diff --git a/Makefile b/Makefile index 8e6801331..4e6a0ba8a 100644 --- a/Makefile +++ b/Makefile @@ -189,6 +189,10 @@ bin/containerd-shim-runc-v1: cmd/containerd-shim-runc-v1 FORCE # set !cgo and om @echo "$(WHALE) bin/containerd-shim-runc-v1" @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v1 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v1 +bin/containerd-shim-runc-v2: cmd/containerd-shim-runc-v2 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220 + @echo "$(WHALE) bin/containerd-shim-runc-v2" + @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v2 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v2 + bin/containerd-shim-runhcs-v1: cmd/containerd-shim-runhcs-v1 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220 @echo "$(WHALE) bin/containerd-shim-runhcs-v1${BINARY_SUFFIX}" @CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runhcs-v1${BINARY_SUFFIX} ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runhcs-v1 diff --git a/Makefile.linux b/Makefile.linux index d9dda903b..7b2700897 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -16,7 +16,7 @@ #linux specific settings WHALE="+" ONI="-" -COMMANDS += containerd-shim containerd-shim-runc-v1 +COMMANDS += containerd-shim containerd-shim-runc-v1 containerd-shim-runc-v2 # check GOOS for cross compile builds ifeq ($(GOOS),linux) diff --git a/client.go b/client.go index 3613c99d3..63e8c193a 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "net/http" "runtime" "strconv" + "strings" "sync" "time" @@ -614,3 +615,20 @@ func (c *Client) Version(ctx context.Context) (Version, error) { Revision: response.Revision, }, nil } + +// CheckRuntime returns true if the current runtime matches the expected +// runtime. Providing various parts of the runtime schema will match those +// parts of the expected runtime +func CheckRuntime(current, expected string) bool { + cp := strings.Split(current, ".") + l := len(cp) + for i, p := range strings.Split(expected, ".") { + if i > l { + return false + } + if p != cp[i] { + return false + } + } + return true +} diff --git a/client_test.go b/client_test.go index 83081a6c9..f50020fa4 100644 --- a/client_test.go +++ b/client_test.go @@ -124,6 +124,7 @@ func TestMain(m *testing.M) { log.G(ctx).WithFields(logrus.Fields{ "version": version.Version, "revision": version.Revision, + "runtime": os.Getenv("TEST_RUNTIME"), }).Info("running tests against containerd") // pull a seed image diff --git a/cmd/containerd-shim-runc-v2/main.go b/cmd/containerd-shim-runc-v2/main.go new file mode 100644 index 000000000..2ebe18ce2 --- /dev/null +++ b/cmd/containerd-shim-runc-v2/main.go @@ -0,0 +1,28 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "github.com/containerd/containerd/runtime/v2/runc/v2" + "github.com/containerd/containerd/runtime/v2/shim" +) + +func main() { + shim.Run("io.containerd.runc.v2", v2.New) +} diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index 6d21ce2d9..f70d6fe11 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" metrics "github.com/docker/go-metrics" "github.com/sirupsen/logrus" "github.com/urfave/cli" @@ -146,7 +147,7 @@ func main() { cli.StringFlag{ Name: "runtime", Usage: "set the runtime to stress test", - Value: "io.containerd.runtime.v1.linux", + Value: plugin.RuntimeLinuxV1, }, } app.Before = func(context *cli.Context) error { diff --git a/cmd/ctr/commands/run/run_unix.go b/cmd/ctr/commands/run/run_unix.go index 52a3205ab..8ecf3d03a 100644 --- a/cmd/ctr/commands/run/run_unix.go +++ b/cmd/ctr/commands/run/run_unix.go @@ -147,6 +147,7 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli cOpts = append(cOpts, containerd.WithRuntime(context.String("runtime"), nil)) + opts = append(opts, oci.WithAnnotations(commands.LabelArgs(context.StringSlice("label")))) var s specs.Spec spec = containerd.WithSpec(&s, opts...) diff --git a/cmd/ctr/commands/tasks/checkpoint.go b/cmd/ctr/commands/tasks/checkpoint.go index 94309e38c..e6d1b73bf 100644 --- a/cmd/ctr/commands/tasks/checkpoint.go +++ b/cmd/ctr/commands/tasks/checkpoint.go @@ -21,6 +21,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/cmd/ctr/commands" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/pkg/errors" @@ -86,7 +87,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa workPath := context.String("work-path") switch rt { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: if r.Options == nil { r.Options = &options.CheckpointOptions{} } @@ -101,7 +102,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa if workPath != "" { opts.WorkPath = workPath } - case "io.containerd.runtime.v1.linux": + case plugin.RuntimeLinuxV1: if r.Options == nil { r.Options = &runctypes.CheckpointOptions{} } diff --git a/container_checkpoint_test.go b/container_checkpoint_test.go index 2e5203a5a..be66d0e15 100644 --- a/container_checkpoint_test.go +++ b/container_checkpoint_test.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/containerd/cio" "github.com/containerd/containerd/oci" + "github.com/containerd/containerd/plugin" ) const ( @@ -47,7 +48,7 @@ func TestCheckpointRestorePTY(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -173,7 +174,7 @@ func TestCheckpointRestore(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -263,7 +264,7 @@ func TestCheckpointRestoreNewContainer(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } @@ -353,7 +354,7 @@ func TestCheckpointLeaveRunning(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == v1runtime { + if client.runtime == plugin.RuntimeLinuxV1 { t.Skip() } diff --git a/container_linux_test.go b/container_linux_test.go index 350fc64b7..3627073fa 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -39,6 +39,7 @@ import ( "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/oci" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/v2/runc/options" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -132,7 +133,7 @@ func TestShimInCgroup(t *testing.T) { t.Fatal(err) } defer client.Close() - if client.runtime == "io.containerd.runc.v1" { + if CheckRuntime(client.runtime, "io.containerd.runc") { t.Skip() } @@ -450,7 +451,7 @@ func writeToFile(t *testing.T, filePath, message string) { func getLogDirPath(runtimeVersion, id string) string { switch runtimeVersion { case "v1": - return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id) + return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id) case "v2": return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) default: @@ -460,7 +461,7 @@ func getLogDirPath(runtimeVersion, id string) string { func getRuntimeVersion() string { switch rt := os.Getenv("TEST_RUNTIME"); rt { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: return "v2" default: return "v1" @@ -1188,7 +1189,7 @@ func TestContainerRuntimeOptionsv1(t *testing.T) { ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)), - WithRuntime("io.containerd.runtime.v1.linux", &runctypes.RuncOptions{Runtime: "no-runc"}), + WithRuntime(plugin.RuntimeLinuxV1, &runctypes.RuncOptions{Runtime: "no-runc"}), ) if err != nil { t.Fatal(err) @@ -1231,7 +1232,7 @@ func TestContainerRuntimeOptionsv2(t *testing.T) { ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)), - WithRuntime("io.containerd.runc.v1", &options.Options{BinaryName: "no-runc"}), + WithRuntime(plugin.RuntimeRuncV1, &options.Options{BinaryName: "no-runc"}), ) if err != nil { t.Fatal(err) @@ -1384,7 +1385,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) { defer container.Delete(ctx, WithSnapshotCleanup) var copts interface{} - if client.runtime == "io.containerd.runc.v1" { + if CheckRuntime(client.runtime, "io.containerd.runc") { copts = &options.Options{ IoUid: 1000, IoGid: 1000, diff --git a/daemon_config_linux_test.go b/daemon_config_linux_test.go index 7fca228e1..e0801bab2 100644 --- a/daemon_config_linux_test.go +++ b/daemon_config_linux_test.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/oci" "github.com/containerd/containerd/pkg/testutil" + "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime/v2/runc/options" srvconfig "github.com/containerd/containerd/services/server/config" ) @@ -140,7 +141,7 @@ func TestDaemonRuntimeRoot(t *testing.T) { } id := t.Name() - container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime("io.containerd.runc.v1", &options.Options{ + container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime(plugin.RuntimeRuncV1, &options.Options{ Root: runtimeRoot, })) if err != nil { diff --git a/oci/spec_opts.go b/oci/spec_opts.go index b9949f837..dcffc0787 100644 --- a/oci/spec_opts.go +++ b/oci/spec_opts.go @@ -1063,3 +1063,17 @@ func WithMemoryLimit(limit uint64) SpecOpts { return nil } } + +// WithAnnotations appends or replaces the annotations on the spec with the +// provided annotations +func WithAnnotations(annotations map[string]string) SpecOpts { + return func(_ context.Context, _ Client, _ *containers.Container, s *Spec) error { + if s.Annotations == nil { + s.Annotations = make(map[string]string) + } + for k, v := range annotations { + s.Annotations[k] = v + } + return nil + } +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 4d2d486d0..5e69145ef 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -75,6 +75,15 @@ const ( GCPlugin Type = "io.containerd.gc.v1" ) +const ( + // RuntimeLinuxV1 is the legacy linux runtime + RuntimeLinuxV1 = "io.containerd.runtime.v1.linux" + // RuntimeRuncV1 is the runc runtime that supports a single container + RuntimeRuncV1 = "io.containerd.runc.v1" + // RuntimeRuncV2 is the runc runtime that supports multiple containers per shim + RuntimeRuncV2 = "io.containerd.runc.v2" +) + // Registration contains information for registering a plugin type Registration struct { // Type of the plugin diff --git a/runtime/v2/runc/container.go b/runtime/v2/runc/container.go index 950fbb96f..0606381bc 100644 --- a/runtime/v2/runc/container.go +++ b/runtime/v2/runc/container.go @@ -38,6 +38,7 @@ import ( "github.com/sirupsen/logrus" ) +// NewContainer returns a new runc container func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { @@ -130,6 +131,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa return container, nil } +// ReadRuntime reads the runtime information from the path func ReadRuntime(path string) (string, error) { data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) if err != nil { @@ -138,6 +140,7 @@ func ReadRuntime(path string) (string, error) { return string(data), nil } +// WriteRuntime writes the runtime information into the path func WriteRuntime(path, runtime string) error { return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) } @@ -168,10 +171,13 @@ func newInit(ctx context.Context, path, workDir, namespace string, platform rpro return p, nil } +// Container for operating on a runc container and its processes type Container struct { mu sync.Mutex - ID string + // ID of the container + ID string + // Bundle path Bundle string cgroup cgroups.Cgroup @@ -179,6 +185,7 @@ type Container struct { processes map[string]rproc.Process } +// All processes in the container func (c *Container) All() (o []rproc.Process) { c.mu.Lock() defer c.mu.Unlock() @@ -192,6 +199,7 @@ func (c *Container) All() (o []rproc.Process) { return o } +// ExecdProcesses added to the container func (c *Container) ExecdProcesses() (o []rproc.Process) { c.mu.Lock() defer c.mu.Unlock() @@ -208,27 +216,38 @@ func (c *Container) Pid() int { return c.process.Pid() } +// Cgroup of the container func (c *Container) Cgroup() cgroups.Cgroup { c.mu.Lock() defer c.mu.Unlock() return c.cgroup } +// CgroupSet sets the cgroup to the container func (c *Container) CgroupSet(cg cgroups.Cgroup) { c.mu.Lock() c.cgroup = cg c.mu.Unlock() } -func (c *Container) Process(id string) rproc.Process { +// Process returns the process by id +func (c *Container) Process(id string) (rproc.Process, error) { c.mu.Lock() defer c.mu.Unlock() if id == "" { - return c.process + if c.process == nil { + return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") + } + return c.process, nil } - return c.processes[id] + p, ok := c.processes[id] + if !ok { + return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", id) + } + return p, nil } +// ProcessExists returns true if the process by id exists func (c *Container) ProcessExists(id string) bool { c.mu.Lock() defer c.mu.Unlock() @@ -236,22 +255,25 @@ func (c *Container) ProcessExists(id string) bool { return ok } +// ProcessAdd adds a new process to the container func (c *Container) ProcessAdd(process rproc.Process) { c.mu.Lock() defer c.mu.Unlock() c.processes[process.ID()] = process } +// ProcessRemove removes the process by id from the container func (c *Container) ProcessRemove(id string) { c.mu.Lock() defer c.mu.Unlock() delete(c.processes, id) } +// Start a container process func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) { - p := c.Process(r.ExecID) - if p == nil { - return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + p, err := c.Process(r.ExecID) + if err != nil { + return nil, err } if err := p.Start(ctx); err != nil { return nil, err @@ -266,10 +288,11 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Proc return p, nil } +// Delete the container or a process by id func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) { - p := c.Process(r.ExecID) - if p == nil { - return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + p, err := c.Process(r.ExecID) + if err != nil { + return nil, err } if err := p.Delete(ctx); err != nil { return nil, err @@ -280,6 +303,7 @@ func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Pr return p, nil } +// Exec an additional process func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) { process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{ ID: r.ExecID, @@ -296,18 +320,21 @@ func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc return process, nil } +// Pause the container func (c *Container) Pause(ctx context.Context) error { return c.process.(*proc.Init).Pause(ctx) } +// Resume the container func (c *Container) Resume(ctx context.Context) error { return c.process.(*proc.Init).Resume(ctx) } +// ResizePty of a process func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error { - p := c.Process(r.ExecID) - if p == nil { - return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + p, err := c.Process(r.ExecID) + if err != nil { + return err } ws := console.WinSize{ Width: uint16(r.Width), @@ -316,18 +343,20 @@ func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) err return p.Resize(ws) } +// Kill a process func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error { - p := c.Process(r.ExecID) - if p == nil { - return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + p, err := c.Process(r.ExecID) + if err != nil { + return err } return p.Kill(ctx, r.Signal, r.All) } +// CloseIO of a process func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error { - p := c.Process(r.ExecID) - if p == nil { - return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + p, err := c.Process(r.ExecID) + if err != nil { + return err } if stdin := p.Stdin(); stdin != nil { if err := stdin.Close(); err != nil { @@ -337,10 +366,11 @@ func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error { return nil } +// Checkpoint the container func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error { - p := c.Process("") - if p == nil { - return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := c.Process("") + if err != nil { + return err } var opts options.CheckpointOptions if r.Options != nil { @@ -362,10 +392,24 @@ func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskReques }) } +// Update the resource information of a running container func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error { - p := c.Process("") - if p == nil { - return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := c.Process("") + if err != nil { + return err } return p.(*proc.Init).Update(ctx, r.Resources) } + +// HasPid returns true if the container owns a specific pid +func (c *Container) HasPid(pid int) bool { + if c.Pid() == pid { + return true + } + for _, p := range c.All() { + if p.Pid() == pid { + return true + } + } + return false +} diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index fbe81033f..c69f417a2 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -210,7 +210,6 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * return nil, err } - s.id = r.ID s.container = container s.send(&eventstypes.TaskCreate{ @@ -251,14 +250,14 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. } switch r.ExecID { case "": - s.send(&eventstypes.TaskExecStarted{ + s.send(&eventstypes.TaskStart{ ContainerID: container.ID, - ExecID: r.ExecID, Pid: uint32(p.Pid()), }) default: - s.send(&eventstypes.TaskStart{ + s.send(&eventstypes.TaskExecStarted{ ContainerID: container.ID, + ExecID: r.ExecID, Pid: uint32(p.Pid()), }) } @@ -284,7 +283,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP s.platform.Close() } s.send(&eventstypes.TaskDelete{ - ContainerID: s.id, + ContainerID: container.ID, Pid: uint32(p.Pid()), ExitStatus: uint32(p.ExitStatus()), ExitedAt: p.ExitedAt(), @@ -487,9 +486,9 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa if err != nil { return nil, err } - p := container.Process(r.ExecID) - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) } p.Wait() @@ -552,14 +551,17 @@ func (s *service) sendL(evt interface{}) { } func (s *service) checkProcesses(e runcC.Exit) { - s.mu.Lock() - defer s.mu.Unlock() - shouldKillAll, err := shouldKillAllOnExit(s.container.Bundle) + container, err := s.getContainer() + if err != nil { + return + } + + shouldKillAll, err := shouldKillAllOnExit(container.Bundle) if err != nil { log.G(s.context).WithError(err).Error("failed to check shouldKillAll") } - for _, p := range s.container.All() { + for _, p := range container.All() { if p.Pid() == e.Pid { if shouldKillAll { if ip, ok := p.(*proc.Init); ok { @@ -572,7 +574,7 @@ func (s *service) checkProcesses(e runcC.Exit) { } p.SetExited(e.Status) s.sendL(&eventstypes.TaskExit{ - ContainerID: s.id, + ContainerID: container.ID, ID: p.ID(), Pid: uint32(e.Pid), ExitStatus: uint32(e.Status), @@ -603,9 +605,9 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) { } func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - p := s.container.Process("") - if p == nil { - return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := s.container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) } ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) if err != nil { @@ -644,9 +646,9 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { if err != nil { return nil, err } - p := container.Process(execID) - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID) + p, err := container.Process(execID) + if err != nil { + return nil, errdefs.ToGRPC(err) } return p, nil } diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go new file mode 100644 index 000000000..7d71e9dfc --- /dev/null +++ b/runtime/v2/runc/v2/service.go @@ -0,0 +1,726 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v2 + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/containerd/cgroups" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + rproc "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/shim" + taskAPI "github.com/containerd/containerd/runtime/v2/task" + runcC "github.com/containerd/go-runc" + "github.com/containerd/typeurl" + ptypes "github.com/gogo/protobuf/types" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" +) + +var ( + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} +) + +// group labels specifies how the shim groups services. +// currently supports a runc.v2 specific .group label and the +// standard k8s pod label. Order matters in this list +var groupLabels = []string{ + "io.containerd.runc.v2.group", + "io.kubernetes.cri.sandbox-id", +} + +type spec struct { + Annotations map[string]string `json:"annotations,omitempty"` +} + +// New returns a new shim service that can be used via GRPC +func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { + ep, err := runc.NewOOMEpoller(publisher) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(ctx) + go ep.Run(ctx) + s := &service{ + id: id, + context: ctx, + events: make(chan interface{}, 128), + ec: shim.Default.Subscribe(), + ep: ep, + cancel: cancel, + containers: make(map[string]*runc.Container), + } + go s.processExits() + runcC.Monitor = shim.Default + if err := s.initPlatform(); err != nil { + cancel() + return nil, errors.Wrap(err, "failed to initialized platform behavior") + } + go s.forward(publisher) + return s, nil +} + +// service is the shim implementation of a remote shim over GRPC +type service struct { + mu sync.Mutex + eventSendMu sync.Mutex + + context context.Context + events chan interface{} + platform rproc.Platform + ec chan runcC.Exit + ep *runc.Epoller + + // id only used in cleanup case + id string + + containers map[string]*runc.Container + + cancel func() +} + +func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + self, err := os.Executable() + if err != nil { + return nil, err + } + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + args := []string{ + "-namespace", ns, + "-id", id, + "-address", containerdAddress, + "-publish-binary", containerdBinary, + } + cmd := exec.Command(self, args...) + cmd.Dir = cwd + cmd.Env = append(os.Environ(), "GOMAXPROCS=4") + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + return cmd, nil +} + +func readSpec() (*spec, error) { + f, err := os.Open("config.json") + if err != nil { + return nil, err + } + defer f.Close() + var s spec + if err := json.NewDecoder(f).Decode(&s); err != nil { + return nil, err + } + return &s, nil +} + +func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { + cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress) + if err != nil { + return "", err + } + grouping := id + spec, err := readSpec() + if err != nil { + return "", err + } + for _, group := range groupLabels { + if groupID, ok := spec.Annotations[group]; ok { + grouping = groupID + break + } + } + address, err := shim.SocketAddress(ctx, grouping) + if err != nil { + return "", err + } + socket, err := shim.NewSocket(address) + if err != nil { + if strings.Contains(err.Error(), "address already in use") { + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + return address, nil + } + return "", err + } + defer socket.Close() + f, err := socket.File() + if err != nil { + return "", err + } + defer f.Close() + + cmd.ExtraFiles = append(cmd.ExtraFiles, f) + + if err := cmd.Start(); err != nil { + return "", err + } + defer func() { + if err != nil { + cmd.Process.Kill() + } + }() + // make sure to wait after start + go cmd.Wait() + if err := shim.WriteAddress("address", address); err != nil { + return "", err + } + if err := shim.SetScore(cmd.Process.Pid); err != nil { + return "", errors.Wrap(err, "failed to set OOM Score on shim") + } + return address, nil +} + +func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + path := filepath.Join(filepath.Dir(cwd), s.id) + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + runtime, err := runc.ReadRuntime(path) + if err != nil { + return nil, err + } + r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false) + if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ + Force: true, + }); err != nil { + logrus.WithError(err).Warn("failed to remove runc container") + } + if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil { + logrus.WithError(err).Warn("failed to cleanup rootfs mount") + } + return &taskAPI.DeleteResponse{ + ExitedAt: time.Now(), + ExitStatus: 128 + uint32(unix.SIGKILL), + }, nil +} + +// Create a new initial process and container with the underlying OCI runtime +func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + container, err := runc.NewContainer(ctx, s.platform, r) + if err != nil { + return nil, err + } + + s.containers[r.ID] = container + + s.send(&eventstypes.TaskCreate{ + ContainerID: r.ID, + Bundle: r.Bundle, + Rootfs: r.Rootfs, + IO: &eventstypes.TaskIO{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }, + Checkpoint: r.Checkpoint, + Pid: uint32(container.Pid()), + }) + + return &taskAPI.CreateTaskResponse{ + Pid: uint32(container.Pid()), + }, nil +} + +// Start a process +func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + + // hold the send lock so that the start events are sent before any exit events in the error case + s.eventSendMu.Lock() + p, err := container.Start(ctx, r) + if err != nil { + s.eventSendMu.Unlock() + return nil, errdefs.ToGRPC(err) + } + if err := s.ep.Add(container.ID, container.Cgroup()); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } + switch r.ExecID { + case "": + s.send(&eventstypes.TaskStart{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + }) + default: + s.send(&eventstypes.TaskExecStarted{ + ContainerID: container.ID, + ExecID: r.ExecID, + Pid: uint32(p.Pid()), + }) + } + s.eventSendMu.Unlock() + return &taskAPI.StartResponse{ + Pid: uint32(p.Pid()), + }, nil +} + +// Delete the initial process and container +func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + // if we deleted our init task, close the platform and send the task delete event + if r.ExecID == "" { + s.mu.Lock() + delete(s.containers, r.ID) + hasContainers := len(s.containers) > 0 + s.mu.Unlock() + if s.platform != nil && !hasContainers { + s.platform.Close() + } + s.send(&eventstypes.TaskDelete{ + ContainerID: container.ID, + Pid: uint32(p.Pid()), + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }) + } + return &taskAPI.DeleteResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + Pid: uint32(p.Pid()), + }, nil +} + +// Exec an additional process inside the container +func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if container.ProcessExists(r.ExecID) { + return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) + } + process, err := container.Exec(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + s.send(&eventstypes.TaskExecAdded{ + ContainerID: container.ID, + ExecID: process.ID(), + }) + return empty, nil +} + +// ResizePty of a process +func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.ResizePty(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// State returns runtime state information for a process +func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, err + } + st, err := p.Status(ctx) + if err != nil { + return nil, err + } + status := task.StatusUnknown + switch st { + case "created": + status = task.StatusCreated + case "running": + status = task.StatusRunning + case "stopped": + status = task.StatusStopped + case "paused": + status = task.StatusPaused + case "pausing": + status = task.StatusPausing + } + sio := p.Stdio() + return &taskAPI.StateResponse{ + ID: p.ID(), + Bundle: container.Bundle, + Pid: uint32(p.Pid()), + Status: status, + Stdin: sio.Stdin, + Stdout: sio.Stdout, + Stderr: sio.Stderr, + Terminal: sio.Terminal, + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Pause the container +func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskPaused{ + container.ID, + }) + return empty, nil +} + +// Resume the container +func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } + s.send(&eventstypes.TaskResumed{ + container.ID, + }) + return empty, nil +} + +// Kill a process with the provided signal +func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Kill(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Pids returns all pids inside the container +func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + pids, err := s.getContainerPids(ctx, r.ID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + var processes []*task.ProcessInfo + for _, pid := range pids { + pInfo := task.ProcessInfo{ + Pid: pid, + } + for _, p := range container.ExecdProcesses() { + if p.Pid() == int(pid) { + d := &options.ProcessDetails{ + ExecID: p.ID(), + } + a, err := typeurl.MarshalAny(d) + if err != nil { + return nil, errors.Wrapf(err, "failed to marshal process %d info", pid) + } + pInfo.Info = a + break + } + } + processes = append(processes, &pInfo) + } + return &taskAPI.PidsResponse{ + Processes: processes, + }, nil +} + +// CloseIO of a process +func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err + } + return empty, nil +} + +// Checkpoint the container +func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Checkpoint(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + p, err := container.Process(r.ExecID) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + +// Connect returns shim information such as the shim's pid +func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { + var pid int + if container, err := s.getContainer(r.ID); err == nil { + pid = container.Pid() + } + return &taskAPI.ConnectResponse{ + ShimPid: uint32(os.Getpid()), + TaskPid: uint32(pid), + }, nil +} + +func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.mu.Lock() + // return out if the shim is still servicing containers + if len(s.containers) > 0 { + s.mu.Unlock() + return empty, nil + } + s.cancel() + os.Exit(0) + return empty, nil +} + +func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { + container, err := s.getContainer(r.ID) + if err != nil { + return nil, err + } + cg := container.Cgroup() + if cg == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") + } + stats, err := cg.Stat(cgroups.IgnoreNotExist) + if err != nil { + return nil, err + } + data, err := typeurl.MarshalAny(stats) + if err != nil { + return nil, err + } + return &taskAPI.StatsResponse{ + Stats: data, + }, nil +} + +func (s *service) processExits() { + for e := range s.ec { + s.checkProcesses(e) + } +} + +func (s *service) send(evt interface{}) { + s.events <- evt +} + +func (s *service) sendL(evt interface{}) { + s.eventSendMu.Lock() + s.events <- evt + s.eventSendMu.Unlock() +} + +func (s *service) checkProcesses(e runcC.Exit) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, container := range s.containers { + if container.HasPid(e.Pid) { + shouldKillAll, err := shouldKillAllOnExit(container.Bundle) + if err != nil { + log.G(s.context).WithError(err).Error("failed to check shouldKillAll") + } + + for _, p := range container.All() { + if p.Pid() == e.Pid { + if shouldKillAll { + if ip, ok := p.(*proc.Init); ok { + // Ensure all children are killed + if err := ip.KillAll(s.context); err != nil { + logrus.WithError(err).WithField("id", ip.ID()). + Error("failed to kill init's children") + } + } + } + p.SetExited(e.Status) + s.sendL(&eventstypes.TaskExit{ + ContainerID: container.ID, + ID: p.ID(), + Pid: uint32(e.Pid), + ExitStatus: uint32(e.Status), + ExitedAt: p.ExitedAt(), + }) + return + } + } + return + } + } +} + +func shouldKillAllOnExit(bundlePath string) (bool, error) { + var bundleSpec specs.Spec + bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json")) + if err != nil { + return false, err + } + json.Unmarshal(bundleConfigContents, &bundleSpec) + + if bundleSpec.Linux != nil { + for _, ns := range bundleSpec.Linux.Namespaces { + if ns.Type == specs.PIDNamespace { + return false, nil + } + } + } + + return true, nil +} + +func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { + container, err := s.getContainer(id) + if err != nil { + return nil, err + } + p, err := container.Process("") + if err != nil { + return nil, errdefs.ToGRPC(err) + } + ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + if err != nil { + return nil, err + } + pids := make([]uint32, 0, len(ps)) + for _, pid := range ps { + pids = append(pids, uint32(pid)) + } + return pids, nil +} + +func (s *service) forward(publisher events.Publisher) { + for e := range s.events { + ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + err := publisher.Publish(ctx, runc.GetTopic(e), e) + cancel() + if err != nil { + logrus.WithError(err).Error("post event") + } + } +} + +func (s *service) getContainer(id string) (*runc.Container, error) { + s.mu.Lock() + container := s.containers[id] + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") + } + return container, nil +} + +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil + } + p, err := runc.NewPlatform() + if err != nil { + return err + } + s.platform = p + return nil +} diff --git a/runtime/v2/shim.go b/runtime/v2/shim.go index 94814e754..65022390e 100644 --- a/runtime/v2/shim.go +++ b/runtime/v2/shim.go @@ -99,7 +99,9 @@ type shim struct { } func (s *shim) Connect(ctx context.Context) error { - response, err := s.task.Connect(ctx, &task.ConnectRequest{}) + response, err := s.task.Connect(ctx, &task.ConnectRequest{ + ID: s.ID(), + }) if err != nil { return err } @@ -317,6 +319,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) { func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error { request := &task.CheckpointTaskRequest{ + ID: s.ID(), Path: path, Options: options, } diff --git a/runtime/v2/shim/util.go b/runtime/v2/shim/util.go index a4e300f86..2e34444a8 100644 --- a/runtime/v2/shim/util.go +++ b/runtime/v2/shim/util.go @@ -19,6 +19,7 @@ package shim import ( "context" "fmt" + "io/ioutil" "net" "os" "os/exec" @@ -150,3 +151,22 @@ func WriteAddress(path, address string) error { } return os.Rename(tempPath, path) } + +// ErrNoAddress is returned when the address file has no content +var ErrNoAddress = errors.New("no shim address") + +// ReadAddress returns the shim's abstract socket address from the path +func ReadAddress(path string) (string, error) { + path, err := filepath.Abs(path) + if err != nil { + return "", err + } + data, err := ioutil.ReadFile(path) + if err != nil { + return "", err + } + if len(data) == 0 { + return "", ErrNoAddress + } + return string(data), nil +} diff --git a/services/tasks/local.go b/services/tasks/local.go index ee7dc1bc9..1dca4daa0 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "time" api "github.com/containerd/containerd/api/services/tasks/v1" @@ -687,8 +688,8 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) { } var checkpointPath string - switch runtime { - case "io.containerd.runc.v1": + switch { + case checkRuntime(runtime, "io.containerd.runc"): v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -699,7 +700,7 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) { } checkpointPath = opts.ImagePath - case "io.containerd.runtime.v1.linux": + case runtime == plugin.RuntimeLinuxV1: v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -721,8 +722,8 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { } var restorePath string - switch runtime { - case "io.containerd.runc.v1": + switch { + case checkRuntime(runtime, "io.containerd.runc"): v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -732,8 +733,7 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { return "", fmt.Errorf("invalid task create option for %s", runtime) } restorePath = opts.CriuImagePath - - case "io.containerd.runtime.v1.linux": + case runtime == plugin.RuntimeLinuxV1: v, err := typeurl.UnmarshalAny(option) if err != nil { return "", err @@ -747,3 +747,20 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) { return restorePath, nil } + +// checkRuntime returns true if the current runtime matches the expected +// runtime. Providing various parts of the runtime schema will match those +// parts of the expected runtime +func checkRuntime(current, expected string) bool { + cp := strings.Split(current, ".") + l := len(cp) + for i, p := range strings.Split(expected, ".") { + if i > l { + return false + } + if p != cp[i] { + return false + } + } + return true +} diff --git a/task.go b/task.go index 9deb66c7e..8d281dc10 100644 --- a/task.go +++ b/task.go @@ -642,12 +642,12 @@ func isCheckpointPathExist(runtime string, v interface{}) bool { } switch runtime { - case "io.containerd.runc.v1": + case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2: if opts, ok := v.(*options.CheckpointOptions); ok && opts.ImagePath != "" { return true } - case "io.containerd.runtime.v1.linux": + case plugin.RuntimeLinuxV1: if opts, ok := v.(*runctypes.CheckpointOptions); ok && opts.ImagePath != "" { return true } diff --git a/task_opts.go b/task_opts.go index 714a152c5..4619e2c0e 100644 --- a/task_opts.go +++ b/task_opts.go @@ -34,11 +34,6 @@ import ( "github.com/pkg/errors" ) -const ( - v1runtime = "io.containerd.runtime.v1.linux" - v2runtime = "io.containerd.runc.v1" -) - // NewTaskOpts allows the caller to set options on a new task type NewTaskOpts func(context.Context, *Client, *TaskInfo) error @@ -99,23 +94,22 @@ func WithCheckpointName(name string) CheckpointTaskOpts { // WithCheckpointImagePath sets image path for checkpoint option func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts { return func(r *CheckpointTaskInfo) error { - switch rt { - case v1runtime: - if r.Options == nil { - r.Options = &runctypes.CheckpointOptions{} - } - opts, ok := r.Options.(*runctypes.CheckpointOptions) - if !ok { - return errors.New("invalid v1 checkpoint options format") - } - opts.ImagePath = path - case v2runtime: + if CheckRuntime(rt, "io.containerd.runc") { if r.Options == nil { r.Options = &options.CheckpointOptions{} } opts, ok := r.Options.(*options.CheckpointOptions) if !ok { - return errors.New("invalid v2 checkpoint options format") + return errors.New("invalid v2 shim checkpoint options format") + } + opts.ImagePath = path + } else { + if r.Options == nil { + r.Options = &runctypes.CheckpointOptions{} + } + opts, ok := r.Options.(*runctypes.CheckpointOptions) + if !ok { + return errors.New("invalid v1 shim checkpoint options format") } opts.ImagePath = path } @@ -126,23 +120,22 @@ func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts { // WithRestoreImagePath sets image path for create option func WithRestoreImagePath(rt, path string) NewTaskOpts { return func(ctx context.Context, c *Client, ti *TaskInfo) error { - switch rt { - case v1runtime: - if ti.Options == nil { - ti.Options = &runctypes.CreateOptions{} - } - opts, ok := ti.Options.(*runctypes.CreateOptions) - if !ok { - return errors.New("invalid v1 create options format") - } - opts.CriuImagePath = path - case v2runtime: + if CheckRuntime(rt, "io.containerd.runc") { if ti.Options == nil { ti.Options = &options.Options{} } opts, ok := ti.Options.(*options.Options) if !ok { - return errors.New("invalid v2 create options format") + return errors.New("invalid v2 shim create options format") + } + opts.CriuImagePath = path + } else { + if ti.Options == nil { + ti.Options = &runctypes.CreateOptions{} + } + opts, ok := ti.Options.(*runctypes.CreateOptions) + if !ok { + return errors.New("invalid v1 shim create options format") } opts.CriuImagePath = path }