support cgroup2

* only shim v2 runc v2 ("io.containerd.runc.v2") is supported
* only PID metrics is implemented. Others should be implemented in separate PRs.
* lots of code duplication in v1 metrics and v2 metrics. Dedupe should be separate PR.

Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
This commit is contained in:
Akihiro Suda
2019-11-05 13:31:48 +09:00
parent f01665aa02
commit 8f870c233f
69 changed files with 10619 additions and 160 deletions

View File

@@ -20,11 +20,22 @@ import (
"context"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
"github.com/containerd/containerd/namespaces"
)
// WithNamespaceCgroupDeletion removes the cgroup directory that was created for the namespace
func WithNamespaceCgroupDeletion(ctx context.Context, i *namespaces.DeleteInfo) error {
if cgroups.Mode() == cgroups.Unified {
cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", i.Name)
if err != nil {
if err == cgroupsv2.ErrCgroupDeleted {
return nil
}
return err
}
return cg.Delete()
}
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(i.Name))
if err != nil {
if err == cgroups.ErrCgroupDeleted {

View File

@@ -26,6 +26,7 @@ import (
"sync"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
@@ -133,9 +134,22 @@ func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTa
}
pid := p.Pid()
if pid > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(pid)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
return container, nil
}
cg, err = cgroupsv2.LoadManager("/sys/fs/cgroup", g)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", pid)
}
} else {
cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
}
}
container.cgroup = cg
}
@@ -190,7 +204,8 @@ type Container struct {
// Bundle path
Bundle string
cgroup cgroups.Cgroup
// cgroup is either cgroups.Cgroup or *cgroupsv2.Manager
cgroup interface{}
process process.Process
processes map[string]process.Process
reservedProcess map[string]struct{}
@@ -228,14 +243,14 @@ func (c *Container) Pid() int {
}
// Cgroup of the container
func (c *Container) Cgroup() cgroups.Cgroup {
func (c *Container) Cgroup() interface{} {
c.mu.Lock()
defer c.mu.Unlock()
return c.cgroup
}
// CgroupSet sets the cgroup to the container
func (c *Container) CgroupSet(cg cgroups.Cgroup) {
func (c *Container) CgroupSet(cg interface{}) {
c.mu.Lock()
c.cgroup = cg
c.mu.Unlock()
@@ -307,9 +322,21 @@ func (c *Container) Start(ctx context.Context, r *task.StartRequest) (process.Pr
return nil, err
}
if c.Cgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
var cg interface{}
if cgroups.Mode() == cgroups.Unified {
g, err := cgroupsv2.PidGroupPath(p.Pid())
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
cg, err = cgroupsv2.LoadManager("/sys/fs/cgroup", g)
if err != nil {
logrus.WithError(err).Errorf("loading cgroup2 for %d", p.Pid())
}
} else {
cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
}
}
c.cgroup = cg
}

View File

@@ -271,8 +271,12 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
if cg, ok := container.Cgroup().(cgroups.Cgroup); ok {
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
} else {
logrus.WithError(errdefs.ErrNotImplemented).Error("add cg to OOM monitor")
}
switch r.ExecID {
case "":
@@ -545,7 +549,14 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
cg := s.container.Cgroup()
cgx := s.container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
cg, ok := cgx.(cgroups.Cgroup)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "cgroup v2 not implemented for Stats")
}
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}

View File

@@ -31,6 +31,7 @@ import (
"time"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
@@ -222,14 +223,27 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
}
if opts, ok := v.(*options.Options); ok {
if opts.ShimCgroup != "" {
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
if err != nil {
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
}
if err := cg.Add(cgroups.Process{
Pid: cmd.Process.Pid,
}); err != nil {
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
if cgroups.Mode() == cgroups.Unified {
if err := cgroupsv2.VerifyGroupPath(opts.ShimCgroup); err != nil {
return "", errors.Wrapf(err, "failed to verify cgroup path %q", opts.ShimCgroup)
}
cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", opts.ShimCgroup)
if err != nil {
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
}
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
}
} else {
cg, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(opts.ShimCgroup))
if err != nil {
return "", errors.Wrapf(err, "failed to load cgroup %s", opts.ShimCgroup)
}
if err := cg.Add(cgroups.Process{
Pid: cmd.Process.Pid,
}); err != nil {
return "", errors.Wrapf(err, "failed to join cgroup %s", opts.ShimCgroup)
}
}
}
}
@@ -316,9 +330,18 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.eventSendMu.Unlock()
return nil, errdefs.ToGRPC(err)
}
if err := s.ep.Add(container.ID, container.Cgroup()); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
switch cg := container.Cgroup().(type) {
case cgroups.Cgroup:
if err := s.ep.Add(container.ID, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}
case *cgroupsv2.Manager:
// TODO: enable controllers for statting
// OOM monitor is not implemented yet
logrus.WithError(errdefs.ErrNotImplemented).Warn("add cg to OOM monitor")
}
switch r.ExecID {
case "":
s.send(&eventstypes.TaskStart{
@@ -608,15 +631,28 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
if err != nil {
return nil, err
}
cg := container.Cgroup()
if cg == nil {
cgx := container.Cgroup()
if cgx == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
var statsx interface{}
switch cg := cgx.(type) {
case cgroups.Cgroup:
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
statsx = stats
case *cgroupsv2.Manager:
stats, err := cg.Stat()
if err != nil {
return nil, err
}
statsx = stats
default:
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "unsupported cgroup type %T", cg)
}
data, err := typeurl.MarshalAny(stats)
data, err := typeurl.MarshalAny(statsx)
if err != nil {
return nil, err
}