
- process.Init#io could be nil - Make sure CreateTaskRequest#Options is not empty before unmarshaling Signed-off-by: Kazuyoshi Kato <katokazu@amazon.com>
499 lines
12 KiB
Go
499 lines
12 KiB
Go
// +build !windows
|
|
|
|
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package process
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/containerd/console"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/mount"
|
|
"github.com/containerd/containerd/pkg/stdio"
|
|
"github.com/containerd/fifo"
|
|
runc "github.com/containerd/go-runc"
|
|
google_protobuf "github.com/gogo/protobuf/types"
|
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
// Init represents an initial process for a container
|
|
type Init struct {
|
|
wg sync.WaitGroup
|
|
initState initState
|
|
|
|
// mu is used to ensure that `Start()` and `Exited()` calls return in
|
|
// the right order when invoked in separate go routines.
|
|
// This is the case within the shim implementation as it makes use of
|
|
// the reaper interface.
|
|
mu sync.Mutex
|
|
|
|
waitBlock chan struct{}
|
|
|
|
WorkDir string
|
|
|
|
id string
|
|
Bundle string
|
|
console console.Console
|
|
Platform stdio.Platform
|
|
io *processIO
|
|
runtime *runc.Runc
|
|
// pausing preserves the pausing state.
|
|
pausing *atomicBool
|
|
status int
|
|
exited time.Time
|
|
pid int
|
|
closers []io.Closer
|
|
stdin io.Closer
|
|
stdio stdio.Stdio
|
|
Rootfs string
|
|
IoUID int
|
|
IoGID int
|
|
NoPivotRoot bool
|
|
NoNewKeyring bool
|
|
CriuWorkPath string
|
|
}
|
|
|
|
// NewRunc returns a new runc instance for a process
|
|
func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Runc {
|
|
if root == "" {
|
|
root = RuncRoot
|
|
}
|
|
return &runc.Runc{
|
|
Command: runtime,
|
|
Log: filepath.Join(path, "log.json"),
|
|
LogFormat: runc.JSON,
|
|
PdeathSignal: unix.SIGKILL,
|
|
Root: filepath.Join(root, namespace),
|
|
Criu: criu,
|
|
SystemdCgroup: systemd,
|
|
}
|
|
}
|
|
|
|
// New returns a new process
|
|
func New(id string, runtime *runc.Runc, stdio stdio.Stdio) *Init {
|
|
p := &Init{
|
|
id: id,
|
|
runtime: runtime,
|
|
pausing: new(atomicBool),
|
|
stdio: stdio,
|
|
status: 0,
|
|
waitBlock: make(chan struct{}),
|
|
}
|
|
p.initState = &createdState{p: p}
|
|
return p
|
|
}
|
|
|
|
// Create the process with the provided config
|
|
func (p *Init) Create(ctx context.Context, r *CreateConfig) error {
|
|
var (
|
|
err error
|
|
socket *runc.Socket
|
|
pio *processIO
|
|
pidFile = newPidFile(p.Bundle)
|
|
)
|
|
|
|
if r.Terminal {
|
|
if socket, err = runc.NewTempConsoleSocket(); err != nil {
|
|
return errors.Wrap(err, "failed to create OCI runtime console socket")
|
|
}
|
|
defer socket.Close()
|
|
} else {
|
|
if pio, err = createIO(ctx, p.id, p.IoUID, p.IoGID, p.stdio); err != nil {
|
|
return errors.Wrap(err, "failed to create init process I/O")
|
|
}
|
|
p.io = pio
|
|
}
|
|
if r.Checkpoint != "" {
|
|
return p.createCheckpointedState(r, pidFile)
|
|
}
|
|
opts := &runc.CreateOpts{
|
|
PidFile: pidFile.Path(),
|
|
NoPivot: p.NoPivotRoot,
|
|
NoNewKeyring: p.NoNewKeyring,
|
|
}
|
|
if p.io != nil {
|
|
opts.IO = p.io.IO()
|
|
}
|
|
if socket != nil {
|
|
opts.ConsoleSocket = socket
|
|
}
|
|
if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {
|
|
return p.runtimeError(err, "OCI runtime create failed")
|
|
}
|
|
if r.Stdin != "" {
|
|
if err := p.openStdin(r.Stdin); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
if socket != nil {
|
|
console, err := socket.ReceiveMaster()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to retrieve console master")
|
|
}
|
|
console, err = p.Platform.CopyConsole(ctx, console, p.id, r.Stdin, r.Stdout, r.Stderr, &p.wg)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to start console copy")
|
|
}
|
|
p.console = console
|
|
} else {
|
|
if err := pio.Copy(ctx, &p.wg); err != nil {
|
|
return errors.Wrap(err, "failed to start io pipe copy")
|
|
}
|
|
}
|
|
pid, err := pidFile.Read()
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
|
|
}
|
|
p.pid = pid
|
|
return nil
|
|
}
|
|
|
|
func (p *Init) openStdin(path string) error {
|
|
sc, err := fifo.OpenFifo(context.Background(), path, unix.O_WRONLY|unix.O_NONBLOCK, 0)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
|
|
}
|
|
p.stdin = sc
|
|
p.closers = append(p.closers, sc)
|
|
return nil
|
|
}
|
|
|
|
func (p *Init) createCheckpointedState(r *CreateConfig, pidFile *pidFile) error {
|
|
opts := &runc.RestoreOpts{
|
|
CheckpointOpts: runc.CheckpointOpts{
|
|
ImagePath: r.Checkpoint,
|
|
WorkDir: p.CriuWorkPath,
|
|
ParentPath: r.ParentCheckpoint,
|
|
},
|
|
PidFile: pidFile.Path(),
|
|
NoPivot: p.NoPivotRoot,
|
|
Detach: true,
|
|
NoSubreaper: true,
|
|
}
|
|
|
|
if p.io != nil {
|
|
opts.IO = p.io.IO()
|
|
}
|
|
|
|
p.initState = &createdCheckpointState{
|
|
p: p,
|
|
opts: opts,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Wait for the process to exit
|
|
func (p *Init) Wait() {
|
|
<-p.waitBlock
|
|
}
|
|
|
|
// ID of the process
|
|
func (p *Init) ID() string {
|
|
return p.id
|
|
}
|
|
|
|
// Pid of the process
|
|
func (p *Init) Pid() int {
|
|
return p.pid
|
|
}
|
|
|
|
// ExitStatus of the process
|
|
func (p *Init) ExitStatus() int {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.status
|
|
}
|
|
|
|
// ExitedAt at time when the process exited
|
|
func (p *Init) ExitedAt() time.Time {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.exited
|
|
}
|
|
|
|
// Status of the process
|
|
func (p *Init) Status(ctx context.Context) (string, error) {
|
|
if p.pausing.get() {
|
|
return "pausing", nil
|
|
}
|
|
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Status(ctx)
|
|
}
|
|
|
|
// Start the init process
|
|
func (p *Init) Start(ctx context.Context) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Start(ctx)
|
|
}
|
|
|
|
func (p *Init) start(ctx context.Context) error {
|
|
err := p.runtime.Start(ctx, p.id)
|
|
return p.runtimeError(err, "OCI runtime start failed")
|
|
}
|
|
|
|
// SetExited of the init process with the next status
|
|
func (p *Init) SetExited(status int) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
p.initState.SetExited(status)
|
|
}
|
|
|
|
func (p *Init) setExited(status int) {
|
|
p.exited = time.Now()
|
|
p.status = status
|
|
p.Platform.ShutdownConsole(context.Background(), p.console)
|
|
close(p.waitBlock)
|
|
}
|
|
|
|
// Delete the init process
|
|
func (p *Init) Delete(ctx context.Context) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Delete(ctx)
|
|
}
|
|
|
|
func (p *Init) delete(ctx context.Context) error {
|
|
waitTimeout(ctx, &p.wg, 2*time.Second)
|
|
err := p.runtime.Delete(ctx, p.id, nil)
|
|
// ignore errors if a runtime has already deleted the process
|
|
// but we still hold metadata and pipes
|
|
//
|
|
// this is common during a checkpoint, runc will delete the container state
|
|
// after a checkpoint and the container will no longer exist within runc
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "does not exist") {
|
|
err = nil
|
|
} else {
|
|
err = p.runtimeError(err, "failed to delete task")
|
|
}
|
|
}
|
|
if p.io != nil {
|
|
for _, c := range p.closers {
|
|
c.Close()
|
|
}
|
|
p.io.Close()
|
|
}
|
|
if err2 := mount.UnmountAll(p.Rootfs, 0); err2 != nil {
|
|
log.G(ctx).WithError(err2).Warn("failed to cleanup rootfs mount")
|
|
if err == nil {
|
|
err = errors.Wrap(err2, "failed rootfs umount")
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Resize the init processes console
|
|
func (p *Init) Resize(ws console.WinSize) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.console == nil {
|
|
return nil
|
|
}
|
|
return p.console.Resize(ws)
|
|
}
|
|
|
|
// Pause the init process and all its child processes
|
|
func (p *Init) Pause(ctx context.Context) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Pause(ctx)
|
|
}
|
|
|
|
// Resume the init process and all its child processes
|
|
func (p *Init) Resume(ctx context.Context) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Resume(ctx)
|
|
}
|
|
|
|
// Kill the init process
|
|
func (p *Init) Kill(ctx context.Context, signal uint32, all bool) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Kill(ctx, signal, all)
|
|
}
|
|
|
|
func (p *Init) kill(ctx context.Context, signal uint32, all bool) error {
|
|
err := p.runtime.Kill(ctx, p.id, int(signal), &runc.KillOpts{
|
|
All: all,
|
|
})
|
|
return checkKillError(err)
|
|
}
|
|
|
|
// KillAll processes belonging to the init process
|
|
func (p *Init) KillAll(ctx context.Context) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
err := p.runtime.Kill(ctx, p.id, int(unix.SIGKILL), &runc.KillOpts{
|
|
All: true,
|
|
})
|
|
return p.runtimeError(err, "OCI runtime killall failed")
|
|
}
|
|
|
|
// Stdin of the process
|
|
func (p *Init) Stdin() io.Closer {
|
|
return p.stdin
|
|
}
|
|
|
|
// Runtime returns the OCI runtime configured for the init process
|
|
func (p *Init) Runtime() *runc.Runc {
|
|
return p.runtime
|
|
}
|
|
|
|
// Exec returns a new child process
|
|
func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Exec(ctx, path, r)
|
|
}
|
|
|
|
// exec returns a new exec'd process
|
|
func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (Process, error) {
|
|
// process exec request
|
|
var spec specs.Process
|
|
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
|
|
return nil, err
|
|
}
|
|
spec.Terminal = r.Terminal
|
|
|
|
e := &execProcess{
|
|
id: r.ID,
|
|
path: path,
|
|
parent: p,
|
|
spec: spec,
|
|
stdio: stdio.Stdio{
|
|
Stdin: r.Stdin,
|
|
Stdout: r.Stdout,
|
|
Stderr: r.Stderr,
|
|
Terminal: r.Terminal,
|
|
},
|
|
waitBlock: make(chan struct{}),
|
|
}
|
|
e.execState = &execCreatedState{p: e}
|
|
return e, nil
|
|
}
|
|
|
|
// Checkpoint the init process
|
|
func (p *Init) Checkpoint(ctx context.Context, r *CheckpointConfig) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Checkpoint(ctx, r)
|
|
}
|
|
|
|
func (p *Init) checkpoint(ctx context.Context, r *CheckpointConfig) error {
|
|
var actions []runc.CheckpointAction
|
|
if !r.Exit {
|
|
actions = append(actions, runc.LeaveRunning)
|
|
}
|
|
// keep criu work directory if criu work dir is set
|
|
work := r.WorkDir
|
|
if work == "" {
|
|
work = filepath.Join(p.WorkDir, "criu-work")
|
|
defer os.RemoveAll(work)
|
|
}
|
|
if err := p.runtime.Checkpoint(ctx, p.id, &runc.CheckpointOpts{
|
|
WorkDir: work,
|
|
ImagePath: r.Path,
|
|
AllowOpenTCP: r.AllowOpenTCP,
|
|
AllowExternalUnixSockets: r.AllowExternalUnixSockets,
|
|
AllowTerminal: r.AllowTerminal,
|
|
FileLocks: r.FileLocks,
|
|
EmptyNamespaces: r.EmptyNamespaces,
|
|
}, actions...); err != nil {
|
|
dumpLog := filepath.Join(p.Bundle, "criu-dump.log")
|
|
if cerr := copyFile(dumpLog, filepath.Join(work, "dump.log")); cerr != nil {
|
|
log.G(ctx).Error(err)
|
|
}
|
|
return fmt.Errorf("%s path= %s", criuError(err), dumpLog)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Update the processes resource configuration
|
|
func (p *Init) Update(ctx context.Context, r *google_protobuf.Any) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
return p.initState.Update(ctx, r)
|
|
}
|
|
|
|
func (p *Init) update(ctx context.Context, r *google_protobuf.Any) error {
|
|
var resources specs.LinuxResources
|
|
if err := json.Unmarshal(r.Value, &resources); err != nil {
|
|
return err
|
|
}
|
|
return p.runtime.Update(ctx, p.id, &resources)
|
|
}
|
|
|
|
// Stdio of the process
|
|
func (p *Init) Stdio() stdio.Stdio {
|
|
return p.stdio
|
|
}
|
|
|
|
func (p *Init) runtimeError(rErr error, msg string) error {
|
|
if rErr == nil {
|
|
return nil
|
|
}
|
|
|
|
rMsg, err := getLastRuntimeError(p.runtime)
|
|
switch {
|
|
case err != nil:
|
|
return errors.Wrapf(rErr, "%s: %s (%s)", msg, "unable to retrieve OCI runtime error", err.Error())
|
|
case rMsg == "":
|
|
return errors.Wrap(rErr, msg)
|
|
default:
|
|
return errors.Errorf("%s: %s", msg, rMsg)
|
|
}
|
|
}
|
|
|
|
func withConditionalIO(c stdio.Stdio) runc.IOOpt {
|
|
return func(o *runc.IOOption) {
|
|
o.OpenStdin = c.Stdin != ""
|
|
o.OpenStdout = c.Stdout != ""
|
|
o.OpenStderr = c.Stderr != ""
|
|
}
|
|
}
|