linux -> runtime/linux

Signed-off-by: Evan Hazlett <ejhazlett@gmail.com>
This commit is contained in:
Evan Hazlett
2018-05-23 08:03:02 -04:00
parent 8d768689fa
commit cae94b930d
42 changed files with 182 additions and 148 deletions

155
runtime/linux/bundle.go Normal file
View File

@@ -0,0 +1,155 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/containerd/runtime/linux/shim"
"github.com/containerd/containerd/runtime/linux/shim/client"
"github.com/pkg/errors"
)
// loadBundle loads an existing bundle from disk
func loadBundle(id, path, workdir string) *bundle {
return &bundle{
id: id,
path: path,
workDir: workdir,
}
}
// newBundle creates a new bundle on disk at the provided path for the given id
func newBundle(id, path, workDir string, spec []byte) (b *bundle, err error) {
if err := os.MkdirAll(path, 0711); err != nil {
return nil, err
}
path = filepath.Join(path, id)
defer func() {
if err != nil {
os.RemoveAll(path)
}
}()
workDir = filepath.Join(workDir, id)
if err := os.MkdirAll(workDir, 0711); err != nil {
return nil, err
}
defer func() {
if err != nil {
os.RemoveAll(workDir)
}
}()
if err := os.Mkdir(path, 0711); err != nil {
return nil, err
}
if err := os.Mkdir(filepath.Join(path, "rootfs"), 0711); err != nil {
return nil, err
}
err = ioutil.WriteFile(filepath.Join(path, configFilename), spec, 0666)
return &bundle{
id: id,
path: path,
workDir: workDir,
}, err
}
type bundle struct {
id string
path string
workDir string
}
// ShimOpt specifies shim options for initialization and connection
type ShimOpt func(*bundle, string, *runctypes.RuncOptions) (shim.Config, client.Opt)
// ShimRemote is a ShimOpt for connecting and starting a remote shim
func ShimRemote(c *Config, daemonAddress, cgroup string, exitHandler func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
config := b.shimConfig(ns, c, ropts)
return config,
client.WithStart(c.Shim, b.shimAddress(ns), daemonAddress, cgroup, c.ShimDebug, exitHandler)
}
}
// ShimLocal is a ShimOpt for using an in process shim implementation
func ShimLocal(c *Config, exchange *exchange.Exchange) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, c, ropts), client.WithLocal(exchange)
}
}
// ShimConnect is a ShimOpt for connecting to an existing remote shim
func ShimConnect(c *Config, onClose func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, c, ropts), client.WithConnect(b.shimAddress(ns), onClose)
}
}
// NewShimClient connects to the shim managing the bundle and tasks creating it if needed
func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientOpts ShimOpt, runcOpts *runctypes.RuncOptions) (*client.Client, error) {
cfg, opt := getClientOpts(b, namespace, runcOpts)
return client.New(ctx, cfg, opt)
}
// Delete deletes the bundle from disk
func (b *bundle) Delete() error {
err := os.RemoveAll(b.path)
if err == nil {
return os.RemoveAll(b.workDir)
}
// error removing the bundle path; still attempt removing work dir
err2 := os.RemoveAll(b.workDir)
if err2 == nil {
return err
}
return errors.Wrapf(err, "Failed to remove both bundle and workdir locations: %v", err2)
}
func (b *bundle) shimAddress(namespace string) string {
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
}
func (b *bundle) shimConfig(namespace string, c *Config, runcOptions *runctypes.RuncOptions) shim.Config {
var (
criuPath string
runtimeRoot = c.RuntimeRoot
systemdCgroup bool
)
if runcOptions != nil {
criuPath = runcOptions.CriuPath
systemdCgroup = runcOptions.SystemdCgroup
if runcOptions.RuntimeRoot != "" {
runtimeRoot = runcOptions.RuntimeRoot
}
}
return shim.Config{
Path: b.path,
WorkDir: b.workDir,
Namespace: namespace,
Criu: criuPath,
RuntimeRoot: runtimeRoot,
SystemdCgroup: systemdCgroup,
}
}

View File

@@ -0,0 +1,70 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"github.com/containerd/console"
google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
)
type deletedState struct {
}
func (s *deletedState) Pause(ctx context.Context) error {
return errors.Errorf("cannot pause a deleted process")
}
func (s *deletedState) Resume(ctx context.Context) error {
return errors.Errorf("cannot resume a deleted process")
}
func (s *deletedState) Update(context context.Context, r *google_protobuf.Any) error {
return errors.Errorf("cannot update a deleted process")
}
func (s *deletedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
return errors.Errorf("cannot checkpoint a deleted process")
}
func (s *deletedState) Resize(ws console.WinSize) error {
return errors.Errorf("cannot resize a deleted process")
}
func (s *deletedState) Start(ctx context.Context) error {
return errors.Errorf("cannot start a deleted process")
}
func (s *deletedState) Delete(ctx context.Context) error {
return errors.Errorf("cannot delete a deleted process")
}
func (s *deletedState) Kill(ctx context.Context, sig uint32, all bool) error {
return errors.Errorf("cannot kill a deleted process")
}
func (s *deletedState) SetExited(status int) {
// no op
}
func (s *deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
return nil, errors.Errorf("cannot exec in a deleted state")
}

225
runtime/linux/proc/exec.go Normal file
View File

@@ -0,0 +1,225 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"golang.org/x/sys/unix"
"github.com/containerd/console"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
type execProcess struct {
wg sync.WaitGroup
State
mu sync.Mutex
id string
console console.Console
io runc.IO
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio Stdio
path string
spec specs.Process
parent *Init
waitBlock chan struct{}
}
func (e *execProcess) Wait() {
<-e.waitBlock
}
func (e *execProcess) ID() string {
return e.id
}
func (e *execProcess) Pid() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.pid
}
func (e *execProcess) ExitStatus() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.status
}
func (e *execProcess) ExitedAt() time.Time {
e.mu.Lock()
defer e.mu.Unlock()
return e.exited
}
func (e *execProcess) setExited(status int) {
e.status = status
e.exited = time.Now()
e.parent.platform.ShutdownConsole(context.Background(), e.console)
close(e.waitBlock)
}
func (e *execProcess) delete(ctx context.Context) error {
e.wg.Wait()
if e.io != nil {
for _, c := range e.closers {
c.Close()
}
e.io.Close()
}
pidfile := filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
// silently ignore error
os.Remove(pidfile)
return nil
}
func (e *execProcess) resize(ws console.WinSize) error {
if e.console == nil {
return nil
}
return e.console.Resize(ws)
}
func (e *execProcess) kill(ctx context.Context, sig uint32, _ bool) error {
pid := e.pid
if pid != 0 {
if err := unix.Kill(pid, syscall.Signal(sig)); err != nil {
return errors.Wrapf(checkKillError(err), "exec kill error")
}
}
return nil
}
func (e *execProcess) Stdin() io.Closer {
return e.stdin
}
func (e *execProcess) Stdio() Stdio {
return e.stdio
}
func (e *execProcess) start(ctx context.Context) (err error) {
var (
socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
)
if e.stdio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket")
}
defer socket.Close()
} else if e.stdio.IsNull() {
if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID); err != nil {
return errors.Wrap(err, "failed to create runc io pipes")
}
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: e.io,
Detach: true,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil {
close(e.waitBlock)
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.Stdin != "" {
fifoCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
sc, err := fifo.OpenFifo(fifoCtx, e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin)
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
if e.console, err = e.parent.platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else if !e.stdio.IsNull() {
fifoCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err := copyPipes(fifoCtx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
e.pid = pid
return nil
}
func (e *execProcess) Status(ctx context.Context) (string, error) {
s, err := e.parent.Status(ctx)
if err != nil {
return "", err
}
// if the container as a whole is in the pausing/paused state, so are all
// other processes inside the container, use container state here
switch s {
case "paused", "pausing":
return s, nil
}
e.mu.Lock()
defer e.mu.Unlock()
// if we don't have a pid then the exec process has just been created
if e.pid == 0 {
return "created", nil
}
// if we have a pid and it can be signaled, the process is running
if err := unix.Kill(e.pid, 0); err == nil {
return "running", nil
}
// else if we have a pid but it can nolonger be signaled, it has stopped
return "stopped", nil
}

View File

@@ -0,0 +1,188 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"github.com/containerd/console"
"github.com/pkg/errors"
)
type execCreatedState struct {
p *execProcess
}
func (s *execCreatedState) transition(name string) error {
switch name {
case "running":
s.p.State = &execRunningState{p: s.p}
case "stopped":
s.p.State = &execStoppedState{p: s.p}
case "deleted":
s.p.State = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execCreatedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *execCreatedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.start(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *execCreatedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *execCreatedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execCreatedState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type execRunningState struct {
p *execProcess
}
func (s *execRunningState) transition(name string) error {
switch name {
case "stopped":
s.p.State = &execStoppedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execRunningState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *execRunningState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a running process")
}
func (s *execRunningState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a running process")
}
func (s *execRunningState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execRunningState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type execStoppedState struct {
p *execProcess
}
func (s *execStoppedState) transition(name string) error {
switch name {
case "deleted":
s.p.State = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *execStoppedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resize a stopped container")
}
func (s *execStoppedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a stopped process")
}
func (s *execStoppedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *execStoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *execStoppedState) SetExited(status int) {
// no op
}

451
runtime/linux/proc/init.go Normal file
View File

@@ -0,0 +1,451 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/console"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
google_protobuf "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
// InitPidFile name of the file that contains the init pid
const InitPidFile = "init.pid"
// Init represents an initial process for a container
type Init struct {
wg sync.WaitGroup
initState
// mu is used to ensure that `Start()` and `Exited()` calls return in
// the right order when invoked in separate go routines.
// This is the case within the shim implementation as it makes use of
// the reaper interface.
mu sync.Mutex
waitBlock chan struct{}
workDir string
id string
bundle string
console console.Console
platform Platform
io runc.IO
runtime *runc.Runc
status int
exited time.Time
pid int
closers []io.Closer
stdin io.Closer
stdio Stdio
rootfs string
IoUID int
IoGID int
}
// NewRunc returns a new runc instance for a process
func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc {
if root == "" {
root = RuncRoot
}
return &runc.Runc{
Command: runtime,
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(root, namespace),
Criu: criu,
SystemdCgroup: systemd,
}
}
// New returns a new init process
func New(context context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform Platform, r *CreateConfig) (*Init, error) {
var success bool
var options runctypes.CreateOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
options = *v.(*runctypes.CreateOptions)
}
rootfs := filepath.Join(path, "rootfs")
// count the number of successful mounts so we can undo
// what was actually done rather than what should have been
// done.
defer func() {
if success {
return
}
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
log.G(context).WithError(err2).Warn("Failed to cleanup rootfs mount")
}
}()
for _, rm := range r.Rootfs {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
runtime := NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup)
p := &Init{
id: r.ID,
bundle: r.Bundle,
runtime: runtime,
platform: platform,
stdio: Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
rootfs: rootfs,
workDir: workDir,
status: 0,
waitBlock: make(chan struct{}),
IoUID: int(options.IoUid),
IoGID: int(options.IoGid),
}
p.initState = &createdState{p: p}
var (
err error
socket *runc.Socket
)
if r.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return nil, errors.Wrap(err, "failed to create OCI runtime console socket")
}
defer socket.Close()
} else if hasNoIO(r) {
if p.io, err = runc.NewNullIO(); err != nil {
return nil, errors.Wrap(err, "creating new NULL IO")
}
} else {
if p.io, err = runc.NewPipeIO(int(options.IoUid), int(options.IoGid)); err != nil {
return nil, errors.Wrap(err, "failed to create OCI runtime io pipes")
}
}
pidFile := filepath.Join(path, InitPidFile)
if r.Checkpoint != "" {
opts := &runc.RestoreOpts{
CheckpointOpts: runc.CheckpointOpts{
ImagePath: r.Checkpoint,
WorkDir: p.workDir,
ParentPath: r.ParentCheckpoint,
},
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
Detach: true,
NoSubreaper: true,
}
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
success = true
return p, nil
}
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
NoNewKeyring: options.NoNewKeyring,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := p.runtime.Create(context, r.ID, r.Bundle, opts); err != nil {
return nil, p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, errors.Wrapf(err, "failed to open stdin fifo %s", r.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
console, err = platform.CopyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !hasNoIO(r) {
if err := copyPipes(context, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(pidFile)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
success = true
return p, nil
}
// Wait for the process to exit
func (p *Init) Wait() {
<-p.waitBlock
}
// ID of the process
func (p *Init) ID() string {
return p.id
}
// Pid of the process
func (p *Init) Pid() int {
return p.pid
}
// ExitStatus of the process
func (p *Init) ExitStatus() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.status
}
// ExitedAt at time when the process exited
func (p *Init) ExitedAt() time.Time {
p.mu.Lock()
defer p.mu.Unlock()
return p.exited
}
// Status of the process
func (p *Init) Status(ctx context.Context) (string, error) {
p.mu.Lock()
defer p.mu.Unlock()
c, err := p.runtime.State(ctx, p.id)
if err != nil {
if os.IsNotExist(err) {
return "stopped", nil
}
return "", p.runtimeError(err, "OCI runtime state failed")
}
return c.Status, nil
}
func (p *Init) start(context context.Context) error {
err := p.runtime.Start(context, p.id)
return p.runtimeError(err, "OCI runtime start failed")
}
func (p *Init) setExited(status int) {
p.exited = time.Now()
p.status = status
p.platform.ShutdownConsole(context.Background(), p.console)
close(p.waitBlock)
}
func (p *Init) delete(context context.Context) error {
p.KillAll(context)
p.wg.Wait()
err := p.runtime.Delete(context, p.id, nil)
// ignore errors if a runtime has already deleted the process
// but we still hold metadata and pipes
//
// this is common during a checkpoint, runc will delete the container state
// after a checkpoint and the container will no longer exist within runc
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
err = nil
} else {
err = p.runtimeError(err, "failed to delete task")
}
}
if p.io != nil {
for _, c := range p.closers {
c.Close()
}
p.io.Close()
}
if err2 := mount.UnmountAll(p.rootfs, 0); err2 != nil {
log.G(context).WithError(err2).Warn("failed to cleanup rootfs mount")
if err == nil {
err = errors.Wrap(err2, "failed rootfs umount")
}
}
return err
}
func (p *Init) resize(ws console.WinSize) error {
if p.console == nil {
return nil
}
return p.console.Resize(ws)
}
func (p *Init) pause(context context.Context) error {
err := p.runtime.Pause(context, p.id)
return p.runtimeError(err, "OCI runtime pause failed")
}
func (p *Init) resume(context context.Context) error {
err := p.runtime.Resume(context, p.id)
return p.runtimeError(err, "OCI runtime resume failed")
}
func (p *Init) kill(context context.Context, signal uint32, all bool) error {
err := p.runtime.Kill(context, p.id, int(signal), &runc.KillOpts{
All: all,
})
return checkKillError(err)
}
// KillAll processes belonging to the init process
func (p *Init) KillAll(context context.Context) error {
err := p.runtime.Kill(context, p.id, int(syscall.SIGKILL), &runc.KillOpts{
All: true,
})
return p.runtimeError(err, "OCI runtime killall failed")
}
// Stdin of the process
func (p *Init) Stdin() io.Closer {
return p.stdin
}
// Runtime returns the OCI runtime configured for the init process
func (p *Init) Runtime() *runc.Runc {
return p.runtime
}
// exec returns a new exec'd process
func (p *Init) exec(context context.Context, path string, r *ExecConfig) (Process, error) {
// process exec request
var spec specs.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, err
}
spec.Terminal = r.Terminal
e := &execProcess{
id: r.ID,
path: path,
parent: p,
spec: spec,
stdio: Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Terminal: r.Terminal,
},
waitBlock: make(chan struct{}),
}
e.State = &execCreatedState{p: e}
return e, nil
}
func (p *Init) checkpoint(context context.Context, r *CheckpointConfig) error {
var options runctypes.CheckpointOptions
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return err
}
options = *v.(*runctypes.CheckpointOptions)
}
var actions []runc.CheckpointAction
if !options.Exit {
actions = append(actions, runc.LeaveRunning)
}
work := filepath.Join(p.workDir, "criu-work")
defer os.RemoveAll(work)
if err := p.runtime.Checkpoint(context, p.id, &runc.CheckpointOpts{
WorkDir: work,
ImagePath: r.Path,
AllowOpenTCP: options.OpenTcp,
AllowExternalUnixSockets: options.ExternalUnixSockets,
AllowTerminal: options.Terminal,
FileLocks: options.FileLocks,
EmptyNamespaces: options.EmptyNamespaces,
}, actions...); err != nil {
dumpLog := filepath.Join(p.bundle, "criu-dump.log")
if cerr := copyFile(dumpLog, filepath.Join(work, "dump.log")); cerr != nil {
log.G(context).Error(err)
}
return fmt.Errorf("%s path= %s", criuError(err), dumpLog)
}
return nil
}
func (p *Init) update(context context.Context, r *google_protobuf.Any) error {
var resources specs.LinuxResources
if err := json.Unmarshal(r.Value, &resources); err != nil {
return err
}
return p.runtime.Update(context, p.id, &resources)
}
// Stdio of the process
func (p *Init) Stdio() Stdio {
return p.stdio
}
func (p *Init) runtimeError(rErr error, msg string) error {
if rErr == nil {
return nil
}
rMsg, err := getLastRuntimeError(p.runtime)
switch {
case err != nil:
return errors.Wrapf(rErr, "%s: %s (%s)", msg, "unable to retrieve OCI runtime error", err.Error())
case rMsg == "":
return errors.Wrap(rErr, msg)
default:
return errors.Errorf("%s: %s", msg, rMsg)
}
}

View File

@@ -0,0 +1,544 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
google_protobuf "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
)
type initState interface {
State
Pause(context.Context) error
Resume(context.Context) error
Update(context.Context, *google_protobuf.Any) error
Checkpoint(context.Context, *CheckpointConfig) error
Exec(context.Context, string, *ExecConfig) (Process, error)
}
type createdState struct {
p *Init
}
func (s *createdState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *createdState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause task in created state")
}
func (s *createdState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume task in created state")
}
func (s *createdState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *createdState) Checkpoint(context context.Context, r *CheckpointConfig) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a task in created state")
}
func (s *createdState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *createdState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.start(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *createdState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *createdState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *createdState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.exec(ctx, path, r)
}
type createdCheckpointState struct {
p *Init
opts *runc.RestoreOpts
}
func (s *createdCheckpointState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *createdCheckpointState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause task in created state")
}
func (s *createdCheckpointState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume task in created state")
}
func (s *createdCheckpointState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *createdCheckpointState) Checkpoint(context context.Context, r *CheckpointConfig) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a task in created state")
}
func (s *createdCheckpointState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *createdCheckpointState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
p := s.p
sio := p.stdio
var (
err error
socket *runc.Socket
)
if sio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create OCI runtime console socket")
}
defer socket.Close()
s.opts.ConsoleSocket = socket
}
if _, err := s.p.runtime.Restore(ctx, p.id, p.bundle, s.opts); err != nil {
return p.runtimeError(err, "OCI runtime restore failed")
}
if sio.Stdin != "" {
sc, err := fifo.OpenFifo(ctx, sio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.Stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
console, err = p.platform.CopyConsole(ctx, console, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup)
if err != nil {
return errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !sio.IsNull() {
if err := copyPipes(ctx, p.io, sio.Stdin, sio.Stdout, sio.Stderr, &p.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(s.opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
return s.transition("running")
}
func (s *createdCheckpointState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *createdCheckpointState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *createdCheckpointState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
func (s *createdCheckpointState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return nil, errors.Errorf("cannot exec in a created state")
}
type runningState struct {
p *Init
}
func (s *runningState) transition(name string) error {
switch name {
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "paused":
s.p.initState = &pausedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *runningState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.pause(ctx); err != nil {
return err
}
return s.transition("paused")
}
func (s *runningState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume a running process")
}
func (s *runningState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *runningState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.checkpoint(ctx, r)
}
func (s *runningState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *runningState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a running process")
}
func (s *runningState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a running process")
}
func (s *runningState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *runningState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.exec(ctx, path, r)
}
type pausedState struct {
p *Init
}
func (s *pausedState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *pausedState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause a paused container")
}
func (s *pausedState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.resume(ctx); err != nil {
return err
}
return s.transition("running")
}
func (s *pausedState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *pausedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.checkpoint(ctx, r)
}
func (s *pausedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *pausedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a paused process")
}
func (s *pausedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot delete a paused process")
}
func (s *pausedState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *pausedState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
func (s *pausedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return nil, errors.Errorf("cannot exec in a paused state")
}
type stoppedState struct {
p *Init
}
func (s *stoppedState) transition(name string) error {
switch name {
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *stoppedState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause a stopped container")
}
func (s *stoppedState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume a stopped container")
}
func (s *stoppedState) Update(context context.Context, r *google_protobuf.Any) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot update a stopped container")
}
func (s *stoppedState) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a stopped container")
}
func (s *stoppedState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resize a stopped container")
}
func (s *stoppedState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot start a stopped process")
}
func (s *stoppedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *stoppedState) Kill(ctx context.Context, sig uint32, all bool) error {
return errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", s.p.id)
}
func (s *stoppedState) SetExited(status int) {
// no op
}
func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return nil, errors.Errorf("cannot exec in a stopped state")
}

146
runtime/linux/proc/io.go Normal file
View File

@@ -0,0 +1,146 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"fmt"
"io"
"os"
"sync"
"syscall"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
)
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
func copyPipes(ctx context.Context, rio runc.IO, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) error {
var sameFile io.WriteCloser
for _, i := range []struct {
name string
dest func(wc io.WriteCloser, rc io.Closer)
}{
{
name: stdout,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(wc, rio.Stdout(), *p)
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
}, {
name: stderr,
dest: func(wc io.WriteCloser, rc io.Closer) {
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(wc, rio.Stderr(), *p)
wg.Done()
wc.Close()
if rc != nil {
rc.Close()
}
}()
},
},
} {
ok, err := isFifo(i.name)
if err != nil {
return err
}
var (
fw io.WriteCloser
fr io.Closer
)
if ok {
if fw, err = fifo.OpenFifo(ctx, i.name, syscall.O_WRONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
}
if fr, err = fifo.OpenFifo(ctx, i.name, syscall.O_RDONLY, 0); err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
}
} else {
if sameFile != nil {
i.dest(sameFile, nil)
continue
}
if fw, err = os.OpenFile(i.name, syscall.O_WRONLY|syscall.O_APPEND, 0); err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", i.name, err)
}
if stdout == stderr {
sameFile = fw
}
}
i.dest(fw, fr)
}
if stdin == "" {
rio.Stdin().Close()
return nil
}
f, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("containerd-shim: opening %s failed: %s", stdin, err)
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(rio.Stdin(), f, *p)
rio.Stdin().Close()
f.Close()
}()
return nil
}
// isFifo checks if a file is a fifo
// if the file does not exist then it returns false
func isFifo(path string) (bool, error) {
stat, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
if stat.Mode()&os.ModeNamedPipe == os.ModeNamedPipe {
return true, nil
}
return false, nil
}

View File

@@ -0,0 +1,105 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"context"
"io"
"sync"
"time"
"github.com/containerd/console"
"github.com/pkg/errors"
)
// RuncRoot is the path to the root runc state directory
const RuncRoot = "/run/containerd/runc"
// Stdio of a process
type Stdio struct {
Stdin string
Stdout string
Stderr string
Terminal bool
}
// IsNull returns true if the stdio is not defined
func (s Stdio) IsNull() bool {
return s.Stdin == "" && s.Stdout == "" && s.Stderr == ""
}
// Process on a linux system
type Process interface {
State
// ID returns the id for the process
ID() string
// Pid returns the pid for the process
Pid() int
// ExitStatus returns the exit status
ExitStatus() int
// ExitedAt is the time the process exited
ExitedAt() time.Time
// Stdin returns the process STDIN
Stdin() io.Closer
// Stdio returns io information for the container
Stdio() Stdio
// Status returns the process status
Status(context.Context) (string, error)
// Wait blocks until the process has exited
Wait()
}
// State of a process
type State interface {
// Resize resizes the process console
Resize(ws console.WinSize) error
// Start execution of the process
Start(context.Context) error
// Delete deletes the process and its resourcess
Delete(context.Context) error
// Kill kills the process
Kill(context.Context, uint32, bool) error
// SetExited sets the exit status for the process
SetExited(status int)
}
func stateName(v interface{}) string {
switch v.(type) {
case *runningState, *execRunningState:
return "running"
case *createdState, *execCreatedState, *createdCheckpointState:
return "created"
case *pausedState:
return "paused"
case *deletedState:
return "deleted"
case *stoppedState:
return "stopped"
}
panic(errors.Errorf("invalid state %v", v))
}
// Platform handles platform-specific behavior that may differs across
// platform implementations
type Platform interface {
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
wg, cwg *sync.WaitGroup) (console.Console, error)
ShutdownConsole(ctx context.Context, console console.Console) error
Close() error
}

View File

@@ -0,0 +1,60 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
google_protobuf "github.com/gogo/protobuf/types"
)
// Mount holds filesystem mount configuration
type Mount struct {
Type string
Source string
Target string
Options []string
}
// CreateConfig hold task creation configuration
type CreateConfig struct {
ID string
Bundle string
Runtime string
Rootfs []Mount
Terminal bool
Stdin string
Stdout string
Stderr string
Checkpoint string
ParentCheckpoint string
Options *google_protobuf.Any
}
// ExecConfig holds exec creation configuration
type ExecConfig struct {
ID string
Terminal bool
Stdin string
Stdout string
Stderr string
Spec *google_protobuf.Any
}
// CheckpointConfig holds task checkpoint configuration
type CheckpointConfig struct {
Path string
Options *google_protobuf.Any
}

104
runtime/linux/proc/utils.go Normal file
View File

@@ -0,0 +1,104 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proc
import (
"encoding/json"
"io"
"os"
"strings"
"time"
"github.com/containerd/containerd/errdefs"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
// TODO(mlaventure): move to runc package?
func getLastRuntimeError(r *runc.Runc) (string, error) {
if r.Log == "" {
return "", nil
}
f, err := os.OpenFile(r.Log, os.O_RDONLY, 0400)
if err != nil {
return "", err
}
var (
errMsg string
log struct {
Level string
Msg string
Time time.Time
}
)
dec := json.NewDecoder(f)
for err = nil; err == nil; {
if err = dec.Decode(&log); err != nil && err != io.EOF {
return "", err
}
if log.Level == "error" {
errMsg = strings.TrimSpace(log.Msg)
}
}
return errMsg, nil
}
// criuError returns only the first line of the error message from criu
// it tries to add an invalid dump log location when returning the message
func criuError(err error) string {
parts := strings.Split(err.Error(), "\n")
return parts[0]
}
func copyFile(to, from string) error {
ff, err := os.Open(from)
if err != nil {
return err
}
defer ff.Close()
tt, err := os.Create(to)
if err != nil {
return err
}
defer tt.Close()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
_, err = io.CopyBuffer(tt, ff, *p)
return err
}
func checkKillError(err error) error {
if err == nil {
return nil
}
if strings.Contains(err.Error(), "os: process already finished") || err == unix.ESRCH {
return errors.Wrapf(errdefs.ErrNotFound, "process already finished")
}
return errors.Wrapf(err, "unknown error after kill")
}
func hasNoIO(r *CreateConfig) bool {
return r.Stdin == "" && r.Stdout == "" && r.Stderr == ""
}

151
runtime/linux/process.go Normal file
View File

@@ -0,0 +1,151 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
shim "github.com/containerd/containerd/runtime/linux/shim/v1"
"github.com/pkg/errors"
"github.com/stevvooe/ttrpc"
)
// Process implements a linux process
type Process struct {
id string
t *Task
}
// ID of the process
func (p *Process) ID() string {
return p.id
}
// Kill sends the provided signal to the underlying process
//
// Unable to kill all processes in the task using this method on a process
func (p *Process) Kill(ctx context.Context, signal uint32, _ bool) error {
_, err := p.t.shim.Kill(ctx, &shim.KillRequest{
Signal: signal,
ID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
return err
}
// State of process
func (p *Process) State(ctx context.Context) (runtime.State, error) {
// use the container status for the status of the process
response, err := p.t.shim.State(ctx, &shim.StateRequest{
ID: p.id,
})
if err != nil {
if errors.Cause(err) != ttrpc.ErrClosed {
return runtime.State{}, errdefs.FromGRPC(err)
}
// We treat ttrpc.ErrClosed as the shim being closed, but really this
// likely means that the process no longer exists. We'll have to plumb
// the connection differently if this causes problems.
return runtime.State{}, errdefs.ErrNotFound
}
var status runtime.Status
switch response.Status {
case task.StatusCreated:
status = runtime.CreatedStatus
case task.StatusRunning:
status = runtime.RunningStatus
case task.StatusStopped:
status = runtime.StoppedStatus
case task.StatusPaused:
status = runtime.PausedStatus
case task.StatusPausing:
status = runtime.PausingStatus
}
return runtime.State{
Pid: response.Pid,
Status: status,
Stdin: response.Stdin,
Stdout: response.Stdout,
Stderr: response.Stderr,
Terminal: response.Terminal,
ExitStatus: response.ExitStatus,
}, nil
}
// ResizePty changes the side of the process's PTY to the provided width and height
func (p *Process) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := p.t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
ID: p.id,
Width: size.Width,
Height: size.Height,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// CloseIO closes the provided IO pipe for the process
func (p *Process) CloseIO(ctx context.Context) error {
_, err := p.t.shim.CloseIO(ctx, &shim.CloseIORequest{
ID: p.id,
Stdin: true,
})
if err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Start the process
func (p *Process) Start(ctx context.Context) error {
r, err := p.t.shim.Start(ctx, &shim.StartRequest{
ID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
p.t.events.Publish(ctx, runtime.TaskExecStartedEventTopic, &eventstypes.TaskExecStarted{
ContainerID: p.t.id,
Pid: r.Pid,
ExecID: p.id,
})
return nil
}
// Wait on the process to exit and return the exit status and timestamp
func (p *Process) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := p.t.shim.Wait(ctx, &shim.WaitRequest{
ID: p.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: r.ExitedAt,
Status: r.ExitStatus,
}, nil
}

View File

@@ -0,0 +1,183 @@
file {
name: "github.com/containerd/containerd/linux/runctypes/runc.proto"
package: "containerd.linux.runc"
dependency: "gogoproto/gogo.proto"
message_type {
name: "RuncOptions"
field {
name: "runtime"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "runtime"
}
field {
name: "runtime_root"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "runtimeRoot"
}
field {
name: "criu_path"
number: 3
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "criuPath"
}
field {
name: "systemd_cgroup"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "systemdCgroup"
}
}
message_type {
name: "CreateOptions"
field {
name: "no_pivot_root"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "noPivotRoot"
}
field {
name: "open_tcp"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "openTcp"
}
field {
name: "external_unix_sockets"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "externalUnixSockets"
}
field {
name: "terminal"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "terminal"
}
field {
name: "file_locks"
number: 5
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "fileLocks"
}
field {
name: "empty_namespaces"
number: 6
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "emptyNamespaces"
}
field {
name: "cgroups_mode"
number: 7
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "cgroupsMode"
}
field {
name: "no_new_keyring"
number: 8
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "noNewKeyring"
}
field {
name: "shim_cgroup"
number: 9
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "shimCgroup"
}
field {
name: "io_uid"
number: 10
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "ioUid"
}
field {
name: "io_gid"
number: 11
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "ioGid"
}
}
message_type {
name: "CheckpointOptions"
field {
name: "exit"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "exit"
}
field {
name: "open_tcp"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "openTcp"
}
field {
name: "external_unix_sockets"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "externalUnixSockets"
}
field {
name: "terminal"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "terminal"
}
field {
name: "file_locks"
number: 5
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "fileLocks"
}
field {
name: "empty_namespaces"
number: 6
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "emptyNamespaces"
}
field {
name: "cgroups_mode"
number: 7
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "cgroupsMode"
}
}
message_type {
name: "ProcessDetails"
field {
name: "exec_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "execId"
}
}
options {
go_package: "github.com/containerd/containerd/linux/runctypes;runctypes"
}
weak_dependency: 0
syntax: "proto3"
}

View File

@@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runctypes

View File

@@ -0,0 +1,183 @@
file {
name: "github.com/containerd/containerd/runtime/linux/runctypes/runc.proto"
package: "containerd.linux.runc"
dependency: "gogoproto/gogo.proto"
message_type {
name: "RuncOptions"
field {
name: "runtime"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "runtime"
}
field {
name: "runtime_root"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "runtimeRoot"
}
field {
name: "criu_path"
number: 3
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "criuPath"
}
field {
name: "systemd_cgroup"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "systemdCgroup"
}
}
message_type {
name: "CreateOptions"
field {
name: "no_pivot_root"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "noPivotRoot"
}
field {
name: "open_tcp"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "openTcp"
}
field {
name: "external_unix_sockets"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "externalUnixSockets"
}
field {
name: "terminal"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "terminal"
}
field {
name: "file_locks"
number: 5
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "fileLocks"
}
field {
name: "empty_namespaces"
number: 6
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "emptyNamespaces"
}
field {
name: "cgroups_mode"
number: 7
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "cgroupsMode"
}
field {
name: "no_new_keyring"
number: 8
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "noNewKeyring"
}
field {
name: "shim_cgroup"
number: 9
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "shimCgroup"
}
field {
name: "io_uid"
number: 10
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "ioUid"
}
field {
name: "io_gid"
number: 11
label: LABEL_OPTIONAL
type: TYPE_UINT32
json_name: "ioGid"
}
}
message_type {
name: "CheckpointOptions"
field {
name: "exit"
number: 1
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "exit"
}
field {
name: "open_tcp"
number: 2
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "openTcp"
}
field {
name: "external_unix_sockets"
number: 3
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "externalUnixSockets"
}
field {
name: "terminal"
number: 4
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "terminal"
}
field {
name: "file_locks"
number: 5
label: LABEL_OPTIONAL
type: TYPE_BOOL
json_name: "fileLocks"
}
field {
name: "empty_namespaces"
number: 6
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "emptyNamespaces"
}
field {
name: "cgroups_mode"
number: 7
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "cgroupsMode"
}
}
message_type {
name: "ProcessDetails"
field {
name: "exec_id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "execId"
}
}
options {
go_package: "github.com/containerd/containerd/runtime/linux/runctypes;runctypes"
}
weak_dependency: 0
syntax: "proto3"
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,42 @@
syntax = "proto3";
package containerd.linux.runc;
import weak "gogoproto/gogo.proto";
option go_package = "github.com/containerd/containerd/runtime/linux/runctypes;runctypes";
message RuncOptions {
string runtime = 1;
string runtime_root = 2;
string criu_path = 3;
bool systemd_cgroup = 4;
}
message CreateOptions {
bool no_pivot_root = 1;
bool open_tcp = 2;
bool external_unix_sockets = 3;
bool terminal = 4;
bool file_locks = 5;
repeated string empty_namespaces = 6;
string cgroups_mode = 7;
bool no_new_keyring = 8;
string shim_cgroup = 9;
uint32 io_uid = 10;
uint32 io_gid = 11;
}
message CheckpointOptions {
bool exit = 1;
bool open_tcp = 2;
bool external_unix_sockets = 3;
bool terminal = 4;
bool file_locks = 5;
repeated string empty_namespaces = 6;
string cgroups_mode = 7;
}
message ProcessDetails {
string exec_id = 1;
}

542
runtime/linux/runtime.go Normal file
View File

@@ -0,0 +1,542 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/proc"
"github.com/containerd/containerd/runtime/linux/runctypes"
shim "github.com/containerd/containerd/runtime/linux/shim/v1"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
var (
pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
empty = &ptypes.Empty{}
)
const (
configFilename = "config.json"
defaultRuntime = "runc"
defaultShim = "containerd-shim"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID: "linux",
InitFn: New,
Requires: []plugin.Type{
plugin.TaskMonitorPlugin,
plugin.MetadataPlugin,
},
Config: &Config{
Shim: defaultShim,
Runtime: defaultRuntime,
},
})
}
var _ = (runtime.PlatformRuntime)(&Runtime{})
// Config options for the runtime
type Config struct {
// Shim is a path or name of binary implementing the Shim GRPC API
Shim string `toml:"shim"`
// Runtime is a path or name of an OCI runtime used by the shim
Runtime string `toml:"runtime"`
// RuntimeRoot is the path that shall be used by the OCI runtime for its data
RuntimeRoot string `toml:"runtime_root"`
// NoShim calls runc directly from within the pkg
NoShim bool `toml:"no_shim"`
// Debug enable debug on the shim
ShimDebug bool `toml:"shim_debug"`
}
// New returns a configured runtime
func New(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
if err := os.MkdirAll(ic.State, 0711); err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
r := &Runtime{
root: ic.Root,
state: ic.State,
monitor: monitor.(runtime.TaskMonitor),
tasks: runtime.NewTaskList(),
db: m.(*metadata.DB),
address: ic.Address,
events: ic.Events,
config: cfg,
}
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
}
// TODO: need to add the tasks to the monitor
for _, t := range tasks {
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
return r, nil
}
// Runtime for a linux based system
type Runtime struct {
root string
state string
address string
monitor runtime.TaskMonitor
tasks *runtime.TaskList
db *metadata.DB
events *exchange.Exchange
config *Config
}
// ID of the runtime
func (r *Runtime) ID() string {
return pluginID
}
// Create a new task
func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid task id")
}
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
return nil, err
}
bundle, err := newBundle(id,
filepath.Join(r.state, namespace),
filepath.Join(r.root, namespace),
opts.Spec.Value)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bundle.Delete()
}
}()
shimopt := ShimLocal(r.config, r.events)
if !r.config.NoShim {
var cgroup string
if opts.Options != nil {
v, err := typeurl.UnmarshalAny(opts.Options)
if err != nil {
return nil, err
}
cgroup = v.(*runctypes.CreateOptions).ShimCgroup
}
exitHandler := func() {
log.G(ctx).WithField("id", id).Info("shim reaped")
t, err := r.tasks.Get(ctx, id)
if err != nil {
// Task was never started or was already successfully deleted
return
}
lc := t.(*Task)
// Stop the monitor
if err := r.monitor.Stop(lc); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("failed to stop monitor")
}
log.G(ctx).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
}).Warn("failed to clen up after killed shim")
}
}
shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
}
s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if kerr := s.KillShim(ctx); kerr != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
}
}()
rt := r.config.Runtime
if ropts != nil && ropts.Runtime != "" {
rt = ropts.Runtime
}
sopts := &shim.CreateTaskRequest{
ID: id,
Bundle: bundle.path,
Runtime: rt,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Checkpoint: opts.Checkpoint,
Options: opts.Options,
}
for _, m := range opts.Rootfs {
sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
cr, err := s.Create(ctx, sopts)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events,
proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup))
if err != nil {
return nil, err
}
if err := r.tasks.Add(ctx, t); err != nil {
return nil, err
}
// after the task is created, add it to the monitor if it has a cgroup
// this can be different on a checkpoint/restore
if t.cg != nil {
if err = r.monitor.Monitor(t); err != nil {
if _, err := r.Delete(ctx, t); err != nil {
log.G(ctx).WithError(err).Error("deleting task after failed monitor")
}
return nil, err
}
}
r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
ContainerID: sopts.ID,
Bundle: sopts.Bundle,
Rootfs: sopts.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: sopts.Stdin,
Stdout: sopts.Stdout,
Stderr: sopts.Stderr,
Terminal: sopts.Terminal,
},
Checkpoint: sopts.Checkpoint,
Pid: uint32(t.pid),
})
return t, nil
}
// Delete a task removing all on disk state
func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
lc, ok := c.(*Task)
if !ok {
return nil, fmt.Errorf("task cannot be cast as *linux.Task")
}
if err := r.monitor.Stop(lc); err != nil {
return nil, err
}
bundle := loadBundle(
lc.id,
filepath.Join(r.state, namespace, lc.id),
filepath.Join(r.root, namespace, lc.id),
)
rsp, err := lc.shim.Delete(ctx, empty)
if err != nil {
if cerr := r.cleanupAfterDeadShim(ctx, bundle, namespace, c.ID(), lc.pid); cerr != nil {
log.G(ctx).WithError(err).Error("unable to cleanup task")
}
return nil, errdefs.FromGRPC(err)
}
r.tasks.Delete(ctx, lc.id)
if err := lc.shim.KillShim(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle")
}
r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: lc.id,
ExitStatus: rsp.ExitStatus,
ExitedAt: rsp.ExitedAt,
Pid: rsp.Pid,
})
return &runtime.Exit{
Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt,
Pid: rsp.Pid,
}, nil
}
// Tasks returns all tasks known to the runtime
func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
return r.tasks.GetAll(ctx)
}
func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
dir, err := ioutil.ReadDir(r.state)
if err != nil {
return nil, err
}
var o []*Task
for _, namespace := range dir {
if !namespace.IsDir() {
continue
}
name := namespace.Name()
log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
tasks, err := r.loadTasks(ctx, name)
if err != nil {
return nil, err
}
o = append(o, tasks...)
}
return o, nil
}
// Get a specific task by task id
func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
return r.tasks.Get(ctx, id)
}
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
if err != nil {
return nil, err
}
var o []*Task
for _, path := range dir {
if !path.IsDir() {
continue
}
id := path.Name()
bundle := loadBundle(
id,
filepath.Join(r.state, ns, id),
filepath.Join(r.root, ns, id),
)
ctx = namespaces.WithNamespace(ctx, ns)
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile))
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
}
}), nil)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Error("connecting to shim")
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
}
continue
}
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
log.G(ctx).WithError(err).WithField("id", id).
Error("get runtime options")
continue
}
t, err := newTask(id, ns, pid, s, r.monitor, r.events,
proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup))
if err != nil {
log.G(ctx).WithError(err).Error("loading task type")
continue
}
o = append(o, t)
}
return o, nil
}
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int) error {
ctx = namespaces.WithNamespace(ctx, ns)
if err := r.terminate(ctx, bundle, ns, id); err != nil {
if r.config.ShimDebug {
return errors.Wrap(err, "failed to terminate task, leaving bundle for debugging")
}
log.G(ctx).WithError(err).Warn("failed to terminate task")
}
// Notify Client
exitedAt := time.Now().UTC()
r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
ContainerID: id,
ID: id,
Pid: uint32(pid),
ExitStatus: 128 + uint32(unix.SIGKILL),
ExitedAt: exitedAt,
})
r.tasks.Delete(ctx, id)
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("delete bundle")
}
r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: id,
Pid: uint32(pid),
ExitStatus: 128 + uint32(unix.SIGKILL),
ExitedAt: exitedAt,
})
return nil
}
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
rt, err := r.getRuntime(ctx, ns, id)
if err != nil {
return err
}
if err := rt.Delete(ctx, id, &runc.DeleteOpts{
Force: true,
}); err != nil {
log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
}
if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"path": bundle.path,
"id": id,
}).Warnf("unmount task rootfs")
}
return nil
}
func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) {
ropts, err := r.getRuncOptions(ctx, id)
if err != nil {
return nil, err
}
var (
cmd = r.config.Runtime
root = proc.RuncRoot
)
if ropts != nil {
if ropts.Runtime != "" {
cmd = ropts.Runtime
}
if ropts.RuntimeRoot != "" {
root = ropts.RuntimeRoot
}
}
return &runc.Runc{
Command: cmd,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(root, ns),
Debug: r.config.ShimDebug,
}, nil
}
func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
var container containers.Container
if err := r.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, err
}
if container.Runtime.Options != nil {
v, err := typeurl.UnmarshalAny(container.Runtime.Options)
if err != nil {
return nil, err
}
ropts, ok := v.(*runctypes.RuncOptions)
if !ok {
return nil, errors.New("invalid runtime options format")
}
return ropts, nil
}
return &runctypes.RuncOptions{}, nil
}

View File

@@ -0,0 +1,278 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"io"
"net"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
"golang.org/x/sys/unix"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stevvooe/ttrpc"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/runtime/linux/shim"
shimapi "github.com/containerd/containerd/runtime/linux/shim/v1"
"github.com/containerd/containerd/sys"
ptypes "github.com/gogo/protobuf/types"
)
var empty = &ptypes.Empty{}
// Opt is an option for a shim client configuration
type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
socket, err := newSocket(address)
if err != nil {
return nil, nil, err
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address)
}
defer f.Close()
cmd, err := newCommand(binary, daemonAddress, debug, config, f)
if err != nil {
return nil, nil, err
}
if err := cmd.Start(); err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
go func() {
cmd.Wait()
exitHandler()
}()
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
"address": address,
"debug": debug,
}).Infof("shim %s started", binary)
// set shim in cgroup if it is provided
if cgroup != "" {
if err := setCgroup(cgroup, cmd); err != nil {
return nil, nil, err
}
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
"address": address,
}).Infof("shim placed in cgroup %s", cgroup)
}
if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
return nil, nil, errors.Wrap(err, "failed to set OOM Score on shim")
}
c, clo, err := WithConnect(address, func() {})(ctx, config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to connect")
}
return c, clo, nil
}
}
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) (*exec.Cmd, error) {
selfExe, err := os.Executable()
if err != nil {
return nil, err
}
args := []string{
"-namespace", config.Namespace,
"-workdir", config.WorkDir,
"-address", daemonAddress,
"-containerd-binary", selfExe,
}
if config.Criu != "" {
args = append(args, "-criu-path", config.Criu)
}
if config.RuntimeRoot != "" {
args = append(args, "-runtime-root", config.RuntimeRoot)
}
if config.SystemdCgroup {
args = append(args, "-systemd-cgroup")
}
if debug {
args = append(args, "-debug")
}
cmd := exec.Command(binary, args...)
cmd.Dir = config.Path
// make sure the shim can be re-parented to system init
// and is cloned in a new mount namespace because the overlay/filesystems
// will be mounted by the shim
cmd.SysProcAttr = getSysProcAttr()
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
if debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd, nil
}
func newSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
l, err := net.Listen("unix", "\x00"+address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", address)
}
return l.(*net.UnixListener), nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
// WithConnect connects to an existing shim
func WithConnect(address string, onClose func()) Opt {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
conn, err := connect(address, annonDialer)
if err != nil {
return nil, nil, err
}
client := ttrpc.NewClient(conn)
client.OnClose(onClose)
return shimapi.NewShimClient(client), conn, nil
}
}
// WithLocal uses an in process shim
func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
service, err := shim.NewService(config, publisher)
if err != nil {
return nil, nil, err
}
return shim.NewLocal(service), nil, nil
}
}
// New returns a new shim client
func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
s, c, err := opt(ctx, config)
if err != nil {
return nil, err
}
return &Client{
ShimService: s,
c: c,
exitCh: make(chan struct{}),
}, nil
}
// Client is a shim client containing the connection to a shim
type Client struct {
shimapi.ShimService
c io.Closer
exitCh chan struct{}
exitOnce sync.Once
}
// IsAlive returns true if the shim can be contacted.
// NOTE: a negative answer doesn't mean that the process is gone.
func (c *Client) IsAlive(ctx context.Context) (bool, error) {
_, err := c.ShimInfo(ctx, empty)
if err != nil {
// TODO(stevvooe): There are some error conditions that need to be
// handle with unix sockets existence to give the right answer here.
return false, err
}
return true, nil
}
// StopShim signals the shim to exit and wait for the process to disappear
func (c *Client) StopShim(ctx context.Context) error {
return c.signalShim(ctx, unix.SIGTERM)
}
// KillShim kills the shim forcefully and wait for the process to disappear
func (c *Client) KillShim(ctx context.Context) error {
return c.signalShim(ctx, unix.SIGKILL)
}
// Close the cient connection
func (c *Client) Close() error {
if c.c == nil {
return nil
}
return c.c.Close()
}
func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error {
info, err := c.ShimInfo(ctx, empty)
if err != nil {
return err
}
pid := int(info.ShimPid)
// make sure we don't kill ourselves if we are running a local shim
if os.Getpid() == pid {
return nil
}
if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH {
return err
}
// wait for shim to die after being signaled
select {
case <-ctx.Done():
return ctx.Err()
case <-c.waitForExit(pid):
return nil
}
}
func (c *Client) waitForExit(pid int) <-chan struct{} {
c.exitOnce.Do(func() {
for {
// use kill(pid, 0) here because the shim could have been reparented
// and we are no longer able to waitpid(pid, ...) on the shim
if err := unix.Kill(pid, 0); err == unix.ESRCH {
close(c.exitCh)
return
}
time.Sleep(10 * time.Millisecond)
}
})
return c.exitCh
}

View File

@@ -0,0 +1,46 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"os/exec"
"syscall"
"github.com/containerd/cgroups"
"github.com/pkg/errors"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
func setCgroup(cgroupPath string, cmd *exec.Cmd) error {
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath))
if err != nil {
return errors.Wrapf(err, "failed to load cgroup %s", cgroupPath)
}
if err := cg.Add(cgroups.Process{
Pid: cmd.Process.Pid,
}); err != nil {
return errors.Wrapf(err, "failed to join cgroup %s", cgroupPath)
}
return nil
}

View File

@@ -0,0 +1,34 @@
// +build !linux,!windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"os/exec"
"syscall"
)
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
}
func setCgroup(cgroupPath string, cmd *exec.Cmd) error {
return nil
}

107
runtime/linux/shim/local.go Normal file
View File

@@ -0,0 +1,107 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"path/filepath"
"github.com/containerd/containerd/mount"
shimapi "github.com/containerd/containerd/runtime/linux/shim/v1"
ptypes "github.com/gogo/protobuf/types"
)
// NewLocal returns a shim client implementation for issue commands to a shim
func NewLocal(s *Service) shimapi.ShimService {
return &local{
s: s,
}
}
type local struct {
s *Service
}
func (c *local) Create(ctx context.Context, in *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
return c.s.Create(ctx, in)
}
func (c *local) Start(ctx context.Context, in *shimapi.StartRequest) (*shimapi.StartResponse, error) {
return c.s.Start(ctx, in)
}
func (c *local) Delete(ctx context.Context, in *ptypes.Empty) (*shimapi.DeleteResponse, error) {
// make sure we unmount the containers rootfs for this local
if err := mount.Unmount(filepath.Join(c.s.config.Path, "rootfs"), 0); err != nil {
return nil, err
}
return c.s.Delete(ctx, in)
}
func (c *local) DeleteProcess(ctx context.Context, in *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
return c.s.DeleteProcess(ctx, in)
}
func (c *local) Exec(ctx context.Context, in *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
return c.s.Exec(ctx, in)
}
func (c *local) ResizePty(ctx context.Context, in *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
return c.s.ResizePty(ctx, in)
}
func (c *local) State(ctx context.Context, in *shimapi.StateRequest) (*shimapi.StateResponse, error) {
return c.s.State(ctx, in)
}
func (c *local) Pause(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) {
return c.s.Pause(ctx, in)
}
func (c *local) Resume(ctx context.Context, in *ptypes.Empty) (*ptypes.Empty, error) {
return c.s.Resume(ctx, in)
}
func (c *local) Kill(ctx context.Context, in *shimapi.KillRequest) (*ptypes.Empty, error) {
return c.s.Kill(ctx, in)
}
func (c *local) ListPids(ctx context.Context, in *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
return c.s.ListPids(ctx, in)
}
func (c *local) CloseIO(ctx context.Context, in *shimapi.CloseIORequest) (*ptypes.Empty, error) {
return c.s.CloseIO(ctx, in)
}
func (c *local) Checkpoint(ctx context.Context, in *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
return c.s.Checkpoint(ctx, in)
}
func (c *local) ShimInfo(ctx context.Context, in *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
return c.s.ShimInfo(ctx, in)
}
func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
return c.s.Update(ctx, in)
}
func (c *local) Wait(ctx context.Context, in *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
return c.s.Wait(ctx, in)
}

View File

@@ -0,0 +1,110 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"os/exec"
"sync"
"time"
"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 32
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
}
Default.Unlock()
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]struct{}
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
return ec, nil
}
// Wait blocks until a process is signal as dead.
// User should rely on the value of the exit status to determine if the
// command was successful or not.
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}
// Subscribe to process exit changes
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = struct{}{}
m.Unlock()
return c
}
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
delete(m.subscribers, c)
close(c)
m.Unlock()
}

View File

@@ -0,0 +1,546 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"fmt"
"os"
"sync"
"github.com/containerd/console"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/proc"
"github.com/containerd/containerd/runtime/linux/runctypes"
shimapi "github.com/containerd/containerd/runtime/linux/shim/v1"
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
empty = &ptypes.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
return &buffer
},
}
)
// Config contains shim specific configuration
type Config struct {
Path string
Namespace string
WorkDir string
Criu string
RuntimeRoot string
SystemdCgroup bool
}
// NewService returns a new shim service that can be used via GRPC
func NewService(config Config, publisher events.Publisher) (*Service, error) {
if config.Namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), config.Namespace)
ctx = log.WithLogger(ctx, logrus.WithFields(logrus.Fields{
"namespace": config.Namespace,
"path": config.Path,
"pid": os.Getpid(),
}))
s := &Service{
config: config,
context: ctx,
processes: make(map[string]proc.Process),
events: make(chan interface{}, 128),
ec: Default.Subscribe(),
}
go s.processExits()
if err := s.initPlatform(); err != nil {
return nil, errors.Wrap(err, "failed to initialized platform behavior")
}
go s.forward(publisher)
return s, nil
}
// Service is the shim implementation of a remote shim over GRPC
type Service struct {
mu sync.Mutex
config Config
context context.Context
processes map[string]proc.Process
events chan interface{}
platform proc.Platform
ec chan runc.Exit
// Filled by Create()
id string
bundle string
}
// Create a new initial process and container with the underlying OCI runtime
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
var mounts []proc.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, proc.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
Options: m.Options,
})
}
process, err := proc.New(
ctx,
s.config.Path,
s.config.WorkDir,
s.config.RuntimeRoot,
s.config.Namespace,
s.config.Criu,
s.config.SystemdCgroup,
s.platform,
&proc.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: r.Runtime,
Rootfs: mounts,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Checkpoint: r.Checkpoint,
ParentCheckpoint: r.ParentCheckpoint,
Options: r.Options,
},
)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
pid := process.Pid()
s.processes[r.ID] = process
return &shimapi.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
// Start a process
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", r.ID)
}
if err := p.Start(ctx); err != nil {
return nil, err
}
return &shimapi.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
// Delete the initial process and container
func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
delete(s.processes, s.id)
s.platform.Close()
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}, nil
}
// DeleteProcess deletes an exec'd process
func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == s.id {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
p := s.processes[r.ID]
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID)
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
delete(s.processes, r.ID)
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
Pid: uint32(p.Pid()),
}, nil
}
// Exec an additional process inside the container
func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if p := s.processes[r.ID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
}
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{
ID: r.ID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
Spec: r.Spec,
})
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.processes[r.ID] = process
return empty, nil
}
// ResizePty of a process
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
p := s.processes[r.ID]
if p == nil {
return nil, errors.Errorf("process does not exist %s", r.ID)
}
if err := p.Resize(ws); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// State returns runtime state information for a process
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s", r.ID)
}
st, err := p.Status(ctx)
if err != nil {
return nil, err
}
status := task.StatusUnknown
switch st {
case "created":
status = task.StatusCreated
case "running":
status = task.StatusRunning
case "stopped":
status = task.StatusStopped
case "paused":
status = task.StatusPaused
case "pausing":
status = task.StatusPausing
}
sio := p.Stdio()
return &shimapi.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.Stdin,
Stdout: sio.Stdout,
Stderr: sio.Stderr,
Terminal: sio.Terminal,
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
// Pause the container
func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Pause(ctx); err != nil {
return nil, err
}
return empty, nil
}
// Resume the container
func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Resume(ctx); err != nil {
return nil, err
}
return empty, nil
}
// Kill a process with the provided signal
func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// ListPids returns all pids inside the container
func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*shimapi.ListPidsResponse, error) {
pids, err := s.getContainerPids(ctx, r.ID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
var processes []*task.ProcessInfo
for _, pid := range pids {
pInfo := task.ProcessInfo{
Pid: pid,
}
for _, p := range s.processes {
if p.Pid() == int(pid) {
d := &runctypes.ProcessDetails{
ExecID: p.ID(),
}
a, err := typeurl.MarshalAny(d)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal process %d info", pid)
}
pInfo.Info = a
break
}
}
processes = append(processes, &pInfo)
}
return &shimapi.ListPidsResponse{
Processes: processes,
}, nil
}
// CloseIO of a process
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID)
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
}
return empty, nil
}
// Checkpoint the container
func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
Path: r.Path,
Options: r.Options,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// ShimInfo returns shim information such as the shim's pid
func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimInfoResponse, error) {
return &shimapi.ShimInfoResponse{
ShimPid: uint32(os.Getpid()),
}, nil
}
// Update a running container
func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
}
// Wait for a process to exit
func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
s.mu.Lock()
p := s.processes[r.ID]
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
p.Wait()
return &shimapi.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
func (s *Service) processExits() {
for e := range s.ec {
s.checkProcesses(e)
}
}
func (s *Service) checkProcesses(e runc.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
if p.Pid() == e.Pid {
if ip, ok := p.(*proc.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
p.SetExited(e.Status)
s.events <- &eventstypes.TaskExit{
ContainerID: s.id,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: p.ExitedAt(),
}
return
}
}
}
func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(ps))
for _, pid := range ps {
pids = append(pids, uint32(pid))
}
return pids, nil
}
func (s *Service) forward(publisher events.Publisher) {
for e := range s.events {
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
log.G(s.context).WithError(err).Error("post event")
}
}
}
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:
return runtime.TaskCreateEventTopic
case *eventstypes.TaskStart:
return runtime.TaskStartEventTopic
case *eventstypes.TaskOOM:
return runtime.TaskOOMEventTopic
case *eventstypes.TaskExit:
return runtime.TaskExitEventTopic
case *eventstypes.TaskDelete:
return runtime.TaskDeleteEventTopic
case *eventstypes.TaskExecAdded:
return runtime.TaskExecAddedEventTopic
case *eventstypes.TaskExecStarted:
return runtime.TaskExecStartedEventTopic
case *eventstypes.TaskPaused:
return runtime.TaskPausedEventTopic
case *eventstypes.TaskResumed:
return runtime.TaskResumedEventTopic
case *eventstypes.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
logrus.Warnf("no topic for type %#v", e)
}
return runtime.TaskUnknownTopic
}

View File

@@ -0,0 +1,111 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"io"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
type linuxPlatform struct {
epoller *console.Epoller
}
func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if p.epoller == nil {
return nil, errors.New("uninitialized epoller")
}
epollConsole, err := p.epoller.Add(console)
if err != nil {
return nil, err
}
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(epollConsole, in, *p)
}()
}
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
epollConsole.Close()
outr.Close()
outw.Close()
wg.Done()
}()
return epollConsole, nil
}
func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
if p.epoller == nil {
return errors.New("uninitialized epoller")
}
epollConsole, ok := cons.(*console.EpollConsole)
if !ok {
return errors.Errorf("expected EpollConsole, got %#v", cons)
}
return epollConsole.Shutdown(p.epoller.CloseConsole)
}
func (p *linuxPlatform) Close() error {
return p.epoller.Close()
}
// initialize a single epoll fd to manage our consoles. `initPlatform` should
// only be called once.
func (s *Service) initPlatform() error {
if s.platform != nil {
return nil
}
epoller, err := console.NewEpoller()
if err != nil {
return errors.Wrap(err, "failed to initialize epoller")
}
s.platform = &linuxPlatform{
epoller: epoller,
}
go epoller.Wait()
return nil
}

View File

@@ -0,0 +1,84 @@
// +build !windows,!linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"io"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/fifo"
)
type unixPlatform struct {
}
func (p *unixPlatform) CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, wg, cwg *sync.WaitGroup) (console.Console, error) {
if stdin != "" {
in, err := fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(console, in, *p)
}()
}
outw, err := fifo.OpenFifo(ctx, stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
outr, err := fifo.OpenFifo(ctx, stdout, syscall.O_RDONLY, 0)
if err != nil {
return nil, err
}
wg.Add(1)
cwg.Add(1)
go func() {
cwg.Done()
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, console, *p)
console.Close()
outr.Close()
outw.Close()
wg.Done()
}()
return console, nil
}
func (p *unixPlatform) ShutdownConsole(ctx context.Context, cons console.Console) error {
return nil
}
func (p *unixPlatform) Close() error {
return nil
}
func (s *Service) initPlatform() error {
s.platform = &unixPlatform{}
return nil
}

View File

@@ -0,0 +1,17 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,165 @@
syntax = "proto3";
package containerd.runtime.linux.shim.v1;
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import weak "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "github.com/containerd/containerd/api/types/mount.proto";
import "github.com/containerd/containerd/api/types/task/task.proto";
option go_package = "github.com/containerd/containerd/runtime/linux/shim/v1;shim";
// Shim service is launched for each container and is responsible for owning the IO
// for the container and its additional processes. The shim is also the parent of
// each container and allows reattaching to the IO and receiving the exit status
// for the container processes.
service Shim {
// State returns shim and task state information.
rpc State(StateRequest) returns (StateResponse);
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(google.protobuf.Empty) returns (DeleteResponse);
rpc DeleteProcess(DeleteProcessRequest) returns (DeleteResponse);
rpc ListPids(ListPidsRequest) returns (ListPidsResponse);
rpc Pause(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Resume(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
// ShimInfo returns information about the shim.
rpc ShimInfo(google.protobuf.Empty) returns (ShimInfoResponse);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
}
message CreateTaskRequest {
string id = 1;
string bundle = 2;
string runtime = 3;
repeated containerd.types.Mount rootfs = 4;
bool terminal = 5;
string stdin = 6;
string stdout = 7;
string stderr = 8;
string checkpoint = 9;
string parent_checkpoint = 10;
google.protobuf.Any options = 11;
}
message CreateTaskResponse {
uint32 pid = 1;
}
message DeleteResponse {
uint32 pid = 1;
uint32 exit_status = 2;
google.protobuf.Timestamp exited_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message DeleteProcessRequest {
string id = 1;
}
message ExecProcessRequest {
string id = 1;
bool terminal = 2;
string stdin = 3;
string stdout = 4;
string stderr = 5;
google.protobuf.Any spec = 6;
}
message ExecProcessResponse {
}
message ResizePtyRequest {
string id = 1;
uint32 width = 2;
uint32 height = 3;
}
message StateRequest {
string id = 1;
}
message StateResponse {
string id = 1;
string bundle = 2;
uint32 pid = 3;
containerd.v1.types.Status status = 4;
string stdin = 5;
string stdout = 6;
string stderr = 7;
bool terminal = 8;
uint32 exit_status = 9;
google.protobuf.Timestamp exited_at = 10 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}
message KillRequest {
string id = 1;
uint32 signal = 2;
bool all = 3;
}
message CloseIORequest {
string id = 1;
bool stdin = 2;
}
message ListPidsRequest {
string id = 1;
}
message ListPidsResponse {
repeated containerd.v1.types.ProcessInfo processes = 1;
}
message CheckpointTaskRequest {
string path = 1;
google.protobuf.Any options = 2;
}
message ShimInfoResponse {
uint32 shim_pid = 1;
}
message UpdateTaskRequest {
google.protobuf.Any resources = 1;
}
message StartRequest {
string id = 1;
}
message StartResponse {
string id = 1;
uint32 pid = 2;
}
message WaitRequest {
string id = 1;
}
message WaitResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}

346
runtime/linux/task.go Normal file
View File

@@ -0,0 +1,346 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package linux
import (
"context"
"sync"
"github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/shim/client"
shim "github.com/containerd/containerd/runtime/linux/shim/v1"
runc "github.com/containerd/go-runc"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/stevvooe/ttrpc"
)
// Task on a linux based system
type Task struct {
mu sync.Mutex
id string
pid int
shim *client.Client
namespace string
cg cgroups.Cgroup
monitor runtime.TaskMonitor
events *exchange.Exchange
runtime *runc.Runc
}
func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor, events *exchange.Exchange, runtime *runc.Runc) (*Task, error) {
var (
err error
cg cgroups.Cgroup
)
if pid > 0 {
cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil && err != cgroups.ErrCgroupDeleted {
return nil, err
}
}
return &Task{
id: id,
pid: pid,
shim: shim,
namespace: namespace,
cg: cg,
monitor: monitor,
events: events,
runtime: runtime,
}, nil
}
// ID of the task
func (t *Task) ID() string {
return t.id
}
// Info returns task information about the runtime and namespace
func (t *Task) Info() runtime.TaskInfo {
return runtime.TaskInfo{
ID: t.id,
Runtime: pluginID,
Namespace: t.namespace,
}
}
// Start the task
func (t *Task) Start(ctx context.Context) error {
t.mu.Lock()
hasCgroup := t.cg != nil
t.mu.Unlock()
r, err := t.shim.Start(ctx, &shim.StartRequest{
ID: t.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
t.pid = int(r.Pid)
if !hasCgroup {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
if err != nil {
return err
}
t.mu.Lock()
t.cg = cg
t.mu.Unlock()
if err := t.monitor.Monitor(t); err != nil {
return err
}
}
t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
ContainerID: t.id,
Pid: uint32(t.pid),
})
return nil
}
// State returns runtime information for the task
func (t *Task) State(ctx context.Context) (runtime.State, error) {
response, err := t.shim.State(ctx, &shim.StateRequest{
ID: t.id,
})
if err != nil {
if errors.Cause(err) != ttrpc.ErrClosed {
return runtime.State{}, errdefs.FromGRPC(err)
}
return runtime.State{}, errdefs.ErrNotFound
}
var status runtime.Status
switch response.Status {
case task.StatusCreated:
status = runtime.CreatedStatus
case task.StatusRunning:
status = runtime.RunningStatus
case task.StatusStopped:
status = runtime.StoppedStatus
case task.StatusPaused:
status = runtime.PausedStatus
case task.StatusPausing:
status = runtime.PausingStatus
}
return runtime.State{
Pid: response.Pid,
Status: status,
Stdin: response.Stdin,
Stdout: response.Stdout,
Stderr: response.Stderr,
Terminal: response.Terminal,
ExitStatus: response.ExitStatus,
ExitedAt: response.ExitedAt,
}, nil
}
// Pause the task and all processes
func (t *Task) Pause(ctx context.Context) error {
if _, err := t.shim.Pause(ctx, empty); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
ContainerID: t.id,
})
return nil
}
// Resume the task and all processes
func (t *Task) Resume(ctx context.Context) error {
if _, err := t.shim.Resume(ctx, empty); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
ContainerID: t.id,
})
return nil
}
// Kill the task using the provided signal
//
// Optionally send the signal to all processes that are a child of the task
func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
if _, err := t.shim.Kill(ctx, &shim.KillRequest{
ID: t.id,
Signal: signal,
All: all,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Exec creates a new process inside the task
func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
if err := identifiers.Validate(id); err != nil {
return nil, errors.Wrapf(err, "invalid exec id")
}
request := &shim.ExecProcessRequest{
ID: id,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
Spec: opts.Spec,
}
if _, err := t.shim.Exec(ctx, request); err != nil {
return nil, errdefs.FromGRPC(err)
}
return &Process{
id: id,
t: t,
}, nil
}
// Pids returns all system level process ids running inside the task
func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
ID: t.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
var processList []runtime.ProcessInfo
for _, p := range resp.Processes {
processList = append(processList, runtime.ProcessInfo{
Pid: p.Pid,
Info: p.Info,
})
}
return processList, nil
}
// ResizePty changes the side of the task's PTY to the provided width and height
func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
ID: t.id,
Width: size.Width,
Height: size.Height,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// CloseIO closes the provided IO on the task
func (t *Task) CloseIO(ctx context.Context) error {
_, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
ID: t.id,
Stdin: true,
})
if err != nil {
err = errdefs.FromGRPC(err)
}
return err
}
// Checkpoint creates a system level dump of the task and process information that can be later restored
func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
r := &shim.CheckpointTaskRequest{
Path: path,
Options: options,
}
if _, err := t.shim.Checkpoint(ctx, r); err != nil {
return errdefs.FromGRPC(err)
}
t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
ContainerID: t.id,
})
return nil
}
// DeleteProcess removes the provided process from the task and deletes all on disk state
func (t *Task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) {
r, err := t.shim.DeleteProcess(ctx, &shim.DeleteProcessRequest{
ID: id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return &runtime.Exit{
Status: r.ExitStatus,
Timestamp: r.ExitedAt,
Pid: r.Pid,
}, nil
}
// Update changes runtime information of a running task
func (t *Task) Update(ctx context.Context, resources *types.Any) error {
if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
Resources: resources,
}); err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
// Process returns a specific process inside the task by the process id
func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
p := &Process{
id: id,
t: t,
}
if _, err := p.State(ctx); err != nil {
return nil, err
}
return p, nil
}
// Metrics returns runtime specific system level metric information for the task
func (t *Task) Metrics(ctx context.Context) (interface{}, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.cg == nil {
return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
}
stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
return stats, nil
}
// Cgroup returns the underlying cgroup for a linux task
func (t *Task) Cgroup() (cgroups.Cgroup, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.cg == nil {
return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
}
return t.cg, nil
}
// Wait for the task to exit returning the status and timestamp
func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := t.shim.Wait(ctx, &shim.WaitRequest{
ID: t.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: r.ExitedAt,
Status: r.ExitStatus,
}, nil
}