diff --git a/linux/runtime.go b/linux/runtime.go index 6f6567aaa..f1764ca71 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -251,7 +251,9 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } // after the task is created, add it to the monitor if err = r.monitor.Monitor(t); err != nil { - r.tasks.Delete(ctx, t) + if _, err := r.Delete(ctx, t); err != nil { + log.G(ctx).WithError(err).Error("deleting task after failed monitor") + } return nil, err } return t, nil @@ -269,6 +271,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er if err := r.monitor.Stop(lc); err != nil { return nil, err } + rsp, err := lc.shim.Delete(ctx, empty) if err != nil { return nil, errdefs.FromGRPC(err) diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 5ef16c0c3..2803c1303 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -7,9 +7,11 @@ import ( eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" metrics "github.com/docker/go-metrics" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -54,7 +56,7 @@ func (m *cgroupsMonitor) Monitor(c runtime.Task) error { } cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(state.Pid))) if err != nil { - return err + return errors.Wrapf(err, "load cgroup for %d", state.Pid) } if err := m.collector.Add(info.ID, info.Namespace, cg); err != nil { return err @@ -68,8 +70,9 @@ func (m *cgroupsMonitor) Stop(c runtime.Task) error { return nil } -func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) { - if err := m.publisher.Publish(m.context, runtime.TaskOOMEventTopic, &eventsapi.TaskOOM{ +func (m *cgroupsMonitor) trigger(id, namespace string, cg cgroups.Cgroup) { + ctx := namespaces.WithNamespace(m.context, namespace) + if err := m.publisher.Publish(ctx, runtime.TaskOOMEventTopic, &eventsapi.TaskOOM{ ContainerID: id, }); err != nil { log.G(m.context).WithError(err).Error("post OOM event") diff --git a/metrics/cgroups/metrics.go b/metrics/cgroups/metrics.go index e3d0f5922..e47696467 100644 --- a/metrics/cgroups/metrics.go +++ b/metrics/cgroups/metrics.go @@ -21,7 +21,7 @@ var ( // Trigger will be called when an event happens and provides the cgroup // where the event originated from -type Trigger func(string, cgroups.Cgroup) +type Trigger func(string, string, cgroups.Cgroup) // New registers the Collector with the provided namespace and returns it so // that cgroups can be added for collection diff --git a/metrics/cgroups/oom.go b/metrics/cgroups/oom.go index 47d979f58..39989d52d 100644 --- a/metrics/cgroups/oom.go +++ b/metrics/cgroups/oom.go @@ -4,11 +4,13 @@ package cgroups import ( "sync" + "sync/atomic" "golang.org/x/sys/unix" "github.com/containerd/cgroups" metrics "github.com/docker/go-metrics" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -18,10 +20,11 @@ func NewOOMCollector(ns *metrics.Namespace) (*OOMCollector, error) { return nil, err } c := &OOMCollector{ - fd: fd, - memoryOOM: ns.NewLabeledGauge("memory_oom", "The number of times a container received an oom event", metrics.Total, "container_id", "namespace"), - set: make(map[uintptr]*oom), + fd: fd, + desc: ns.NewDesc("memory_oom", "The number of times a container has received an oom event", metrics.Total, "container_id", "namespace"), + set: make(map[uintptr]*oom), } + ns.Add(c) go c.start() return c, nil } @@ -29,9 +32,9 @@ func NewOOMCollector(ns *metrics.Namespace) (*OOMCollector, error) { type OOMCollector struct { mu sync.Mutex - memoryOOM metrics.LabeledGauge - fd int - set map[uintptr]*oom + desc *prometheus.Desc + fd int + set map[uintptr]*oom } type oom struct { @@ -39,6 +42,7 @@ type oom struct { namespace string c cgroups.Cgroup triggers []Trigger + count int64 } func (o *OOMCollector) Add(id, namespace string, cg cgroups.Cgroup, triggers ...Trigger) error { @@ -54,16 +58,24 @@ func (o *OOMCollector) Add(id, namespace string, cg cgroups.Cgroup, triggers ... triggers: triggers, namespace: namespace, } - // set the gauge's default value - o.memoryOOM.WithValues(id, namespace).Set(0) event := unix.EpollEvent{ Fd: int32(fd), Events: unix.EPOLLHUP | unix.EPOLLIN | unix.EPOLLERR, } - if err := unix.EpollCtl(o.fd, unix.EPOLL_CTL_ADD, int(fd), &event); err != nil { - return err + return unix.EpollCtl(o.fd, unix.EPOLL_CTL_ADD, int(fd), &event) +} + +func (o *OOMCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- o.desc +} + +func (o *OOMCollector) Collect(ch chan<- prometheus.Metric) { + o.mu.Lock() + defer o.mu.Unlock() + for _, t := range o.set { + c := atomic.LoadInt64(&t.count) + ch <- prometheus.MustNewConstMetric(o.desc, prometheus.CounterValue, float64(c), t.id, t.namespace) } - return nil } // Close closes the epoll fd @@ -107,14 +119,14 @@ func (o *OOMCollector) process(fd uintptr, event uint32) { unix.Close(int(fd)) return } - o.memoryOOM.WithValues(info.id, info.namespace).Inc(1) + atomic.AddInt64(&info.count, 1) for _, t := range info.triggers { - t(info.id, info.c) + t(info.id, info.namespace, info.c) } } func flush(fd uintptr) error { - buf := make([]byte, 8) - _, err := unix.Read(int(fd), buf) + var buf [8]byte + _, err := unix.Read(int(fd), buf[:]) return err }