From 6601b406b79d9d166ffcaea1474171a0664f3210 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 2 Jul 2019 15:51:08 -0400 Subject: [PATCH] Refactor runtime code for code sharing Signed-off-by: Michael Crosby --- cmd/containerd-shim/main_unix.go | 4 +- {runtime/v2/runc => pkg/oom}/epoll.go | 6 +- .../proc => pkg/process}/deleted_state.go | 5 +- .../v1/linux/proc => pkg/process}/exec.go | 8 +-- .../linux/proc => pkg/process}/exec_state.go | 2 +- .../v1/linux/proc => pkg/process}/init.go | 20 +++---- .../linux/proc => pkg/process}/init_state.go | 15 +++-- {runtime/v1/linux/proc => pkg/process}/io.go | 8 +-- .../proc/proc.go => pkg/process/process.go | 28 +-------- .../v1/linux/proc => pkg/process}/types.go | 2 +- .../v1/linux/proc => pkg/process}/utils.go | 30 ++++++++-- pkg/stdio/platform.go | 33 +++++++++++ pkg/stdio/stdio.go | 30 ++++++++++ runtime/v1/linux/proc/process.go | 46 --------------- runtime/v1/linux/runtime.go | 10 ++-- runtime/v1/shim/service.go | 44 +++++++------- runtime/v2/runc/container.go | 58 +++++++++---------- runtime/v2/runc/platform.go | 4 +- runtime/v2/runc/v1/service.go | 19 +++--- runtime/v2/runc/v2/service.go | 17 +++--- 20 files changed, 203 insertions(+), 186 deletions(-) rename {runtime/v2/runc => pkg/oom}/epoll.go (94%) rename {runtime/v1/linux/proc => pkg/process}/deleted_state.go (95%) rename {runtime/v1/linux/proc => pkg/process}/exec.go (98%) rename {runtime/v1/linux/proc => pkg/process}/exec_state.go (99%) rename {runtime/v1/linux/proc => pkg/process}/init.go (97%) rename {runtime/v1/linux/proc => pkg/process}/init_state.go (96%) rename {runtime/v1/linux/proc => pkg/process}/io.go (98%) rename runtime/proc/proc.go => pkg/process/process.go (70%) rename {runtime/v1/linux/proc => pkg/process}/types.go (99%) rename {runtime/v1/linux/proc => pkg/process}/utils.go (83%) create mode 100644 pkg/stdio/platform.go create mode 100644 pkg/stdio/stdio.go delete mode 100644 runtime/v1/linux/proc/process.go diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index cf1875eb9..c8af6b968 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -37,8 +37,8 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" shimlog "github.com/containerd/containerd/runtime/v1" - "github.com/containerd/containerd/runtime/v1/linux/proc" "github.com/containerd/containerd/runtime/v1/shim" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" "github.com/containerd/ttrpc" @@ -73,7 +73,7 @@ func init() { flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data") - flag.StringVar(&runtimeRootFlag, "runtime-root", proc.RuncRoot, "root directory for the runtime") + flag.StringVar(&runtimeRootFlag, "runtime-root", process.RuncRoot, "root directory for the runtime") flag.StringVar(&criuFlag, "criu", "", "path to criu binary") flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup") // currently, the `containerd publish` utility is embedded in the daemon binary. diff --git a/runtime/v2/runc/epoll.go b/pkg/oom/epoll.go similarity index 94% rename from runtime/v2/runc/epoll.go rename to pkg/oom/epoll.go index 9ff19876a..667560332 100644 --- a/runtime/v2/runc/epoll.go +++ b/pkg/oom/epoll.go @@ -16,7 +16,7 @@ limitations under the License. */ -package runc +package oom import ( "context" @@ -30,9 +30,9 @@ import ( "golang.org/x/sys/unix" ) -// NewOOMEpoller returns an epoll implementation that listens to OOM events +// New returns an epoll implementation that listens to OOM events // from a container's cgroups. -func NewOOMEpoller(publisher shim.Publisher) (*Epoller, error) { +func New(publisher shim.Publisher) (*Epoller, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, err diff --git a/runtime/v1/linux/proc/deleted_state.go b/pkg/process/deleted_state.go similarity index 95% rename from runtime/v1/linux/proc/deleted_state.go rename to pkg/process/deleted_state.go index fe9d7bf55..95ad138e0 100644 --- a/runtime/v1/linux/proc/deleted_state.go +++ b/pkg/process/deleted_state.go @@ -16,14 +16,13 @@ limitations under the License. */ -package proc +package process import ( "context" "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" ) @@ -67,6 +66,6 @@ func (s *deletedState) SetExited(status int) { // no op } -func (s *deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *deletedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return nil, errors.Errorf("cannot exec in a deleted state") } diff --git a/runtime/v1/linux/proc/exec.go b/pkg/process/exec.go similarity index 98% rename from runtime/v1/linux/proc/exec.go rename to pkg/process/exec.go index 91dd2cc10..4175dcd5a 100644 --- a/runtime/v1/linux/proc/exec.go +++ b/pkg/process/exec.go @@ -16,7 +16,7 @@ limitations under the License. */ -package proc +package process import ( "context" @@ -32,7 +32,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" specs "github.com/opencontainers/runtime-spec/specs-go" @@ -53,7 +53,7 @@ type execProcess struct { pid safePid closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio path string spec specs.Process @@ -161,7 +161,7 @@ func (e *execProcess) Stdin() io.Closer { return e.stdin } -func (e *execProcess) Stdio() proc.Stdio { +func (e *execProcess) Stdio() stdio.Stdio { return e.stdio } diff --git a/runtime/v1/linux/proc/exec_state.go b/pkg/process/exec_state.go similarity index 99% rename from runtime/v1/linux/proc/exec_state.go rename to pkg/process/exec_state.go index 12489501b..a8b44bb8b 100644 --- a/runtime/v1/linux/proc/exec_state.go +++ b/pkg/process/exec_state.go @@ -16,7 +16,7 @@ limitations under the License. */ -package proc +package process import ( "context" diff --git a/runtime/v1/linux/proc/init.go b/pkg/process/init.go similarity index 97% rename from runtime/v1/linux/proc/init.go rename to pkg/process/init.go index c02e8cbda..7861bdd8b 100644 --- a/runtime/v1/linux/proc/init.go +++ b/pkg/process/init.go @@ -16,7 +16,7 @@ limitations under the License. */ -package proc +package process import ( "context" @@ -33,7 +33,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" google_protobuf "github.com/gogo/protobuf/types" @@ -59,7 +59,7 @@ type Init struct { id string Bundle string console console.Console - Platform proc.Platform + Platform stdio.Platform io *processIO runtime *runc.Runc status int @@ -67,7 +67,7 @@ type Init struct { pid safePid closers []io.Closer stdin io.Closer - stdio proc.Stdio + stdio stdio.Stdio Rootfs string IoUID int IoGID int @@ -93,7 +93,7 @@ func NewRunc(root, path, namespace, runtime, criu string, systemd bool) *runc.Ru } // New returns a new process -func New(id string, runtime *runc.Runc, stdio proc.Stdio) *Init { +func New(id string, runtime *runc.Runc, stdio stdio.Stdio) *Init { p := &Init{ id: id, runtime: runtime, @@ -381,7 +381,7 @@ func (p *Init) Runtime() *runc.Runc { } // Exec returns a new child process -func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { p.mu.Lock() defer p.mu.Unlock() @@ -389,7 +389,7 @@ func (p *Init) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce } // exec returns a new exec'd process -func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { // process exec request var spec specs.Process if err := json.Unmarshal(r.Spec.Value, &spec); err != nil { @@ -402,7 +402,7 @@ func (p *Init) exec(ctx context.Context, path string, r *ExecConfig) (proc.Proce path: path, parent: p, spec: spec, - stdio: proc.Stdio{ + stdio: stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -468,7 +468,7 @@ func (p *Init) update(ctx context.Context, r *google_protobuf.Any) error { } // Stdio of the process -func (p *Init) Stdio() proc.Stdio { +func (p *Init) Stdio() stdio.Stdio { return p.stdio } @@ -488,7 +488,7 @@ func (p *Init) runtimeError(rErr error, msg string) error { } } -func withConditionalIO(c proc.Stdio) runc.IOOpt { +func withConditionalIO(c stdio.Stdio) runc.IOOpt { return func(o *runc.IOOption) { o.OpenStdin = c.Stdin != "" o.OpenStdout = c.Stdout != "" diff --git a/runtime/v1/linux/proc/init_state.go b/pkg/process/init_state.go similarity index 96% rename from runtime/v1/linux/proc/init_state.go rename to pkg/process/init_state.go index 6d9d15f5e..9ec1d17be 100644 --- a/runtime/v1/linux/proc/init_state.go +++ b/pkg/process/init_state.go @@ -16,12 +16,11 @@ limitations under the License. */ -package proc +package process import ( "context" - "github.com/containerd/containerd/runtime/proc" runc "github.com/containerd/go-runc" google_protobuf "github.com/gogo/protobuf/types" "github.com/pkg/errors" @@ -35,7 +34,7 @@ type initState interface { Resume(context.Context) error Update(context.Context, *google_protobuf.Any) error Checkpoint(context.Context, *CheckpointConfig) error - Exec(context.Context, string, *ExecConfig) (proc.Process, error) + Exec(context.Context, string, *ExecConfig) (Process, error) Kill(context.Context, uint32, bool) error SetExited(int) } @@ -100,7 +99,7 @@ func (s *createdState) SetExited(status int) { } } -func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *createdState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return s.p.exec(ctx, path, r) } @@ -208,7 +207,7 @@ func (s *createdCheckpointState) SetExited(status int) { } } -func (s *createdCheckpointState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *createdCheckpointState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return nil, errors.Errorf("cannot exec in a created state") } @@ -268,7 +267,7 @@ func (s *runningState) SetExited(status int) { } } -func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *runningState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return s.p.exec(ctx, path, r) } @@ -332,7 +331,7 @@ func (s *pausedState) SetExited(status int) { } } -func (s *pausedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *pausedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return nil, errors.Errorf("cannot exec in a paused state") } @@ -385,6 +384,6 @@ func (s *stoppedState) SetExited(status int) { // no op } -func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (proc.Process, error) { +func (s *stoppedState) Exec(ctx context.Context, path string, r *ExecConfig) (Process, error) { return nil, errors.Errorf("cannot exec in a stopped state") } diff --git a/runtime/v1/linux/proc/io.go b/pkg/process/io.go similarity index 98% rename from runtime/v1/linux/proc/io.go rename to pkg/process/io.go index fde1872c8..169f6c8e2 100644 --- a/runtime/v1/linux/proc/io.go +++ b/pkg/process/io.go @@ -16,7 +16,7 @@ limitations under the License. */ -package proc +package process import ( "context" @@ -32,7 +32,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" "github.com/pkg/errors" @@ -50,7 +50,7 @@ type processIO struct { uri *url.URL copy bool - stdio proc.Stdio + stdio stdio.Stdio } func (p *processIO) Close() error { @@ -76,7 +76,7 @@ func (p *processIO) Copy(ctx context.Context, wg *sync.WaitGroup) error { return nil } -func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio proc.Stdio) (*processIO, error) { +func createIO(ctx context.Context, id string, ioUID, ioGID int, stdio stdio.Stdio) (*processIO, error) { pio := &processIO{ stdio: stdio, } diff --git a/runtime/proc/proc.go b/pkg/process/process.go similarity index 70% rename from runtime/proc/proc.go rename to pkg/process/process.go index 0e8d21b74..7cebb9b30 100644 --- a/runtime/proc/proc.go +++ b/pkg/process/process.go @@ -14,30 +14,17 @@ limitations under the License. */ -package proc +package process import ( "context" "io" - "sync" "time" "github.com/containerd/console" + "github.com/containerd/containerd/pkg/stdio" ) -// Stdio of a process -type Stdio struct { - Stdin string - Stdout string - Stderr string - Terminal bool -} - -// IsNull returns true if the stdio is not defined -func (s Stdio) IsNull() bool { - return s.Stdin == "" && s.Stdout == "" && s.Stderr == "" -} - // Process on a system type Process interface { // ID returns the id for the process @@ -51,7 +38,7 @@ type Process interface { // Stdin returns the process STDIN Stdin() io.Closer // Stdio returns io information for the container - Stdio() Stdio + Stdio() stdio.Stdio // Status returns the process status Status(context.Context) (string, error) // Wait blocks until the process has exited @@ -67,12 +54,3 @@ type Process interface { // SetExited sets the exit status for the process SetExited(status int) } - -// Platform handles platform-specific behavior that may differs across -// platform implementations -type Platform interface { - CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, - wg *sync.WaitGroup) (console.Console, error) - ShutdownConsole(ctx context.Context, console console.Console) error - Close() error -} diff --git a/runtime/v1/linux/proc/types.go b/pkg/process/types.go similarity index 99% rename from runtime/v1/linux/proc/types.go rename to pkg/process/types.go index 5d705c030..03477038a 100644 --- a/runtime/v1/linux/proc/types.go +++ b/pkg/process/types.go @@ -14,7 +14,7 @@ limitations under the License. */ -package proc +package process import ( google_protobuf "github.com/gogo/protobuf/types" diff --git a/runtime/v1/linux/proc/utils.go b/pkg/process/utils.go similarity index 83% rename from runtime/v1/linux/proc/utils.go rename to pkg/process/utils.go index 5abb1e977..b0ac6333c 100644 --- a/runtime/v1/linux/proc/utils.go +++ b/pkg/process/utils.go @@ -16,7 +16,7 @@ limitations under the License. */ -package proc +package process import ( "context" @@ -35,6 +35,15 @@ import ( "golang.org/x/sys/unix" ) +const ( + // RuncRoot is the path to the root runc state directory + RuncRoot = "/run/containerd/runc" + // StoppedPID is the pid assigned after a container has run and stopped + StoppedPID = -1 + // InitPidFile name of the file that contains the init pid + InitPidFile = "init.pid" +) + // safePid is a thread safe wrapper for pid. type safePid struct { sync.Mutex @@ -124,9 +133,6 @@ func checkKillError(err error) error { return errors.Wrapf(err, "unknown error after kill") } -// InitPidFile name of the file that contains the init pid -const InitPidFile = "init.pid" - func newPidFile(bundle string) *pidFile { return &pidFile{ path: filepath.Join(bundle, InitPidFile), @@ -168,3 +174,19 @@ func waitTimeout(ctx context.Context, wg *sync.WaitGroup, timeout time.Duration) return ctx.Err() } } + +func stateName(v interface{}) string { + switch v.(type) { + case *runningState, *execRunningState: + return "running" + case *createdState, *execCreatedState, *createdCheckpointState: + return "created" + case *pausedState: + return "paused" + case *deletedState: + return "deleted" + case *stoppedState: + return "stopped" + } + panic(errors.Errorf("invalid state %v", v)) +} diff --git a/pkg/stdio/platform.go b/pkg/stdio/platform.go new file mode 100644 index 000000000..6e1b27cfa --- /dev/null +++ b/pkg/stdio/platform.go @@ -0,0 +1,33 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package stdio + +import ( + "context" + "sync" + + "github.com/containerd/console" +) + +// Platform handles platform-specific behavior that may differs across +// platform implementations +type Platform interface { + CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string, + wg *sync.WaitGroup) (console.Console, error) + ShutdownConsole(ctx context.Context, console console.Console) error + Close() error +} diff --git a/pkg/stdio/stdio.go b/pkg/stdio/stdio.go new file mode 100644 index 000000000..b02e77dcd --- /dev/null +++ b/pkg/stdio/stdio.go @@ -0,0 +1,30 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package stdio + +// Stdio of a process +type Stdio struct { + Stdin string + Stdout string + Stderr string + Terminal bool +} + +// IsNull returns true if the stdio is not defined +func (s Stdio) IsNull() bool { + return s.Stdin == "" && s.Stdout == "" && s.Stderr == "" +} diff --git a/runtime/v1/linux/proc/process.go b/runtime/v1/linux/proc/process.go deleted file mode 100644 index bc156d78c..000000000 --- a/runtime/v1/linux/proc/process.go +++ /dev/null @@ -1,46 +0,0 @@ -// +build !windows - -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package proc - -import ( - "github.com/pkg/errors" -) - -// RuncRoot is the path to the root runc state directory -const ( - RuncRoot = "/run/containerd/runc" - // StoppedPID is the pid assigned after a container has run and stopped - StoppedPID = -1 -) - -func stateName(v interface{}) string { - switch v.(type) { - case *runningState, *execRunningState: - return "running" - case *createdState, *execCreatedState, *createdCheckpointState: - return "created" - case *pausedState: - return "paused" - case *deletedState: - return "deleted" - case *stoppedState: - return "stopped" - } - panic(errors.Errorf("invalid state %v", v)) -} diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index a85dcd918..0243c3986 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -37,12 +37,12 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - "github.com/containerd/containerd/runtime/v1" - "github.com/containerd/containerd/runtime/v1/linux/proc" + v1 "github.com/containerd/containerd/runtime/v1" shim "github.com/containerd/containerd/runtime/v1/shim/v1" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" @@ -335,7 +335,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { filepath.Join(r.root, ns, id), ) ctx = namespaces.WithNamespace(ctx, ns) - pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile)) shimExit := make(chan struct{}) s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() { defer close(shimExit) @@ -422,7 +422,7 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, "namespace": ns, }).Warn("cleaning up after shim dead") - pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) + pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile)) ctx = namespaces.WithNamespace(ctx, ns) if err := r.terminate(ctx, bundle, ns, id); err != nil { if r.config.ShimDebug { @@ -487,7 +487,7 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er var ( cmd = r.config.Runtime - root = proc.RuncRoot + root = process.RuncRoot ) if ropts != nil { if ropts.Runtime != "" { diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go index 6e87f052a..f55700135 100644 --- a/runtime/v1/shim/service.go +++ b/runtime/v1/shim/service.go @@ -35,10 +35,10 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/linux/proc" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" @@ -84,7 +84,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { s := &Service{ config: config, context: ctx, - processes: make(map[string]rproc.Process), + processes: make(map[string]process.Process), events: make(chan interface{}, 128), ec: Default.Subscribe(), } @@ -102,9 +102,9 @@ type Service struct { config Config context context.Context - processes map[string]rproc.Process + processes map[string]process.Process events chan interface{} - platform rproc.Platform + platform stdio.Platform ec chan runc.Exit // Filled by Create() @@ -114,9 +114,9 @@ type Service struct { // Create a new initial process and container with the underlying OCI runtime func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) { - var mounts []proc.Mount + var mounts []process.Mount for _, m := range r.Rootfs { - mounts = append(mounts, proc.Mount{ + mounts = append(mounts, process.Mount{ Type: m.Type, Source: m.Source, Target: m.Target, @@ -132,7 +132,7 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ * } } - config := &proc.CreateConfig{ + config := &process.CreateConfig{ ID: r.ID, Bundle: r.Bundle, Runtime: r.Runtime, @@ -266,7 +266,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - process, err := p.(*proc.Init).Exec(ctx, s.config.Path, &proc.ExecConfig{ + process, err := p.(*process.Init).Exec(ctx, s.config.Path, &process.ExecConfig{ ID: r.ID, Terminal: r.Terminal, Stdin: r.Stdin, @@ -348,7 +348,7 @@ func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, er if err != nil { return nil, err } - if err := p.(*proc.Init).Pause(ctx); err != nil { + if err := p.(*process.Init).Pause(ctx); err != nil { return nil, err } return empty, nil @@ -360,7 +360,7 @@ func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, e if err != nil { return nil, err } - if err := p.(*proc.Init).Resume(ctx); err != nil { + if err := p.(*process.Init).Resume(ctx); err != nil { return nil, err } return empty, nil @@ -448,7 +448,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque } options = *v.(*runctypes.CheckpointOptions) } - if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ + if err := p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{ Path: r.Path, Exit: options.Exit, AllowOpenTCP: options.OpenTcp, @@ -476,7 +476,7 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*pt if err != nil { return nil, err } - if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil { + if err := p.(*process.Init).Update(ctx, r.Resources); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -502,11 +502,11 @@ func (s *Service) processExits() { } } -func (s *Service) allProcesses() []rproc.Process { +func (s *Service) allProcesses() []process.Process { s.mu.Lock() defer s.mu.Unlock() - res := make([]rproc.Process, 0, len(s.processes)) + res := make([]process.Process, 0, len(s.processes)) for _, p := range s.processes { res = append(res, p) } @@ -523,7 +523,7 @@ func (s *Service) checkProcesses(e runc.Exit) { if p.Pid() == e.Pid { if shouldKillAll { - if ip, ok := p.(*proc.Init); ok { + if ip, ok := p.(*process.Init); ok { // Ensure all children are killed if err := ip.KillAll(s.context); err != nil { log.G(s.context).WithError(err).WithField("id", ip.ID()). @@ -569,7 +569,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er return nil, err } - ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) if err != nil { return nil, err } @@ -589,7 +589,7 @@ func (s *Service) forward(publisher events.Publisher) { } // getInitProcess returns initial process -func (s *Service) getInitProcess() (rproc.Process, error) { +func (s *Service) getInitProcess() (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() @@ -601,7 +601,7 @@ func (s *Service) getInitProcess() (rproc.Process, error) { } // getExecProcess returns exec process -func (s *Service) getExecProcess(id string) (rproc.Process, error) { +func (s *Service) getExecProcess(id string) (process.Process, error) { s.mu.Lock() defer s.mu.Unlock() @@ -640,7 +640,7 @@ func getTopic(ctx context.Context, e interface{}) string { return runtime.TaskUnknownTopic } -func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform rproc.Platform, r *proc.CreateConfig, rootfs string) (*proc.Init, error) { +func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu string, systemdCgroup bool, platform stdio.Platform, r *process.CreateConfig, rootfs string) (*process.Init, error) { var options runctypes.CreateOptions if r.Options != nil { v, err := typeurl.UnmarshalAny(r.Options) @@ -650,8 +650,8 @@ func newInit(ctx context.Context, path, workDir, runtimeRoot, namespace, criu st options = *v.(*runctypes.CreateOptions) } - runtime := proc.NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup) - p := proc.New(r.ID, runtime, rproc.Stdio{ + runtime := process.NewRunc(runtimeRoot, path, namespace, r.Runtime, criu, systemdCgroup) + p := process.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, diff --git a/runtime/v2/runc/container.go b/runtime/v2/runc/container.go index 77066fcb1..6afa92db2 100644 --- a/runtime/v2/runc/container.go +++ b/runtime/v2/runc/container.go @@ -30,8 +30,8 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/typeurl" @@ -40,7 +40,7 @@ import ( ) // NewContainer returns a new runc container -func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) { +func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (*Container, error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, errors.Wrap(err, "create namespace") @@ -55,9 +55,9 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa opts = *v.(*options.Options) } - var mounts []proc.Mount + var mounts []process.Mount for _, m := range r.Rootfs { - mounts = append(mounts, proc.Mount{ + mounts = append(mounts, process.Mount{ Type: m.Type, Source: m.Source, Target: m.Target, @@ -73,7 +73,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa } } - config := &proc.CreateConfig{ + config := &process.CreateConfig{ ID: r.ID, Bundle: r.Bundle, Runtime: opts.BinaryName, @@ -108,7 +108,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa } } - process, err := newInit( + p, err := newInit( ctx, r.Bundle, filepath.Join(r.Bundle, "work"), @@ -121,17 +121,17 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa if err != nil { return nil, errdefs.ToGRPC(err) } - if err := process.Create(ctx, config); err != nil { + if err := p.Create(ctx, config); err != nil { return nil, errdefs.ToGRPC(err) } container := &Container{ ID: r.ID, Bundle: r.Bundle, - process: process, - processes: make(map[string]rproc.Process), + process: p, + processes: make(map[string]process.Process), reservedProcess: make(map[string]struct{}), } - pid := process.Pid() + pid := p.Pid() if pid > 0 { cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) if err != nil { @@ -156,10 +156,10 @@ func WriteRuntime(path, runtime string) error { return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) } -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, - r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) { - runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) - p := proc.New(r.ID, runtime, rproc.Stdio{ +func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform, + r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) { + runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) + p := process.New(r.ID, runtime, stdio.Stdio{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -191,13 +191,13 @@ type Container struct { Bundle string cgroup cgroups.Cgroup - process rproc.Process - processes map[string]rproc.Process + process process.Process + processes map[string]process.Process reservedProcess map[string]struct{} } // All processes in the container -func (c *Container) All() (o []rproc.Process) { +func (c *Container) All() (o []process.Process) { c.mu.Lock() defer c.mu.Unlock() @@ -211,7 +211,7 @@ func (c *Container) All() (o []rproc.Process) { } // ExecdProcesses added to the container -func (c *Container) ExecdProcesses() (o []rproc.Process) { +func (c *Container) ExecdProcesses() (o []process.Process) { c.mu.Lock() defer c.mu.Unlock() for _, p := range c.processes { @@ -242,7 +242,7 @@ func (c *Container) CgroupSet(cg cgroups.Cgroup) { } // Process returns the process by id -func (c *Container) Process(id string) (rproc.Process, error) { +func (c *Container) Process(id string) (process.Process, error) { c.mu.Lock() defer c.mu.Unlock() if id == "" { @@ -282,7 +282,7 @@ func (c *Container) ReserveProcess(id string) (bool, func()) { } // ProcessAdd adds a new process to the container -func (c *Container) ProcessAdd(process rproc.Process) { +func (c *Container) ProcessAdd(process process.Process) { c.mu.Lock() defer c.mu.Unlock() @@ -298,7 +298,7 @@ func (c *Container) ProcessRemove(id string) { } // Start a container process -func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) { +func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) { p, err := c.Process(r.ExecID) if err != nil { return nil, err @@ -317,7 +317,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Proc } // Delete the container or a process by id -func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) { +func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (process.Process, error) { p, err := c.Process(r.ExecID) if err != nil { return nil, err @@ -332,8 +332,8 @@ func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Pr } // Exec an additional process -func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) { - process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{ +func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (process.Process, error) { + process, err := c.process.(*process.Init).Exec(ctx, c.Bundle, &process.ExecConfig{ ID: r.ExecID, Terminal: r.Terminal, Stdin: r.Stdin, @@ -350,12 +350,12 @@ func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc // Pause the container func (c *Container) Pause(ctx context.Context) error { - return c.process.(*proc.Init).Pause(ctx) + return c.process.(*process.Init).Pause(ctx) } // Resume the container func (c *Container) Resume(ctx context.Context) error { - return c.process.(*proc.Init).Resume(ctx) + return c.process.(*process.Init).Resume(ctx) } // ResizePty of a process @@ -408,7 +408,7 @@ func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskReques } opts = *v.(*options.CheckpointOptions) } - return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ + return p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{ Path: r.Path, Exit: opts.Exit, AllowOpenTCP: opts.OpenTcp, @@ -426,7 +426,7 @@ func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error if err != nil { return err } - return p.(*proc.Init).Update(ctx, r.Resources) + return p.(*process.Init).Update(ctx, r.Resources) } // HasPid returns true if the container owns a specific pid diff --git a/runtime/v2/runc/platform.go b/runtime/v2/runc/platform.go index 970754dac..4478b55ef 100644 --- a/runtime/v2/runc/platform.go +++ b/runtime/v2/runc/platform.go @@ -25,7 +25,7 @@ import ( "syscall" "github.com/containerd/console" - rproc "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/fifo" "github.com/pkg/errors" ) @@ -38,7 +38,7 @@ var bufPool = sync.Pool{ } // NewPlatform returns a linux platform for use with I/O operations -func NewPlatform() (rproc.Platform, error) { +func NewPlatform() (stdio.Platform, error) { epoller, err := console.NewEpoller() if err != nil { return nil, errors.Wrap(err, "failed to initialize epoller") diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index de379d7d7..c4f5328bf 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -36,8 +36,9 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/pkg/oom" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" @@ -60,7 +61,7 @@ var ( // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { - ep, err := runc.NewOOMEpoller(publisher) + ep, err := oom.New(publisher) if err != nil { return nil, err } @@ -90,9 +91,9 @@ type service struct { context context.Context events chan interface{} - platform rproc.Platform + platform stdio.Platform ec chan runcC.Exit - ep *runc.Epoller + ep *oom.Epoller id string container *runc.Container @@ -209,7 +210,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false) + r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false) if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ Force: true, }); err != nil { @@ -590,7 +591,7 @@ func (s *service) checkProcesses(e runcC.Exit) { for _, p := range container.All() { if p.Pid() == e.Pid { if shouldKillAll { - if ip, ok := p.(*proc.Init); ok { + if ip, ok := p.(*process.Init); ok { // Ensure all children are killed if err := ip.KillAll(s.context); err != nil { logrus.WithError(err).WithField("id", ip.ID()). @@ -635,7 +636,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er if err != nil { return nil, errdefs.ToGRPC(err) } - ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) if err != nil { return nil, err } @@ -670,7 +671,7 @@ func (s *service) getContainer() (*runc.Container, error) { return container, nil } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getProcess(execID string) (process.Process, error) { container, err := s.getContainer() if err != nil { return nil, err diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index ace71ace1..54e82dba9 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -37,8 +37,9 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - rproc "github.com/containerd/containerd/runtime/proc" - "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/pkg/oom" + "github.com/containerd/containerd/pkg/process" + "github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" @@ -73,7 +74,7 @@ type spec struct { // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { - ep, err := runc.NewOOMEpoller(publisher) + ep, err := oom.New(publisher) if err != nil { return nil, err } @@ -104,9 +105,9 @@ type service struct { context context.Context events chan interface{} - platform rproc.Platform + platform stdio.Platform ec chan runcC.Exit - ep *runc.Epoller + ep *oom.Epoller // id only used in cleanup case id string @@ -254,7 +255,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false) + r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false) if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{ Force: true, }); err != nil { @@ -653,7 +654,7 @@ func (s *service) checkProcesses(e runcC.Exit) { for _, p := range container.All() { if p.Pid() == e.Pid { if shouldKillAll { - if ip, ok := p.(*proc.Init); ok { + if ip, ok := p.(*process.Init); ok { // Ensure all children are killed if err := ip.KillAll(s.context); err != nil { logrus.WithError(err).WithField("id", ip.ID()). @@ -705,7 +706,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er if err != nil { return nil, errdefs.ToGRPC(err) } - ps, err := p.(*proc.Init).Runtime().Ps(ctx, id) + ps, err := p.(*process.Init).Runtime().Ps(ctx, id) if err != nil { return nil, err }