Add oom events to shim
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
		
							
								
								
									
										123
									
								
								runtime/v2/runc/epoll.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								runtime/v2/runc/epoll.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,123 @@
 | 
				
			|||||||
 | 
					// +build linux
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					   Copyright The containerd Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					   you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					   You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					       http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					   Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					   distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					   See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					   limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package runc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/containerd/cgroups"
 | 
				
			||||||
 | 
						eventstypes "github.com/containerd/containerd/api/events"
 | 
				
			||||||
 | 
						"github.com/containerd/containerd/events"
 | 
				
			||||||
 | 
						"github.com/containerd/containerd/runtime"
 | 
				
			||||||
 | 
						"github.com/sirupsen/logrus"
 | 
				
			||||||
 | 
						"golang.org/x/sys/unix"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newOOMEpoller(publisher events.Publisher) (*epoller, error) {
 | 
				
			||||||
 | 
						fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &epoller{
 | 
				
			||||||
 | 
							fd:        fd,
 | 
				
			||||||
 | 
							publisher: publisher,
 | 
				
			||||||
 | 
							set:       make(map[uintptr]*item),
 | 
				
			||||||
 | 
						}, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type epoller struct {
 | 
				
			||||||
 | 
						mu sync.Mutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fd        int
 | 
				
			||||||
 | 
						publisher events.Publisher
 | 
				
			||||||
 | 
						set       map[uintptr]*item
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type item struct {
 | 
				
			||||||
 | 
						id string
 | 
				
			||||||
 | 
						cg cgroups.Cgroup
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (e *epoller) Close() error {
 | 
				
			||||||
 | 
						return unix.Close(e.fd)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (e *epoller) run(ctx context.Context) {
 | 
				
			||||||
 | 
						var events [128]unix.EpollEvent
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							n, err := unix.EpollWait(e.fd, events[:], -1)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								if err == unix.EINTR {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								logrus.WithError(err).Error("cgroups: epoll wait")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for i := 0; i < n; i++ {
 | 
				
			||||||
 | 
								e.process(ctx, uintptr(events[i].Fd))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (e *epoller) add(id string, cg cgroups.Cgroup) error {
 | 
				
			||||||
 | 
						e.mu.Lock()
 | 
				
			||||||
 | 
						defer e.mu.Unlock()
 | 
				
			||||||
 | 
						fd, err := cg.OOMEventFD()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						e.set[fd] = &item{
 | 
				
			||||||
 | 
							id: id,
 | 
				
			||||||
 | 
							cg: cg,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						event := unix.EpollEvent{
 | 
				
			||||||
 | 
							Fd:     int32(fd),
 | 
				
			||||||
 | 
							Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return unix.EpollCtl(e.fd, unix.EPOLL_CTL_ADD, int(fd), &event)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (e *epoller) process(ctx context.Context, fd uintptr) {
 | 
				
			||||||
 | 
						flush(fd)
 | 
				
			||||||
 | 
						e.mu.Lock()
 | 
				
			||||||
 | 
						i, ok := e.set[fd]
 | 
				
			||||||
 | 
						if !ok {
 | 
				
			||||||
 | 
							e.mu.Unlock()
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						e.mu.Unlock()
 | 
				
			||||||
 | 
						if i.cg.State() == cgroups.Deleted {
 | 
				
			||||||
 | 
							e.mu.Lock()
 | 
				
			||||||
 | 
							delete(e.set, fd)
 | 
				
			||||||
 | 
							e.mu.Unlock()
 | 
				
			||||||
 | 
							unix.Close(int(fd))
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := e.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventstypes.TaskOOM{
 | 
				
			||||||
 | 
							ContainerID: i.id,
 | 
				
			||||||
 | 
						}); err != nil {
 | 
				
			||||||
 | 
							logrus.WithError(err).Error("publish OOM event")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func flush(fd uintptr) error {
 | 
				
			||||||
 | 
						var buf [8]byte
 | 
				
			||||||
 | 
						_, err := unix.Read(int(fd), buf[:])
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -64,12 +64,18 @@ var _ = (taskAPI.TaskService)(&service{})
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// New returns a new shim service that can be used via GRPC
 | 
					// New returns a new shim service that can be used via GRPC
 | 
				
			||||||
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
 | 
					func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
 | 
				
			||||||
 | 
						ep, err := newOOMEpoller(publisher)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						go ep.run(ctx)
 | 
				
			||||||
	s := &service{
 | 
						s := &service{
 | 
				
			||||||
		id:        id,
 | 
							id:        id,
 | 
				
			||||||
		context:   ctx,
 | 
							context:   ctx,
 | 
				
			||||||
		processes: make(map[string]rproc.Process),
 | 
							processes: make(map[string]rproc.Process),
 | 
				
			||||||
		events:    make(chan interface{}, 128),
 | 
							events:    make(chan interface{}, 128),
 | 
				
			||||||
		ec:        shim.Default.Subscribe(),
 | 
							ec:        shim.Default.Subscribe(),
 | 
				
			||||||
 | 
							ep:        ep,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	go s.processExits()
 | 
						go s.processExits()
 | 
				
			||||||
	runcC.Monitor = shim.Default
 | 
						runcC.Monitor = shim.Default
 | 
				
			||||||
@@ -90,6 +96,7 @@ type service struct {
 | 
				
			|||||||
	events    chan interface{}
 | 
						events    chan interface{}
 | 
				
			||||||
	platform  rproc.Platform
 | 
						platform  rproc.Platform
 | 
				
			||||||
	ec        chan runcC.Exit
 | 
						ec        chan runcC.Exit
 | 
				
			||||||
 | 
						ep        *epoller
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	id string
 | 
						id string
 | 
				
			||||||
	// Filled by Create()
 | 
						// Filled by Create()
 | 
				
			||||||
@@ -293,7 +300,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logrus.WithError(err).Errorf("loading cgroup for %d", pid)
 | 
								logrus.WithError(err).Errorf("loading cgroup for %d", pid)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		s.cg = cg
 | 
							s.setCgroup(cg)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	s.task = process
 | 
						s.task = process
 | 
				
			||||||
	return &taskAPI.CreateTaskResponse{
 | 
						return &taskAPI.CreateTaskResponse{
 | 
				
			||||||
@@ -318,7 +325,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
 | 
								logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		s.cg = cg
 | 
							s.setCgroup(cg)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return &taskAPI.StartResponse{
 | 
						return &taskAPI.StartResponse{
 | 
				
			||||||
		Pid: uint32(p.Pid()),
 | 
							Pid: uint32(p.Pid()),
 | 
				
			||||||
@@ -708,6 +715,13 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
 | 
				
			|||||||
	return p, nil
 | 
						return p, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s *service) setCgroup(cg cgroups.Cgroup) {
 | 
				
			||||||
 | 
						s.cg = cg
 | 
				
			||||||
 | 
						if err := s.ep.add(s.id, cg); err != nil {
 | 
				
			||||||
 | 
							logrus.WithError(err).Error("add cg to OOM monitor")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getTopic(ctx context.Context, e interface{}) string {
 | 
					func getTopic(ctx context.Context, e interface{}) string {
 | 
				
			||||||
	switch e.(type) {
 | 
						switch e.(type) {
 | 
				
			||||||
	case *eventstypes.TaskCreate:
 | 
						case *eventstypes.TaskCreate:
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user