Refactor runtime code for code sharing

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2019-07-02 15:51:08 -04:00
parent 2aa8780ce6
commit 6601b406b7
20 changed files with 203 additions and 186 deletions

View File

@@ -30,8 +30,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/typeurl"
@@ -40,7 +40,7 @@ import (
)
// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTaskRequest) (*Container, error) {
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (*Container, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, errors.Wrap(err, "create namespace")
@@ -55,9 +55,9 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
opts = *v.(*options.Options)
}
var mounts []proc.Mount
var mounts []process.Mount
for _, m := range r.Rootfs {
mounts = append(mounts, proc.Mount{
mounts = append(mounts, process.Mount{
Type: m.Type,
Source: m.Source,
Target: m.Target,
@@ -73,7 +73,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
}
}
config := &proc.CreateConfig{
config := &process.CreateConfig{
ID: r.ID,
Bundle: r.Bundle,
Runtime: opts.BinaryName,
@@ -108,7 +108,7 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
}
}
process, err := newInit(
p, err := newInit(
ctx,
r.Bundle,
filepath.Join(r.Bundle, "work"),
@@ -121,17 +121,17 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
if err != nil {
return nil, errdefs.ToGRPC(err)
}
if err := process.Create(ctx, config); err != nil {
if err := p.Create(ctx, config); err != nil {
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: process,
processes: make(map[string]rproc.Process),
process: p,
processes: make(map[string]process.Process),
reservedProcess: make(map[string]struct{}),
}
pid := process.Pid()
pid := p.Pid()
if pid > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
@@ -156,10 +156,10 @@ func WriteRuntime(path, runtime string) error {
return ioutil.WriteFile(filepath.Join(path, "runtime"), []byte(runtime), 0600)
}
func newInit(ctx context.Context, path, workDir, namespace string, platform rproc.Platform,
r *proc.CreateConfig, options *options.Options, rootfs string) (*proc.Init, error) {
runtime := proc.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
p := proc.New(r.ID, runtime, rproc.Stdio{
func newInit(ctx context.Context, path, workDir, namespace string, platform stdio.Platform,
r *process.CreateConfig, options *options.Options, rootfs string) (*process.Init, error) {
runtime := process.NewRunc(options.Root, path, namespace, options.BinaryName, options.CriuPath, options.SystemdCgroup)
p := process.New(r.ID, runtime, stdio.Stdio{
Stdin: r.Stdin,
Stdout: r.Stdout,
Stderr: r.Stderr,
@@ -191,13 +191,13 @@ type Container struct {
Bundle string
cgroup cgroups.Cgroup
process rproc.Process
processes map[string]rproc.Process
process process.Process
processes map[string]process.Process
reservedProcess map[string]struct{}
}
// All processes in the container
func (c *Container) All() (o []rproc.Process) {
func (c *Container) All() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -211,7 +211,7 @@ func (c *Container) All() (o []rproc.Process) {
}
// ExecdProcesses added to the container
func (c *Container) ExecdProcesses() (o []rproc.Process) {
func (c *Container) ExecdProcesses() (o []process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
for _, p := range c.processes {
@@ -242,7 +242,7 @@ func (c *Container) CgroupSet(cg cgroups.Cgroup) {
}
// Process returns the process by id
func (c *Container) Process(id string) (rproc.Process, error) {
func (c *Container) Process(id string) (process.Process, error) {
c.mu.Lock()
defer c.mu.Unlock()
if id == "" {
@@ -282,7 +282,7 @@ func (c *Container) ReserveProcess(id string) (bool, func()) {
}
// ProcessAdd adds a new process to the container
func (c *Container) ProcessAdd(process rproc.Process) {
func (c *Container) ProcessAdd(process process.Process) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -298,7 +298,7 @@ func (c *Container) ProcessRemove(id string) {
}
// Start a container process
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Process, error) {
func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
@@ -317,7 +317,7 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (rproc.Proc
}
// Delete the container or a process by id
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Process, error) {
func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (process.Process, error) {
p, err := c.Process(r.ExecID)
if err != nil {
return nil, err
@@ -332,8 +332,8 @@ func (c *Container) Delete(ctx context.Context, r *task.DeleteRequest) (rproc.Pr
}
// Exec an additional process
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc.Process, error) {
process, err := c.process.(*proc.Init).Exec(ctx, c.Bundle, &proc.ExecConfig{
func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (process.Process, error) {
process, err := c.process.(*process.Init).Exec(ctx, c.Bundle, &process.ExecConfig{
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
@@ -350,12 +350,12 @@ func (c *Container) Exec(ctx context.Context, r *task.ExecProcessRequest) (rproc
// Pause the container
func (c *Container) Pause(ctx context.Context) error {
return c.process.(*proc.Init).Pause(ctx)
return c.process.(*process.Init).Pause(ctx)
}
// Resume the container
func (c *Container) Resume(ctx context.Context) error {
return c.process.(*proc.Init).Resume(ctx)
return c.process.(*process.Init).Resume(ctx)
}
// ResizePty of a process
@@ -408,7 +408,7 @@ func (c *Container) Checkpoint(ctx context.Context, r *task.CheckpointTaskReques
}
opts = *v.(*options.CheckpointOptions)
}
return p.(*proc.Init).Checkpoint(ctx, &proc.CheckpointConfig{
return p.(*process.Init).Checkpoint(ctx, &process.CheckpointConfig{
Path: r.Path,
Exit: opts.Exit,
AllowOpenTCP: opts.OpenTcp,
@@ -426,7 +426,7 @@ func (c *Container) Update(ctx context.Context, r *task.UpdateTaskRequest) error
if err != nil {
return err
}
return p.(*proc.Init).Update(ctx, r.Resources)
return p.(*process.Init).Update(ctx, r.Resources)
}
// HasPid returns true if the container owns a specific pid

View File

@@ -1,135 +0,0 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runc
import (
"context"
"sync"
"github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
// NewOOMEpoller returns an epoll implementation that listens to OOM events
// from a container's cgroups.
func NewOOMEpoller(publisher shim.Publisher) (*Epoller, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil {
return nil, err
}
return &Epoller{
fd: fd,
publisher: publisher,
set: make(map[uintptr]*item),
}, nil
}
// Epoller implementation for handling OOM events from a container's cgroup
type Epoller struct {
mu sync.Mutex
fd int
publisher shim.Publisher
set map[uintptr]*item
}
type item struct {
id string
cg cgroups.Cgroup
}
// Close the epoll fd
func (e *Epoller) Close() error {
return unix.Close(e.fd)
}
// Run the epoll loop
func (e *Epoller) Run(ctx context.Context) {
var events [128]unix.EpollEvent
for {
select {
case <-ctx.Done():
e.Close()
return
default:
n, err := unix.EpollWait(e.fd, events[:], -1)
if err != nil {
if err == unix.EINTR {
continue
}
logrus.WithError(err).Error("cgroups: epoll wait")
}
for i := 0; i < n; i++ {
e.process(ctx, uintptr(events[i].Fd))
}
}
}
}
// Add the cgroup to the epoll monitor
func (e *Epoller) Add(id string, cg cgroups.Cgroup) error {
e.mu.Lock()
defer e.mu.Unlock()
fd, err := cg.OOMEventFD()
if err != nil {
return err
}
e.set[fd] = &item{
id: id,
cg: cg,
}
event := unix.EpollEvent{
Fd: int32(fd),
Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR,
}
return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
}
func (e *Epoller) process(ctx context.Context, fd uintptr) {
flush(fd)
e.mu.Lock()
i, ok := e.set[fd]
if !ok {
e.mu.Unlock()
return
}
e.mu.Unlock()
if i.cg.State() == cgroups.Deleted {
e.mu.Lock()
delete(e.set, fd)
e.mu.Unlock()
unix.Close(int(fd))
return
}
if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
ContainerID: i.id,
}); err != nil {
logrus.WithError(err).Error("publish OOM event")
}
}
func flush(fd uintptr) error {
var buf [8]byte
_, err := unix.Read(int(fd), buf[:])
return err
}

View File

@@ -25,7 +25,7 @@ import (
"syscall"
"github.com/containerd/console"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/fifo"
"github.com/pkg/errors"
)
@@ -38,7 +38,7 @@ var bufPool = sync.Pool{
}
// NewPlatform returns a linux platform for use with I/O operations
func NewPlatform() (rproc.Platform, error) {
func NewPlatform() (stdio.Platform, error) {
epoller, err := console.NewEpoller()
if err != nil {
return nil, errors.Wrap(err, "failed to initialize epoller")

View File

@@ -36,8 +36,9 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/pkg/oom"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
@@ -60,7 +61,7 @@ var (
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := runc.NewOOMEpoller(publisher)
ep, err := oom.New(publisher)
if err != nil {
return nil, err
}
@@ -90,9 +91,9 @@ type service struct {
context context.Context
events chan interface{}
platform rproc.Platform
platform stdio.Platform
ec chan runcC.Exit
ep *runc.Epoller
ep *oom.Epoller
id string
container *runc.Container
@@ -209,7 +210,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
if err != nil {
return nil, err
}
r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false)
r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false)
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
@@ -590,7 +591,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
for _, p := range container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*proc.Init); ok {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
@@ -635,7 +636,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}
@@ -670,7 +671,7 @@ func (s *service) getContainer() (*runc.Container, error) {
return container, nil
}
func (s *service) getProcess(execID string) (rproc.Process, error) {
func (s *service) getProcess(execID string) (process.Process, error) {
container, err := s.getContainer()
if err != nil {
return nil, err

View File

@@ -37,8 +37,9 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
rproc "github.com/containerd/containerd/runtime/proc"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/pkg/oom"
"github.com/containerd/containerd/pkg/process"
"github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/runtime/v2/runc"
"github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim"
@@ -73,7 +74,7 @@ type spec struct {
// New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := runc.NewOOMEpoller(publisher)
ep, err := oom.New(publisher)
if err != nil {
return nil, err
}
@@ -104,9 +105,9 @@ type service struct {
context context.Context
events chan interface{}
platform rproc.Platform
platform stdio.Platform
ec chan runcC.Exit
ep *runc.Epoller
ep *oom.Epoller
// id only used in cleanup case
id string
@@ -254,7 +255,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
if err != nil {
return nil, err
}
r := proc.NewRunc(proc.RuncRoot, path, ns, runtime, "", false)
r := process.NewRunc(process.RuncRoot, path, ns, runtime, "", false)
if err := r.Delete(ctx, s.id, &runcC.DeleteOpts{
Force: true,
}); err != nil {
@@ -653,7 +654,7 @@ func (s *service) checkProcesses(e runcC.Exit) {
for _, p := range container.All() {
if p.Pid() == e.Pid {
if shouldKillAll {
if ip, ok := p.(*proc.Init); ok {
if ip, ok := p.(*process.Init); ok {
// Ensure all children are killed
if err := ip.KillAll(s.context); err != nil {
logrus.WithError(err).WithField("id", ip.ID()).
@@ -705,7 +706,7 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
if err != nil {
return nil, errdefs.ToGRPC(err)
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
ps, err := p.(*process.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
}