diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go new file mode 100644 index 000000000..e57ec7b4c --- /dev/null +++ b/runtime/v2/runc/epoll.go @@ -0,0 +1,123 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package runc + +import ( + "context" + "sync" + + "github.com/containerd/cgroups" + eventstypes "github.com/containerd/containerd/api/events" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/runtime" + "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" +) + +func newOOMEpoller(publisher events.Publisher) (*epoller, error) { + fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) + if err != nil { + return nil, err + } + return &epoller{ + fd: fd, + publisher: publisher, + set: make(map[uintptr]*item), + }, nil +} + +type epoller struct { + mu sync.Mutex + + fd int + publisher events.Publisher + set map[uintptr]*item +} + +type item struct { + id string + cg cgroups.Cgroup +} + +func (e *epoller) Close() error { + return unix.Close(e.fd) +} + +func (e *epoller) run(ctx context.Context) { + var events [128]unix.EpollEvent + for { + n, err := unix.EpollWait(e.fd, events[:], -1) + if err != nil { + if err == unix.EINTR { + continue + } + logrus.WithError(err).Error("cgroups: epoll wait") + } + for i := 0; i < n; i++ { + e.process(ctx, uintptr(events[i].Fd)) + } + } +} + +func (e *epoller) add(id string, cg cgroups.Cgroup) error { + e.mu.Lock() + defer e.mu.Unlock() + fd, err := cg.OOMEventFD() + if err != nil { + return err + } + e.set[fd] = &item{ + id: id, + cg: cg, + } + event := unix.EpollEvent{ + Fd: int32(fd), + Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR, + } + return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event) +} + +func (e *epoller) process(ctx context.Context, fd uintptr) { + flush(fd) + e.mu.Lock() + i, ok := e.set[fd] + if !ok { + e.mu.Unlock() + return + } + e.mu.Unlock() + if i.cg.State() == cgroups.Deleted { + e.mu.Lock() + delete(e.set, fd) + e.mu.Unlock() + unix.Close(int(fd)) + return + } + if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{ + ContainerID: i.id, + }); err != nil { + logrus.WithError(err).Error("publish OOM event") + } +} + +func flush(fd uintptr) error { + var buf [8]byte + _, err := unix.Read(int(fd), buf[:]) + return err +} diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/service.go index 14a8f5b8c..91a248f6f 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/service.go @@ -64,12 +64,18 @@ var _ = (taskAPI.TaskService)(&service{}) // New returns a new shim service that can be used via GRPC func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { + ep, err := newOOMEpoller(publisher) + if err != nil { + return nil, err + } + go ep.run(ctx) s := &service{ id: id, context: ctx, processes: make(map[string]rproc.Process), events: make(chan interface{}, 128), ec: shim.Default.Subscribe(), + ep: ep, } go s.processExits() runcC.Monitor = shim.Default @@ -90,6 +96,7 @@ type service struct { events chan interface{} platform rproc.Platform ec chan runcC.Exit + ep *epoller id string // Filled by Create() @@ -293,7 +300,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * if err != nil { logrus.WithError(err).Errorf("loading cgroup for %d", pid) } - s.cg = cg + s.setCgroup(cg) } s.task = process return &taskAPI.CreateTaskResponse{ @@ -318,7 +325,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if err != nil { logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) } - s.cg = cg + s.setCgroup(cg) } return &taskAPI.StartResponse{ Pid: uint32(p.Pid()), @@ -708,6 +715,13 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { return p, nil } +func (s *service) setCgroup(cg cgroups.Cgroup) { + s.cg = cg + if err := s.ep.add(s.id, cg); err != nil { + logrus.WithError(err).Error("add cg to OOM monitor") + } +} + func getTopic(ctx context.Context, e interface{}) string { switch e.(type) { case *eventstypes.TaskCreate: