diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index d5b24498f..a92a0e665 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -40,6 +40,10 @@ func main() { Name: "debug", Usage: "enable debug output in logs", }, + cli.StringFlag{ + Name: "namespace,n", + Usage: "namespace that owns the task", + }, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { @@ -61,10 +65,11 @@ func main() { if err != nil { return err } - var ( - server = grpc.NewServer() - sv = shim.New(path) - ) + server := grpc.NewServer() + sv, err := shim.New(path, context.GlobalString("namespace")) + if err != nil { + return err + } logrus.Debug("registering grpc server") shimapi.RegisterShimServer(server, sv) if err := serve(server, "shim.sock"); err != nil { diff --git a/linux/runtime.go b/linux/runtime.go index d4cc07392..423dd2806 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -16,7 +16,9 @@ import ( "github.com/containerd/containerd/api/services/shim" "github.com/containerd/containerd/api/types/mount" "github.com/containerd/containerd/api/types/task" + shimb "github.com/containerd/containerd/linux/shim" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" runc "github.com/containerd/go-runc" @@ -87,11 +89,15 @@ type Runtime struct { } func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) (t plugin.Task, err error) { - path, err := r.newBundle(id, opts.Spec) + namespace, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } - s, err := newShim(r.shim, path, r.remote) + path, err := r.newBundle(namespace, id, opts.Spec) + if err != nil { + return nil, err + } + s, err := newShim(r.shim, path, namespace, r.remote) if err != nil { os.RemoveAll(path) return nil, err @@ -136,6 +142,10 @@ func (r *Runtime) Create(ctx context.Context, id string, opts plugin.CreateOpts) } func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } lc, ok := c.(*Task) if !ok { return nil, fmt.Errorf("container cannot be cast as *linux.Container") @@ -153,7 +163,7 @@ func (r *Runtime) Delete(ctx context.Context, c plugin.Task) (*plugin.Exit, erro return &plugin.Exit{ Status: rsp.ExitStatus, Timestamp: rsp.ExitedAt, - }, r.deleteBundle(lc.containerID) + }, r.deleteBundle(namespace, lc.containerID) } func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { @@ -162,6 +172,25 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { return nil, err } var o []plugin.Task + for _, fi := range dir { + if !fi.IsDir() { + continue + } + tasks, err := r.loadContainers(ctx, fi.Name()) + if err != nil { + return nil, err + } + o = append(o, tasks...) + } + return o, nil +} + +func (r *Runtime) loadContainers(ctx context.Context, ns string) ([]plugin.Task, error) { + dir, err := ioutil.ReadDir(filepath.Join(r.root, ns)) + if err != nil { + return nil, err + } + var o []plugin.Task for _, fi := range dir { if !fi.IsDir() { continue @@ -169,12 +198,12 @@ func (r *Runtime) Tasks(ctx context.Context) ([]plugin.Task, error) { id := fi.Name() // TODO: optimize this if it is call frequently to list all containers // i.e. dont' reconnect to the the shim's ever time - c, err := r.loadContainer(filepath.Join(r.root, id)) + c, err := r.loadContainer(ctx, filepath.Join(r.root, ns, id)) if err != nil { - log.G(ctx).WithError(err).Warnf("failed to load container %s", id) + log.G(ctx).WithError(err).Warnf("failed to load container %s/%s", ns, id) // if we fail to load the container, connect to the shim, make sure if the shim has // been killed and cleanup the resources still being held by the container - r.killContainer(ctx, id) + r.killContainer(ctx, ns, id) continue } o = append(o, c) @@ -229,8 +258,12 @@ func (r *Runtime) forward(events shim.Shim_EventsClient) { } } -func (r *Runtime) newBundle(id string, spec []byte) (string, error) { - path := filepath.Join(r.root, id) +func (r *Runtime) newBundle(namespace, id string, spec []byte) (string, error) { + path := filepath.Join(r.root, namespace) + if err := os.MkdirAll(path, 0700); err != nil { + return "", err + } + path = filepath.Join(path, id) if err := os.Mkdir(path, 0700); err != nil { return "", err } @@ -246,26 +279,27 @@ func (r *Runtime) newBundle(id string, spec []byte) (string, error) { return path, err } -func (r *Runtime) deleteBundle(id string) error { - return os.RemoveAll(filepath.Join(r.root, id)) +func (r *Runtime) deleteBundle(namespace, id string) error { + return os.RemoveAll(filepath.Join(r.root, namespace, id)) } -func (r *Runtime) loadContainer(path string) (*Task, error) { - id := filepath.Base(path) - s, err := loadShim(path, r.remote) +func (r *Runtime) loadContainer(ctx context.Context, path string) (*Task, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + id := filepath.Base(path) + s, err := loadShim(path, namespace, r.remote) if err != nil { return nil, err } - if err = r.handleEvents(s); err != nil { return nil, err } - data, err := ioutil.ReadFile(filepath.Join(path, configFilename)) if err != nil { return nil, err } - return &Task{ containerID: id, shim: s, @@ -275,13 +309,14 @@ func (r *Runtime) loadContainer(path string) (*Task, error) { // killContainer is used whenever the runtime fails to connect to a shim (it died) // and needs to cleanup the container resources in the underlying runtime (runc, etc...) -func (r *Runtime) killContainer(ctx context.Context, id string) { +func (r *Runtime) killContainer(ctx context.Context, ns, id string) { log.G(ctx).Debug("terminating container after failed load") runtime := &runc.Runc{ // TODO: should we get Command provided for initial container creation? Command: r.runtime, LogFormat: runc.JSON, PdeathSignal: unix.SIGKILL, + Root: filepath.Join(shimb.RuncRoot, ns), } if err := runtime.Kill(ctx, id, int(unix.SIGKILL), &runc.KillOpts{ All: true, @@ -302,10 +337,10 @@ func (r *Runtime) killContainer(ctx context.Context, id string) { if err := runtime.Delete(ctx, id); err != nil { log.G(ctx).WithError(err).Warnf("delete container %s", id) } - // try to unmount the rootfs is it was not held by an external shim - unix.Unmount(filepath.Join(r.root, id, "rootfs"), 0) + // try to unmount the rootfs in case it was not owned by an external mount namespace + unix.Unmount(filepath.Join(r.root, ns, id, "rootfs"), 0) // remove container bundle - if err := r.deleteBundle(id); err != nil { + if err := r.deleteBundle(ns, id); err != nil { log.G(ctx).WithError(err).Warnf("delete container bundle %s", id) } } diff --git a/linux/shim.go b/linux/shim.go index 0edc6270b..b30f44509 100644 --- a/linux/shim.go +++ b/linux/shim.go @@ -22,16 +22,16 @@ import ( "github.com/pkg/errors" ) -func newShim(shimName string, path string, remote bool) (shim.ShimClient, error) { +func newShim(shimName string, path, namespace string, remote bool) (shim.ShimClient, error) { if !remote { - return localShim.Client(path) + return localShim.Client(path, namespace) } socket := filepath.Join(path, "shim.sock") l, err := sys.CreateUnixSocket(socket) if err != nil { return nil, err } - cmd := exec.Command(shimName) + cmd := exec.Command(shimName, "--namespace", namespace) cmd.Dir = path f, err := l.(*net.UnixListener).File() if err != nil { @@ -57,9 +57,9 @@ func newShim(shimName string, path string, remote bool) (shim.ShimClient, error) return connectShim(socket) } -func loadShim(path string, remote bool) (shim.ShimClient, error) { +func loadShim(path, namespace string, remote bool) (shim.ShimClient, error) { if !remote { - return localShim.Client(path) + return localShim.Client(path, namespace) } socket := filepath.Join(path, "shim.sock") return connectShim(socket) diff --git a/linux/shim/client.go b/linux/shim/client.go index d0e0f4858..fb3a373e6 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -4,6 +4,7 @@ package shim import ( "path/filepath" + "syscall" shimapi "github.com/containerd/containerd/api/services/shim" "github.com/containerd/containerd/api/types/task" @@ -15,23 +16,28 @@ import ( "google.golang.org/grpc/metadata" ) -func Client(path string) (shimapi.ShimClient, error) { +func Client(path, namespace string) (shimapi.ShimClient, error) { pid, err := runc.ReadPidFile(filepath.Join(path, "init.pid")) if err != nil { return nil, err } - cl := &client{ - s: New(path), + s, err := New(path, namespace) + if err != nil { + return nil, err } - - // used when quering container status and info + cl := &client{ + s: s, + } + // used when quering container status and info cl.s.initProcess = &initProcess{ id: filepath.Base(path), pid: pid, runc: &runc.Runc{ - Log: filepath.Join(path, "log.json"), - LogFormat: runc.JSON, + Log: filepath.Join(path, "log.json"), + LogFormat: runc.JSON, + PdeathSignal: syscall.SIGKILL, + Root: filepath.Join(RuncRoot, namespace), }, } return cl, nil diff --git a/linux/shim/init.go b/linux/shim/init.go index 60cdd57f8..85543b700 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -49,7 +49,7 @@ type initProcess struct { terminal bool } -func newInitProcess(context context.Context, path string, r *shimapi.CreateRequest) (*initProcess, error) { +func newInitProcess(context context.Context, path, namespace string, r *shimapi.CreateRequest) (*initProcess, error) { for _, rm := range r.Rootfs { m := &mount.Mount{ Type: rm.Type, @@ -65,6 +65,7 @@ func newInitProcess(context context.Context, path string, r *shimapi.CreateReque Log: filepath.Join(path, "log.json"), LogFormat: runc.JSON, PdeathSignal: syscall.SIGKILL, + Root: filepath.Join(RuncRoot, namespace), } p := &initProcess{ id: r.ID, diff --git a/linux/shim/service.go b/linux/shim/service.go index 3983f6f7a..7c964d3b1 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -20,13 +20,19 @@ import ( var empty = &google_protobuf.Empty{} +const RuncRoot = "/run/containerd/runc" + // New returns a new shim service that can be used via GRPC -func New(path string) *Service { +func New(path, namespace string) (*Service, error) { + if namespace == "" { + return nil, fmt.Errorf("shim namespace cannot be empty") + } return &Service{ path: path, processes: make(map[int]process), events: make(chan *task.Event, 4096), - } + namespace: namespace, + }, nil } type Service struct { @@ -40,10 +46,11 @@ type Service struct { eventsMu sync.Mutex deferredEvent *task.Event execID int + namespace string } func (s *Service) Create(ctx context.Context, r *shimapi.CreateRequest) (*shimapi.CreateResponse, error) { - process, err := newInitProcess(ctx, s.path, r) + process, err := newInitProcess(ctx, s.path, s.namespace, r) if err != nil { return nil, err }