From 6bcbf88f82e814f76ede351f48b57613540af425 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 7 Feb 2019 15:40:30 -0500 Subject: [PATCH] Move runc shim code into common package Signed-off-by: Michael Crosby --- cmd/containerd-shim-runc-v1/main.go | 4 +- runtime/v2/runc/container.go | 371 +++++++++++++ runtime/v2/runc/epoll.go | 20 +- .../v2/runc/{service_linux.go => platform.go} | 45 +- runtime/v2/runc/util.go | 55 ++ runtime/v2/runc/{ => v1}/service.go | 498 +++++------------- 6 files changed, 608 insertions(+), 385 deletions(-) create mode 100644 runtime/v2/runc/container.go rename runtime/v2/runc/{service_linux.go => platform.go} (83%) create mode 100644 runtime/v2/runc/util.go rename runtime/v2/runc/{ => v1}/service.go (61%) diff --git a/cmd/containerd-shim-runc-v1/main.go b/cmd/containerd-shim-runc-v1/main.go index d339886bc..b8a8df7b0 100644 --- a/cmd/containerd-shim-runc-v1/main.go +++ b/cmd/containerd-shim-runc-v1/main.go @@ -19,10 +19,10 @@ package main import ( - "github.com/containerd/containerd/runtime/v2/runc" + "github.com/containerd/containerd/runtime/v2/runc/v1" "github.com/containerd/containerd/runtime/v2/shim" ) func main() { - shim.Run("io.containerd.runc.v1", runc.New) + shim.Run("io.containerd.runc.v1", v1.New) } diff --git a/runtime/v2/runc/container.go b/runtime/v2/runc/container.go new file mode 100644 index 000000000..950fbb96f --- /dev/null +++ b/runtime/v2/runc/container.go @@ -0,0 +1,371 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runc + +import ( + "context" + "io/ioutil" + "path/filepath" + "sync" + + "github.com/containerd/cgroups" + "github.com/containerd/console" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/namespaces" + rproc "github.com/containerd/containerd/runtime/proc" + "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc/options" + "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/typeurl" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, errors.Wrap(err, "create namespace") + } + + var opts options.Options + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return nil, err + } + opts = *v.(*options.Options) + } + + var mounts []proc.Mount + for _, m := range r.Rootfs { + mounts = append(mounts, proc.Mount{ + Type: m.Type, + Source: m.Source, + Target: m.Target, + Options: m.Options, + }) + } + config := &proc.CreateConfig{ + ID: r.ID, + Bundle: r.Bundle, + Runtime: opts.BinaryName, + Rootfs: mounts, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Checkpoint: r.Checkpoint, + ParentCheckpoint: r.ParentCheckpoint, + Options: r.Options, + } + + if err := WriteRuntime(r.Bundle, opts.BinaryName); err != nil { + return nil, err + } + rootfs := filepath.Join(r.Bundle, "rootfs") + defer func() { + if err != nil { + if err2 := mount.UnmountAll(rootfs, 0); err2 != nil { + logrus.WithError(err2).Warn("failed to cleanup rootfs mount") + } + } + }() + for _, rm := range mounts { + m := &mount.Mount{ + Type: rm.Type, + Source: rm.Source, + Options: rm.Options, + } + if err := m.Mount(rootfs); err != nil { + return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) + } + } + + process, err := newInit( + ctx, + r.Bundle, + filepath.Join(r.Bundle, "work"), + ns, + platform, + config, + &opts, + ) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + if err := process.Create(ctx, config); err != nil { + return nil, errdefs.ToGRPC(err) + } + container := &Container{ + ID: r.ID, + Bundle: r.Bundle, + process: process, + processes: make(map[string]rproc.Process), + } + pid := process.Pid() + if pid > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) + if err != nil { + logrus.WithError(err).Errorf("loading cgroup for %d", pid) + } + container.cgroup = cg + } + return container, nil +} + +func ReadRuntime(path string) (string, error) { + data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) + if err != nil { + return "", err + } + return string(data), nil +} + +func WriteRuntime(path, runtime string) error { + return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) +} + +func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, + r *proc.CreateConfig, options *options.Options) (*proc.Init, error) { + rootfs := filepath.Join(path, "rootfs") + runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) + p := proc.New(r.ID, runtime, rproc.Stdio{ + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Terminal: r.Terminal, + }) + p.Bundle = r.Bundle + p.Platform = platform + p.Rootfs = rootfs + p.WorkDir = workDir + p.IoUID = int(options.IoUid) + p.IoGID = int(options.IoGid) + p.NoPivotRoot = options.NoPivotRoot + p.NoNewKeyring = options.NoNewKeyring + p.CriuWorkPath = options.CriuWorkPath + if p.CriuWorkPath == "" { + // if criu work path not set, use container WorkDir + p.CriuWorkPath = p.WorkDir + } + return p, nil +} + +type Container struct { + mu sync.Mutex + + ID string + Bundle string + + cgroup cgroups.Cgroup + process rproc.Process + processes map[string]rproc.Process +} + +func (c *Container) All() (o []rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, p := range c.processes { + o = append(o, p) + } + if c.process != nil { + o = append(o, c.process) + } + return o +} + +func (c *Container) ExecdProcesses() (o []rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + for _, p := range c.processes { + o = append(o, p) + } + return o +} + +// Pid of the main process of a container +func (c *Container) Pid() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.process.Pid() +} + +func (c *Container) Cgroup() cgroups.Cgroup { + c.mu.Lock() + defer c.mu.Unlock() + return c.cgroup +} + +func (c *Container) CgroupSet(cg cgroups.Cgroup) { + c.mu.Lock() + c.cgroup = cg + c.mu.Unlock() +} + +func (c *Container) Process(id string) rproc.Process { + c.mu.Lock() + defer c.mu.Unlock() + if id == "" { + return c.process + } + return c.processes[id] +} + +func (c *Container) ProcessExists(id string) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.processes[id] + return ok +} + +func (c *Container) ProcessAdd(process rproc.Process) { + c.mu.Lock() + defer c.mu.Unlock() + c.processes[process.ID()] = process +} + +func (c *Container) ProcessRemove(id string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.processes, id) +} + +func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) { + p := c.Process(r.ExecID) + if p == nil { + return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + } + if err := p.Start(ctx); err != nil { + return nil, err + } + if c.Cgroup() == nil && p.Pid() > 0 { + cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) + if err != nil { + logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) + } + c.cgroup = cg + } + return p, nil +} + +func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) { + p := c.Process(r.ExecID) + if p == nil { + return nil, errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + } + if err := p.Delete(ctx); err != nil { + return nil, err + } + if r.ExecID != "" { + c.ProcessRemove(r.ExecID) + } + return p, nil +} + +func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) { + process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{ + ID: r.ExecID, + Terminal: r.Terminal, + Stdin: r.Stdin, + Stdout: r.Stdout, + Stderr: r.Stderr, + Spec: r.Spec, + }) + if err != nil { + return nil, err + } + c.ProcessAdd(process) + return process, nil +} + +func (c *Container) Pause(ctx context.Context) error { + return c.process.(*proc.Init).Pause(ctx) +} + +func (c *Container) Resume(ctx context.Context) error { + return c.process.(*proc.Init).Resume(ctx) +} + +func (c *Container) ResizePty(ctx context.Context, r *task.ResizePtyRequest) error { + p := c.Process(r.ExecID) + if p == nil { + return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + } + ws := console.WinSize{ + Width: uint16(r.Width), + Height: uint16(r.Height), + } + return p.Resize(ws) +} + +func (c *Container) Kill(ctx context.Context, r *task.KillRequest) error { + p := c.Process(r.ExecID) + if p == nil { + return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + } + return p.Kill(ctx, r.Signal, r.All) +} + +func (c *Container) CloseIO(ctx context.Context, r *task.CloseIORequest) error { + p := c.Process(r.ExecID) + if p == nil { + return errors.Wrapf(errdefs.ErrNotFound, "process does not exist %s", r.ExecID) + } + if stdin := p.Stdin(); stdin != nil { + if err := stdin.Close(); err != nil { + return errors.Wrap(err, "close stdin") + } + } + return nil +} + +func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskRequest) error { + p := c.Process("") + if p == nil { + return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + var opts options.CheckpointOptions + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return err + } + opts = *v.(*options.CheckpointOptions) + } + return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ + Path: r.Path, + Exit: opts.Exit, + AllowOpenTCP: opts.OpenTcp, + AllowExternalUnixSockets: opts.ExternalUnixSockets, + AllowTerminal: opts.Terminal, + FileLocks: opts.FileLocks, + EmptyNamespaces: opts.EmptyNamespaces, + WorkDir: opts.WorkPath, + }) +} + +func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error { + p := c.Process("") + if p == nil { + return errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + return p.(*proc.Init).Update(ctx, r.Resources) +} diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go index 6aea9b8c1..5425655ff 100644 --- a/runtime/v2/runc/epoll.go +++ b/runtime/v2/runc/epoll.go @@ -30,19 +30,22 @@ import ( "golang.org/x/sys/unix" ) -func newOOMEpoller(publisher events.Publisher) (*epoller, error) { +// NewOOMEpoller returns an epoll implementation that listens to OOM events +// from a container's cgroups. +func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, err } - return &epoller{ + return &Epoller{ fd: fd, publisher: publisher, set: make(map[uintptr]*item), }, nil } -type epoller struct { +// Epoller implementation for handling OOM events from a container's cgroup +type Epoller struct { mu sync.Mutex fd int @@ -55,11 +58,13 @@ type item struct { cg cgroups.Cgroup } -func (e *epoller) Close() error { +// Close the epoll fd +func (e *Epoller) Close() error { return unix.Close(e.fd) } -func (e *epoller) run(ctx context.Context) { +// Run the epoll loop +func (e *Epoller) Run(ctx context.Context) { var events [128]unix.EpollEvent for { select { @@ -81,7 +86,8 @@ func (e *epoller) run(ctx context.Context) { } } -func (e *epoller) add(id string, cg cgroups.Cgroup) error { +// Add the cgroup to the epoll monitor +func (e *Epoller) Add(id string, cg cgroups.Cgroup) error { e.mu.Lock() defer e.mu.Unlock() fd, err := cg.OOMEventFD() @@ -99,7 +105,7 @@ func (e *epoller) add(id string, cg cgroups.Cgroup) error { return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event) } -func (e *epoller) process(ctx context.Context, fd uintptr) { +func (e *Epoller) process(ctx context.Context, fd uintptr) { flush(fd) e.mu.Lock() i, ok := e.set[fd] diff --git a/runtime/v2/runc/service_linux.go b/runtime/v2/runc/platform.go similarity index 83% rename from runtime/v2/runc/service_linux.go rename to runtime/v2/runc/platform.go index 195c23014..d38aa5469 100644 --- a/runtime/v2/runc/service_linux.go +++ b/runtime/v2/runc/platform.go @@ -1,3 +1,5 @@ +// +build linux + /* Copyright The containerd Authors. @@ -23,10 +25,30 @@ import ( "syscall" "github.com/containerd/console" + rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/fifo" "github.com/pkg/errors" ) +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 32<<10) + return &buffer + }, +} + +// NewPlatform returns a linux platform for use with I/O operations +func NewPlatform() (rproc.Platform, error) { + epoller, err := console.NewEpoller() + if err != nil { + return nil, errors.Wrap(err, "failed to initialize epoller") + } + go epoller.Wait() + return &linuxPlatform{ + epoller: epoller, + }, nil +} + type linuxPlatform struct { epoller *console.Epoller } @@ -69,9 +91,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console cwg.Add(1) go func() { cwg.Done() - p := bufPool.Get().(*[]byte) - defer bufPool.Put(p) - io.CopyBuffer(outw, epollConsole, *p) + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + io.CopyBuffer(outw, epollConsole, *buf) epollConsole.Close() outr.Close() outw.Close() @@ -94,20 +116,3 @@ func (p *linuxPlatform) ShutdownConsole(ctx context.Context, cons console.Consol func (p *linuxPlatform) Close() error { return p.epoller.Close() } - -// initialize a single epoll fd to manage our consoles. `initPlatform` should -// only be called once. -func (s *service) initPlatform() error { - if s.platform != nil { - return nil - } - epoller, err := console.NewEpoller() - if err != nil { - return errors.Wrap(err, "failed to initialize epoller") - } - s.platform = &linuxPlatform{ - epoller: epoller, - } - go epoller.Wait() - return nil -} diff --git a/runtime/v2/runc/util.go b/runtime/v2/runc/util.go new file mode 100644 index 000000000..51ca04864 --- /dev/null +++ b/runtime/v2/runc/util.go @@ -0,0 +1,55 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runc + +import ( + "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/runtime" + "github.com/sirupsen/logrus" +) + +// GetTopic converts an event from an interface type to the specific +// event topic id +func GetTopic(e interface{}) string { + switch e.(type) { + case *events.TaskCreate: + return runtime.TaskCreateEventTopic + case *events.TaskStart: + return runtime.TaskStartEventTopic + case *events.TaskOOM: + return runtime.TaskOOMEventTopic + case *events.TaskExit: + return runtime.TaskExitEventTopic + case *events.TaskDelete: + return runtime.TaskDeleteEventTopic + case *events.TaskExecAdded: + return runtime.TaskExecAddedEventTopic + case *events.TaskExecStarted: + return runtime.TaskExecStartedEventTopic + case *events.TaskPaused: + return runtime.TaskPausedEventTopic + case *events.TaskResumed: + return runtime.TaskResumedEventTopic + case *events.TaskCheckpointed: + return runtime.TaskCheckpointedEventTopic + default: + logrus.Warnf("no topic for type %#v", e) + } + return runtime.TaskUnknownTopic +} diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/v1/service.go similarity index 61% rename from runtime/v2/runc/service.go rename to runtime/v2/runc/v1/service.go index 83c490689..fbe81033f 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/v1/service.go @@ -16,7 +16,7 @@ limitations under the License. */ -package runc +package v1 import ( "context" @@ -30,7 +30,6 @@ import ( "time" "github.com/containerd/cgroups" - "github.com/containerd/console" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" @@ -38,9 +37,9 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/runtime" rproc "github.com/containerd/containerd/runtime/proc" "github.com/containerd/containerd/runtime/v1/linux/proc" + "github.com/containerd/containerd/runtime/v2/runc" "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" @@ -54,33 +53,25 @@ import ( ) var ( - empty = &ptypes.Empty{} - bufPool = sync.Pool{ - New: func() interface{} { - buffer := make([]byte, 32<<10) - return &buffer - }, - } + _ = (taskAPI.TaskService)(&service{}) + empty = &ptypes.Empty{} ) -var _ = (taskAPI.TaskService)(&service{}) - // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { - ep, err := newOOMEpoller(publisher) + ep, err := runc.NewOOMEpoller(publisher) if err != nil { return nil, err } ctx, cancel := context.WithCancel(ctx) - go ep.run(ctx) + go ep.Run(ctx) s := &service{ - id: id, - context: ctx, - processes: make(map[string]rproc.Process), - events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), - ep: ep, - cancel: cancel, + id: id, + context: ctx, + events: make(chan interface{}, 128), + ec: shim.Default.Subscribe(), + ep: ep, + cancel: cancel, } go s.processExits() runcC.Monitor = shim.Default @@ -97,17 +88,15 @@ type service struct { mu sync.Mutex eventSendMu sync.Mutex - context context.Context - task rproc.Process - processes map[string]rproc.Process - events chan interface{} - platform rproc.Platform - ec chan runcC.Exit - ep *epoller + context context.Context + events chan interface{} + platform rproc.Platform + ec chan runcC.Exit + ep *runc.Epoller + + id string + container *runc.Container - id string - bundle string - cg cgroups.Cgroup cancel func() } @@ -192,7 +181,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) if err != nil { return nil, err } - runtime, err := s.readRuntime(path) + runtime, err := runc.ReadRuntime(path) if err != nil { return nil, err } @@ -211,107 +200,18 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) }, nil } -func (s *service) readRuntime(path string) (string, error) { - data, err := ioutil.ReadFile(filepath.Join(path, "runtime")) - if err != nil { - return "", err - } - return string(data), nil -} - -func (s *service) writeRuntime(path, runtime string) error { - return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600) -} - // Create a new initial process and container with the underlying OCI runtime func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) { s.mu.Lock() defer s.mu.Unlock() - ns, err := namespaces.NamespaceRequired(ctx) + container, err := runc.NewContainer(ctx, s.platform, r) if err != nil { - return nil, errors.Wrap(err, "create namespace") - } - - var opts options.Options - if r.Options != nil { - v, err := typeurl.UnmarshalAny(r.Options) - if err != nil { - return nil, err - } - opts = *v.(*options.Options) - } - - var mounts []proc.Mount - for _, m := range r.Rootfs { - mounts = append(mounts, proc.Mount{ - Type: m.Type, - Source: m.Source, - Target: m.Target, - Options: m.Options, - }) - } - config := &proc.CreateConfig{ - ID: r.ID, - Bundle: r.Bundle, - Runtime: opts.BinaryName, - Rootfs: mounts, - Terminal: r.Terminal, - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Checkpoint: r.Checkpoint, - ParentCheckpoint: r.ParentCheckpoint, - Options: r.Options, - } - if err := s.writeRuntime(r.Bundle, opts.BinaryName); err != nil { return nil, err } - rootfs := filepath.Join(r.Bundle, "rootfs") - defer func() { - if err != nil { - if err2 := mount.UnmountAll(rootfs, 0); err2 != nil { - logrus.WithError(err2).Warn("failed to cleanup rootfs mount") - } - } - }() - for _, rm := range mounts { - m := &mount.Mount{ - Type: rm.Type, - Source: rm.Source, - Options: rm.Options, - } - if err := m.Mount(rootfs); err != nil { - return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m) - } - } - process, err := newInit( - ctx, - r.Bundle, - filepath.Join(r.Bundle, "work"), - ns, - s.platform, - config, - &opts, - ) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - if err := process.Create(ctx, config); err != nil { - return nil, errdefs.ToGRPC(err) - } - // save the main task id and bundle to the shim for additional requests + s.id = r.ID - s.bundle = r.Bundle - pid := process.Pid() - if pid > 0 { - cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid)) - if err != nil { - logrus.WithError(err).Errorf("loading cgroup for %d", pid) - } - s.cg = cg - } - s.task = process + s.container = container s.send(&eventstypes.TaskCreate{ ContainerID: r.ID, @@ -324,46 +224,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * Terminal: r.Terminal, }, Checkpoint: r.Checkpoint, - Pid: uint32(pid), + Pid: uint32(container.Pid()), }) return &taskAPI.CreateTaskResponse{ - Pid: uint32(pid), + Pid: uint32(container.Pid()), }, nil - } // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } // hold the send lock so that the start events are sent before any exit events in the error case s.eventSendMu.Lock() - if err := p.Start(ctx); err != nil { + p, err := container.Start(ctx, r) + if err != nil { s.eventSendMu.Unlock() - return nil, err + return nil, errdefs.ToGRPC(err) } - - // case for restore - if s.getCgroup() == nil && p.Pid() > 0 { - cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) - if err != nil { - logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) - } - s.setCgroup(cg) + if err := s.ep.Add(container.ID, container.Cgroup()); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") } - if r.ExecID != "" { + switch r.ExecID { + case "": s.send(&eventstypes.TaskExecStarted{ - ContainerID: s.id, + ContainerID: container.ID, ExecID: r.ExecID, Pid: uint32(p.Pid()), }) - } else { + default: s.send(&eventstypes.TaskStart{ - ContainerID: s.id, + ContainerID: container.ID, Pid: uint32(p.Pid()), }) } @@ -375,23 +270,16 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + p, err := container.Delete(ctx, r) + if err != nil { + return nil, errdefs.ToGRPC(err) } - if err := p.Delete(ctx); err != nil { - return nil, err - } - isTask := r.ExecID == "" - if !isTask { - s.mu.Lock() - delete(s.processes, r.ExecID) - s.mu.Unlock() - } - if isTask { + // if we deleted our init task, close the platform and send the task delete event + if r.ExecID == "" { if s.platform != nil { s.platform.Close() } @@ -411,33 +299,20 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.processes[r.ExecID] - s.mu.Unlock() - if p != nil { + container, err := s.getContainer() + if err != nil { + return nil, err + } + if container.ProcessExists(r.ExecID) { return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) } - p = s.task - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{ - ID: r.ExecID, - Terminal: r.Terminal, - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Spec: r.Spec, - }) + process, err := container.Exec(ctx, r) if err != nil { return nil, errdefs.ToGRPC(err) } - s.mu.Lock() - s.processes[r.ExecID] = process - s.mu.Unlock() s.send(&eventstypes.TaskExecAdded{ - ContainerID: s.id, + ContainerID: s.container.ID, ExecID: process.ID(), }) return empty, nil @@ -445,15 +320,11 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - ws := console.WinSize{ - Width: uint16(r.Width), - Height: uint16(r.Height), - } - if err := p.Resize(ws); err != nil { + if err := container.ResizePty(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -485,7 +356,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. sio := p.Stdio() return &taskAPI.StateResponse{ ID: p.ID(), - Bundle: s.bundle, + Bundle: s.container.Bundle, Pid: uint32(p.Pid()), Status: status, Stdin: sio.Stdin, @@ -499,48 +370,41 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Pause(ctx); err != nil { + container, err := s.getContainer() + if err != nil { return nil, err } + if err := container.Pause(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } s.send(&eventstypes.TaskPaused{ - p.ID(), + container.ID, }) return empty, nil } // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Resume(ctx); err != nil { + container, err := s.getContainer() + if err != nil { return nil, err } + if err := container.Resume(ctx); err != nil { + return nil, errdefs.ToGRPC(err) + } s.send(&eventstypes.TaskResumed{ - p.ID(), + container.ID, }) return empty, nil } // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.Kill(ctx, r.Signal, r.All); err != nil { + if err := container.Kill(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil @@ -548,6 +412,10 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp // Pids returns all pids inside the container func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } pids, err := s.getContainerPids(ctx, r.ID) if err != nil { return nil, errdefs.ToGRPC(err) @@ -557,7 +425,7 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi pInfo := task.ProcessInfo{ Pid: pid, } - for _, p := range s.processes { + for _, p := range container.ExecdProcesses() { if p.Pid() == int(pid) { d := &options.ProcessDetails{ ExecID: p.ID(), @@ -579,54 +447,63 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - p, err := s.getProcess(r.ExecID) + container, err := s.getContainer() if err != nil { return nil, err } - if stdin := p.Stdin(); stdin != nil { - if err := stdin.Close(); err != nil { - return nil, errors.Wrap(err, "close stdin") - } + if err := container.CloseIO(ctx, r); err != nil { + return nil, err } return empty, nil } // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + container, err := s.getContainer() + if err != nil { + return nil, err } - var opts options.CheckpointOptions - if r.Options != nil { - v, err := typeurl.UnmarshalAny(r.Options) - if err != nil { - return nil, err - } - opts = *v.(*options.CheckpointOptions) - } - if err := p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{ - Path: r.Path, - Exit: opts.Exit, - AllowOpenTCP: opts.OpenTcp, - AllowExternalUnixSockets: opts.ExternalUnixSockets, - AllowTerminal: opts.Terminal, - FileLocks: opts.FileLocks, - EmptyNamespaces: opts.EmptyNamespaces, - WorkDir: opts.WorkPath, - }); err != nil { + if err := container.Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } return empty, nil } +// Update a running container +func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } + if err := container.Update(ctx, r); err != nil { + return nil, errdefs.ToGRPC(err) + } + return empty, nil +} + +// Wait for a process to exit +func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } + p := container.Process(r.ExecID) + if p == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") + } + p.Wait() + + return &taskAPI.WaitResponse{ + ExitStatus: uint32(p.ExitStatus()), + ExitedAt: p.ExitedAt(), + }, nil +} + // Connect returns shim information such as the shim's pid func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) { var pid int - if s.task != nil { - pid = s.task.Pid() + if s.container != nil { + pid = s.container.Pid() } return &taskAPI.ConnectResponse{ ShimPid: uint32(os.Getpid()), @@ -641,7 +518,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - cg := s.getCgroup() + cg := s.container.Cgroup() if cg == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") } @@ -658,37 +535,6 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. }, nil } -// Update a running container -func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil { - return nil, errdefs.ToGRPC(err) - } - return empty, nil -} - -// Wait for a process to exit -func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - p, err := s.getProcess(r.ExecID) - if err != nil { - return nil, err - } - if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") - } - p.Wait() - - return &taskAPI.WaitResponse{ - ExitStatus: uint32(p.ExitStatus()), - ExitedAt: p.ExitedAt(), - }, nil -} - func (s *service) processExits() { for e := range s.ec { s.checkProcesses(e) @@ -706,12 +552,14 @@ func (s *service) sendL(evt interface{}) { } func (s *service) checkProcesses(e runcC.Exit) { - shouldKillAll, err := shouldKillAllOnExit(s.bundle) + s.mu.Lock() + defer s.mu.Unlock() + shouldKillAll, err := shouldKillAllOnExit(s.container.Bundle) if err != nil { log.G(s.context).WithError(err).Error("failed to check shouldKillAll") } - for _, p := range s.allProcesses() { + for _, p := range s.container.All() { if p.Pid() == e.Pid { if shouldKillAll { if ip, ok := p.(*proc.Init); ok { @@ -754,24 +602,8 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) { return true, nil } -func (s *service) allProcesses() (o []rproc.Process) { - s.mu.Lock() - defer s.mu.Unlock() - - o = make([]rproc.Process, 0, len(s.processes)+1) - for _, p := range s.processes { - o = append(o, p) - } - if s.task != nil { - o = append(o, s.task) - } - return o -} - func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { - s.mu.Lock() - p := s.task - s.mu.Unlock() + p := s.container.Process("") if p == nil { return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -789,7 +621,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *service) forward(publisher events.Publisher) { for e := range s.events { ctx, cancel := context.WithTimeout(s.context, 5*time.Second) - err := publisher.Publish(ctx, getTopic(e), e) + err := publisher.Publish(ctx, runc.GetTopic(e), e) cancel() if err != nil { logrus.WithError(err).Error("post event") @@ -797,84 +629,38 @@ func (s *service) forward(publisher events.Publisher) { } } -func (s *service) getProcess(execID string) (rproc.Process, error) { +func (s *service) getContainer() (*runc.Container, error) { s.mu.Lock() - defer s.mu.Unlock() - if execID == "" { - return s.task, nil + container := s.container + s.mu.Unlock() + if container == nil { + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "container not created") } - p := s.processes[execID] + return container, nil +} + +func (s *service) getProcess(execID string) (rproc.Process, error) { + container, err := s.getContainer() + if err != nil { + return nil, err + } + p := container.Process(execID) if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID) } return p, nil } -func (s *service) getCgroup() cgroups.Cgroup { - s.mu.Lock() - defer s.mu.Unlock() - return s.cg -} - -func (s *service) setCgroup(cg cgroups.Cgroup) { - s.mu.Lock() - s.cg = cg - s.mu.Unlock() - if err := s.ep.add(s.id, cg); err != nil { - logrus.WithError(err).Error("add cg to OOM monitor") +// initialize a single epoll fd to manage our consoles. `initPlatform` should +// only be called once. +func (s *service) initPlatform() error { + if s.platform != nil { + return nil } -} - -func getTopic(e interface{}) string { - switch e.(type) { - case *eventstypes.TaskCreate: - return runtime.TaskCreateEventTopic - case *eventstypes.TaskStart: - return runtime.TaskStartEventTopic - case *eventstypes.TaskOOM: - return runtime.TaskOOMEventTopic - case *eventstypes.TaskExit: - return runtime.TaskExitEventTopic - case *eventstypes.TaskDelete: - return runtime.TaskDeleteEventTopic - case *eventstypes.TaskExecAdded: - return runtime.TaskExecAddedEventTopic - case *eventstypes.TaskExecStarted: - return runtime.TaskExecStartedEventTopic - case *eventstypes.TaskPaused: - return runtime.TaskPausedEventTopic - case *eventstypes.TaskResumed: - return runtime.TaskResumedEventTopic - case *eventstypes.TaskCheckpointed: - return runtime.TaskCheckpointedEventTopic - default: - logrus.Warnf("no topic for type %#v", e) + p, err := runc.NewPlatform() + if err != nil { + return err } - return runtime.TaskUnknownTopic -} - -func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform, r *proc.CreateConfig, options *options.Options) (*proc.Init, error) { - rootfs := filepath.Join(path, "rootfs") - runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup) - p := proc.New(r.ID, runtime, rproc.Stdio{ - Stdin: r.Stdin, - Stdout: r.Stdout, - Stderr: r.Stderr, - Terminal: r.Terminal, - }) - p.Bundle = r.Bundle - p.Platform = platform - p.Rootfs = rootfs - p.WorkDir = workDir - p.IoUID = int(options.IoUid) - p.IoGID = int(options.IoGid) - p.NoPivotRoot = options.NoPivotRoot - p.NoNewKeyring = options.NoNewKeyring - p.CriuWorkPath = options.CriuWorkPath - if p.CriuWorkPath == "" { - // if criu work path not set, use container WorkDir - p.CriuWorkPath = p.WorkDir - } - - return p, nil + s.platform = p + return nil }