Merge pull request #967 from crosbymichael/task-ns

Namespace tasks via runc --root
This commit is contained in:
Kenfe-Mickaël Laventure 2017-06-07 07:31:14 -07:00 committed by GitHub
commit 9c0897d524
6 changed files with 94 additions and 40 deletions

View File

@ -40,6 +40,10 @@ func main() {
Name: "debug",
Usage: "enable debug output in logs",
},
cli.StringFlag{
Name: "namespace,n",
Usage: "namespace that owns the task",
},
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
@ -61,10 +65,11 @@ func main() {
if err != nil {
return err
}
var (
server = grpc.NewServer()
sv = shim.New(path)
)
server := grpc.NewServer()
sv, err := shim.New(path, context.GlobalString("namespace"))
if err != nil {
return err
}
logrus.Debug("registering grpc server")
shimapi.RegisterShimServer(server, sv)
if err := serve(server, "shim.sock"); err != nil {

View File

@ -16,7 +16,9 @@ import (
"github.com/containerd/containerd/api/services/shim"
"github.com/containerd/containerd/api/types/mount"
"github.com/containerd/containerd/api/types/task"
shimb "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
runc "github.com/containerd/go-runc"
@ -87,11 +89,15 @@ type Runtime struct {
}
func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) {
path, err := r.newBundle(id, opts.Spec)
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
s, err := newShim(r.shim, path, r.remote)
path, err := r.newBundle(namespace, id, opts.Spec)
if err != nil {
return nil, err
}
s, err := newShim(r.shim, path, namespace, r.remote)
if err != nil {
os.RemoveAll(path)
return nil, err
@ -136,6 +142,10 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts)
}
func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
lc, ok := c.(*Task)
if !ok {
return nil, fmt.Errorf("container cannot be cast as *linux.Container")
@ -153,7 +163,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro
return &plugin.Exit{
Status: rsp.ExitStatus,
Timestamp: rsp.ExitedAt,
}, r.deleteBundle(lc.containerID)
}, r.deleteBundle(namespace, lc.containerID)
}
func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
@ -162,6 +172,25 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
return nil, err
}
var o []plugin.Task
for _, fi := range dir {
if !fi.IsDir() {
continue
}
tasks, err := r.loadContainers(ctx, fi.Name())
if err != nil {
return nil, err
}
o = append(o, tasks...)
}
return o, nil
}
func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, error) {
dir, err := ioutil.ReadDir(filepath.Join(r.root, ns))
if err != nil {
return nil, err
}
var o []plugin.Task
for _, fi := range dir {
if !fi.IsDir() {
continue
@ -169,12 +198,12 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) {
id := fi.Name()
// TODO: optimize this if it is call frequently to list all containers
// i.e. dont' reconnect to the the shim's ever time
c, err := r.loadContainer(filepath.Join(r.root, id))
c, err := r.loadContainer(ctx, filepath.Join(r.root, ns, id))
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to load container %s", id)
log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id)
// if we fail to load the container, connect to the shim, make sure if the shim has
// been killed and cleanup the resources still being held by the container
r.killContainer(ctx, id)
r.killContainer(ctx, ns, id)
continue
}
o = append(o, c)
@ -229,8 +258,12 @@ func (r *Runtime) forward(events shim.Shim_EventsClient) {
}
}
func (r *Runtime) newBundle(id string, spec []byte) (string, error) {
path := filepath.Join(r.root, id)
func (r *Runtime) newBundle(namespace, id string, spec []byte) (string, error) {
path := filepath.Join(r.root, namespace)
if err := os.MkdirAll(path, 0700); err != nil {
return "", err
}
path = filepath.Join(path, id)
if err := os.Mkdir(path, 0700); err != nil {
return "", err
}
@ -246,26 +279,27 @@ func (r *Runtime) newBundle(id string, spec []byte) (string, error) {
return path, err
}
func (r *Runtime) deleteBundle(id string) error {
return os.RemoveAll(filepath.Join(r.root, id))
func (r *Runtime) deleteBundle(namespace, id string) error {
return os.RemoveAll(filepath.Join(r.root, namespace, id))
}
func (r *Runtime) loadContainer(path string) (*Task, error) {
id := filepath.Base(path)
s, err := loadShim(path, r.remote)
func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
id := filepath.Base(path)
s, err := loadShim(path, namespace, r.remote)
if err != nil {
return nil, err
}
if err = r.handleEvents(s); err != nil {
return nil, err
}
data, err := ioutil.ReadFile(filepath.Join(path, configFilename))
if err != nil {
return nil, err
}
return &Task{
containerID: id,
shim: s,
@ -275,13 +309,14 @@ func (r *Runtime) loadContainer(path string) (*Task, error) {
// killContainer is used whenever the runtime fails to connect to a shim (it died)
// and needs to cleanup the container resources in the underlying runtime (runc, etc...)
func (r *Runtime) killContainer(ctx context.Context, id string) {
func (r *Runtime) killContainer(ctx context.Context, ns, id string) {
log.G(ctx).Debug("terminating container after failed load")
runtime := &runc.Runc{
// TODO: should we get Command provided for initial container creation?
Command: r.runtime,
LogFormat: runc.JSON,
PdeathSignal: unix.SIGKILL,
Root: filepath.Join(shimb.RuncRoot, ns),
}
if err := runtime.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{
All: true,
@ -302,10 +337,10 @@ func (r *Runtime) killContainer(ctx context.Context, id string) {
if err := runtime.Delete(ctx, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container %s", id)
}
// try to unmount the rootfs is it was not held by an external shim
unix.Unmount(filepath.Join(r.root, id, "rootfs"), 0)
// try to unmount the rootfs in case it was not owned by an external mount namespace
unix.Unmount(filepath.Join(r.root, ns, id, "rootfs"), 0)
// remove container bundle
if err := r.deleteBundle(id); err != nil {
if err := r.deleteBundle(ns, id); err != nil {
log.G(ctx).WithError(err).Warnf("delete container bundle %s", id)
}
}

View File

@ -22,16 +22,16 @@ import (
"github.com/pkg/errors"
)
func newShim(shimName string, path string, remote bool) (shim.ShimClient, error) {
func newShim(shimName string, path, namespace string, remote bool) (shim.ShimClient, error) {
if !remote {
return localShim.Client(path)
return localShim.Client(path, namespace)
}
socket := filepath.Join(path, "shim.sock")
l, err := sys.CreateUnixSocket(socket)
if err != nil {
return nil, err
}
cmd := exec.Command(shimName)
cmd := exec.Command(shimName, "--namespace", namespace)
cmd.Dir = path
f, err := l.(*net.UnixListener).File()
if err != nil {
@ -57,9 +57,9 @@ func newShim(shimName string, path string, remote bool) (shim.ShimClient, error)
return connectShim(socket)
}
func loadShim(path string, remote bool) (shim.ShimClient, error) {
func loadShim(path, namespace string, remote bool) (shim.ShimClient, error) {
if !remote {
return localShim.Client(path)
return localShim.Client(path, namespace)
}
socket := filepath.Join(path, "shim.sock")
return connectShim(socket)

View File

@ -4,6 +4,7 @@ package shim
import (
"path/filepath"
"syscall"
shimapi "github.com/containerd/containerd/api/services/shim"
"github.com/containerd/containerd/api/types/task"
@ -15,23 +16,28 @@ import (
"google.golang.org/grpc/metadata"
)
func Client(path string) (shimapi.ShimClient, error) {
func Client(path, namespace string) (shimapi.ShimClient, error) {
pid, err := runc.ReadPidFile(filepath.Join(path, "init.pid"))
if err != nil {
return nil, err
}
cl := &client{
s: New(path),
s, err := New(path, namespace)
if err != nil {
return nil, err
}
// used when quering container status and info
cl := &client{
s: s,
}
// used when quering container status and info
cl.s.initProcess = &initProcess{
id: filepath.Base(path),
pid: pid,
runc: &runc.Runc{
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(RuncRoot, namespace),
},
}
return cl, nil

View File

@ -49,7 +49,7 @@ type initProcess struct {
terminal bool
}
func newInitProcess(context context.Context, path string, r *shimapi.CreateRequest) (*initProcess, error) {
func newInitProcess(context context.Context, path, namespace string, r *shimapi.CreateRequest) (*initProcess, error) {
for _, rm := range r.Rootfs {
m := &mount.Mount{
Type: rm.Type,
@ -65,6 +65,7 @@ func newInitProcess(context context.Context, path string, r *shimapi.CreateReque
Log: filepath.Join(path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(RuncRoot, namespace),
}
p := &initProcess{
id: r.ID,

View File

@ -20,13 +20,19 @@ import (
var empty = &google_protobuf.Empty{}
const RuncRoot = "/run/containerd/runc"
// New returns a new shim service that can be used via GRPC
func New(path string) *Service {
func New(path, namespace string) (*Service, error) {
if namespace == "" {
return nil, fmt.Errorf("shim namespace cannot be empty")
}
return &Service{
path: path,
processes: make(map[int]process),
events: make(chan *task.Event, 4096),
}
namespace: namespace,
}, nil
}
type Service struct {
@ -40,10 +46,11 @@ type Service struct {
eventsMu sync.Mutex
deferredEvent *task.Event
execID int
namespace string
}
func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) {
process, err := newInitProcess(ctx, s.path, r)
process, err := newInitProcess(ctx, s.path, s.namespace, r)
if err != nil {
return nil, err
}