Remove runtime v1

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-03-13 13:42:53 -07:00
parent eebd4ad531
commit ef516a1507
30 changed files with 11 additions and 6741 deletions

View File

@ -1,28 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
v1 "github.com/containerd/containerd/runtime/v2/runc/v1"
"github.com/containerd/containerd/runtime/v2/shim"
)
func main() {
shim.Run("io.containerd.runc.v1", v1.New)
}

View File

@ -1,333 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"os/signal"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/protobuf/proto"
ptypes "github.com/containerd/containerd/protobuf/types"
shimlog "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/containerd/version"
"github.com/containerd/ttrpc"
"github.com/sirupsen/logrus"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
var (
debugFlag bool
versionFlag bool
namespaceFlag string
socketFlag string
addressFlag string
workdirFlag string
runtimeRootFlag string
criuFlag string
systemdCgroupFlag bool
containerdBinaryFlag string
bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
)
func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&workdirFlag, "workdir", "", "path used to storage large temporary data")
flag.StringVar(&runtimeRootFlag, "runtime-root", process.RuncRoot, "root directory for the runtime")
flag.StringVar(&criuFlag, "criu", "", "path to criu binary (deprecated: do not use)")
flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup")
// currently, the `containerd publish` utility is embedded in the daemon binary.
// The daemon invokes `containerd-shim -containerd-binary ...` with its own os.Executable() path.
flag.StringVar(&containerdBinaryFlag, "containerd-binary", "containerd", "path to containerd binary (used for `containerd publish`)")
flag.Parse()
}
func setRuntime() {
debug.SetGCPercent(40)
go func() {
for range time.Tick(30 * time.Second) {
debug.FreeOSMemory()
}
}()
if os.Getenv("GOMAXPROCS") == "" {
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
// the number of Go stacks present in the shim.
runtime.GOMAXPROCS(2)
}
}
func main() {
parseFlags()
if versionFlag {
fmt.Println("containerd-shim")
fmt.Println(" Version: ", version.Version)
fmt.Println(" Revision:", version.Revision)
fmt.Println(" Go version:", version.GoVersion)
fmt.Println("")
return
}
setRuntime()
if debugFlag {
logrus.SetLevel(logrus.DebugLevel)
}
stdout, stderr, err := openStdioKeepAlivePipes(workdirFlag)
if err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
defer func() {
stdout.Close()
stderr.Close()
}()
// redirect the following output into fifo to make sure that containerd
// still can read the log after restart
logrus.SetOutput(stdout)
if err := executeShim(); err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
}
// If containerd server process dies, we need the shim to keep stdout/err reader
// FDs so that Linux does not SIGPIPE the shim process if it tries to use its end of
// these pipes.
func openStdioKeepAlivePipes(dir string) (io.ReadWriteCloser, io.ReadWriteCloser, error) {
background := context.Background()
keepStdoutAlive, err := shimlog.OpenShimStdoutLog(background, dir)
if err != nil {
return nil, nil, err
}
keepStderrAlive, err := shimlog.OpenShimStderrLog(background, dir)
if err != nil {
return nil, nil, err
}
return keepStdoutAlive, keepStderrAlive, nil
}
func executeShim() error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler
signals, err := setupSignals()
if err != nil {
return err
}
dump := make(chan os.Signal, 32)
signal.Notify(dump, syscall.SIGUSR1)
path, err := os.Getwd()
if err != nil {
return err
}
server, err := newServer()
if err != nil {
return fmt.Errorf("failed creating server: %w", err)
}
sv, err := shim.NewService(
shim.Config{
Path: path,
Namespace: namespaceFlag,
WorkDir: workdirFlag,
SystemdCgroup: systemdCgroupFlag,
RuntimeRoot: runtimeRootFlag,
},
&remoteEventsPublisher{address: addressFlag},
)
if err != nil {
return err
}
logrus.Debug("registering ttrpc server")
shimapi.RegisterShimService(server, sv)
socket := socketFlag
if err := serve(context.Background(), server, socket); err != nil {
return err
}
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(logger, signals, server, sv)
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
var (
l net.Listener
err error
)
if path == "" {
f := os.NewFile(3, "socket")
l, err = net.FileListener(f)
f.Close()
path = "[inherited from parent]"
} else {
const (
abstractSocketPrefix = "\x00"
socketPathLimit = 106
)
p := strings.TrimPrefix(path, "unix://")
if len(p) == len(path) {
p = abstractSocketPrefix + p
}
if len(p) > socketPathLimit {
return fmt.Errorf("%q: unix socket path too long (> %d)", p, socketPathLimit)
}
l, err = net.Listen("unix", p)
}
if err != nil {
return err
}
logrus.WithField("socket", path).Debug("serving api on unix socket")
go func() {
defer l.Close()
if err := server.Serve(ctx, l); err != nil && !errors.Is(err, net.ErrClosed) {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
return nil
}
func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.Server, sv *shim.Service) error {
var (
termOnce sync.Once
done = make(chan struct{})
)
for {
select {
case <-done:
return nil
case s := <-signals:
switch s {
case unix.SIGCHLD:
if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status")
}
case unix.SIGTERM, unix.SIGINT:
go termOnce.Do(func() {
ctx := context.TODO()
if err := server.Shutdown(ctx); err != nil {
logger.WithError(err).Error("failed to shutdown server")
}
// Ensure our child is dead if any
sv.Kill(ctx, &shimapi.KillRequest{
Signal: uint32(syscall.SIGKILL),
All: true,
})
sv.Delete(context.Background(), &ptypes.Empty{})
close(done)
})
case unix.SIGPIPE:
}
}
}
}
func dumpStacks(logger *logrus.Entry) {
var (
buf []byte
stackSize int
)
bufferLen := 16384
for stackSize == len(buf) {
buf = make([]byte, bufferLen)
stackSize = runtime.Stack(buf, true)
bufferLen *= 2
}
buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
}
type remoteEventsPublisher struct {
address string
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := protobuf.MarshalAnyToProto(event)
if err != nil {
return err
}
data, err := proto.Marshal(encoded)
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, containerdBinaryFlag, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
b := bufPool.Get().(*bytes.Buffer)
defer func() {
b.Reset()
bufPool.Put(b)
}()
cmd.Stdout = b
cmd.Stderr = b
c, err := reaper.Default.Start(cmd)
if err != nil {
return err
}
status, err := reaper.Default.WaitTimeout(cmd, c, 30*time.Second)
if err != nil {
return fmt.Errorf("failed to publish event: %s: %w", b.String(), err)
}
if status != 0 {
return fmt.Errorf("failed to publish event: %s", b.String())
}
return nil
}

View File

@ -1,44 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os"
"os/signal"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
// make sure runc is setup to use the monitor
// for waiting on processes
runc.Monitor = reaper.Default
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
// for darwin, we omit the socket credentials because these syscalls are
// slightly different. since we don't have darwin support yet, this can be
// implemented later and the build can continue without issue.
return ttrpc.NewServer()
}

View File

@ -1,45 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os"
"os/signal"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
// make sure runc is setup to use the monitor
// for waiting on processes
runc.Monitor = reaper.Default
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
// for freebsd, we omit the socket credentials because these syscalls are
// slightly different. since we don't have freebsd support yet, this can be
// implemented later and the build can continue without issue.
return ttrpc.NewServer()
}

View File

@ -1,46 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"os"
"os/signal"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc"
"golang.org/x/sys/unix"
)
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
// make sure runc is setup to use the monitor
// for waiting on processes
runc.Monitor = reaper.Default
// set the shim as the subreaper for all orphaned processes created by the container
if err := reaper.SetSubreaper(1); err != nil {
return nil, err
}
return signals, nil
}
func newServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
}

View File

@ -19,7 +19,6 @@ package builtins
import (
_ "github.com/containerd/containerd/metrics/cgroups"
_ "github.com/containerd/containerd/metrics/cgroups/v2"
_ "github.com/containerd/containerd/runtime/v1/linux"
_ "github.com/containerd/containerd/runtime/v2/runc/options"
_ "github.com/containerd/containerd/snapshots/native/plugin"
_ "github.com/containerd/containerd/snapshots/overlay/plugin"

View File

@ -22,7 +22,7 @@ For each `containerd` release, we'll publish a release tarball specifically for
### Content
As shown below, the release tarball contains:
- `containerd`, `containerd-shim`, `containerd-shim-runc-v1`, `containerd-shim-runc-v2`, `ctr`: binaries for containerd.
- `containerd`, `containerd-shim-runc-v2`, `ctr`: binaries for containerd.
- `runc`: runc binary.
- `/opt/cni/bin`: binaries for [Container Network Interface](https://github.com/containernetworking/cni)
- `crictl`, `crictl.yaml`: command line tools for CRI container runtime and its config file.
@ -45,8 +45,6 @@ usr/local/
usr/local/bin/
usr/local/bin/containerd-shim-runc-v2
usr/local/bin/ctr
usr/local/bin/containerd-shim
usr/local/bin/containerd-shim-runc-v1
usr/local/bin/crictl
usr/local/bin/critest
usr/local/bin/containerd

View File

@ -18,9 +18,7 @@ verify its sha256sum, and extract it under `/usr/local`:
$ tar Cxzvf /usr/local containerd-1.6.2-linux-amd64.tar.gz
bin/
bin/containerd-shim-runc-v2
bin/containerd-shim
bin/ctr
bin/containerd-shim-runc-v1
bin/containerd
bin/containerd-stress
```

View File

@ -18,8 +18,7 @@ package integration
import (
// Register for linux platforms
_ "github.com/containerd/containerd/plugins/sandbox" // WithInMemoryServices will fail otherwise
_ "github.com/containerd/containerd/runtime/v1/linux"
_ "github.com/containerd/containerd/plugins/sandbox" // WithInMemoryServices will fail otherwise
_ "github.com/containerd/containerd/services/sandbox" // WithInMemoryServices will fail otherwise
_ "github.com/containerd/containerd/snapshots/overlay/plugin"
)

View File

@ -375,16 +375,9 @@ func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) {
// After we restarted containerd we write some messages to the log pipes, simulating shim writing stuff there.
// Then we make sure that these messages are available on the containerd log thus proving that the server reconnected to the log pipes
runtimeVersion := getRuntimeVersion()
logDirPath := getLogDirPath(runtimeVersion, id)
logDirPath := getLogDirPath("v2", id)
switch runtimeVersion {
case "v1":
writeToFile(t, filepath.Join(logDirPath, "shim.stdout.log"), fmt.Sprintf("%s writing to stdout\n", id))
writeToFile(t, filepath.Join(logDirPath, "shim.stderr.log"), fmt.Sprintf("%s writing to stderr\n", id))
case "v2":
writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id))
}
writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id))
statusC, err := task.Wait(ctx)
if err != nil {
@ -402,18 +395,8 @@ func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) {
t.Fatal(err)
}
switch runtimeVersion {
case "v1":
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stdout", id)) {
t.Fatal("containerd did not connect to the shim stdout pipe")
}
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stderr", id)) {
t.Fatal("containerd did not connect to the shim stderr pipe")
}
case "v2":
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) {
t.Fatal("containerd did not connect to the shim log pipe")
}
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) {
t.Fatal("containerd did not connect to the shim log pipe")
}
}
@ -441,15 +424,6 @@ func getLogDirPath(runtimeVersion, id string) string {
}
}
func getRuntimeVersion() string {
switch rt := os.Getenv("TEST_RUNTIME"); rt {
case plugin.RuntimeLinuxV1:
return "v1"
default:
return "v2"
}
}
func TestContainerAttach(t *testing.T) {
t.Parallel()

View File

@ -30,7 +30,6 @@ import (
"testing"
"time"
v1shimcli "github.com/containerd/containerd/runtime/v1/shim/client"
v2shimcli "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/ttrpc"
)
@ -47,10 +46,8 @@ func TestFailFastWhenConnectShim(t *testing.T) {
// abstract Unix domain sockets are only for Linux.
if runtime.GOOS == "linux" {
t.Run("abstract-unix-socket-v1", testFailFastWhenConnectShim(true, v1shimcli.AnonDialer))
t.Run("abstract-unix-socket-v2", testFailFastWhenConnectShim(true, v2shimcli.AnonDialer))
}
t.Run("normal-unix-socket-v1", testFailFastWhenConnectShim(false, v1shimcli.AnonDialer))
t.Run("normal-unix-socket-v2", testFailFastWhenConnectShim(false, v2shimcli.AnonDialer))
}

View File

@ -28,7 +28,6 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v1/linux"
"github.com/docker/go-metrics"
"github.com/sirupsen/logrus"
)
@ -55,11 +54,15 @@ type cgroupsMonitor struct {
publisher events.Publisher
}
type cgroupTask interface {
Cgroup() (cgroups.Cgroup, error)
}
func (m *cgroupsMonitor) Monitor(c runtime.Task, labels map[string]string) error {
if err := m.collector.Add(c, labels); err != nil {
return err
}
t, ok := c.(*linux.Task)
t, ok := c.(cgroupTask)
if !ok {
return nil
}

View File

@ -1,248 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
"path/filepath"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/containerd/runtime/v1/shim"
"github.com/containerd/containerd/runtime/v1/shim/client"
"github.com/opencontainers/runtime-spec/specs-go"
)
// loadBundle loads an existing bundle from disk
func loadBundle(id, path, workdir string) *bundle {
return &bundle{
id: id,
path: path,
workDir: workdir,
}
}
// newBundle creates a new bundle on disk at the provided path for the given id
func newBundle(id, path, workDir string, spec []byte) (b *bundle, err error) {
if err := os.MkdirAll(path, 0711); err != nil {
return nil, err
}
path = filepath.Join(path, id)
if err := os.Mkdir(path, 0700); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(path)
}
}()
if err := prepareBundleDirectoryPermissions(path, spec); err != nil {
return nil, err
}
workDir = filepath.Join(workDir, id)
if err := os.MkdirAll(workDir, 0711); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(workDir)
}
}()
rootfs := filepath.Join(path, "rootfs")
if err := os.MkdirAll(rootfs, 0711); err != nil {
return nil, err
}
err = os.WriteFile(filepath.Join(path, configFilename), spec, 0666)
return &bundle{
id: id,
path: path,
workDir: workDir,
}, err
}
// prepareBundleDirectoryPermissions prepares the permissions of the bundle
// directory. When user namespaces are enabled, the permissions are modified
// to allow the remapped root GID to access the bundle.
func prepareBundleDirectoryPermissions(path string, spec []byte) error {
gid, err := remappedGID(spec)
if err != nil {
return err
}
if gid == 0 {
return nil
}
if err := os.Chown(path, -1, int(gid)); err != nil {
return err
}
return os.Chmod(path, 0710)
}
// ociSpecUserNS is a subset of specs.Spec used to reduce garbage during
// unmarshal.
type ociSpecUserNS struct {
Linux *linuxSpecUserNS
}
// linuxSpecUserNS is a subset of specs.Linux used to reduce garbage during
// unmarshal.
type linuxSpecUserNS struct {
GIDMappings []specs.LinuxIDMapping
}
// remappedGID reads the remapped GID 0 from the OCI spec, if it exists. If
// there is no remapping, remappedGID returns 0. If the spec cannot be parsed,
// remappedGID returns an error.
func remappedGID(spec []byte) (uint32, error) {
var ociSpec ociSpecUserNS
err := json.Unmarshal(spec, &ociSpec)
if err != nil {
return 0, err
}
if ociSpec.Linux == nil || len(ociSpec.Linux.GIDMappings) == 0 {
return 0, nil
}
for _, mapping := range ociSpec.Linux.GIDMappings {
if mapping.ContainerID == 0 {
return mapping.HostID, nil
}
}
return 0, nil
}
type bundle struct {
id string
path string
workDir string
}
// ShimOpt specifies shim options for initialization and connection
type ShimOpt func(*bundle, string, *runctypes.RuncOptions) (shim.Config, client.Opt)
// ShimRemote is a ShimOpt for connecting and starting a remote shim
func ShimRemote(c *Config, daemonAddress, cgroup string, exitHandler func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
config := b.shimConfig(ns, c, ropts)
return config,
client.WithStart(c.Shim, b.shimAddress(ns, daemonAddress), daemonAddress, cgroup, c.ShimDebug, exitHandler)
}
}
// ShimLocal is a ShimOpt for using an in process shim implementation
func ShimLocal(c *Config, exchange *exchange.Exchange) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, c, ropts), client.WithLocal(exchange)
}
}
// ShimConnect is a ShimOpt for connecting to an existing remote shim
func ShimConnect(c *Config, onClose func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, c, ropts), client.WithConnect(b.decideShimAddress(ns), onClose)
}
}
// NewShimClient connects to the shim managing the bundle and tasks creating it if needed
func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientOpts ShimOpt, runcOpts *runctypes.RuncOptions) (*client.Client, error) {
cfg, opt := getClientOpts(b, namespace, runcOpts)
return client.New(ctx, cfg, opt)
}
// Delete deletes the bundle from disk
func (b *bundle) Delete() error {
address, _ := b.loadAddress()
if address != "" {
// we don't care about errors here
client.RemoveSocket(address)
}
err := atomicDelete(b.path)
if err == nil {
return atomicDelete(b.workDir)
}
// error removing the bundle path; still attempt removing work dir
err2 := atomicDelete(b.workDir)
if err2 == nil {
return err
}
return fmt.Errorf("failed to remove both bundle and workdir locations: %v: %w", err2, err)
}
func (b *bundle) legacyShimAddress(namespace string) string {
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
}
const socketRoot = "/run/containerd"
func (b *bundle) shimAddress(namespace, socketPath string) string {
d := sha256.Sum256([]byte(filepath.Join(socketPath, namespace, b.id)))
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d)
}
func (b *bundle) loadAddress() (string, error) {
addressPath := filepath.Join(b.path, "address")
data, err := os.ReadFile(addressPath)
if err != nil {
return "", err
}
return string(data), nil
}
func (b *bundle) decideShimAddress(namespace string) string {
address, err := b.loadAddress()
if err != nil {
return b.legacyShimAddress(namespace)
}
return address
}
func (b *bundle) shimConfig(namespace string, c *Config, runcOptions *runctypes.RuncOptions) shim.Config {
var (
runtimeRoot = c.RuntimeRoot
systemdCgroup bool
)
if runcOptions != nil {
systemdCgroup = runcOptions.SystemdCgroup
if runcOptions.RuntimeRoot != "" {
runtimeRoot = runcOptions.RuntimeRoot
}
}
return shim.Config{
Path: b.path,
WorkDir: b.workDir,
Namespace: namespace,
RuntimeRoot: runtimeRoot,
SystemdCgroup: systemdCgroup,
}
}
// atomicDelete renames the path to a hidden file before removal
func atomicDelete(path string) error {
// create a hidden dir for an atomic removal
atomicPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
if err := os.Rename(path, atomicPath); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return os.RemoveAll(atomicPath)
}

View File

@ -1,141 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"syscall"
"testing"
"github.com/containerd/containerd/oci"
"github.com/containerd/continuity/testutil"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewBundle(t *testing.T) {
testutil.RequiresRoot(t)
tests := []struct {
userns bool
}{{
userns: false,
}, {
userns: true,
}}
const usernsGID = 4200
for i, tc := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
dir := t.TempDir()
work := filepath.Join(dir, "work")
state := filepath.Join(dir, "state")
id := fmt.Sprintf("new-bundle-%d", i)
spec := oci.Spec{}
if tc.userns {
spec.Linux = &specs.Linux{
GIDMappings: []specs.LinuxIDMapping{{ContainerID: 0, HostID: usernsGID}},
}
}
specBytes, err := json.Marshal(&spec)
require.NoError(t, err, "failed to marshal spec")
b, err := newBundle(id, work, state, specBytes)
require.NoError(t, err, "newBundle should succeed")
require.NotNil(t, b, "bundle should not be nil")
fi, err := os.Stat(b.path)
assert.NoError(t, err, "should be able to stat bundle path")
if tc.userns {
assert.Equal(t, os.ModeDir|0710, fi.Mode(), "bundle path should be a directory with perm 0710")
} else {
assert.Equal(t, os.ModeDir|0700, fi.Mode(), "bundle path should be a directory with perm 0700")
}
stat, ok := fi.Sys().(*syscall.Stat_t)
require.True(t, ok, "should assert to *syscall.Stat_t")
expectedGID := uint32(0)
if tc.userns {
expectedGID = usernsGID
}
assert.Equal(t, expectedGID, stat.Gid, "gid should match")
})
}
}
func TestRemappedGID(t *testing.T) {
tests := []struct {
spec oci.Spec
gid uint32
}{{
// empty spec
spec: oci.Spec{},
gid: 0,
}, {
// empty Linux section
spec: oci.Spec{
Linux: &specs.Linux{},
},
gid: 0,
}, {
// empty ID mappings
spec: oci.Spec{
Linux: &specs.Linux{
GIDMappings: make([]specs.LinuxIDMapping, 0),
},
},
gid: 0,
}, {
// valid ID mapping
spec: oci.Spec{
Linux: &specs.Linux{
GIDMappings: []specs.LinuxIDMapping{{
ContainerID: 0,
HostID: 1000,
}},
},
},
gid: 1000,
}, {
// missing ID mapping
spec: oci.Spec{
Linux: &specs.Linux{
GIDMappings: []specs.LinuxIDMapping{{
ContainerID: 100,
HostID: 1000,
}},
},
},
gid: 0,
}}
for i, tc := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
s, err := json.Marshal(tc.spec)
require.NoError(t, err, "failed to marshal spec")
gid, err := remappedGID(s)
assert.NoError(t, err, "should unmarshal successfully")
assert.Equal(t, tc.gid, gid, "expected GID to match")
})
}
}

View File

@ -1,171 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"errors"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/runtime"
shim "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/ttrpc"
)
// Process implements a linux process
type Process struct {
id string
t *Task
}
// ID of the process
func (p *Process) ID() string {
return p.id
}
// Kill sends the provided signal to the underlying process
//
// Unable to kill all processes in the task using this method on a process
func (p *Process) Kill(ctx context.Context, signal uint32, _ bool) error {
_, err := p.t.shim.Kill(ctx, &shim.KillRequest{
Signal: signal,
ID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
return err
}
func statusFromProto(from task.Status) runtime.Status {
var status runtime.Status
switch from {
case task.Status_CREATED:
status = runtime.CreatedStatus
case task.Status_RUNNING:
status = runtime.RunningStatus
case task.Status_STOPPED:
status = runtime.StoppedStatus
case task.Status_PAUSED:
status = runtime.PausedStatus
case task.Status_PAUSING:
status = runtime.PausingStatus
}
return status
}
// State of process
func (p *Process) State(ctx context.Context) (runtime.State, error) {
// use the container status for the status of the process
response, err := p.t.shim.State(ctx, &shim.StateRequest{
ID: p.id,
})
if err != nil {
if !errors.Is(err, ttrpc.ErrClosed) {
return runtime.State{}, errdefs.FromGRPC(err)
}
// We treat ttrpc.ErrClosed as the shim being closed, but really this
// likely means that the process no longer exists. We'll have to plumb
// the connection differently if this causes problems.
return runtime.State{}, errdefs.ErrNotFound
}
return runtime.State{
Pid: response.Pid,
Status: statusFromProto(response.Status),
Stdin: response.Stdin,
Stdout: response.Stdout,
Stderr: response.Stderr,
Terminal: response.Terminal,
ExitStatus: response.ExitStatus,
}, nil
}
// ResizePty changes the side of the process's PTY to the provided width and height
func (p *Process) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := p.t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
ID: p.id,
Width: size.Width,
Height: size.Height,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// CloseIO closes the provided IO pipe for the process
func (p *Process) CloseIO(ctx context.Context) error {
_, err := p.t.shim.CloseIO(ctx, &shim.CloseIORequest{
ID: p.id,
Stdin: true,
})
if err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Start the process
func (p *Process) Start(ctx context.Context) error {
r, err := p.t.shim.Start(ctx, &shim.StartRequest{
ID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{
ContainerID: p.t.id,
Pid: r.Pid,
ExecID: p.id,
})
return nil
}
// Wait on the process to exit and return the exit status and timestamp
func (p *Process) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := p.t.shim.Wait(ctx, &shim.WaitRequest{
ID: p.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: protobuf.FromTimestamp(r.ExitedAt),
Status: r.ExitStatus,
}, nil
}
// Delete the process and return the exit status
func (p *Process) Delete(ctx context.Context) (*runtime.Exit, error) {
r, err := p.t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{
ID: p.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return &runtime.Exit{
Status: r.ExitStatus,
Timestamp: protobuf.FromTimestamp(r.ExitedAt),
Pid: r.Pid,
}, nil
}

View File

@ -1,546 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/cleanup"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/protobuf"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
v1 "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/go-runc"
"github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sys/unix"
)
var (
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
empty = &ptypes.Empty{}
)
const (
configFilename = "config.json"
defaultRuntime = "runc"
defaultShim = "containerd-shim"
// cleanupTimeout is default timeout for cleanup operations
cleanupTimeout = 1 * time.Minute
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID: "linux",
InitFn: New,
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin,
},
Config: &Config{
Shim: defaultShim,
Runtime: defaultRuntime,
},
})
}
var _ = (runtime.PlatformRuntime)(&Runtime{})
// Config options for the runtime
type Config struct {
// Shim is a path or name of binary implementing the Shim GRPC API
Shim string `toml:"shim"`
// Runtime is a path or name of an OCI runtime used by the shim
Runtime string `toml:"runtime"`
// RuntimeRoot is the path that shall be used by the OCI runtime for its data
RuntimeRoot string `toml:"runtime_root"`
// NoShim calls runc directly from within the pkg
NoShim bool `toml:"no_shim"`
// Debug enable debug on the shim
ShimDebug bool `toml:"shim_debug"`
}
// New returns a configured runtime
func New(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
if err := os.MkdirAll(ic.State, 0711); err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
ep, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
r := &Runtime{
root: ic.Root,
state: ic.State,
tasks: runtime.NewNSMap[runtime.Task](),
containers: metadata.NewContainerStore(m.(*metadata.DB)),
address: ic.Address,
events: ep.(*exchange.Exchange),
config: cfg,
}
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
}
for _, t := range tasks {
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
return r, nil
}
// Runtime for a linux based system
type Runtime struct {
root string
state string
address string
tasks *runtime.NSMap[runtime.Task]
containers containers.Store
events *exchange.Exchange
config *Config
}
// ID of the runtime
func (r *Runtime) ID() string {
return pluginID
}
// Create a new task
func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(log.Fields{
"id": id,
"namespace": namespace,
}))
if err := identifiers.Validate(id); err != nil {
return nil, fmt.Errorf("invalid task id: %w", err)
}
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
return nil, err
}
bundle, err := newBundle(id,
filepath.Join(r.state, namespace),
filepath.Join(r.root, namespace),
opts.Spec.GetValue())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bundle.Delete()
}
}()
shimopt := ShimLocal(r.config, r.events)
if !r.config.NoShim {
var cgroup string
if opts.TaskOptions != nil && opts.TaskOptions.GetValue() != nil {
v, err := typeurl.UnmarshalAny(opts.TaskOptions)
if err != nil {
return nil, err
}
cgroup = v.(*runctypes.CreateOptions).ShimCgroup
}
exitHandler := func() {
log.G(ctx).WithField("id", id).Info("shim reaped")
if _, err := r.tasks.Get(ctx, id); err != nil {
// Task was never started or was already successfully deleted
return
}
if err = r.cleanupAfterDeadShim(cleanup.Background(ctx), bundle, namespace, id); err != nil {
log.G(ctx).WithError(err).Warn("failed to clean up after killed shim")
}
}
shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
}
s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
deferCtx, deferCancel := context.WithTimeout(cleanup.Background(ctx), cleanupTimeout)
defer deferCancel()
if kerr := s.KillShim(deferCtx); kerr != nil {
log.G(ctx).WithError(kerr).Error("failed to kill shim")
}
}
}()
rt := r.config.Runtime
if ropts != nil && ropts.Runtime != "" {
rt = ropts.Runtime
}
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: bundle.path,
Runtime: rt,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Checkpoint: opts.Checkpoint,
Options: protobuf.FromAny(opts.TaskOptions),
}
for _, m := range opts.Rootfs {
sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
cr, err := s.Create(ctx, sopts)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
t, err := newTask(id, namespace, int(cr.Pid), s, r.events, r.tasks, bundle)
if err != nil {
return nil, err
}
if err := r.tasks.Add(ctx, t); err != nil {
return nil, err
}
r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
ContainerID: sopts.ID,
Bundle: sopts.Bundle,
Rootfs: sopts.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: sopts.Stdin,
Stdout: sopts.Stdout,
Stderr: sopts.Stderr,
Terminal: sopts.Terminal,
},
Checkpoint: sopts.Checkpoint,
Pid: uint32(t.pid),
})
return t, nil
}
// Tasks returns all tasks known to the runtime
func (r *Runtime) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
return r.tasks.GetAll(ctx, all)
}
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
dir, err := os.ReadDir(r.state)
if err != nil {
return nil, err
}
var o []*Task
for _, namespace := range dir {
if !namespace.IsDir() {
continue
}
name := namespace.Name()
// skip hidden directories
if len(name) > 0 && name[0] == '.' {
continue
}
log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
tasks, err := r.loadTasks(ctx, name)
if err != nil {
return nil, err
}
o = append(o, tasks...)
}
return o, nil
}
// Get a specific task by task id
func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
return r.tasks.Get(ctx, id)
}
// Add a runtime task
func (r *Runtime) Add(ctx context.Context, task runtime.Task) error {
return r.tasks.Add(ctx, task)
}
// Delete a runtime task
func (r *Runtime) Delete(ctx context.Context, id string) (*runtime.Exit, error) {
task, err := r.tasks.Get(ctx, id)
if err != nil {
return nil, err
}
s := task.(*Task)
exit, err := s.Delete(ctx)
if err != nil {
return nil, err
}
r.tasks.Delete(ctx, id)
return exit, nil
}
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := os.ReadDir(filepath.Join(r.state, ns))
if err != nil {
return nil, err
}
var o []*Task
for _, path := range dir {
if !path.IsDir() {
continue
}
id := path.Name()
// skip hidden directories
if len(id) > 0 && id[0] == '.' {
continue
}
bundle := loadBundle(
id,
filepath.Join(r.state, ns, id),
filepath.Join(r.root, ns, id),
)
ctx = namespaces.WithNamespace(ctx, ns)
ctx = log.WithLogger(ctx, log.G(ctx).WithError(err).WithFields(log.Fields{
"id": id,
"namespace": ns,
}))
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
shimExit := make(chan struct{})
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
defer close(shimExit)
if _, err := r.tasks.Get(ctx, id); err != nil {
// Task was never started or was already successfully deleted
return
}
if err := r.cleanupAfterDeadShim(ctx, bundle, ns, id); err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
}
}), nil)
if err != nil {
log.G(ctx).WithError(err).Error("connecting to shim")
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
}
continue
}
logDirPath := filepath.Join(r.root, ns, id)
copyAndClose := func(dst io.Writer, src io.ReadWriteCloser) {
copyDone := make(chan struct{})
go func() {
io.Copy(dst, src)
close(copyDone)
}()
select {
case <-shimExit:
case <-copyDone:
}
src.Close()
}
shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithField("logDirPath", logDirPath).
Error("opening shim stdout log pipe")
continue
}
if r.config.ShimDebug {
go copyAndClose(os.Stdout, shimStdoutLog)
} else {
go copyAndClose(io.Discard, shimStdoutLog)
}
shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithField("logDirPath", logDirPath).
Error("opening shim stderr log pipe")
continue
}
if r.config.ShimDebug {
go copyAndClose(os.Stderr, shimStderrLog)
} else {
go copyAndClose(io.Discard, shimStderrLog)
}
t, err := newTask(id, ns, pid, s, r.events, r.tasks, bundle)
if err != nil {
log.G(ctx).WithError(err).Error("loading task type")
continue
}
o = append(o, t)
}
return o, nil
}
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string) error {
log.G(ctx).Warn("cleaning up after shim dead")
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
if err := r.terminate(ctx, bundle, ns, id); err != nil {
if r.config.ShimDebug {
return fmt.Errorf("failed to terminate task, leaving bundle for debugging: %w", err)
}
log.G(ctx).WithError(err).Warn("failed to terminate task")
}
// Notify Client
exitedAt := time.Now().UTC()
r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
ContainerID: id,
ID: id,
Pid: uint32(pid),
ExitStatus: 128 + uint32(unix.SIGKILL),
ExitedAt: protobuf.ToTimestamp(exitedAt),
})
r.tasks.Delete(ctx, id)
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("delete bundle")
}
// kill shim
if shimPid, err := runc.ReadPidFile(filepath.Join(bundle.path, "shim.pid")); err == nil && shimPid > 0 {
unix.Kill(shimPid, unix.SIGKILL)
}
r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: id,
Pid: uint32(pid),
ExitStatus: 128 + uint32(unix.SIGKILL),
ExitedAt: protobuf.ToTimestamp(exitedAt),
})
return nil
}
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
rt, err := r.getRuntime(ctx, ns, id)
if err != nil {
return err
}
if err := rt.Delete(ctx, id, &runc.DeleteOpts{
Force: true,
}); err != nil {
log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
}
if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).WithFields(log.Fields{
"path": bundle.path,
"id": id,
}).Warnf("unmount task rootfs")
}
return nil
}
func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) {
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
return nil, err
}
var (
cmd = r.config.Runtime
root = process.RuncRoot
)
if ropts != nil {
if ropts.Runtime != "" {
cmd = ropts.Runtime
}
if ropts.RuntimeRoot != "" {
root = ropts.RuntimeRoot
}
}
return &runc.Runc{
Command: cmd,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(root, ns),
Debug: r.config.ShimDebug,
}, nil
}
func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
container, err := r.containers.Get(ctx, id)
if err != nil {
return nil, err
}
if container.Runtime.Options != nil && container.Runtime.Options.GetValue() != nil {
v, err := typeurl.UnmarshalAny(container.Runtime.Options)
if err != nil {
return nil, err
}
ropts, ok := v.(*runctypes.RuncOptions)
if !ok {
return nil, errors.New("invalid runtime options format")
}
return ropts, nil
}
return &runctypes.RuncOptions{}, nil
}

View File

@ -1,353 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"errors"
"fmt"
"sync"
cgroups "github.com/containerd/cgroups/v3/cgroup1"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v1/shim/client"
"github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/ttrpc"
)
// Task on a linux based system
type Task struct {
mu sync.Mutex
id string
pid int
shim *client.Client
namespace string
cg cgroups.Cgroup
events *exchange.Exchange
tasks *runtime.NSMap[runtime.Task]
bundle *bundle
}
func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.NSMap[runtime.Task], bundle *bundle) (*Task, error) {
var (
err error
cg cgroups.Cgroup
)
if pid > 0 {
cg, err = cgroups.Load(cgroups.PidPath(pid))
if err != nil && err != cgroups.ErrCgroupDeleted {
return nil, err
}
}
return &Task{
id: id,
pid: pid,
shim: shim,
namespace: namespace,
cg: cg,
events: events,
tasks: list,
bundle: bundle,
}, nil
}
// ID of the task
func (t *Task) ID() string {
return t.id
}
// Namespace of the task
func (t *Task) Namespace() string {
return t.namespace
}
// PID of the task
func (t *Task) PID(_ context.Context) (uint32, error) {
return uint32(t.pid), nil
}
// Delete the task and return the exit status
func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
rsp, shimErr := t.shim.Delete(ctx, empty)
if shimErr != nil {
shimErr = errdefs.FromGRPC(shimErr)
if !errdefs.IsNotFound(shimErr) {
return nil, shimErr
}
}
t.tasks.Delete(ctx, t.id)
if err := t.shim.KillShim(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
if err := t.bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle")
}
if shimErr != nil {
return nil, shimErr
}
t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: t.id,
ExitStatus: rsp.ExitStatus,
ExitedAt: rsp.ExitedAt,
Pid: rsp.Pid,
})
return &runtime.Exit{
Status: rsp.ExitStatus,
Timestamp: protobuf.FromTimestamp(rsp.ExitedAt),
Pid: rsp.Pid,
}, nil
}
// Start the task
func (t *Task) Start(ctx context.Context) error {
t.mu.Lock()
hasCgroup := t.cg != nil
t.mu.Unlock()
r, err := t.shim.Start(ctx, &shim.StartRequest{
ID: t.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
t.pid = int(r.Pid)
if !hasCgroup {
cg, err := cgroups.Load(cgroups.PidPath(t.pid))
if err != nil && err != cgroups.ErrCgroupDeleted {
return err
}
t.mu.Lock()
if err == cgroups.ErrCgroupDeleted {
t.cg = nil
} else {
t.cg = cg
}
t.mu.Unlock()
}
t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
ContainerID: t.id,
Pid: uint32(t.pid),
})
return nil
}
// State returns runtime information for the task
func (t *Task) State(ctx context.Context) (runtime.State, error) {
response, err := t.shim.State(ctx, &shim.StateRequest{
ID: t.id,
})
if err != nil {
if !errors.Is(err, ttrpc.ErrClosed) {
return runtime.State{}, errdefs.FromGRPC(err)
}
return runtime.State{}, errdefs.ErrNotFound
}
return runtime.State{
Pid: response.Pid,
Status: statusFromProto(response.Status),
Stdin: response.Stdin,
Stdout: response.Stdout,
Stderr: response.Stderr,
Terminal: response.Terminal,
ExitStatus: response.ExitStatus,
ExitedAt: protobuf.FromTimestamp(response.ExitedAt),
}, nil
}
// Pause the task and all processes
func (t *Task) Pause(ctx context.Context) error {
if _, err := t.shim.Pause(ctx, empty); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
ContainerID: t.id,
})
return nil
}
// Resume the task and all processes
func (t *Task) Resume(ctx context.Context) error {
if _, err := t.shim.Resume(ctx, empty); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
ContainerID: t.id,
})
return nil
}
// Kill the task using the provided signal
//
// Optionally send the signal to all processes that are a child of the task
func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
if _, err := t.shim.Kill(ctx, &shim.KillRequest{
ID: t.id,
Signal: signal,
All: all,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Exec creates a new process inside the task
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.ExecProcess, error) {
if err := identifiers.Validate(id); err != nil {
return nil, fmt.Errorf("invalid exec id: %w", err)
}
request := &shim.ExecProcessRequest{
ID: id,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Spec: opts.Spec,
}
if _, err := t.shim.Exec(ctx, request); err != nil {
return nil, errdefs.FromGRPC(err)
}
return &Process{
id: id,
t: t,
}, nil
}
// Pids returns all system level process ids running inside the task
func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
ID: t.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
var processList []runtime.ProcessInfo
for _, p := range resp.Processes {
processList = append(processList, runtime.ProcessInfo{
Pid: p.Pid,
Info: p.Info,
})
}
return processList, nil
}
// ResizePty changes the side of the task's PTY to the provided width and height
func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
ID: t.id,
Width: size.Width,
Height: size.Height,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// CloseIO closes the provided IO on the task
func (t *Task) CloseIO(ctx context.Context) error {
_, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
ID: t.id,
Stdin: true,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// Checkpoint creates a system level dump of the task and process information that can be later restored
func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
r := &shim.CheckpointTaskRequest{
Path: path,
Options: options,
}
if _, err := t.shim.Checkpoint(ctx, r); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
ContainerID: t.id,
})
return nil
}
// Update changes runtime information of a running task
func (t *Task) Update(ctx context.Context, resources *types.Any, _ map[string]string) error {
if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
Resources: resources,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Process returns a specific process inside the task by the process id
func (t *Task) Process(ctx context.Context, id string) (runtime.ExecProcess, error) {
p := &Process{
id: id,
t: t,
}
if _, err := p.State(ctx); err != nil {
return nil, err
}
return p, nil
}
// Stats returns runtime specific system level metric information for the task
func (t *Task) Stats(ctx context.Context) (*types.Any, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.cg == nil {
return nil, fmt.Errorf("cgroup does not exist: %w", errdefs.ErrNotFound)
}
stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
return protobuf.MarshalAnyToProto(stats)
}
// Cgroup returns the underlying cgroup for a linux task
func (t *Task) Cgroup() (cgroups.Cgroup, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.cg == nil {
return nil, fmt.Errorf("cgroup does not exist: %w", errdefs.ErrNotFound)
}
return t.cg, nil
}
// Wait for the task to exit returning the status and timestamp
func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := t.shim.Wait(ctx, &shim.WaitRequest{
ID: t.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: protobuf.FromTimestamp(r.ExitedAt),
Status: r.ExitStatus,
}, nil
}

View File

@ -1,38 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"context"
"io"
"path/filepath"
"github.com/containerd/fifo"
"golang.org/x/sys/unix"
)
// OpenShimStdoutLog opens the shim log for reading
func OpenShimStdoutLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stdout.log"), unix.O_RDWR|unix.O_CREAT, 0700)
}
// OpenShimStderrLog opens the shim log
func OpenShimStderrLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stderr.log"), unix.O_RDWR|unix.O_CREAT, 0700)
}

View File

@ -1,430 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
ptypes "github.com/containerd/containerd/protobuf/types"
v1 "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
var empty = &ptypes.Empty{}
// Opt is an option for a shim client configuration
type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
socket, err := newSocket(address)
if err != nil {
if !eaddrinuse(err) {
return nil, nil, err
}
if err := RemoveSocket(address); err != nil {
return nil, nil, fmt.Errorf("remove already used socket: %w", err)
}
if socket, err = newSocket(address); err != nil {
return nil, nil, err
}
}
f, err := socket.File()
if err != nil {
return nil, nil, fmt.Errorf("failed to get fd for socket %s: %w", address, err)
}
defer f.Close()
stdoutCopy := io.Discard
stderrCopy := io.Discard
stdoutLog, err := v1.OpenShimStdoutLog(ctx, config.WorkDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to create stdout log: %w", err)
}
stderrLog, err := v1.OpenShimStderrLog(ctx, config.WorkDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to create stderr log: %w", err)
}
if debug {
stdoutCopy = os.Stdout
stderrCopy = os.Stderr
}
go io.Copy(stdoutCopy, stdoutLog)
go io.Copy(stderrCopy, stderrLog)
cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog)
if err != nil {
return nil, nil, err
}
if err := cmd.Start(); err != nil {
return nil, nil, fmt.Errorf("failed to start shim: %w", err)
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
go func() {
cmd.Wait()
exitHandler()
if stdoutLog != nil {
stdoutLog.Close()
}
if stderrLog != nil {
stderrLog.Close()
}
socket.Close()
RemoveSocket(address)
}()
log.G(ctx).WithFields(log.Fields{
"pid": cmd.Process.Pid,
"address": address,
"debug": debug,
}).Infof("shim %s started", binary)
if err := writeFile(filepath.Join(config.Path, "address"), address); err != nil {
return nil, nil, err
}
if err := writeFile(filepath.Join(config.Path, "shim.pid"), strconv.Itoa(cmd.Process.Pid)); err != nil {
return nil, nil, err
}
// set shim in cgroup if it is provided
if cgroup != "" {
if err := setCgroup(cgroup, cmd); err != nil {
return nil, nil, err
}
log.G(ctx).WithFields(log.Fields{
"pid": cmd.Process.Pid,
"address": address,
}).Infof("shim placed in cgroup %s", cgroup)
}
if err = setupOOMScore(cmd.Process.Pid); err != nil {
return nil, nil, err
}
c, clo, err := WithConnect(address, func() {})(ctx, config)
if err != nil {
return nil, nil, fmt.Errorf("failed to connect: %w", err)
}
return c, clo, nil
}
}
func eaddrinuse(err error) bool {
cause := errors.Unwrap(err)
netErr, ok := cause.(*net.OpError)
if !ok {
return false
}
if netErr.Op != "listen" {
return false
}
syscallErr, ok := netErr.Err.(*os.SyscallError)
if !ok {
return false
}
errno, ok := syscallErr.Err.(syscall.Errno)
if !ok {
return false
}
return errno == syscall.EADDRINUSE
}
// setupOOMScore gets containerd's oom score and adds +1 to it
// to ensure a shim has a lower* score than the daemons
// if not already at the maximum OOM Score
func setupOOMScore(shimPid int) error {
pid := os.Getpid()
score, err := sys.GetOOMScoreAdj(pid)
if err != nil {
return fmt.Errorf("get daemon OOM score: %w", err)
}
shimScore := score + 1
if err := sys.AdjustOOMScore(shimPid, shimScore); err != nil {
return fmt.Errorf("set shim OOM score: %w", err)
}
return nil
}
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) {
selfExe, err := os.Executable()
if err != nil {
return nil, err
}
args := []string{
"-namespace", config.Namespace,
"-workdir", config.WorkDir,
"-address", daemonAddress,
"-containerd-binary", selfExe,
}
if config.RuntimeRoot != "" {
args = append(args, "-runtime-root", config.RuntimeRoot)
}
if config.SystemdCgroup {
args = append(args, "-systemd-cgroup")
}
if debug {
args = append(args, "-debug")
}
cmd := exec.Command(binary, args...)
cmd.Dir = config.Path
// make sure the shim can be re-parented to system init
// and is cloned in a new mount namespace because the overlay/filesystems
// will be mounted by the shim
cmd.SysProcAttr = getSysProcAttr()
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.Stdout = stdout
cmd.Stderr = stderr
return cmd, nil
}
// writeFile writes a address file atomically
func writeFile(path, address string) error {
path, err := filepath.Abs(path)
if err != nil {
return err
}
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
_, err = f.WriteString(address)
f.Close()
if err != nil {
return err
}
return os.Rename(tempPath, path)
}
const (
abstractSocketPrefix = "\x00"
socketPathLimit = 106
)
type socket string
func (s socket) isAbstract() bool {
return !strings.HasPrefix(string(s), "unix://")
}
func (s socket) path() string {
path := strings.TrimPrefix(string(s), "unix://")
// if there was no trim performed, we assume an abstract socket
if len(path) == len(s) {
path = abstractSocketPrefix + path
}
return path
}
func newSocket(address string) (*net.UnixListener, error) {
if len(address) > socketPathLimit {
return nil, fmt.Errorf("%q: unix socket path too long (> %d)", address, socketPathLimit)
}
var (
sock = socket(address)
path = sock.path()
)
if !sock.isAbstract() {
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
return nil, fmt.Errorf("%s: %w", path, err)
}
}
l, err := net.Listen("unix", path)
if err != nil {
return nil, fmt.Errorf("failed to listen to unix socket %q (abstract: %t): %w", address, sock.isAbstract(), err)
}
if err := os.Chmod(path, 0600); err != nil {
l.Close()
return nil, err
}
return l.(*net.UnixListener), nil
}
// RemoveSocket removes the socket at the specified address if
// it exists on the filesystem
func RemoveSocket(address string) error {
sock := socket(address)
if !sock.isAbstract() {
return os.Remove(sock.path())
}
return nil
}
// AnonDialer returns a dialer for a socket
//
// NOTE: It is only used for testing.
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
return anonDialer(address, timeout)
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
func anonDialer(address string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", socket(address).path(), timeout)
}
// WithConnect connects to an existing shim
func WithConnect(address string, onClose func()) Opt {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
conn, err := connect(address, anonDialer)
if err != nil {
return nil, nil, err
}
client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
return shimapi.NewShimClient(client), conn, nil
}
}
// WithLocal uses an in process shim
func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
service, err := shim.NewService(config, publisher)
if err != nil {
return nil, nil, err
}
return shim.NewLocal(service), nil, nil
}
}
// New returns a new shim client
func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
s, c, err := opt(ctx, config)
if err != nil {
return nil, err
}
return &Client{
ShimService: s,
c: c,
exitCh: make(chan struct{}),
}, nil
}
// Client is a shim client containing the connection to a shim
type Client struct {
shimapi.ShimService
c io.Closer
exitCh chan struct{}
exitOnce sync.Once
}
// IsAlive returns true if the shim can be contacted.
// NOTE: a negative answer doesn't mean that the process is gone.
func (c *Client) IsAlive(ctx context.Context) (bool, error) {
_, err := c.ShimInfo(ctx, empty)
if err != nil {
// TODO(stevvooe): There are some error conditions that need to be
// handle with unix sockets existence to give the right answer here.
return false, err
}
return true, nil
}
// StopShim signals the shim to exit and wait for the process to disappear
func (c *Client) StopShim(ctx context.Context) error {
return c.signalShim(ctx, unix.SIGTERM)
}
// KillShim kills the shim forcefully and wait for the process to disappear
func (c *Client) KillShim(ctx context.Context) error {
return c.signalShim(ctx, unix.SIGKILL)
}
// Close the client connection
func (c *Client) Close() error {
if c.c == nil {
return nil
}
return c.c.Close()
}
func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error {
info, err := c.ShimInfo(ctx, empty)
if err != nil {
return err
}
pid := int(info.ShimPid)
// make sure we don't kill ourselves if we are running a local shim
if os.Getpid() == pid {
return nil
}
if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH {
return err
}
// wait for shim to die after being signaled
select {
case <-ctx.Done():
return ctx.Err()
case <-c.waitForExit(ctx, pid):
return nil
}
}
func (c *Client) waitForExit(ctx context.Context, pid int) <-chan struct{} {
go c.exitOnce.Do(func() {
defer close(c.exitCh)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
// use kill(pid, 0) here because the shim could have been reparented
// and we are no longer able to waitpid(pid, ...) on the shim
if err := unix.Kill(pid, 0); err == unix.ESRCH {
return
}
select {
case <-ticker.C:
case <-ctx.Done():
log.G(ctx).WithField("pid", pid).Warn("timed out while waiting for shim to exit")
return
}
}
})
return c.exitCh
}

View File

@ -1,42 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"fmt"
"syscall"
"github.com/containerd/cgroups/v3/cgroup1"
exec "golang.org/x/sys/execabs"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
func setCgroup(cgroupPath string, cmd *exec.Cmd) error {
cg, err := cgroup1.Load(cgroup1.StaticPath(cgroupPath))
if err != nil {
return fmt.Errorf("failed to load cgroup %s: %w", cgroupPath, err)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return fmt.Errorf("failed to join cgroup %s: %w", cgroupPath, err)
}
return nil
}

View File

@ -1,35 +0,0 @@
//go:build !linux && !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"syscall"
exec "golang.org/x/sys/execabs"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
func setCgroup(cgroupPath string, cmd *exec.Cmd) error {
return nil
}

View File

@ -1,107 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"path/filepath"
"github.com/containerd/containerd/mount"
ptypes "github.com/containerd/containerd/protobuf/types"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
)
// NewLocal returns a shim client implementation for issue commands to a shim
func NewLocal(s *Service) shimapi.ShimService {
return &local{
s: s,
}
}
type local struct {
s *Service
}
func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
return c.s.Create(ctx, in)
}
func (c *local) Start(ctx context.Context, in *shimapi.StartRequest) (*shimapi.StartResponse, error) {
return c.s.Start(ctx, in)
}
func (c *local) Delete(ctx context.Context, in *ptypes.Empty) (*shimapi.DeleteResponse, error) {
// make sure we unmount the containers rootfs for this local
if err := mount.Unmount(filepath.Join(c.s.config.Path, "rootfs"), 0); err != nil {
return nil, err
}
return c.s.Delete(ctx, in)
}
func (c *local) DeleteProcess(ctx context.Context, in *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
return c.s.DeleteProcess(ctx, in)
}
func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
return c.s.Exec(ctx, in)
}
func (c *local) ResizePty(ctx context.Context, in *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
return c.s.ResizePty(ctx, in)
}
func (c *local) State(ctx context.Context, in *shimapi.StateRequest) (*shimapi.StateResponse, error) {
return c.s.State(ctx, in)
}
func (c *local) Pause(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) {
return c.s.Pause(ctx, in)
}
func (c *local) Resume(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) {
return c.s.Resume(ctx, in)
}
func (c *local) Kill(ctx context.Context, in *shimapi.KillRequest) (*ptypes.Empty, error) {
return c.s.Kill(ctx, in)
}
func (c *local) ListPids(ctx context.Context, in *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
return c.s.ListPids(ctx, in)
}
func (c *local) CloseIO(ctx context.Context, in *shimapi.CloseIORequest) (*ptypes.Empty, error) {
return c.s.CloseIO(ctx, in)
}
func (c *local) Checkpoint(ctx context.Context, in *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
return c.s.Checkpoint(ctx, in)
}
func (c *local) ShimInfo(ctx context.Context, in *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
return c.s.ShimInfo(ctx, in)
}
func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
return c.s.Update(ctx, in)
}
func (c *local) Wait(ctx context.Context, in *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
return c.s.Wait(ctx, in)
}

View File

@ -1,679 +0,0 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/containerd/console"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/protobuf"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl/v2"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
empty = &ptypes.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 4096)
return &buffer
},
}
)
// Config contains shim specific configuration
type Config struct {
Path string
Namespace string
WorkDir string
// Criu is the path to the criu binary used for checkpoint and restore.
//
// Deprecated: runc option --criu is now ignored (with a warning), and the
// option will be removed entirely in a future release. Users who need a non-
// standard criu binary should rely on the standard way of looking up binaries
// in $PATH.
Criu string
RuntimeRoot string
SystemdCgroup bool
}
// NewService returns a new shim service that can be used via GRPC
func NewService(config Config, publisher events.Publisher) (*Service, error) {
if config.Namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
ctx = log.WithLogger(ctx, logrus.WithFields(log.Fields{
"namespace": config.Namespace,
"path": config.Path,
"pid": os.Getpid(),
}))
s := &Service{
config: config,
context: ctx,
processes: make(map[string]process.Process),
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
}
go s.forward(publisher)
return s, nil
}
// Service is the shim implementation of a remote shim over GRPC
type Service struct {
mu sync.Mutex
config Config
context context.Context
processes map[string]process.Process
events chan interface{}
platform stdio.Platform
ec chan runc.Exit
// Filled by Create()
id string
bundle string
}
// Create a new initial process and container with the underlying OCI runtime
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
var pmounts []process.Mount
for _, m := range r.Rootfs {
pmounts = append(pmounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
rootfs := ""
if len(pmounts) > 0 {
rootfs = filepath.Join(r.Bundle, "rootfs")
if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
return nil, err
}
}
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: r.Runtime,
Rootfs: pmounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
}
var mounts []mount.Mount
for _, pm := range pmounts {
mounts = append(mounts, mount.Mount{
Type: pm.Type,
Source: pm.Source,
Target: pm.Target,
Options: pm.Options,
})
}
defer func() {
if err != nil {
if err2 := mount.UnmountMounts(mounts, rootfs, 0); err2 != nil {
log.G(ctx).WithError(err2).Warn("Failed to cleanup rootfs mount")
}
}
}()
if err := mount.All(mounts, rootfs); err != nil {
return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
}
s.mu.Lock()
defer s.mu.Unlock()
process, err := newInit(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.SystemdCgroup,
s.platform,
config,
rootfs,
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := process.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
// Start a process
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
}
return &shimapi.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
if err := p.Delete(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
delete(s.processes, s.id)
s.mu.Unlock()
s.platform.Close()
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
Pid: uint32(p.Pid()),
}, nil
}
// DeleteProcess deletes an exec'd process
func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
if r.ID == s.id {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
if err := p.Delete(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
delete(s.processes, r.ID)
s.mu.Unlock()
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
if p := s.processes[r.ID]; p != nil {
s.mu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
}
p := s.processes[s.id]
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
process, err := p.(*process.Init).Exec(ctx, s.config.Path, &process.ExecConfig{
ID: r.ID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
s.processes[r.ID] = process
s.mu.Unlock()
return empty, nil
}
// ResizePty of a process
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
s.mu.Lock()
p := s.processes[r.ID]
s.mu.Unlock()
if p == nil {
return nil, fmt.Errorf("process does not exist %s", r.ID)
}
if err := p.Resize(ws); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.Status_UNKNOWN
switch st {
case "created":
status = task.Status_CREATED
case "running":
status = task.Status_RUNNING
case "stopped":
status = task.Status_STOPPED
case "paused":
status = task.Status_PAUSED
case "pausing":
status = task.Status_PAUSING
}
sio := p.Stdio()
return &shimapi.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
// Pause the container
func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
if err := p.(*process.Init).Pause(ctx); err != nil {
return nil, err
}
return empty, nil
}
// Resume the container
func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
if err := p.(*process.Init).Resume(ctx); err != nil {
return nil, err
}
return empty, nil
}
// Kill a process with the provided signal
func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
if r.ID == "" {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// ListPids returns all pids inside the container
func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
s.mu.Lock()
defer s.mu.Unlock()
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range s.processes {
if p.Pid() == int(pid) {
d := &runctypes.ProcessDetails{
ExecID: p.ID(),
}
a, err := typeurl.MarshalAny(d)
if err != nil {
return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err)
}
pInfo.Info = protobuf.FromAny(a)
break
}
}
processes = append(processes, &pInfo)
}
return &shimapi.ListPidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return nil, fmt.Errorf("close stdin: %w", err)
}
}
return empty, nil
}
// Checkpoint the container
func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
var options *runctypes.CheckpointOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
options = v.(*runctypes.CheckpointOptions)
}
if err := p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{
Path: r.Path,
Exit: options.Exit,
AllowOpenTCP: options.OpenTcp,
AllowExternalUnixSockets: options.ExternalUnixSockets,
AllowTerminal: options.Terminal,
FileLocks: options.FileLocks,
EmptyNamespaces: options.EmptyNamespaces,
WorkDir: options.WorkPath,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// ShimInfo returns shim information such as the shim's pid
func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
return &shimapi.ShimInfoResponse{
ShimPid: uint32(os.Getpid()),
}, nil
}
// Update a running container
func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
if err := p.(*process.Init).Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
p, err := s.getExecProcess(r.ID)
if err != nil {
return nil, err
}
p.Wait()
return &shimapi.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *Service) checkProcesses(e runc.Exit) {
var p process.Process
s.mu.Lock()
for _, proc := range s.processes {
if proc.Pid() == e.Pid {
p = proc
break
}
}
s.mu.Unlock()
if p == nil {
log.G(s.context).Debugf("process with id:%d wasn't found", e.Pid)
return
}
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if shouldKillAllOnExit(s.context, s.bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}
}
func shouldKillAllOnExit(ctx context.Context, bundlePath string) bool {
var bundleSpec specs.Spec
bundleConfigContents, err := os.ReadFile(filepath.Join(bundlePath, "config.json"))
if err != nil {
log.G(ctx).WithError(err).Error("shouldKillAllOnExit: failed to read config.json")
return true
}
if err := json.Unmarshal(bundleConfigContents, &bundleSpec); err != nil {
log.G(ctx).WithError(err).Error("shouldKillAllOnExit: failed to unmarshal bundle json")
return true
}
if bundleSpec.Linux != nil {
for _, ns := range bundleSpec.Linux.Namespaces {
if ns.Type == specs.PIDNamespace && ns.Path == "" {
return false
}
}
}
return true
}
func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p, err := s.getInitProcess()
if err != nil {
return nil, err
}
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *Service) forward(publisher events.Publisher) {
for e := range s.events {
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
log.G(s.context).WithError(err).Error("post event")
}
}
}
// getInitProcess returns initial process
func (s *Service) getInitProcess() (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
return p, nil
}
// getExecProcess returns exec process
func (s *Service) getExecProcess(id string) (process.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s does not exist", id)
}
return p, nil
}
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:
return runtime.TaskCreateEventTopic
case *eventstypes.TaskStart:
return runtime.TaskStartEventTopic
case *eventstypes.TaskOOM:
return runtime.TaskOOMEventTopic
case *eventstypes.TaskExit:
return runtime.TaskExitEventTopic
case *eventstypes.TaskDelete:
return runtime.TaskDeleteEventTopic
case *eventstypes.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
case *eventstypes.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
case *eventstypes.TaskPaused:
return runtime.TaskPausedEventTopic
case *eventstypes.TaskResumed:
return runtime.TaskResumedEventTopic
case *eventstypes.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
logrus.Warnf("no topic for type %#v", e)
}
return runtime.TaskUnknownTopic
}
func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace string, systemdCgroup bool, platform stdio.Platform, r *process.CreateConfig, rootfs string) (*process.Init, error) {
options := &runctypes.CreateOptions{}
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
options = v.(*runctypes.CreateOptions)
}
runtime := process.NewRunc(runtimeRoot, path, namespace, r.Runtime, systemdCgroup)
p := process.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
})
p.Bundle = r.Bundle
p.Platform = platform
p.Rootfs = rootfs
p.WorkDir = workDir
p.IoUID = int(options.IoUid)
p.IoGID = int(options.IoGid)
p.NoPivotRoot = options.NoPivotRoot
p.NoNewKeyring = options.NoNewKeyring
p.CriuWorkPath = options.CriuWorkPath
if p.CriuWorkPath == "" {
// if criu work path not set, use container WorkDir
p.CriuWorkPath = p.WorkDir
}
return p, nil
}

View File

@ -1,193 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/fifo"
)
type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (cons console.Console, retErr error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
epollConsole, err := p.epoller.Add(console)
if err != nil {
return nil, err
}
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
bp := bufPool.Get().(*[]byte)
defer bufPool.Put(bp)
io.CopyBuffer(epollConsole, in, *bp)
// we need to shutdown epollConsole when pipe broken
epollConsole.Shutdown(p.epoller.CloseConsole)
epollConsole.Close()
in.Close()
}()
}
uri, err := url.Parse(stdout)
if err != nil {
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
}
switch uri.Scheme {
case "binary":
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
cmd := process.NewBinaryCmd(uri, id, ns)
// In case of unexpected errors during logging binary start, close open pipes
var filesToClose []*os.File
defer func() {
if retErr != nil {
process.CloseFiles(filesToClose...)
}
}()
// Create pipe to be used by logging binary for Stdout
outR, outW, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
filesToClose = append(filesToClose, outR)
// Stderr is created for logging binary but unused when terminal is true
serrR, _, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
filesToClose = append(filesToClose, serrR)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
filesToClose = append(filesToClose, r)
cmd.ExtraFiles = append(cmd.ExtraFiles, outR, serrR, w)
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outW, epollConsole)
outW.Close()
wg.Done()
}()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start logging binary process: %w", err)
}
// Close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// Wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
cwg.Wait()
default:
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
outw.Close()
outr.Close()
wg.Done()
}()
cwg.Wait()
}
return epollConsole, nil
}
func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil {
return errors.New("uninitialized epoller")
}
epollConsole, ok := cons.(*console.EpollConsole)
if !ok {
return fmt.Errorf("expected EpollConsole, got %#v", cons)
}
return epollConsole.Shutdown(p.epoller.CloseConsole)
}
func (p *linuxPlatform) Close() error {
return p.epoller.Close()
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *Service) initPlatform() error {
if s.platform != nil {
return nil
}
epoller, err := console.NewEpoller()
if err != nil {
return fmt.Errorf("failed to initialize epoller: %w", err)
}
s.platform = &linuxPlatform{
epoller: epoller,
}
go epoller.Wait()
return nil
}

View File

@ -1,160 +0,0 @@
//go:build !windows && !linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"fmt"
"io"
"net/url"
"os"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/fifo"
)
type unixPlatform struct {
}
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, id, stdin, stdout, stderr string, wg *sync.WaitGroup) (cons console.Console, retErr error) {
var cwg sync.WaitGroup
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(console, in, *p)
}()
}
uri, err := url.Parse(stdout)
if err != nil {
return nil, fmt.Errorf("unable to parse stdout uri: %w", err)
}
switch uri.Scheme {
case "binary":
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
cmd := process.NewBinaryCmd(uri, id, ns)
// In case of unexpected errors during logging binary start, close open pipes
var filesToClose []*os.File
defer func() {
if retErr != nil {
process.CloseFiles(filesToClose...)
}
}()
// Create pipe to be used by logging binary for Stdout
outR, outW, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stdout pipes: %w", err)
}
filesToClose = append(filesToClose, outR)
// Stderr is created for logging binary but unused when terminal is true
serrR, _, err := os.Pipe()
if err != nil {
return nil, fmt.Errorf("failed to create stderr pipes: %w", err)
}
filesToClose = append(filesToClose, serrR)
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
filesToClose = append(filesToClose, r)
cmd.ExtraFiles = append(cmd.ExtraFiles, outR, serrR, w)
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
io.Copy(outW, console)
outW.Close()
wg.Done()
}()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start logging binary process: %w", err)
}
// Close our side of the pipe after start
if err := w.Close(); err != nil {
return nil, fmt.Errorf("failed to close write pipe after start: %w", err)
}
// Wait for the logging binary to be ready
b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to read from logging binary: %w", err)
}
cwg.Wait()
default:
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, console, *p)
outw.Close()
outr.Close()
wg.Done()
}()
cwg.Wait()
}
return console, nil
}
func (p *unixPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
return nil
}
func (p *unixPlatform) Close() error {
return nil
}
func (s *Service) initPlatform() error {
s.platform = &unixPlatform{}
return nil
}

View File

@ -1,17 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim

File diff suppressed because it is too large Load Diff

View File

@ -1,180 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto3";
package containerd.runtime.linux.shim.v1;
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
import "github.com/containerd/containerd/api/types/task/task.proto";
option go_package = "github.com/containerd/containerd/runtime/v1/shim/v1;shim";
// Shim service is launched for each container and is responsible for owning the IO
// for the container and its additional processes. The shim is also the parent of
// each container and allows reattaching to the IO and receiving the exit status
// for the container processes.
service Shim {
// State returns shim and task state information.
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(google.protobuf.Empty) returns (DeleteResponse);
rpc DeleteProcess(DeleteProcessRequest) returns (DeleteResponse);
rpc ListPids(ListPidsRequest) returns (ListPidsResponse);
rpc Pause(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Resume(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
// ShimInfo returns information about the shim.
rpc ShimInfo(google.protobuf.Empty) returns (ShimInfoResponse);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
}
message CreateTaskRequest {
string id = 1;
string bundle = 2;
string runtime = 3;
repeated containerd.types.Mount rootfs = 4;
bool terminal = 5;
string stdin = 6;
string stdout = 7;
string stderr = 8;
string checkpoint = 9;
string parent_checkpoint = 10;
google.protobuf.Any options = 11;
}
message CreateTaskResponse {
uint32 pid = 1;
}
message DeleteResponse {
uint32 pid = 1;
uint32 exit_status = 2;
google.protobuf.Timestamp exited_at = 3;
}
message DeleteProcessRequest {
string id = 1;
}
message ExecProcessRequest {
string id = 1;
bool terminal = 2;
string stdin = 3;
string stdout = 4;
string stderr = 5;
google.protobuf.Any spec = 6;
}
message ExecProcessResponse {
}
message ResizePtyRequest {
string id = 1;
uint32 width = 2;
uint32 height = 3;
}
message StateRequest {
string id = 1;
}
message StateResponse {
string id = 1;
string bundle = 2;
uint32 pid = 3;
containerd.v1.types.Status status = 4;
string stdin = 5;
string stdout = 6;
string stderr = 7;
bool terminal = 8;
uint32 exit_status = 9;
google.protobuf.Timestamp exited_at = 10;
}
message KillRequest {
string id = 1;
uint32 signal = 2;
bool all = 3;
}
message CloseIORequest {
string id = 1;
bool stdin = 2;
}
message ListPidsRequest {
string id = 1;
}
message ListPidsResponse {
repeated containerd.v1.types.ProcessInfo processes = 1;
}
message CheckpointTaskRequest {
string path = 1;
google.protobuf.Any options = 2;
}
message ShimInfoResponse {
uint32 shim_pid = 1;
}
message UpdateTaskRequest {
google.protobuf.Any resources = 1;
}
message StartRequest {
string id = 1;
}
message StartResponse {
string id = 1;
uint32 pid = 2;
}
message WaitRequest {
string id = 1;
}
message WaitResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2;
}

View File

@ -1,285 +0,0 @@
// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT.
// source: github.com/containerd/containerd/runtime/v1/shim/v1/shim.proto
package shim
import (
context "context"
ttrpc "github.com/containerd/ttrpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
type ShimService interface {
State(context.Context, *StateRequest) (*StateResponse, error)
Create(context.Context, *CreateTaskRequest) (*CreateTaskResponse, error)
Start(context.Context, *StartRequest) (*StartResponse, error)
Delete(context.Context, *emptypb.Empty) (*DeleteResponse, error)
DeleteProcess(context.Context, *DeleteProcessRequest) (*DeleteResponse, error)
ListPids(context.Context, *ListPidsRequest) (*ListPidsResponse, error)
Pause(context.Context, *emptypb.Empty) (*emptypb.Empty, error)
Resume(context.Context, *emptypb.Empty) (*emptypb.Empty, error)
Checkpoint(context.Context, *CheckpointTaskRequest) (*emptypb.Empty, error)
Kill(context.Context, *KillRequest) (*emptypb.Empty, error)
Exec(context.Context, *ExecProcessRequest) (*emptypb.Empty, error)
ResizePty(context.Context, *ResizePtyRequest) (*emptypb.Empty, error)
CloseIO(context.Context, *CloseIORequest) (*emptypb.Empty, error)
ShimInfo(context.Context, *emptypb.Empty) (*ShimInfoResponse, error)
Update(context.Context, *UpdateTaskRequest) (*emptypb.Empty, error)
Wait(context.Context, *WaitRequest) (*WaitResponse, error)
}
func RegisterShimService(srv *ttrpc.Server, svc ShimService) {
srv.RegisterService("containerd.runtime.linux.shim.v1.Shim", &ttrpc.ServiceDesc{
Methods: map[string]ttrpc.Method{
"State": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req StateRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.State(ctx, &req)
},
"Create": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req CreateTaskRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Create(ctx, &req)
},
"Start": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req StartRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Start(ctx, &req)
},
"Delete": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req emptypb.Empty
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Delete(ctx, &req)
},
"DeleteProcess": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req DeleteProcessRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.DeleteProcess(ctx, &req)
},
"ListPids": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ListPidsRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.ListPids(ctx, &req)
},
"Pause": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req emptypb.Empty
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Pause(ctx, &req)
},
"Resume": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req emptypb.Empty
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Resume(ctx, &req)
},
"Checkpoint": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req CheckpointTaskRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Checkpoint(ctx, &req)
},
"Kill": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req KillRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Kill(ctx, &req)
},
"Exec": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ExecProcessRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Exec(ctx, &req)
},
"ResizePty": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ResizePtyRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.ResizePty(ctx, &req)
},
"CloseIO": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req CloseIORequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.CloseIO(ctx, &req)
},
"ShimInfo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req emptypb.Empty
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.ShimInfo(ctx, &req)
},
"Update": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req UpdateTaskRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Update(ctx, &req)
},
"Wait": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req WaitRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Wait(ctx, &req)
},
},
})
}
type shimClient struct {
client *ttrpc.Client
}
func NewShimClient(client *ttrpc.Client) ShimService {
return &shimClient{
client: client,
}
}
func (c *shimClient) State(ctx context.Context, req *StateRequest) (*StateResponse, error) {
var resp StateResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "State", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
var resp CreateTaskResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Create", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Start(ctx context.Context, req *StartRequest) (*StartResponse, error) {
var resp StartResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Start", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Delete(ctx context.Context, req *emptypb.Empty) (*DeleteResponse, error) {
var resp DeleteResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Delete", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) DeleteProcess(ctx context.Context, req *DeleteProcessRequest) (*DeleteResponse, error) {
var resp DeleteResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "DeleteProcess", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) ListPids(ctx context.Context, req *ListPidsRequest) (*ListPidsResponse, error) {
var resp ListPidsResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ListPids", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Pause(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Pause", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Resume(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Resume", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Checkpoint", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Kill(ctx context.Context, req *KillRequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Kill", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Exec(ctx context.Context, req *ExecProcessRequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Exec", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) ResizePty(ctx context.Context, req *ResizePtyRequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ResizePty", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) CloseIO(ctx context.Context, req *CloseIORequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "CloseIO", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) ShimInfo(ctx context.Context, req *emptypb.Empty) (*ShimInfoResponse, error) {
var resp ShimInfoResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "ShimInfo", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Update(ctx context.Context, req *UpdateTaskRequest) (*emptypb.Empty, error) {
var resp emptypb.Empty
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Update", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *shimClient) Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error) {
var resp WaitResponse
if err := c.client.Call(ctx, "containerd.runtime.linux.shim.v1.Shim", "Wait", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}

View File

@ -1,736 +0,0 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
goruntime "runtime"
"sync"
"syscall"
"time"
"github.com/containerd/cgroups/v3/cgroup1"
eventstypes "github.com/containerd/containerd/api/events"
taskAPI "github.com/containerd/containerd/api/runtime/task/v2"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/oom"
oomv1 "github.com/containerd/containerd/pkg/oom/v1"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/schedcore"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/protobuf/proto"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc"
"github.com/containerd/typeurl/v2"
"github.com/sirupsen/logrus"
exec "golang.org/x/sys/execabs"
"golang.org/x/sys/unix"
)
var (
_ = (taskAPI.TaskService)(&service{})
empty = &ptypes.Empty{}
)
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := oomv1.New(publisher)
if err != nil {
return nil, err
}
go ep.Run(ctx)
s := &service{
id: id,
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
cancel: shutdown,
}
go s.processExits()
runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil {
shutdown()
return nil, fmt.Errorf("failed to initialized platform behavior: %w", err)
}
go s.forward(ctx, publisher)
return s, nil
}
// service is the shim implementation of a remote shim over GRPC
type service struct {
mu sync.Mutex
eventSendMu sync.Mutex
context context.Context
events chan interface{}
platform stdio.Platform
ec chan runcC.Exit
ep oom.Watcher
id string
container *runc.Container
cancel func()
}
func newCommand(ctx context.Context, id, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func (s *service) StartShim(ctx context.Context, opts shim.StartOpts) (_ string, retErr error) {
cmd, err := newCommand(ctx, opts.ID, opts.Address, opts.TTRPCAddress)
if err != nil {
return "", err
}
address, err := shim.SocketAddress(ctx, opts.Address, opts.ID)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
if !shim.SocketEaddrinuse(err) {
return "", err
}
if err := shim.RemoveSocket(address); err != nil {
return "", fmt.Errorf("remove already used socket: %w", err)
}
if socket, err = shim.NewSocket(address); err != nil {
return "", err
}
}
defer func() {
if retErr != nil {
socket.Close()
_ = shim.RemoveSocket(address)
}
}()
// make sure that reexec shim-v2 binary use the value if need
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
f, err := socket.File()
if err != nil {
return "", err
}
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
goruntime.LockOSThread()
if os.Getenv("SCHED_CORE") != "" {
if err := schedcore.Create(schedcore.ProcessGroup); err != nil {
return "", fmt.Errorf("enable sched core support: %w", err)
}
}
if err := cmd.Start(); err != nil {
f.Close()
return "", err
}
goruntime.UnlockOSThread()
defer func() {
if retErr != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
if data, err := io.ReadAll(os.Stdin); err == nil {
if len(data) > 0 {
var any ptypes.Any
if err := proto.Unmarshal(data, &any); err != nil {
return "", err
}
v, err := typeurl.UnmarshalAny(&any)
if err != nil {
return "", err
}
if opts, ok := v.(*options.Options); ok {
if opts.ShimCgroup != "" {
cg, err := cgroup1.Load(cgroup1.StaticPath(opts.ShimCgroup))
if err != nil {
return "", fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return "", fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err)
}
}
}
}
}
if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
return "", fmt.Errorf("failed to adjust OOM score for shim: %w", err)
}
return address, nil
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
if address, err := shim.ReadAddress("address"); err == nil {
if err = shim.RemoveSocket(address); err != nil {
return nil, err
}
}
path, err := os.Getwd()
if err != nil {
return nil, err
}
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
runtime, err := runc.ReadRuntime(path)
if err != nil {
return nil, err
}
opts, err := runc.ReadOptions(path)
if err != nil {
return nil, err
}
root := process.RuncRoot
if opts != nil && opts.Root != "" {
root = opts.Root
}
r := process.NewRunc(root, path, ns, runtime, false)
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
logrus.WithError(err).Warn("failed to remove runc container")
}
if err := mount.UnmountRecursive(filepath.Join(path, "rootfs"), 0); err != nil {
logrus.WithError(err).Warn("failed to cleanup rootfs mount")
}
pid, err := runcC.ReadPidFile(filepath.Join(path, process.InitPidFile))
if err != nil {
logrus.WithError(err).Warn("failed to read init pid file")
}
return &taskAPI.DeleteResponse{
ExitedAt: protobuf.ToTimestamp(time.Now()),
ExitStatus: 128 + uint32(unix.SIGKILL),
Pid: uint32(pid),
}, nil
}
// Create a new initial process and container with the underlying OCI runtime
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
s.mu.Lock()
defer s.mu.Unlock()
container, err := runc.NewContainer(ctx, s.platform, r)
if err != nil {
return nil, err
}
s.container = container
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Rootfs: r.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
Checkpoint: r.Checkpoint,
Pid: uint32(container.Pid()),
})
return &taskAPI.CreateTaskResponse{
Pid: uint32(container.Pid()),
}, nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
// hold the send lock so that the start events are sent before any exit events in the error case
s.eventSendMu.Lock()
p, err := container.Start(ctx, r)
if err != nil {
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
switch r.ExecID {
case "":
if cg, ok := container.Cgroup().(cgroup1.Cgroup); ok {
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
} else {
logrus.WithError(errdefs.ErrNotImplemented).Error("add cg to OOM monitor")
}
s.send(&eventstypes.TaskStart{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
})
default:
s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Pid: uint32(p.Pid()),
})
}
s.eventSendMu.Unlock()
return &taskAPI.StartResponse{
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p, err := container.Delete(ctx, r)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// if we deleted our init task, close the platform and send the task delete event
if r.ExecID == "" {
if s.platform != nil {
s.platform.Close()
}
s.send(&eventstypes.TaskDelete{
ContainerID: container.ID,
Pid: uint32(p.Pid()),
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
}
return &taskAPI.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
ok, cancel := container.ReserveProcess(r.ExecID)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskExecAdded{
ContainerID: s.container.ID,
ExecID: process.ID(),
})
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.ResizePty(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.Status_UNKNOWN
switch st {
case "created":
status = task.Status_CREATED
case "running":
status = task.Status_RUNNING
case "stopped":
status = task.Status_STOPPED
case "paused":
status = task.Status_PAUSED
case "pausing":
status = task.Status_PAUSING
}
sio := p.Stdio()
return &taskAPI.StateResponse{
ID: p.ID(),
Bundle: s.container.Bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Pause(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskPaused{
ContainerID: container.ID,
})
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Resume(ctx); err != nil {
return nil, errdefs.ToGRPC(err)
}
s.send(&eventstypes.TaskResumed{
ContainerID: container.ID,
})
return empty, nil
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Kill(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range container.ExecdProcesses() {
if p.Pid() == int(pid) {
d := &options.ProcessDetails{
ExecID: p.ID(),
}
a, err := protobuf.MarshalAnyToProto(d)
if err != nil {
return nil, fmt.Errorf("failed to marshal process %d info: %w", pid, err)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &taskAPI.PidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.CloseIO(ctx, r); err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Checkpoint(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
if err := container.Update(ctx, r); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p, err := container.Process(r.ExecID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
p.Wait()
return &taskAPI.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
}, nil
}
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if s.container != nil {
pid = s.container.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(pid),
}, nil
}
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
// please make sure that temporary resource has been cleanup
// before shutdown service.
s.cancel()
close(s.events)
return empty, nil
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
cgx := container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
cg, ok := cgx.(cgroup1.Cgroup)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "cgroup v2 not implemented for Stats")
}
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
stats, err := cg.Stat(cgroup1.IgnoreNotExist)
if err != nil {
return nil, err
}
data, err := typeurl.MarshalAny(stats)
if err != nil {
return nil, err
}
return &taskAPI.StatsResponse{
Stats: protobuf.FromAny(data),
}, nil
}
func (s *service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *service) send(evt interface{}) {
s.events <- evt
}
func (s *service) sendL(evt interface{}) {
s.eventSendMu.Lock()
s.events <- evt
s.eventSendMu.Unlock()
}
func (s *service) checkProcesses(e runcC.Exit) {
container, err := s.getContainer()
if err != nil {
return
}
for _, p := range container.All() {
if p.Pid() == e.Pid {
if runc.ShouldKillAllOnExit(s.context, container.Bundle) {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
}
p.SetExited(e.Status)
s.sendL(&eventstypes.TaskExit{
ContainerID: container.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
return
}
}
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p, err := s.container.Process("")
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel()
if err != nil {
logrus.WithError(err).Error("post event")
}
}
publisher.Close()
}
func (s *service) getContainer() (*runc.Container, error) {
s.mu.Lock()
container := s.container
s.mu.Unlock()
if container == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created")
}
return container, nil
}
func (s *service) getProcess(execID string) (process.Process, error) {
container, err := s.getContainer()
if err != nil {
return nil, err
}
p, err := container.Process(execID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return p, nil
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *service) initPlatform() error {
if s.platform != nil {
return nil
}
p, err := runc.NewPlatform()
if err != nil {
return err
}
s.platform = p
return nil
}