Merge pull request #3004 from crosbymichael/multi-shim
io.containerd.runc.v2 with Container Groups
This commit is contained in:
commit
07697638be
@ -28,6 +28,7 @@ addons:
|
||||
|
||||
env:
|
||||
- TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v1 TRAVIS_CGO_ENABLED=1
|
||||
- TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runc.v2 TRAVIS_CGO_ENABLED=1
|
||||
- TRAVIS_GOOS=linux TEST_RUNTIME=io.containerd.runtime.v1.linux TRAVIS_CGO_ENABLED=1
|
||||
- TRAVIS_GOOS=darwin TRAVIS_CGO_ENABLED=0
|
||||
|
||||
|
4
Makefile
4
Makefile
@ -189,6 +189,10 @@ bin/containerd-shim-runc-v1: cmd/containerd-shim-runc-v1 FORCE # set !cgo and om
|
||||
@echo "$(WHALE) bin/containerd-shim-runc-v1"
|
||||
@CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v1 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v1
|
||||
|
||||
bin/containerd-shim-runc-v2: cmd/containerd-shim-runc-v2 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220
|
||||
@echo "$(WHALE) bin/containerd-shim-runc-v2"
|
||||
@CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runc-v2 ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runc-v2
|
||||
|
||||
bin/containerd-shim-runhcs-v1: cmd/containerd-shim-runhcs-v1 FORCE # set !cgo and omit pie for a static shim build: https://github.com/golang/go/issues/17789#issuecomment-258542220
|
||||
@echo "$(WHALE) bin/containerd-shim-runhcs-v1${BINARY_SUFFIX}"
|
||||
@CGO_ENABLED=0 go build ${GO_BUILD_FLAGS} -o bin/containerd-shim-runhcs-v1${BINARY_SUFFIX} ${SHIM_GO_LDFLAGS} ${GO_TAGS} ./cmd/containerd-shim-runhcs-v1
|
||||
|
@ -16,7 +16,7 @@
|
||||
#linux specific settings
|
||||
WHALE="+"
|
||||
ONI="-"
|
||||
COMMANDS += containerd-shim containerd-shim-runc-v1
|
||||
COMMANDS += containerd-shim containerd-shim-runc-v1 containerd-shim-runc-v2
|
||||
|
||||
# check GOOS for cross compile builds
|
||||
ifeq ($(GOOS),linux)
|
||||
|
18
client.go
18
client.go
@ -24,6 +24,7 @@ import (
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -614,3 +615,20 @@ func (c *Client) Version(ctx context.Context) (Version, error) {
|
||||
Revision: response.Revision,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CheckRuntime returns true if the current runtime matches the expected
|
||||
// runtime. Providing various parts of the runtime schema will match those
|
||||
// parts of the expected runtime
|
||||
func CheckRuntime(current, expected string) bool {
|
||||
cp := strings.Split(current, ".")
|
||||
l := len(cp)
|
||||
for i, p := range strings.Split(expected, ".") {
|
||||
if i > l {
|
||||
return false
|
||||
}
|
||||
if p != cp[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -124,6 +124,7 @@ func TestMain(m *testing.M) {
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"version": version.Version,
|
||||
"revision": version.Revision,
|
||||
"runtime": os.Getenv("TEST_RUNTIME"),
|
||||
}).Info("running tests against containerd")
|
||||
|
||||
// pull a seed image
|
||||
|
@ -19,10 +19,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/runtime/v2/runc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/v1"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
)
|
||||
|
||||
func main() {
|
||||
shim.Run("io.containerd.runc.v1", runc.New)
|
||||
shim.Run("io.containerd.runc.v1", v1.New)
|
||||
}
|
||||
|
28
cmd/containerd-shim-runc-v2/main.go
Normal file
28
cmd/containerd-shim-runc-v2/main.go
Normal file
@ -0,0 +1,28 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/runtime/v2/runc/v2"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
)
|
||||
|
||||
func main() {
|
||||
shim.Run("io.containerd.runc.v2", v2.New)
|
||||
}
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
metrics "github.com/docker/go-metrics"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
@ -146,7 +147,7 @@ func main() {
|
||||
cli.StringFlag{
|
||||
Name: "runtime",
|
||||
Usage: "set the runtime to stress test",
|
||||
Value: "io.containerd.runtime.v1.linux",
|
||||
Value: plugin.RuntimeLinuxV1,
|
||||
},
|
||||
}
|
||||
app.Before = func(context *cli.Context) error {
|
||||
|
@ -147,6 +147,7 @@ func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli
|
||||
|
||||
cOpts = append(cOpts, containerd.WithRuntime(context.String("runtime"), nil))
|
||||
|
||||
opts = append(opts, oci.WithAnnotations(commands.LabelArgs(context.StringSlice("label"))))
|
||||
var s specs.Spec
|
||||
spec = containerd.WithSpec(&s, opts...)
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cmd/ctr/commands"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/runtime/linux/runctypes"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/pkg/errors"
|
||||
@ -86,7 +87,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa
|
||||
workPath := context.String("work-path")
|
||||
|
||||
switch rt {
|
||||
case "io.containerd.runc.v1":
|
||||
case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2:
|
||||
if r.Options == nil {
|
||||
r.Options = &options.CheckpointOptions{}
|
||||
}
|
||||
@ -101,7 +102,7 @@ func withCheckpointOpts(rt string, context *cli.Context) containerd.CheckpointTa
|
||||
if workPath != "" {
|
||||
opts.WorkPath = workPath
|
||||
}
|
||||
case "io.containerd.runtime.v1.linux":
|
||||
case plugin.RuntimeLinuxV1:
|
||||
if r.Options == nil {
|
||||
r.Options = &runctypes.CheckpointOptions{}
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -47,7 +48,7 @@ func TestCheckpointRestorePTY(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
if client.runtime == v1runtime {
|
||||
if client.runtime == plugin.RuntimeLinuxV1 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
@ -173,7 +174,7 @@ func TestCheckpointRestore(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
if client.runtime == v1runtime {
|
||||
if client.runtime == plugin.RuntimeLinuxV1 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
@ -263,7 +264,7 @@ func TestCheckpointRestoreNewContainer(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
if client.runtime == v1runtime {
|
||||
if client.runtime == plugin.RuntimeLinuxV1 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
@ -353,7 +354,7 @@ func TestCheckpointLeaveRunning(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
if client.runtime == v1runtime {
|
||||
if client.runtime == plugin.RuntimeLinuxV1 {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ import (
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/runtime/linux/runctypes"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
@ -132,7 +133,7 @@ func TestShimInCgroup(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
if client.runtime == "io.containerd.runc.v1" {
|
||||
if CheckRuntime(client.runtime, "io.containerd.runc") {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
@ -450,7 +451,7 @@ func writeToFile(t *testing.T, filePath, message string) {
|
||||
func getLogDirPath(runtimeVersion, id string) string {
|
||||
switch runtimeVersion {
|
||||
case "v1":
|
||||
return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id)
|
||||
return filepath.Join(defaultRoot, plugin.RuntimeLinuxV1, testNamespace, id)
|
||||
case "v2":
|
||||
return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id)
|
||||
default:
|
||||
@ -460,7 +461,7 @@ func getLogDirPath(runtimeVersion, id string) string {
|
||||
|
||||
func getRuntimeVersion() string {
|
||||
switch rt := os.Getenv("TEST_RUNTIME"); rt {
|
||||
case "io.containerd.runc.v1":
|
||||
case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2:
|
||||
return "v2"
|
||||
default:
|
||||
return "v1"
|
||||
@ -1188,7 +1189,7 @@ func TestContainerRuntimeOptionsv1(t *testing.T) {
|
||||
ctx, id,
|
||||
WithNewSnapshot(id, image),
|
||||
WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)),
|
||||
WithRuntime("io.containerd.runtime.v1.linux", &runctypes.RuncOptions{Runtime: "no-runc"}),
|
||||
WithRuntime(plugin.RuntimeLinuxV1, &runctypes.RuncOptions{Runtime: "no-runc"}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1231,7 +1232,7 @@ func TestContainerRuntimeOptionsv2(t *testing.T) {
|
||||
ctx, id,
|
||||
WithNewSnapshot(id, image),
|
||||
WithNewSpec(oci.WithImageConfig(image), withExitStatus(7)),
|
||||
WithRuntime("io.containerd.runc.v1", &options.Options{BinaryName: "no-runc"}),
|
||||
WithRuntime(plugin.RuntimeRuncV1, &options.Options{BinaryName: "no-runc"}),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -1384,7 +1385,7 @@ func testUserNamespaces(t *testing.T, readonlyRootFS bool) {
|
||||
defer container.Delete(ctx, WithSnapshotCleanup)
|
||||
|
||||
var copts interface{}
|
||||
if client.runtime == "io.containerd.runc.v1" {
|
||||
if CheckRuntime(client.runtime, "io.containerd.runc") {
|
||||
copts = &options.Options{
|
||||
IoUid: 1000,
|
||||
IoGid: 1000,
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/pkg/testutil"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
srvconfig "github.com/containerd/containerd/services/server/config"
|
||||
)
|
||||
@ -140,7 +141,7 @@ func TestDaemonRuntimeRoot(t *testing.T) {
|
||||
}
|
||||
|
||||
id := t.Name()
|
||||
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime("io.containerd.runc.v1", &options.Options{
|
||||
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("top")), WithRuntime(plugin.RuntimeRuncV1, &options.Options{
|
||||
Root: runtimeRoot,
|
||||
}))
|
||||
if err != nil {
|
||||
|
@ -1063,3 +1063,17 @@ func WithMemoryLimit(limit uint64) SpecOpts {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAnnotations appends or replaces the annotations on the spec with the
|
||||
// provided annotations
|
||||
func WithAnnotations(annotations map[string]string) SpecOpts {
|
||||
return func(_ context.Context, _ Client, _ *containers.Container, s *Spec) error {
|
||||
if s.Annotations == nil {
|
||||
s.Annotations = make(map[string]string)
|
||||
}
|
||||
for k, v := range annotations {
|
||||
s.Annotations[k] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -75,6 +75,15 @@ const (
|
||||
GCPlugin Type = "io.containerd.gc.v1"
|
||||
)
|
||||
|
||||
const (
|
||||
// RuntimeLinuxV1 is the legacy linux runtime
|
||||
RuntimeLinuxV1 = "io.containerd.runtime.v1.linux"
|
||||
// RuntimeRuncV1 is the runc runtime that supports a single container
|
||||
RuntimeRuncV1 = "io.containerd.runc.v1"
|
||||
// RuntimeRuncV2 is the runc runtime that supports multiple containers per shim
|
||||
RuntimeRuncV2 = "io.containerd.runc.v2"
|
||||
)
|
||||
|
||||
// Registration contains information for registering a plugin
|
||||
type Registration struct {
|
||||
// Type of the plugin
|
||||
|
415
runtime/v2/runc/container.go
Normal file
415
runtime/v2/runc/container.go
Normal file
@ -0,0 +1,415 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package runc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
"github.com/containerd/console"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
rproc "github.com/containerd/containerd/runtime/proc"
|
||||
"github.com/containerd/containerd/runtime/v1/linux/proc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/runtime/v2/task"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// NewContainer returns a new runc container
|
||||
func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create namespace")
|
||||
}
|
||||
|
||||
var opts options.Options
|
||||
if r.Options != nil {
|
||||
v, err := typeurl.UnmarshalAny(r.Options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = *v.(*options.Options)
|
||||
}
|
||||
|
||||
var mounts []proc.Mount
|
||||
for _, m := range r.Rootfs {
|
||||
mounts = append(mounts, proc.Mount{
|
||||
Type: m.Type,
|
||||
Source: m.Source,
|
||||
Target: m.Target,
|
||||
Options: m.Options,
|
||||
})
|
||||
}
|
||||
config := &proc.CreateConfig{
|
||||
ID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
Runtime: opts.BinaryName,
|
||||
Rootfs: mounts,
|
||||
Terminal: r.Terminal,
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Checkpoint: r.Checkpoint,
|
||||
ParentCheckpoint: r.ParentCheckpoint,
|
||||
Options: r.Options,
|
||||
}
|
||||
|
||||
if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rootfs := filepath.Join(r.Bundle, "rootfs")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
|
||||
logrus.WithError(err2).Warn("failed to cleanup rootfs mount")
|
||||
}
|
||||
}
|
||||
}()
|
||||
for _, rm := range mounts {
|
||||
m := &mount.Mount{
|
||||
Type: rm.Type,
|
||||
Source: rm.Source,
|
||||
Options: rm.Options,
|
||||
}
|
||||
if err := m.Mount(rootfs); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
|
||||
}
|
||||
}
|
||||
|
||||
process, err := newInit(
|
||||
ctx,
|
||||
r.Bundle,
|
||||
filepath.Join(r.Bundle, "work"),
|
||||
ns,
|
||||
platform,
|
||||
config,
|
||||
&opts,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
if err := process.Create(ctx, config); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
container := &Container{
|
||||
ID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
process: process,
|
||||
processes: make(map[string]rproc.Process),
|
||||
}
|
||||
pid := process.Pid()
|
||||
if pid > 0 {
|
||||
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
|
||||
}
|
||||
container.cgroup = cg
|
||||
}
|
||||
return container, nil
|
||||
}
|
||||
|
||||
// ReadRuntime reads the runtime information from the path
|
||||
func ReadRuntime(path string) (string, error) {
|
||||
data, err := ioutil.ReadFile(filepath.Join(path, "runtime"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
// WriteRuntime writes the runtime information into the path
|
||||
func WriteRuntime(path, runtime string) error {
|
||||
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
|
||||
}
|
||||
|
||||
func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform,
|
||||
r *proc.CreateConfig, options *options.Options) (*proc.Init, error) {
|
||||
rootfs := filepath.Join(path, "rootfs")
|
||||
runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
|
||||
p := proc.New(r.ID, runtime, rproc.Stdio{
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
})
|
||||
p.Bundle = r.Bundle
|
||||
p.Platform = platform
|
||||
p.Rootfs = rootfs
|
||||
p.WorkDir = workDir
|
||||
p.IoUID = int(options.IoUid)
|
||||
p.IoGID = int(options.IoGid)
|
||||
p.NoPivotRoot = options.NoPivotRoot
|
||||
p.NoNewKeyring = options.NoNewKeyring
|
||||
p.CriuWorkPath = options.CriuWorkPath
|
||||
if p.CriuWorkPath == "" {
|
||||
// if criu work path not set, use container WorkDir
|
||||
p.CriuWorkPath = p.WorkDir
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Container for operating on a runc container and its processes
|
||||
type Container struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// ID of the container
|
||||
ID string
|
||||
// Bundle path
|
||||
Bundle string
|
||||
|
||||
cgroup cgroups.Cgroup
|
||||
process rproc.Process
|
||||
processes map[string]rproc.Process
|
||||
}
|
||||
|
||||
// All processes in the container
|
||||
func (c *Container) All() (o []rproc.Process) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for _, p := range c.processes {
|
||||
o = append(o, p)
|
||||
}
|
||||
if c.process != nil {
|
||||
o = append(o, c.process)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// ExecdProcesses added to the container
|
||||
func (c *Container) ExecdProcesses() (o []rproc.Process) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for _, p := range c.processes {
|
||||
o = append(o, p)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// Pid of the main process of a container
|
||||
func (c *Container) Pid() int {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.process.Pid()
|
||||
}
|
||||
|
||||
// Cgroup of the container
|
||||
func (c *Container) Cgroup() cgroups.Cgroup {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.cgroup
|
||||
}
|
||||
|
||||
// CgroupSet sets the cgroup to the container
|
||||
func (c *Container) CgroupSet(cg cgroups.Cgroup) {
|
||||
c.mu.Lock()
|
||||
c.cgroup = cg
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Process returns the process by id
|
||||
func (c *Container) Process(id string) (rproc.Process, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if id == "" {
|
||||
if c.process == nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
return c.process, nil
|
||||
}
|
||||
p, ok := c.processes[id]
|
||||
if !ok {
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", id)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// ProcessExists returns true if the process by id exists
|
||||
func (c *Container) ProcessExists(id string) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
_, ok := c.processes[id]
|
||||
return ok
|
||||
}
|
||||
|
||||
// ProcessAdd adds a new process to the container
|
||||
func (c *Container) ProcessAdd(process rproc.Process) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.processes[process.ID()] = process
|
||||
}
|
||||
|
||||
// ProcessRemove removes the process by id from the container
|
||||
func (c *Container) ProcessRemove(id string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.processes, id)
|
||||
}
|
||||
|
||||
// Start a container process
|
||||
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) {
|
||||
p, err := c.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.Cgroup() == nil && p.Pid() > 0 {
|
||||
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
|
||||
}
|
||||
c.cgroup = cg
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Delete the container or a process by id
|
||||
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) {
|
||||
p, err := c.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Delete(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if r.ExecID != "" {
|
||||
c.ProcessRemove(r.ExecID)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Exec an additional process
|
||||
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) {
|
||||
process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{
|
||||
ID: r.ExecID,
|
||||
Terminal: r.Terminal,
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Spec: r.Spec,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.ProcessAdd(process)
|
||||
return process, nil
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (c *Container) Pause(ctx context.Context) error {
|
||||
return c.process.(*proc.Init).Pause(ctx)
|
||||
}
|
||||
|
||||
// Resume the container
|
||||
func (c *Container) Resume(ctx context.Context) error {
|
||||
return c.process.(*proc.Init).Resume(ctx)
|
||||
}
|
||||
|
||||
// ResizePty of a process
|
||||
func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error {
|
||||
p, err := c.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ws := console.WinSize{
|
||||
Width: uint16(r.Width),
|
||||
Height: uint16(r.Height),
|
||||
}
|
||||
return p.Resize(ws)
|
||||
}
|
||||
|
||||
// Kill a process
|
||||
func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error {
|
||||
p, err := c.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.Kill(ctx, r.Signal, r.All)
|
||||
}
|
||||
|
||||
// CloseIO of a process
|
||||
func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error {
|
||||
p, err := c.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stdin := p.Stdin(); stdin != nil {
|
||||
if err := stdin.Close(); err != nil {
|
||||
return errors.Wrap(err, "close stdin")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checkpoint the container
|
||||
func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error {
|
||||
p, err := c.Process("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var opts options.CheckpointOptions
|
||||
if r.Options != nil {
|
||||
v, err := typeurl.UnmarshalAny(r.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
opts = *v.(*options.CheckpointOptions)
|
||||
}
|
||||
return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
|
||||
Path: r.Path,
|
||||
Exit: opts.Exit,
|
||||
AllowOpenTCP: opts.OpenTcp,
|
||||
AllowExternalUnixSockets: opts.ExternalUnixSockets,
|
||||
AllowTerminal: opts.Terminal,
|
||||
FileLocks: opts.FileLocks,
|
||||
EmptyNamespaces: opts.EmptyNamespaces,
|
||||
WorkDir: opts.WorkPath,
|
||||
})
|
||||
}
|
||||
|
||||
// Update the resource information of a running container
|
||||
func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error {
|
||||
p, err := c.Process("")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.(*proc.Init).Update(ctx, r.Resources)
|
||||
}
|
||||
|
||||
// HasPid returns true if the container owns a specific pid
|
||||
func (c *Container) HasPid(pid int) bool {
|
||||
if c.Pid() == pid {
|
||||
return true
|
||||
}
|
||||
for _, p := range c.All() {
|
||||
if p.Pid() == pid {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
@ -30,19 +30,22 @@ import (
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func newOOMEpoller(publisher events.Publisher) (*epoller, error) {
|
||||
// NewOOMEpoller returns an epoll implementation that listens to OOM events
|
||||
// from a container's cgroups.
|
||||
func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) {
|
||||
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &epoller{
|
||||
return &Epoller{
|
||||
fd: fd,
|
||||
publisher: publisher,
|
||||
set: make(map[uintptr]*item),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type epoller struct {
|
||||
// Epoller implementation for handling OOM events from a container's cgroup
|
||||
type Epoller struct {
|
||||
mu sync.Mutex
|
||||
|
||||
fd int
|
||||
@ -55,11 +58,13 @@ type item struct {
|
||||
cg cgroups.Cgroup
|
||||
}
|
||||
|
||||
func (e *epoller) Close() error {
|
||||
// Close the epoll fd
|
||||
func (e *Epoller) Close() error {
|
||||
return unix.Close(e.fd)
|
||||
}
|
||||
|
||||
func (e *epoller) run(ctx context.Context) {
|
||||
// Run the epoll loop
|
||||
func (e *Epoller) Run(ctx context.Context) {
|
||||
var events [128]unix.EpollEvent
|
||||
for {
|
||||
select {
|
||||
@ -81,7 +86,8 @@ func (e *epoller) run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *epoller) add(id string, cg cgroups.Cgroup) error {
|
||||
// Add the cgroup to the epoll monitor
|
||||
func (e *Epoller) Add(id string, cg cgroups.Cgroup) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
fd, err := cg.OOMEventFD()
|
||||
@ -99,7 +105,7 @@ func (e *epoller) add(id string, cg cgroups.Cgroup) error {
|
||||
return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
|
||||
}
|
||||
|
||||
func (e *epoller) process(ctx context.Context, fd uintptr) {
|
||||
func (e *Epoller) process(ctx context.Context, fd uintptr) {
|
||||
flush(fd)
|
||||
e.mu.Lock()
|
||||
i, ok := e.set[fd]
|
||||
|
@ -1,3 +1,5 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
@ -23,10 +25,30 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/console"
|
||||
rproc "github.com/containerd/containerd/runtime/proc"
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
buffer := make([]byte, 32<<10)
|
||||
return &buffer
|
||||
},
|
||||
}
|
||||
|
||||
// NewPlatform returns a linux platform for use with I/O operations
|
||||
func NewPlatform() (rproc.Platform, error) {
|
||||
epoller, err := console.NewEpoller()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to initialize epoller")
|
||||
}
|
||||
go epoller.Wait()
|
||||
return &linuxPlatform{
|
||||
epoller: epoller,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type linuxPlatform struct {
|
||||
epoller *console.Epoller
|
||||
}
|
||||
@ -69,9 +91,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
|
||||
cwg.Add(1)
|
||||
go func() {
|
||||
cwg.Done()
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(outw, epollConsole, *p)
|
||||
buf := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(buf)
|
||||
io.CopyBuffer(outw, epollConsole, *buf)
|
||||
epollConsole.Close()
|
||||
outr.Close()
|
||||
outw.Close()
|
||||
@ -94,20 +116,3 @@ func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Consol
|
||||
func (p *linuxPlatform) Close() error {
|
||||
return p.epoller.Close()
|
||||
}
|
||||
|
||||
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
||||
// only be called once.
|
||||
func (s *service) initPlatform() error {
|
||||
if s.platform != nil {
|
||||
return nil
|
||||
}
|
||||
epoller, err := console.NewEpoller()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to initialize epoller")
|
||||
}
|
||||
s.platform = &linuxPlatform{
|
||||
epoller: epoller,
|
||||
}
|
||||
go epoller.Wait()
|
||||
return nil
|
||||
}
|
55
runtime/v2/runc/util.go
Normal file
55
runtime/v2/runc/util.go
Normal file
@ -0,0 +1,55 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package runc
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// GetTopic converts an event from an interface type to the specific
|
||||
// event topic id
|
||||
func GetTopic(e interface{}) string {
|
||||
switch e.(type) {
|
||||
case *events.TaskCreate:
|
||||
return runtime.TaskCreateEventTopic
|
||||
case *events.TaskStart:
|
||||
return runtime.TaskStartEventTopic
|
||||
case *events.TaskOOM:
|
||||
return runtime.TaskOOMEventTopic
|
||||
case *events.TaskExit:
|
||||
return runtime.TaskExitEventTopic
|
||||
case *events.TaskDelete:
|
||||
return runtime.TaskDeleteEventTopic
|
||||
case *events.TaskExecAdded:
|
||||
return runtime.TaskExecAddedEventTopic
|
||||
case *events.TaskExecStarted:
|
||||
return runtime.TaskExecStartedEventTopic
|
||||
case *events.TaskPaused:
|
||||
return runtime.TaskPausedEventTopic
|
||||
case *events.TaskResumed:
|
||||
return runtime.TaskResumedEventTopic
|
||||
case *events.TaskCheckpointed:
|
||||
return runtime.TaskCheckpointedEventTopic
|
||||
default:
|
||||
logrus.Warnf("no topic for type %#v", e)
|
||||
}
|
||||
return runtime.TaskUnknownTopic
|
||||
}
|
@ -16,7 +16,7 @@
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package runc
|
||||
package v1
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -30,7 +30,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
"github.com/containerd/console"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -38,9 +37,9 @@ import (
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/runtime"
|
||||
rproc "github.com/containerd/containerd/runtime/proc"
|
||||
"github.com/containerd/containerd/runtime/v1/linux/proc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
taskAPI "github.com/containerd/containerd/runtime/v2/task"
|
||||
@ -54,33 +53,25 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
empty = &ptypes.Empty{}
|
||||
bufPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
buffer := make([]byte, 32<<10)
|
||||
return &buffer
|
||||
},
|
||||
}
|
||||
_ = (taskAPI.TaskService)(&service{})
|
||||
empty = &ptypes.Empty{}
|
||||
)
|
||||
|
||||
var _ = (taskAPI.TaskService)(&service{})
|
||||
|
||||
// New returns a new shim service that can be used via GRPC
|
||||
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
|
||||
ep, err := newOOMEpoller(publisher)
|
||||
ep, err := runc.NewOOMEpoller(publisher)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go ep.run(ctx)
|
||||
go ep.Run(ctx)
|
||||
s := &service{
|
||||
id: id,
|
||||
context: ctx,
|
||||
processes: make(map[string]rproc.Process),
|
||||
events: make(chan interface{}, 128),
|
||||
ec: shim.Default.Subscribe(),
|
||||
ep: ep,
|
||||
cancel: cancel,
|
||||
id: id,
|
||||
context: ctx,
|
||||
events: make(chan interface{}, 128),
|
||||
ec: shim.Default.Subscribe(),
|
||||
ep: ep,
|
||||
cancel: cancel,
|
||||
}
|
||||
go s.processExits()
|
||||
runcC.Monitor = shim.Default
|
||||
@ -97,17 +88,15 @@ type service struct {
|
||||
mu sync.Mutex
|
||||
eventSendMu sync.Mutex
|
||||
|
||||
context context.Context
|
||||
task rproc.Process
|
||||
processes map[string]rproc.Process
|
||||
events chan interface{}
|
||||
platform rproc.Platform
|
||||
ec chan runcC.Exit
|
||||
ep *epoller
|
||||
context context.Context
|
||||
events chan interface{}
|
||||
platform rproc.Platform
|
||||
ec chan runcC.Exit
|
||||
ep *runc.Epoller
|
||||
|
||||
id string
|
||||
container *runc.Container
|
||||
|
||||
id string
|
||||
bundle string
|
||||
cg cgroups.Cgroup
|
||||
cancel func()
|
||||
}
|
||||
|
||||
@ -192,7 +181,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runtime, err := s.readRuntime(path)
|
||||
runtime, err := runc.ReadRuntime(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -211,107 +200,17 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) readRuntime(path string) (string, error) {
|
||||
data, err := ioutil.ReadFile(filepath.Join(path, "runtime"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func (s *service) writeRuntime(path, runtime string) error {
|
||||
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
|
||||
}
|
||||
|
||||
// Create a new initial process and container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
container, err := runc.NewContainer(ctx, s.platform, r)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create namespace")
|
||||
}
|
||||
|
||||
var opts options.Options
|
||||
if r.Options != nil {
|
||||
v, err := typeurl.UnmarshalAny(r.Options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = *v.(*options.Options)
|
||||
}
|
||||
|
||||
var mounts []proc.Mount
|
||||
for _, m := range r.Rootfs {
|
||||
mounts = append(mounts, proc.Mount{
|
||||
Type: m.Type,
|
||||
Source: m.Source,
|
||||
Target: m.Target,
|
||||
Options: m.Options,
|
||||
})
|
||||
}
|
||||
config := &proc.CreateConfig{
|
||||
ID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
Runtime: opts.BinaryName,
|
||||
Rootfs: mounts,
|
||||
Terminal: r.Terminal,
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Checkpoint: r.Checkpoint,
|
||||
ParentCheckpoint: r.ParentCheckpoint,
|
||||
Options: r.Options,
|
||||
}
|
||||
if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rootfs := filepath.Join(r.Bundle, "rootfs")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
|
||||
logrus.WithError(err2).Warn("failed to cleanup rootfs mount")
|
||||
}
|
||||
}
|
||||
}()
|
||||
for _, rm := range mounts {
|
||||
m := &mount.Mount{
|
||||
Type: rm.Type,
|
||||
Source: rm.Source,
|
||||
Options: rm.Options,
|
||||
}
|
||||
if err := m.Mount(rootfs); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
|
||||
}
|
||||
}
|
||||
process, err := newInit(
|
||||
ctx,
|
||||
r.Bundle,
|
||||
filepath.Join(r.Bundle, "work"),
|
||||
ns,
|
||||
s.platform,
|
||||
config,
|
||||
&opts,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
if err := process.Create(ctx, config); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
// save the main task id and bundle to the shim for additional requests
|
||||
s.id = r.ID
|
||||
s.bundle = r.Bundle
|
||||
pid := process.Pid()
|
||||
if pid > 0 {
|
||||
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
|
||||
}
|
||||
s.cg = cg
|
||||
}
|
||||
s.task = process
|
||||
|
||||
s.container = container
|
||||
|
||||
s.send(&eventstypes.TaskCreate{
|
||||
ContainerID: r.ID,
|
||||
@ -324,46 +223,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
Terminal: r.Terminal,
|
||||
},
|
||||
Checkpoint: r.Checkpoint,
|
||||
Pid: uint32(pid),
|
||||
Pid: uint32(container.Pid()),
|
||||
})
|
||||
|
||||
return &taskAPI.CreateTaskResponse{
|
||||
Pid: uint32(pid),
|
||||
Pid: uint32(container.Pid()),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// hold the send lock so that the start events are sent before any exit events in the error case
|
||||
s.eventSendMu.Lock()
|
||||
if err := p.Start(ctx); err != nil {
|
||||
p, err := container.Start(ctx, r)
|
||||
if err != nil {
|
||||
s.eventSendMu.Unlock()
|
||||
return nil, err
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
// case for restore
|
||||
if s.getCgroup() == nil && p.Pid() > 0 {
|
||||
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
|
||||
}
|
||||
s.setCgroup(cg)
|
||||
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
|
||||
logrus.WithError(err).Error("add cg to OOM monitor")
|
||||
}
|
||||
if r.ExecID != "" {
|
||||
s.send(&eventstypes.TaskExecStarted{
|
||||
ContainerID: s.id,
|
||||
ExecID: r.ExecID,
|
||||
switch r.ExecID {
|
||||
case "":
|
||||
s.send(&eventstypes.TaskStart{
|
||||
ContainerID: container.ID,
|
||||
Pid: uint32(p.Pid()),
|
||||
})
|
||||
} else {
|
||||
s.send(&eventstypes.TaskStart{
|
||||
ContainerID: s.id,
|
||||
default:
|
||||
s.send(&eventstypes.TaskExecStarted{
|
||||
ContainerID: container.ID,
|
||||
ExecID: r.ExecID,
|
||||
Pid: uint32(p.Pid()),
|
||||
})
|
||||
}
|
||||
@ -375,28 +269,21 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := container.Delete(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
if err := p.Delete(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
isTask := r.ExecID == ""
|
||||
if !isTask {
|
||||
s.mu.Lock()
|
||||
delete(s.processes, r.ExecID)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
if isTask {
|
||||
// if we deleted our init task, close the platform and send the task delete event
|
||||
if r.ExecID == "" {
|
||||
if s.platform != nil {
|
||||
s.platform.Close()
|
||||
}
|
||||
s.send(&eventstypes.TaskDelete{
|
||||
ContainerID: s.id,
|
||||
ContainerID: container.ID,
|
||||
Pid: uint32(p.Pid()),
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
@ -411,33 +298,20 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.processes[r.ExecID]
|
||||
s.mu.Unlock()
|
||||
if p != nil {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if container.ProcessExists(r.ExecID) {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
||||
}
|
||||
p = s.task
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{
|
||||
ID: r.ExecID,
|
||||
Terminal: r.Terminal,
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Spec: r.Spec,
|
||||
})
|
||||
process, err := container.Exec(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.processes[r.ExecID] = process
|
||||
s.mu.Unlock()
|
||||
|
||||
s.send(&eventstypes.TaskExecAdded{
|
||||
ContainerID: s.id,
|
||||
ContainerID: s.container.ID,
|
||||
ExecID: process.ID(),
|
||||
})
|
||||
return empty, nil
|
||||
@ -445,15 +319,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ws := console.WinSize{
|
||||
Width: uint16(r.Width),
|
||||
Height: uint16(r.Height),
|
||||
}
|
||||
if err := p.Resize(ws); err != nil {
|
||||
if err := container.ResizePty(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
@ -485,7 +355,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
||||
sio := p.Stdio()
|
||||
return &taskAPI.StateResponse{
|
||||
ID: p.ID(),
|
||||
Bundle: s.bundle,
|
||||
Bundle: s.container.Bundle,
|
||||
Pid: uint32(p.Pid()),
|
||||
Status: status,
|
||||
Stdin: sio.Stdin,
|
||||
@ -499,48 +369,41 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.task
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
if err := p.(*proc.Init).Pause(ctx); err != nil {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Pause(ctx); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskPaused{
|
||||
p.ID(),
|
||||
container.ID,
|
||||
})
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.task
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
if err := p.(*proc.Init).Resume(ctx); err != nil {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Resume(ctx); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskResumed{
|
||||
p.ID(),
|
||||
container.ID,
|
||||
})
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
||||
if err := container.Kill(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
@ -548,6 +411,10 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
|
||||
|
||||
// Pids returns all pids inside the container
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pids, err := s.getContainerPids(ctx, r.ID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
@ -557,7 +424,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
||||
pInfo := task.ProcessInfo{
|
||||
Pid: pid,
|
||||
}
|
||||
for _, p := range s.processes {
|
||||
for _, p := range container.ExecdProcesses() {
|
||||
if p.Pid() == int(pid) {
|
||||
d := &options.ProcessDetails{
|
||||
ExecID: p.ID(),
|
||||
@ -579,54 +446,63 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if stdin := p.Stdin(); stdin != nil {
|
||||
if err := stdin.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "close stdin")
|
||||
}
|
||||
if err := container.CloseIO(ctx, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.task
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var opts options.CheckpointOptions
|
||||
if r.Options != nil {
|
||||
v, err := typeurl.UnmarshalAny(r.Options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = *v.(*options.CheckpointOptions)
|
||||
}
|
||||
if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
|
||||
Path: r.Path,
|
||||
Exit: opts.Exit,
|
||||
AllowOpenTCP: opts.OpenTcp,
|
||||
AllowExternalUnixSockets: opts.ExternalUnixSockets,
|
||||
AllowTerminal: opts.Terminal,
|
||||
FileLocks: opts.FileLocks,
|
||||
EmptyNamespaces: opts.EmptyNamespaces,
|
||||
WorkDir: opts.WorkPath,
|
||||
}); err != nil {
|
||||
if err := container.Checkpoint(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Update(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := container.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
p.Wait()
|
||||
|
||||
return &taskAPI.WaitResponse{
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||
var pid int
|
||||
if s.task != nil {
|
||||
pid = s.task.Pid()
|
||||
if s.container != nil {
|
||||
pid = s.container.Pid()
|
||||
}
|
||||
return &taskAPI.ConnectResponse{
|
||||
ShimPid: uint32(os.Getpid()),
|
||||
@ -641,7 +517,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||
cg := s.getCgroup()
|
||||
cg := s.container.Cgroup()
|
||||
if cg == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
||||
}
|
||||
@ -658,37 +534,6 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
p := s.task
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
p, err := s.getProcess(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
p.Wait()
|
||||
|
||||
return &taskAPI.WaitResponse{
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) processExits() {
|
||||
for e := range s.ec {
|
||||
s.checkProcesses(e)
|
||||
@ -706,12 +551,17 @@ func (s *service) sendL(evt interface{}) {
|
||||
}
|
||||
|
||||
func (s *service) checkProcesses(e runcC.Exit) {
|
||||
shouldKillAll, err := shouldKillAllOnExit(s.bundle)
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
|
||||
if err != nil {
|
||||
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
|
||||
}
|
||||
|
||||
for _, p := range s.allProcesses() {
|
||||
for _, p := range container.All() {
|
||||
if p.Pid() == e.Pid {
|
||||
if shouldKillAll {
|
||||
if ip, ok := p.(*proc.Init); ok {
|
||||
@ -724,7 +574,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
|
||||
}
|
||||
p.SetExited(e.Status)
|
||||
s.sendL(&eventstypes.TaskExit{
|
||||
ContainerID: s.id,
|
||||
ContainerID: container.ID,
|
||||
ID: p.ID(),
|
||||
Pid: uint32(e.Pid),
|
||||
ExitStatus: uint32(e.Status),
|
||||
@ -754,26 +604,10 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *service) allProcesses() (o []rproc.Process) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
o = make([]rproc.Process, 0, len(s.processes)+1)
|
||||
for _, p := range s.processes {
|
||||
o = append(o, p)
|
||||
}
|
||||
if s.task != nil {
|
||||
o = append(o, s.task)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
||||
s.mu.Lock()
|
||||
p := s.task
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.container.Process("")
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
|
||||
if err != nil {
|
||||
@ -789,7 +623,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
|
||||
func (s *service) forward(publisher events.Publisher) {
|
||||
for e := range s.events {
|
||||
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
|
||||
err := publisher.Publish(ctx, getTopic(e), e)
|
||||
err := publisher.Publish(ctx, runc.GetTopic(e), e)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("post event")
|
||||
@ -797,84 +631,38 @@ func (s *service) forward(publisher events.Publisher) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) getProcess(execID string) (rproc.Process, error) {
|
||||
func (s *service) getContainer() (*runc.Container, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if execID == "" {
|
||||
return s.task, nil
|
||||
}
|
||||
p := s.processes[execID]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (s *service) getCgroup() cgroups.Cgroup {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.cg
|
||||
}
|
||||
|
||||
func (s *service) setCgroup(cg cgroups.Cgroup) {
|
||||
s.mu.Lock()
|
||||
s.cg = cg
|
||||
container := s.container
|
||||
s.mu.Unlock()
|
||||
if err := s.ep.add(s.id, cg); err != nil {
|
||||
logrus.WithError(err).Error("add cg to OOM monitor")
|
||||
if container == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
|
||||
}
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func getTopic(e interface{}) string {
|
||||
switch e.(type) {
|
||||
case *eventstypes.TaskCreate:
|
||||
return runtime.TaskCreateEventTopic
|
||||
case *eventstypes.TaskStart:
|
||||
return runtime.TaskStartEventTopic
|
||||
case *eventstypes.TaskOOM:
|
||||
return runtime.TaskOOMEventTopic
|
||||
case *eventstypes.TaskExit:
|
||||
return runtime.TaskExitEventTopic
|
||||
case *eventstypes.TaskDelete:
|
||||
return runtime.TaskDeleteEventTopic
|
||||
case *eventstypes.TaskExecAdded:
|
||||
return runtime.TaskExecAddedEventTopic
|
||||
case *eventstypes.TaskExecStarted:
|
||||
return runtime.TaskExecStartedEventTopic
|
||||
case *eventstypes.TaskPaused:
|
||||
return runtime.TaskPausedEventTopic
|
||||
case *eventstypes.TaskResumed:
|
||||
return runtime.TaskResumedEventTopic
|
||||
case *eventstypes.TaskCheckpointed:
|
||||
return runtime.TaskCheckpointedEventTopic
|
||||
default:
|
||||
logrus.Warnf("no topic for type %#v", e)
|
||||
func (s *service) getProcess(execID string) (rproc.Process, error) {
|
||||
container, err := s.getContainer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return runtime.TaskUnknownTopic
|
||||
}
|
||||
|
||||
func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options) (*proc.Init, error) {
|
||||
rootfs := filepath.Join(path, "rootfs")
|
||||
runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
|
||||
p := proc.New(r.ID, runtime, rproc.Stdio{
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
})
|
||||
p.Bundle = r.Bundle
|
||||
p.Platform = platform
|
||||
p.Rootfs = rootfs
|
||||
p.WorkDir = workDir
|
||||
p.IoUID = int(options.IoUid)
|
||||
p.IoGID = int(options.IoGid)
|
||||
p.NoPivotRoot = options.NoPivotRoot
|
||||
p.NoNewKeyring = options.NoNewKeyring
|
||||
p.CriuWorkPath = options.CriuWorkPath
|
||||
if p.CriuWorkPath == "" {
|
||||
// if criu work path not set, use container WorkDir
|
||||
p.CriuWorkPath = p.WorkDir
|
||||
p, err := container.Process(execID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
||||
// only be called once.
|
||||
func (s *service) initPlatform() error {
|
||||
if s.platform != nil {
|
||||
return nil
|
||||
}
|
||||
p, err := runc.NewPlatform()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.platform = p
|
||||
return nil
|
||||
}
|
726
runtime/v2/runc/v2/service.go
Normal file
726
runtime/v2/runc/v2/service.go
Normal file
@ -0,0 +1,726 @@
|
||||
// +build linux
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/cgroups"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
rproc "github.com/containerd/containerd/runtime/proc"
|
||||
"github.com/containerd/containerd/runtime/v1/linux/proc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc"
|
||||
"github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
taskAPI "github.com/containerd/containerd/runtime/v2/task"
|
||||
runcC "github.com/containerd/go-runc"
|
||||
"github.com/containerd/typeurl"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
var (
|
||||
_ = (taskAPI.TaskService)(&service{})
|
||||
empty = &ptypes.Empty{}
|
||||
)
|
||||
|
||||
// group labels specifies how the shim groups services.
|
||||
// currently supports a runc.v2 specific .group label and the
|
||||
// standard k8s pod label. Order matters in this list
|
||||
var groupLabels = []string{
|
||||
"io.containerd.runc.v2.group",
|
||||
"io.kubernetes.cri.sandbox-id",
|
||||
}
|
||||
|
||||
type spec struct {
|
||||
Annotations map[string]string `json:"annotations,omitempty"`
|
||||
}
|
||||
|
||||
// New returns a new shim service that can be used via GRPC
|
||||
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
|
||||
ep, err := runc.NewOOMEpoller(publisher)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go ep.Run(ctx)
|
||||
s := &service{
|
||||
id: id,
|
||||
context: ctx,
|
||||
events: make(chan interface{}, 128),
|
||||
ec: shim.Default.Subscribe(),
|
||||
ep: ep,
|
||||
cancel: cancel,
|
||||
containers: make(map[string]*runc.Container),
|
||||
}
|
||||
go s.processExits()
|
||||
runcC.Monitor = shim.Default
|
||||
if err := s.initPlatform(); err != nil {
|
||||
cancel()
|
||||
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
||||
}
|
||||
go s.forward(publisher)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// service is the shim implementation of a remote shim over GRPC
|
||||
type service struct {
|
||||
mu sync.Mutex
|
||||
eventSendMu sync.Mutex
|
||||
|
||||
context context.Context
|
||||
events chan interface{}
|
||||
platform rproc.Platform
|
||||
ec chan runcC.Exit
|
||||
ep *runc.Epoller
|
||||
|
||||
// id only used in cleanup case
|
||||
id string
|
||||
|
||||
containers map[string]*runc.Container
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self, err := os.Executable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args := []string{
|
||||
"-namespace", ns,
|
||||
"-id", id,
|
||||
"-address", containerdAddress,
|
||||
"-publish-binary", containerdBinary,
|
||||
}
|
||||
cmd := exec.Command(self, args...)
|
||||
cmd.Dir = cwd
|
||||
cmd.Env = append(os.Environ(), "GOMAXPROCS=4")
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{
|
||||
Setpgid: true,
|
||||
}
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func readSpec() (*spec, error) {
|
||||
f, err := os.Open("config.json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
var s spec
|
||||
if err := json.NewDecoder(f).Decode(&s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
grouping := id
|
||||
spec, err := readSpec()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, group := range groupLabels {
|
||||
if groupID, ok := spec.Annotations[group]; ok {
|
||||
grouping = groupID
|
||||
break
|
||||
}
|
||||
}
|
||||
address, err := shim.SocketAddress(ctx, grouping)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
socket, err := shim.NewSocket(address)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "address already in use") {
|
||||
if err := shim.WriteAddress("address", address); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return address, nil
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
defer socket.Close()
|
||||
f, err := socket.File()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
}()
|
||||
// make sure to wait after start
|
||||
go cmd.Wait()
|
||||
if err := shim.WriteAddress("address", address); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := shim.SetScore(cmd.Process.Pid); err != nil {
|
||||
return "", errors.Wrap(err, "failed to set OOM Score on shim")
|
||||
}
|
||||
return address, nil
|
||||
}
|
||||
|
||||
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
|
||||
cwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path := filepath.Join(filepath.Dir(cwd), s.id)
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
runtime, err := runc.ReadRuntime(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false)
|
||||
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
|
||||
Force: true,
|
||||
}); err != nil {
|
||||
logrus.WithError(err).Warn("failed to remove runc container")
|
||||
}
|
||||
if err := mount.UnmountAll(filepath.Join(path, "rootfs"), 0); err != nil {
|
||||
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
|
||||
}
|
||||
return &taskAPI.DeleteResponse{
|
||||
ExitedAt: time.Now(),
|
||||
ExitStatus: 128 + uint32(unix.SIGKILL),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Create a new initial process and container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
container, err := runc.NewContainer(ctx, s.platform, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.containers[r.ID] = container
|
||||
|
||||
s.send(&eventstypes.TaskCreate{
|
||||
ContainerID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
Rootfs: r.Rootfs,
|
||||
IO: &eventstypes.TaskIO{
|
||||
Stdin: r.Stdin,
|
||||
Stdout: r.Stdout,
|
||||
Stderr: r.Stderr,
|
||||
Terminal: r.Terminal,
|
||||
},
|
||||
Checkpoint: r.Checkpoint,
|
||||
Pid: uint32(container.Pid()),
|
||||
})
|
||||
|
||||
return &taskAPI.CreateTaskResponse{
|
||||
Pid: uint32(container.Pid()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// hold the send lock so that the start events are sent before any exit events in the error case
|
||||
s.eventSendMu.Lock()
|
||||
p, err := container.Start(ctx, r)
|
||||
if err != nil {
|
||||
s.eventSendMu.Unlock()
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
|
||||
logrus.WithError(err).Error("add cg to OOM monitor")
|
||||
}
|
||||
switch r.ExecID {
|
||||
case "":
|
||||
s.send(&eventstypes.TaskStart{
|
||||
ContainerID: container.ID,
|
||||
Pid: uint32(p.Pid()),
|
||||
})
|
||||
default:
|
||||
s.send(&eventstypes.TaskExecStarted{
|
||||
ContainerID: container.ID,
|
||||
ExecID: r.ExecID,
|
||||
Pid: uint32(p.Pid()),
|
||||
})
|
||||
}
|
||||
s.eventSendMu.Unlock()
|
||||
return &taskAPI.StartResponse{
|
||||
Pid: uint32(p.Pid()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := container.Delete(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
// if we deleted our init task, close the platform and send the task delete event
|
||||
if r.ExecID == "" {
|
||||
s.mu.Lock()
|
||||
delete(s.containers, r.ID)
|
||||
hasContainers := len(s.containers) > 0
|
||||
s.mu.Unlock()
|
||||
if s.platform != nil && !hasContainers {
|
||||
s.platform.Close()
|
||||
}
|
||||
s.send(&eventstypes.TaskDelete{
|
||||
ContainerID: container.ID,
|
||||
Pid: uint32(p.Pid()),
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
})
|
||||
}
|
||||
return &taskAPI.DeleteResponse{
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
Pid: uint32(p.Pid()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if container.ProcessExists(r.ExecID) {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
||||
}
|
||||
process, err := container.Exec(ctx, r)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
s.send(&eventstypes.TaskExecAdded{
|
||||
ContainerID: container.ID,
|
||||
ExecID: process.ID(),
|
||||
})
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.ResizePty(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := container.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
st, err := p.Status(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
status := task.StatusUnknown
|
||||
switch st {
|
||||
case "created":
|
||||
status = task.StatusCreated
|
||||
case "running":
|
||||
status = task.StatusRunning
|
||||
case "stopped":
|
||||
status = task.StatusStopped
|
||||
case "paused":
|
||||
status = task.StatusPaused
|
||||
case "pausing":
|
||||
status = task.StatusPausing
|
||||
}
|
||||
sio := p.Stdio()
|
||||
return &taskAPI.StateResponse{
|
||||
ID: p.ID(),
|
||||
Bundle: container.Bundle,
|
||||
Pid: uint32(p.Pid()),
|
||||
Status: status,
|
||||
Stdin: sio.Stdin,
|
||||
Stdout: sio.Stdout,
|
||||
Stderr: sio.Stderr,
|
||||
Terminal: sio.Terminal,
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Pause(ctx); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskPaused{
|
||||
container.ID,
|
||||
})
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Resume(ctx); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.send(&eventstypes.TaskResumed{
|
||||
container.ID,
|
||||
})
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Kill(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Pids returns all pids inside the container
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pids, err := s.getContainerPids(ctx, r.ID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
var processes []*task.ProcessInfo
|
||||
for _, pid := range pids {
|
||||
pInfo := task.ProcessInfo{
|
||||
Pid: pid,
|
||||
}
|
||||
for _, p := range container.ExecdProcesses() {
|
||||
if p.Pid() == int(pid) {
|
||||
d := &options.ProcessDetails{
|
||||
ExecID: p.ID(),
|
||||
}
|
||||
a, err := typeurl.MarshalAny(d)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
|
||||
}
|
||||
pInfo.Info = a
|
||||
break
|
||||
}
|
||||
}
|
||||
processes = append(processes, &pInfo)
|
||||
}
|
||||
return &taskAPI.PidsResponse{
|
||||
Processes: processes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.CloseIO(ctx, r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Checkpoint(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := container.Update(ctx, r); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := container.Process(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
p.Wait()
|
||||
|
||||
return &taskAPI.WaitResponse{
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||
var pid int
|
||||
if container, err := s.getContainer(r.ID); err == nil {
|
||||
pid = container.Pid()
|
||||
}
|
||||
return &taskAPI.ConnectResponse{
|
||||
ShimPid: uint32(os.Getpid()),
|
||||
TaskPid: uint32(pid),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
// return out if the shim is still servicing containers
|
||||
if len(s.containers) > 0 {
|
||||
s.mu.Unlock()
|
||||
return empty, nil
|
||||
}
|
||||
s.cancel()
|
||||
os.Exit(0)
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||
container, err := s.getContainer(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cg := container.Cgroup()
|
||||
if cg == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
||||
}
|
||||
stats, err := cg.Stat(cgroups.IgnoreNotExist)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
data, err := typeurl.MarshalAny(stats)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &taskAPI.StatsResponse{
|
||||
Stats: data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) processExits() {
|
||||
for e := range s.ec {
|
||||
s.checkProcesses(e)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) send(evt interface{}) {
|
||||
s.events <- evt
|
||||
}
|
||||
|
||||
func (s *service) sendL(evt interface{}) {
|
||||
s.eventSendMu.Lock()
|
||||
s.events <- evt
|
||||
s.eventSendMu.Unlock()
|
||||
}
|
||||
|
||||
func (s *service) checkProcesses(e runcC.Exit) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
for _, container := range s.containers {
|
||||
if container.HasPid(e.Pid) {
|
||||
shouldKillAll, err := shouldKillAllOnExit(container.Bundle)
|
||||
if err != nil {
|
||||
log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
|
||||
}
|
||||
|
||||
for _, p := range container.All() {
|
||||
if p.Pid() == e.Pid {
|
||||
if shouldKillAll {
|
||||
if ip, ok := p.(*proc.Init); ok {
|
||||
// Ensure all children are killed
|
||||
if err := ip.KillAll(s.context); err != nil {
|
||||
logrus.WithError(err).WithField("id", ip.ID()).
|
||||
Error("failed to kill init's children")
|
||||
}
|
||||
}
|
||||
}
|
||||
p.SetExited(e.Status)
|
||||
s.sendL(&eventstypes.TaskExit{
|
||||
ContainerID: container.ID,
|
||||
ID: p.ID(),
|
||||
Pid: uint32(e.Pid),
|
||||
ExitStatus: uint32(e.Status),
|
||||
ExitedAt: p.ExitedAt(),
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func shouldKillAllOnExit(bundlePath string) (bool, error) {
|
||||
var bundleSpec specs.Spec
|
||||
bundleConfigContents, err := ioutil.ReadFile(filepath.Join(bundlePath, "config.json"))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
json.Unmarshal(bundleConfigContents, &bundleSpec)
|
||||
|
||||
if bundleSpec.Linux != nil {
|
||||
for _, ns := range bundleSpec.Linux.Namespaces {
|
||||
if ns.Type == specs.PIDNamespace {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
||||
container, err := s.getContainer(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := container.Process("")
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pids := make([]uint32, 0, len(ps))
|
||||
for _, pid := range ps {
|
||||
pids = append(pids, uint32(pid))
|
||||
}
|
||||
return pids, nil
|
||||
}
|
||||
|
||||
func (s *service) forward(publisher events.Publisher) {
|
||||
for e := range s.events {
|
||||
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
|
||||
err := publisher.Publish(ctx, runc.GetTopic(e), e)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("post event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) getContainer(id string) (*runc.Container, error) {
|
||||
s.mu.Lock()
|
||||
container := s.containers[id]
|
||||
s.mu.Unlock()
|
||||
if container == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
|
||||
}
|
||||
return container, nil
|
||||
}
|
||||
|
||||
// initialize a single epoll fd to manage our consoles. `initPlatform` should
|
||||
// only be called once.
|
||||
func (s *service) initPlatform() error {
|
||||
if s.platform != nil {
|
||||
return nil
|
||||
}
|
||||
p, err := runc.NewPlatform()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.platform = p
|
||||
return nil
|
||||
}
|
@ -99,7 +99,9 @@ type shim struct {
|
||||
}
|
||||
|
||||
func (s *shim) Connect(ctx context.Context) error {
|
||||
response, err := s.task.Connect(ctx, &task.ConnectRequest{})
|
||||
response, err := s.task.Connect(ctx, &task.ConnectRequest{
|
||||
ID: s.ID(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -317,6 +319,7 @@ func (s *shim) Wait(ctx context.Context) (*runtime.Exit, error) {
|
||||
|
||||
func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any) error {
|
||||
request := &task.CheckpointTaskRequest{
|
||||
ID: s.ID(),
|
||||
Path: path,
|
||||
Options: options,
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package shim
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
@ -150,3 +151,22 @@ func WriteAddress(path, address string) error {
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
}
|
||||
|
||||
// ErrNoAddress is returned when the address file has no content
|
||||
var ErrNoAddress = errors.New("no shim address")
|
||||
|
||||
// ReadAddress returns the shim's abstract socket address from the path
|
||||
func ReadAddress(path string) (string, error) {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return "", ErrNoAddress
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
api "github.com/containerd/containerd/api/services/tasks/v1"
|
||||
@ -687,8 +688,8 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) {
|
||||
}
|
||||
|
||||
var checkpointPath string
|
||||
switch runtime {
|
||||
case "io.containerd.runc.v1":
|
||||
switch {
|
||||
case checkRuntime(runtime, "io.containerd.runc"):
|
||||
v, err := typeurl.UnmarshalAny(option)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -699,7 +700,7 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) {
|
||||
}
|
||||
checkpointPath = opts.ImagePath
|
||||
|
||||
case "io.containerd.runtime.v1.linux":
|
||||
case runtime == plugin.RuntimeLinuxV1:
|
||||
v, err := typeurl.UnmarshalAny(option)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -721,8 +722,8 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
|
||||
}
|
||||
|
||||
var restorePath string
|
||||
switch runtime {
|
||||
case "io.containerd.runc.v1":
|
||||
switch {
|
||||
case checkRuntime(runtime, "io.containerd.runc"):
|
||||
v, err := typeurl.UnmarshalAny(option)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -732,8 +733,7 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
|
||||
return "", fmt.Errorf("invalid task create option for %s", runtime)
|
||||
}
|
||||
restorePath = opts.CriuImagePath
|
||||
|
||||
case "io.containerd.runtime.v1.linux":
|
||||
case runtime == plugin.RuntimeLinuxV1:
|
||||
v, err := typeurl.UnmarshalAny(option)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -747,3 +747,20 @@ func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
|
||||
|
||||
return restorePath, nil
|
||||
}
|
||||
|
||||
// checkRuntime returns true if the current runtime matches the expected
|
||||
// runtime. Providing various parts of the runtime schema will match those
|
||||
// parts of the expected runtime
|
||||
func checkRuntime(current, expected string) bool {
|
||||
cp := strings.Split(current, ".")
|
||||
l := len(cp)
|
||||
for i, p := range strings.Split(expected, ".") {
|
||||
if i > l {
|
||||
return false
|
||||
}
|
||||
if p != cp[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
4
task.go
4
task.go
@ -642,12 +642,12 @@ func isCheckpointPathExist(runtime string, v interface{}) bool {
|
||||
}
|
||||
|
||||
switch runtime {
|
||||
case "io.containerd.runc.v1":
|
||||
case plugin.RuntimeRuncV1, plugin.RuntimeRuncV2:
|
||||
if opts, ok := v.(*options.CheckpointOptions); ok && opts.ImagePath != "" {
|
||||
return true
|
||||
}
|
||||
|
||||
case "io.containerd.runtime.v1.linux":
|
||||
case plugin.RuntimeLinuxV1:
|
||||
if opts, ok := v.(*runctypes.CheckpointOptions); ok && opts.ImagePath != "" {
|
||||
return true
|
||||
}
|
||||
|
51
task_opts.go
51
task_opts.go
@ -34,11 +34,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
v1runtime = "io.containerd.runtime.v1.linux"
|
||||
v2runtime = "io.containerd.runc.v1"
|
||||
)
|
||||
|
||||
// NewTaskOpts allows the caller to set options on a new task
|
||||
type NewTaskOpts func(context.Context, *Client, *TaskInfo) error
|
||||
|
||||
@ -99,23 +94,22 @@ func WithCheckpointName(name string) CheckpointTaskOpts {
|
||||
// WithCheckpointImagePath sets image path for checkpoint option
|
||||
func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts {
|
||||
return func(r *CheckpointTaskInfo) error {
|
||||
switch rt {
|
||||
case v1runtime:
|
||||
if r.Options == nil {
|
||||
r.Options = &runctypes.CheckpointOptions{}
|
||||
}
|
||||
opts, ok := r.Options.(*runctypes.CheckpointOptions)
|
||||
if !ok {
|
||||
return errors.New("invalid v1 checkpoint options format")
|
||||
}
|
||||
opts.ImagePath = path
|
||||
case v2runtime:
|
||||
if CheckRuntime(rt, "io.containerd.runc") {
|
||||
if r.Options == nil {
|
||||
r.Options = &options.CheckpointOptions{}
|
||||
}
|
||||
opts, ok := r.Options.(*options.CheckpointOptions)
|
||||
if !ok {
|
||||
return errors.New("invalid v2 checkpoint options format")
|
||||
return errors.New("invalid v2 shim checkpoint options format")
|
||||
}
|
||||
opts.ImagePath = path
|
||||
} else {
|
||||
if r.Options == nil {
|
||||
r.Options = &runctypes.CheckpointOptions{}
|
||||
}
|
||||
opts, ok := r.Options.(*runctypes.CheckpointOptions)
|
||||
if !ok {
|
||||
return errors.New("invalid v1 shim checkpoint options format")
|
||||
}
|
||||
opts.ImagePath = path
|
||||
}
|
||||
@ -126,23 +120,22 @@ func WithCheckpointImagePath(rt, path string) CheckpointTaskOpts {
|
||||
// WithRestoreImagePath sets image path for create option
|
||||
func WithRestoreImagePath(rt, path string) NewTaskOpts {
|
||||
return func(ctx context.Context, c *Client, ti *TaskInfo) error {
|
||||
switch rt {
|
||||
case v1runtime:
|
||||
if ti.Options == nil {
|
||||
ti.Options = &runctypes.CreateOptions{}
|
||||
}
|
||||
opts, ok := ti.Options.(*runctypes.CreateOptions)
|
||||
if !ok {
|
||||
return errors.New("invalid v1 create options format")
|
||||
}
|
||||
opts.CriuImagePath = path
|
||||
case v2runtime:
|
||||
if CheckRuntime(rt, "io.containerd.runc") {
|
||||
if ti.Options == nil {
|
||||
ti.Options = &options.Options{}
|
||||
}
|
||||
opts, ok := ti.Options.(*options.Options)
|
||||
if !ok {
|
||||
return errors.New("invalid v2 create options format")
|
||||
return errors.New("invalid v2 shim create options format")
|
||||
}
|
||||
opts.CriuImagePath = path
|
||||
} else {
|
||||
if ti.Options == nil {
|
||||
ti.Options = &runctypes.CreateOptions{}
|
||||
}
|
||||
opts, ok := ti.Options.(*runctypes.CreateOptions)
|
||||
if !ok {
|
||||
return errors.New("invalid v1 shim create options format")
|
||||
}
|
||||
opts.CriuImagePath = path
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user