Implement CRI container and pods stats

See https://kep.k8s.io/2371

* Implement new CRI RPCs - `ListPodSandboxStats` and `PodSandboxStats`
  * `ListPodSandboxStats` and `PodSandboxStats` which return stats about
    pod sandbox. To obtain pod sandbox stats, underlying metrics are
    read from the pod sandbox cgroup parent.
  * Process info is obtained by calling into the underlying task
  * Network stats are taken by looking up network metrics based on the
    pod sandbox network namespace path
* Return more detailed stats for cpu and memory for existing container
  stats. These metrics use the underlying task's metrics to obtain
  stats.

Signed-off-by: David Porter <porterdavid@google.com>
This commit is contained in:
David Porter 2021-10-05 13:28:36 -07:00
parent b69bbe25ac
commit 2e6d5709e3
14 changed files with 1090 additions and 33 deletions

View File

@ -17,6 +17,8 @@
package server
import (
"time"
"github.com/containerd/containerd/api/types"
v1 "github.com/containerd/containerd/metrics/types/v1"
v2 "github.com/containerd/containerd/metrics/types/v2"
@ -25,6 +27,7 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
stats "github.com/containerd/containerd/pkg/cri/store/stats"
)
func (c *criService) containerMetrics(
@ -60,45 +63,89 @@ func (c *criService) containerMetrics(
if err != nil {
return nil, errors.Wrap(err, "failed to extract container metrics")
}
switch metrics := s.(type) {
case *v1.Metrics:
if metrics.CPU != nil && metrics.CPU.Usage != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: stats.Timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, s, stats.Timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain cpu stats")
}
cs.Cpu = cpuStats
memoryStats, err := c.memoryContainerStats(meta.ID, s, stats.Timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain memory stats")
}
if metrics.Memory != nil && metrics.Memory.Usage != nil {
cs.Memory = &runtime.MemoryUsage{
Timestamp: stats.Timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: getWorkingSet(metrics.Memory),
},
}
}
case *v2.Metrics:
if metrics.CPU != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: stats.Timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.UsageUsec * 1000},
}
}
if metrics.Memory != nil {
cs.Memory = &runtime.MemoryUsage{
Timestamp: stats.Timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: getWorkingSetV2(metrics.Memory),
},
}
}
default:
return &cs, errors.Errorf("unexpected metrics type: %v", metrics)
}
cs.Memory = memoryStats
}
return &cs, nil
}
func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
var oldStats *stats.ContainerStats
if isSandbox {
sandbox, err := c.sandboxStore.Get(containerID)
if err != nil {
return 0, errors.Wrapf(err, "failed to get sandbox container: %s", containerID)
}
oldStats = sandbox.Stats
} else {
container, err := c.containerStore.Get(containerID)
if err != nil {
return 0, errors.Wrapf(err, "failed to get container ID: %s", containerID)
}
oldStats = container.Stats
}
if oldStats == nil {
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update sandbox stats container ID: %s", containerID)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update container stats ID: %s", containerID)
}
}
return 0, nil
}
nanoSeconds := currentTimestamp.UnixNano() - oldStats.Timestamp.UnixNano()
// zero or negative interval
if nanoSeconds <= 0 {
return 0, nil
}
newUsageNanoCores := uint64(float64(currentUsageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update sandbox container stats: %s", containerID)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, errors.Wrapf(err, "failed to update container stats ID: %s", containerID)
}
}
return newUsageNanoCores, nil
}
// getWorkingSet calculates workingset memory from cgroup memory stats.
// The caller should make sure memory is not nil.
// workingset = usage - total_inactive_file
@ -123,3 +170,109 @@ func getWorkingSetV2(memory *v2.MemoryStat) uint64 {
}
return workingSet
}
func isMemoryUnlimited(v uint64) bool {
// Size after which we consider memory to be "unlimited". This is not
// MaxInt64 due to rounding by the kernel.
// TODO: k8s or cadvisor should export this https://github.com/google/cadvisor/blob/2b6fbacac7598e0140b5bc8428e3bdd7d86cf5b9/metrics/prometheus.go#L1969-L1971
const maxMemorySize = uint64(1 << 62)
return v > maxMemorySize
}
// https://github.com/kubernetes/kubernetes/blob/b47f8263e18c7b13dba33fba23187e5e0477cdbd/pkg/kubelet/stats/helper.go#L68-L71
func getAvailableBytes(memory *v1.MemoryStat, workingSetBytes uint64) uint64 {
// memory limit - working set bytes
if !isMemoryUnlimited(memory.Usage.Limit) {
return memory.Usage.Limit - workingSetBytes
}
return 0
}
func getAvailableBytesV2(memory *v2.MemoryStat, workingSetBytes uint64) uint64 {
// memory limit (memory.max) for cgroupv2 - working set bytes
if !isMemoryUnlimited(memory.UsageLimit) {
return memory.UsageLimit - workingSetBytes
}
return 0
}
func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) {
switch metrics := stats.(type) {
case *v1.Metrics:
if metrics.CPU != nil && metrics.CPU.Usage != nil {
usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, metrics.CPU.Usage.Total, timestamp)
if err != nil {
return nil, errors.Wrapf(err, "failed to get usage nano cores, containerID: %s", ID)
}
return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil
}
case *v2.Metrics:
if metrics.CPU != nil {
// convert to nano seconds
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000
usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, usageCoreNanoSeconds, timestamp)
if err != nil {
return nil, errors.Wrapf(err, "failed to get usage nano cores, containerID: %s", ID)
}
return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil
}
default:
return nil, errors.Errorf("unexpected metrics type: %v", metrics)
}
return nil, nil
}
func (c *criService) memoryContainerStats(ID string, stats interface{}, timestamp time.Time) (*runtime.MemoryUsage, error) {
switch metrics := stats.(type) {
case *v1.Metrics:
if metrics.Memory != nil && metrics.Memory.Usage != nil {
workingSetBytes := getWorkingSet(metrics.Memory)
return &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: workingSetBytes,
},
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytes(metrics.Memory, workingSetBytes)},
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage.Usage},
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.TotalRSS},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgFault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgMajFault},
}, nil
}
case *v2.Metrics:
if metrics.Memory != nil {
workingSetBytes := getWorkingSetV2(metrics.Memory)
return &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{
Value: workingSetBytes,
},
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytesV2(metrics.Memory, workingSetBytes)},
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage},
// Use Anon memory for RSS as cAdvisor on cgroupv2
// see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.Anon},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
}, nil
}
default:
return nil, errors.Errorf("unexpected metrics type: %v", metrics)
}
return nil, nil
}

View File

@ -17,10 +17,15 @@
package server
import (
"math"
"testing"
"time"
v1 "github.com/containerd/cgroups/stats/v1"
v2 "github.com/containerd/cgroups/v2/stats"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
"github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func TestGetWorkingSet(t *testing.T) {
@ -53,3 +58,273 @@ func TestGetWorkingSet(t *testing.T) {
})
}
}
func TestGetWorkingSetV2(t *testing.T) {
for desc, test := range map[string]struct {
memory *v2.MemoryStat
expected uint64
}{
"nil memory usage": {
memory: &v2.MemoryStat{},
expected: 0,
},
"memory usage higher than inactive_total_file": {
memory: &v2.MemoryStat{
InactiveFile: 1000,
Usage: 2000,
},
expected: 1000,
},
"memory usage lower than inactive_total_file": {
memory: &v2.MemoryStat{
InactiveFile: 2000,
Usage: 1000,
},
expected: 0,
},
} {
t.Run(desc, func(t *testing.T) {
got := getWorkingSetV2(test.memory)
assert.Equal(t, test.expected, got)
})
}
}
func TestGetAvailableBytes(t *testing.T) {
for desc, test := range map[string]struct {
memory *v1.MemoryStat
workingSetBytes uint64
expected uint64
}{
"no limit": {
memory: &v1.MemoryStat{
Usage: &v1.MemoryEntry{
Limit: math.MaxUint64, // no limit
Usage: 1000,
},
},
workingSetBytes: 500,
expected: 0,
},
"with limit": {
memory: &v1.MemoryStat{
Usage: &v1.MemoryEntry{
Limit: 5000,
Usage: 1000,
},
},
workingSetBytes: 500,
expected: 5000 - 500,
},
} {
t.Run(desc, func(t *testing.T) {
got := getAvailableBytes(test.memory, test.workingSetBytes)
assert.Equal(t, test.expected, got)
})
}
}
func TestGetAvailableBytesV2(t *testing.T) {
for desc, test := range map[string]struct {
memory *v2.MemoryStat
workingSetBytes uint64
expected uint64
}{
"no limit": {
memory: &v2.MemoryStat{
UsageLimit: math.MaxUint64, // no limit
Usage: 1000,
},
workingSetBytes: 500,
expected: 0,
},
"with limit": {
memory: &v2.MemoryStat{
UsageLimit: 5000,
Usage: 1000,
},
workingSetBytes: 500,
expected: 5000 - 500,
},
} {
t.Run(desc, func(t *testing.T) {
got := getAvailableBytesV2(test.memory, test.workingSetBytes)
assert.Equal(t, test.expected, got)
})
}
}
func TestContainerMetricsCPU(t *testing.T) {
c := newTestCRIService()
timestamp := time.Now()
secondAfterTimeStamp := timestamp.Add(time.Second)
ID := "ID"
for desc, test := range map[string]struct {
firstMetrics interface{}
secondMetrics interface{}
expectedFirst *runtime.CpuUsage
expectedSecond *runtime.CpuUsage
}{
"v1 metrics": {
firstMetrics: &v1.Metrics{
CPU: &v1.CPUStat{
Usage: &v1.CPUUsage{
Total: 50,
},
},
},
secondMetrics: &v1.Metrics{
CPU: &v1.CPUStat{
Usage: &v1.CPUUsage{
Total: 500,
},
},
},
expectedFirst: &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: 50},
UsageNanoCores: &runtime.UInt64Value{Value: 0},
},
expectedSecond: &runtime.CpuUsage{
Timestamp: secondAfterTimeStamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: 500},
UsageNanoCores: &runtime.UInt64Value{Value: 450},
},
},
} {
t.Run(desc, func(t *testing.T) {
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: ID},
)
assert.NoError(t, err)
assert.Nil(t, container.Stats)
err = c.containerStore.Add(container)
assert.NoError(t, err)
cpuUsage, err := c.cpuContainerStats(ID, false, test.firstMetrics, timestamp)
assert.NoError(t, err)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
assert.Equal(t, test.expectedFirst, cpuUsage)
cpuUsage, err = c.cpuContainerStats(ID, false, test.secondMetrics, secondAfterTimeStamp)
assert.NoError(t, err)
assert.Equal(t, test.expectedSecond, cpuUsage)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
})
}
}
func TestContainerMetricsMemory(t *testing.T) {
c := newTestCRIService()
timestamp := time.Now()
for desc, test := range map[string]struct {
metrics interface{}
expected *runtime.MemoryUsage
}{
"v1 metrics - no memory limit": {
metrics: &v1.Metrics{
Memory: &v1.MemoryStat{
Usage: &v1.MemoryEntry{
Limit: math.MaxUint64, // no limit
Usage: 1000,
},
TotalRSS: 10,
TotalPgFault: 11,
TotalPgMajFault: 12,
TotalInactiveFile: 500,
},
},
expected: &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{Value: 500},
AvailableBytes: &runtime.UInt64Value{Value: 0},
UsageBytes: &runtime.UInt64Value{Value: 1000},
RssBytes: &runtime.UInt64Value{Value: 10},
PageFaults: &runtime.UInt64Value{Value: 11},
MajorPageFaults: &runtime.UInt64Value{Value: 12},
},
},
"v1 metrics - memory limit": {
metrics: &v1.Metrics{
Memory: &v1.MemoryStat{
Usage: &v1.MemoryEntry{
Limit: 5000,
Usage: 1000,
},
TotalRSS: 10,
TotalPgFault: 11,
TotalPgMajFault: 12,
TotalInactiveFile: 500,
},
},
expected: &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{Value: 500},
AvailableBytes: &runtime.UInt64Value{Value: 4500},
UsageBytes: &runtime.UInt64Value{Value: 1000},
RssBytes: &runtime.UInt64Value{Value: 10},
PageFaults: &runtime.UInt64Value{Value: 11},
MajorPageFaults: &runtime.UInt64Value{Value: 12},
},
},
"v2 metrics - memory limit": {
metrics: &v2.Metrics{
Memory: &v2.MemoryStat{
Usage: 1000,
UsageLimit: 5000,
InactiveFile: 0,
Pgfault: 11,
Pgmajfault: 12,
},
},
expected: &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{Value: 1000},
AvailableBytes: &runtime.UInt64Value{Value: 4000},
UsageBytes: &runtime.UInt64Value{Value: 1000},
RssBytes: &runtime.UInt64Value{Value: 0},
PageFaults: &runtime.UInt64Value{Value: 11},
MajorPageFaults: &runtime.UInt64Value{Value: 12},
},
},
"v2 metrics - no memory limit": {
metrics: &v2.Metrics{
Memory: &v2.MemoryStat{
Usage: 1000,
UsageLimit: math.MaxUint64, // no limit
InactiveFile: 0,
Pgfault: 11,
Pgmajfault: 12,
},
},
expected: &runtime.MemoryUsage{
Timestamp: timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{Value: 1000},
AvailableBytes: &runtime.UInt64Value{Value: 0},
UsageBytes: &runtime.UInt64Value{Value: 1000},
RssBytes: &runtime.UInt64Value{Value: 0},
PageFaults: &runtime.UInt64Value{Value: 11},
MajorPageFaults: &runtime.UInt64Value{Value: 12},
},
},
} {
t.Run(desc, func(t *testing.T) {
got, err := c.memoryContainerStats("ID", test.metrics, timestamp)
assert.NoError(t, err)
assert.Equal(t, test.expected, got)
})
}
}

View File

@ -1198,6 +1198,59 @@ func (in *instrumentedAlphaService) ImageFsInfo(ctx context.Context, r *runtime_
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedService) PodSandboxStats(ctx context.Context, r *runtime.PodSandboxStatsRequest) (res *runtime.PodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.G(ctx).Debugf("PodSandboxStats for %q", r.GetPodSandboxId())
defer func() {
if err != nil {
log.G(ctx).WithError(err).Errorf("PodSandboxStats for %q failed", r.GetPodSandboxId())
} else {
log.G(ctx).Debugf("PodSandboxStats for %q returns stats %+v", r.GetPodSandboxId(), res.GetStats())
}
}()
res, err = in.c.PodSandboxStats(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedAlphaService) PodSandboxStats(ctx context.Context, r *runtime_alpha.PodSandboxStatsRequest) (res *runtime_alpha.PodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.G(ctx).Debugf("PodSandboxStats for %q", r.GetPodSandboxId())
defer func() {
if err != nil {
log.G(ctx).WithError(err).Errorf("PodSandboxStats for %q failed", r.GetPodSandboxId())
} else {
log.G(ctx).Debugf("PodSandboxStats for %q returns stats %+v", r.GetPodSandboxId(), res.GetStats())
}
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.PodSandboxStatsRequest
if err := alphaReqToV1Req(r, &v1r); err != nil {
return nil, errdefs.ToGRPC(err)
}
var v1res *runtime.PodSandboxStatsResponse
v1res, err = in.c.PodSandboxStats(ctrdutil.WithNamespace(ctx), &v1r)
if v1res != nil {
resp := &runtime_alpha.PodSandboxStatsResponse{}
perr := v1RespToAlphaResp(v1res, resp)
if perr == nil {
res = resp
} else {
// actual error has precidence on error returned vs parse error issues
if err == nil {
err = perr
} else {
// extra log entry if convert response parse error and request error
log.G(ctx).WithError(err).Errorf("PodSandboxStats for %q failed", r.GetPodSandboxId())
}
}
}
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (res *runtime.ContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
@ -1251,6 +1304,59 @@ func (in *instrumentedAlphaService) ContainerStats(ctx context.Context, r *runti
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedService) ListPodSandboxStats(ctx context.Context, r *runtime.ListPodSandboxStatsRequest) (res *runtime.ListPodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListPodSandboxStats with filter %+v", r.GetFilter())
defer func() {
if err != nil {
log.G(ctx).WithError(err).Error("ListPodSandboxStats failed")
} else {
log.G(ctx).Tracef("ListPodSandboxStats returns stats %+v", res.GetStats())
}
}()
res, err = in.c.ListPodSandboxStats(ctrdutil.WithNamespace(ctx), r)
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedAlphaService) ListPodSandboxStats(ctx context.Context, r *runtime_alpha.ListPodSandboxStatsRequest) (res *runtime_alpha.ListPodSandboxStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err
}
log.G(ctx).Tracef("ListPodSandboxStats with filter %+v", r.GetFilter())
defer func() {
if err != nil {
log.G(ctx).WithError(err).Error("ListPodSandboxStats failed")
} else {
log.G(ctx).Tracef("ListPodSandboxStats returns stats %+v", res.GetStats())
}
}()
// converts request and response for earlier CRI version to call and get response from the current version
var v1r runtime.ListPodSandboxStatsRequest
if err := alphaReqToV1Req(r, &v1r); err != nil {
return nil, errdefs.ToGRPC(err)
}
var v1res *runtime.ListPodSandboxStatsResponse
v1res, err = in.c.ListPodSandboxStats(ctrdutil.WithNamespace(ctx), &v1r)
if v1res != nil {
resp := &runtime_alpha.ListPodSandboxStatsResponse{}
perr := v1RespToAlphaResp(v1res, resp)
if perr == nil {
res = resp
} else {
// actual error has precidence on error returned vs parse error issues
if err == nil {
err = perr
} else {
// extra log entry if convert response parse error and request error
log.G(ctx).WithError(perr).Error("ListPodSandboxStats failed")
}
}
}
return res, errdefs.ToGRPC(err)
}
func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (res *runtime.ListContainerStatsResponse, err error) {
if err := in.checkInitialized(); err != nil {
return nil, err

View File

@ -63,6 +63,12 @@ func (c *criService) normalizePodSandboxFilter(filter *runtime.PodSandboxFilter)
}
}
func (c *criService) normalizePodSandboxStatsFilter(filter *runtime.PodSandboxStatsFilter) {
if sb, err := c.sandboxStore.Get(filter.GetId()); err == nil {
filter.Id = sb.ID
}
}
// filterCRISandboxes filters CRISandboxes.
func (c *criService) filterCRISandboxes(sandboxes []*runtime.PodSandbox, filter *runtime.PodSandboxFilter) []*runtime.PodSandbox {
if filter == nil {

View File

@ -0,0 +1,47 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
func (c *criService) PodSandboxStats(
ctx context.Context,
r *runtime.PodSandboxStatsRequest,
) (*runtime.PodSandboxStatsResponse, error) {
sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())
if err != nil {
return nil, errors.Wrapf(err, "an error occurred when trying to find sandbox %s", r.GetPodSandboxId())
}
metrics, err := metricsForSandbox(sandbox)
if err != nil {
return nil, errors.Wrapf(err, "failed getting metrics for sandbox %s", r.GetPodSandboxId())
}
podSandboxStats, err := c.podSandboxStats(ctx, sandbox, metrics)
if err != nil {
return nil, errors.Wrapf(err, "failed to decode pod sandbox metrics %s", r.GetPodSandboxId())
}
return &runtime.PodSandboxStatsResponse{Stats: podSandboxStats}, nil
}

View File

@ -0,0 +1,177 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"time"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/cgroups"
cgroupsv2 "github.com/containerd/cgroups/v2"
"github.com/vishvananda/netlink"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
)
func (c *criService) podSandboxStats(
ctx context.Context,
sandbox sandboxstore.Sandbox,
stats interface{},
) (*runtime.PodSandboxStats, error) {
meta := sandbox.Metadata
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, errors.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state", meta.ID)
}
var podSandboxStats runtime.PodSandboxStats
podSandboxStats.Attributes = &runtime.PodSandboxAttributes{
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}
podSandboxStats.Linux = &runtime.LinuxPodSandboxStats{}
if stats != nil {
timestamp := time.Now()
cpuStats, err := c.cpuContainerStats(meta.ID, true /* isSandbox */, stats, timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain cpu stats")
}
podSandboxStats.Linux.Cpu = cpuStats
memoryStats, err := c.memoryContainerStats(meta.ID, stats, timestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain memory stats")
}
podSandboxStats.Linux.Memory = memoryStats
if sandbox.NetNSPath != "" {
rxBytes, rxErrors, txBytes, txErrors := getContainerNetIO(ctx, sandbox.NetNSPath)
podSandboxStats.Linux.Network = &runtime.NetworkUsage{
DefaultInterface: &runtime.NetworkInterfaceUsage{
Name: defaultIfName,
RxBytes: &runtime.UInt64Value{Value: rxBytes},
RxErrors: &runtime.UInt64Value{Value: rxErrors},
TxBytes: &runtime.UInt64Value{Value: txBytes},
TxErrors: &runtime.UInt64Value{Value: txErrors},
},
}
}
var pidCount uint64
for _, cntr := range c.containerStore.List() {
if cntr.SandboxID != sandbox.ID {
continue
}
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
continue
}
task, err := cntr.Container.Task(ctx, nil)
if err != nil {
return nil, err
}
processes, err := task.Pids(ctx)
if err != nil {
return nil, err
}
pidCount += uint64(len(processes))
}
podSandboxStats.Linux.Process = &runtime.ProcessUsage{
Timestamp: timestamp.UnixNano(),
ProcessCount: &runtime.UInt64Value{Value: pidCount},
}
listContainerStatsRequest := &runtime.ListContainerStatsRequest{Filter: &runtime.ContainerStatsFilter{PodSandboxId: meta.ID}}
resp, err := c.ListContainerStats(ctx, listContainerStatsRequest)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain container stats during podSandboxStats call")
}
podSandboxStats.Linux.Containers = resp.GetStats()
}
return &podSandboxStats, nil
}
// https://github.com/cri-o/cri-o/blob/74a5cf8dffd305b311eb1c7f43a4781738c388c1/internal/oci/stats.go#L32
func getContainerNetIO(ctx context.Context, netNsPath string) (rxBytes, rxErrors, txBytes, txErrors uint64) {
ns.WithNetNSPath(netNsPath, func(_ ns.NetNS) error {
link, err := netlink.LinkByName(defaultIfName)
if err != nil {
log.G(ctx).WithError(err).Errorf("unable to retrieve network namespace stats for netNsPath: %v, interface: %v", netNsPath, defaultIfName)
return err
}
attrs := link.Attrs()
if attrs != nil && attrs.Statistics != nil {
rxBytes = attrs.Statistics.RxBytes
rxErrors = attrs.Statistics.RxErrors
txBytes = attrs.Statistics.TxBytes
txErrors = attrs.Statistics.TxErrors
}
return nil
})
return rxBytes, rxErrors, txBytes, txErrors
}
func metricsForSandbox(sandbox sandboxstore.Sandbox) (interface{}, error) {
cgroupPath := sandbox.Config.GetLinux().GetCgroupParent()
if cgroupPath == "" {
return nil, errors.Errorf("failed to get cgroup metrics for sandbox %v because cgroupPath is empty", sandbox.ID)
}
var statsx interface{}
if cgroups.Mode() == cgroups.Unified {
cg, err := cgroupsv2.LoadManager("/sys/fs/cgroup", cgroupPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to load sandbox cgroup: %v", cgroupPath)
}
stats, err := cg.Stat()
if err != nil {
return nil, errors.Wrapf(err, "failed to get stats for cgroup: %v", cgroupPath)
}
statsx = stats
} else {
control, err := cgroups.Load(cgroups.V1, cgroups.StaticPath(cgroupPath))
if err != nil {
return nil, errors.Wrapf(err, "failed to load sandbox cgroup: %v", cgroupPath)
}
stats, err := control.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, errors.Wrapf(err, "failed to get stats for cgroup %v", cgroupPath)
}
statsx = stats
}
return statsx, nil
}

View File

@ -0,0 +1,80 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
// ListPodSandboxStats returns stats of all ready sandboxes.
func (c *criService) ListPodSandboxStats(
ctx context.Context,
r *runtime.ListPodSandboxStatsRequest,
) (*runtime.ListPodSandboxStatsResponse, error) {
sandboxes := c.sandboxesForListPodSandboxStatsRequest(r)
podSandboxStats := new(runtime.ListPodSandboxStatsResponse)
for _, sandbox := range sandboxes {
metrics, err := metricsForSandbox(sandbox)
if err != nil {
return nil, errors.Wrapf(err, "failed to obtain metrics for sandbox %q", sandbox.ID)
}
sandboxStats, err := c.podSandboxStats(ctx, sandbox, metrics)
if err != nil {
return nil, errors.Wrapf(err, "failed to decode sandbox container metrics for sandbox %q", sandbox.ID)
}
podSandboxStats.Stats = append(podSandboxStats.Stats, sandboxStats)
}
return podSandboxStats, nil
}
func (c *criService) sandboxesForListPodSandboxStatsRequest(r *runtime.ListPodSandboxStatsRequest) []sandboxstore.Sandbox {
sandboxesInStore := c.sandboxStore.List()
if r.GetFilter() == nil {
return sandboxesInStore
}
c.normalizePodSandboxStatsFilter(r.GetFilter())
var sandboxes []sandboxstore.Sandbox
for _, sandbox := range sandboxesInStore {
if r.GetFilter().GetId() != "" && sandbox.ID != r.GetFilter().GetId() {
continue
}
if r.GetFilter().GetLabelSelector() != nil &&
!matchLabelSelector(r.GetFilter().GetLabelSelector(), sandbox.Config.GetLabels()) {
continue
}
// We can't obtain metrics for sandboxes that aren't in ready state
if sandbox.Status.Get().State != sandboxstore.StateReady {
continue
}
sandboxes = append(sandboxes, sandbox)
}
return sandboxes
}

View File

@ -0,0 +1,37 @@
//go:build !windows && !linux
// +build !windows,!linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/errdefs"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/pkg/errors"
)
func (c *criService) podSandboxStats(ctx context.Context, sandbox sandboxstore.Sandbox, stats interface{}) (*runtime.PodSandboxStats, error) {
return nil, errors.Wrap(errdefs.ErrNotImplemented, "pod sandbox stats not implemented")
}
func metricsForSandbox(sandbox sandboxstore.Sandbox) (interface{}, error) {
return nil, errors.Wrap(errdefs.ErrNotImplemented, "metrics for sandbox not implemented")
}

View File

@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/containerd/containerd/errdefs"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/pkg/errors"
)
func (c *criService) podSandboxStats(ctx context.Context, sandbox sandboxstore.Sandbox, stats interface{}) (*runtime.PodSandboxStats, error) {
return nil, errors.Wrap(errdefs.ErrNotImplemented, "pod sandbox stats not implemented on windows")
}
func metricsForSandbox(sandbox sandboxstore.Sandbox) (interface{}, error) {
return nil, errors.Wrap(errdefs.ErrNotImplemented, "metrics for sandbox not implemented on windows")
}

View File

@ -24,6 +24,7 @@ import (
cio "github.com/containerd/containerd/pkg/cri/io"
"github.com/containerd/containerd/pkg/cri/store"
"github.com/containerd/containerd/pkg/cri/store/label"
"github.com/containerd/containerd/pkg/cri/store/stats"
"github.com/containerd/containerd/pkg/cri/store/truncindex"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -46,6 +47,8 @@ type Container struct {
// IsStopSignaledWithTimeout the default is 0, and it is set to 1 after sending
// the signal once to avoid repeated sending of the signal.
IsStopSignaledWithTimeout *uint32
// Stats contains (mutable) stats for the container
Stats *stats.ContainerStats
}
// Opts sets specific information to newly created Container.
@ -166,6 +169,27 @@ func (s *Store) List() []Container {
return containers
}
func (s *Store) UpdateContainerStats(id string, newContainerStats *stats.ContainerStats) error {
s.lock.RLock()
defer s.lock.RUnlock()
id, err := s.idIndex.Get(id)
if err != nil {
if err == truncindex.ErrNotExist {
err = errdefs.ErrNotFound
}
return err
}
if _, ok := s.containers[id]; !ok {
return errdefs.ErrNotFound
}
c := s.containers[id]
c.Stats = newContainerStats
s.containers[id] = c
return nil
}
// Delete deletes the container from store with specified id.
func (s *Store) Delete(id string) {
s.lock.Lock()

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/errdefs"
cio "github.com/containerd/containerd/pkg/cri/io"
"github.com/containerd/containerd/pkg/cri/store/label"
"github.com/containerd/containerd/pkg/cri/store/stats"
"github.com/opencontainers/selinux/go-selinux"
assertlib "github.com/stretchr/testify/assert"
@ -132,6 +133,25 @@ func TestContainerStore(t *testing.T) {
Removing: true,
},
}
stats := map[string]*stats.ContainerStats{
"1": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 1,
},
"2abcd": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 2,
},
"4a333": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 3,
},
"4abcd": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 4,
},
}
assert := assertlib.New(t)
containers := map[string]Container{}
for id := range metadatas {
@ -163,12 +183,26 @@ func TestContainerStore(t *testing.T) {
got, err := s.Get(genTruncIndex(id))
assert.NoError(err)
assert.Equal(c, got)
assert.Nil(c.Stats)
}
t.Logf("should be able to list containers")
cs := s.List()
assert.Len(cs, len(containers))
t.Logf("should be able to update stats on container")
for id := range containers {
err := s.UpdateContainerStats(id, stats[id])
assert.NoError(err)
}
// Validate stats were updated
cs = s.List()
assert.Len(cs, len(containers))
for _, c := range cs {
assert.Equal(stats[c.ID], c.Stats)
}
if selinux.GetEnabled() {
t.Logf("should have reserved labels (requires -tag selinux)")
assert.Equal(map[string]bool{

View File

@ -23,6 +23,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/cri/store"
"github.com/containerd/containerd/pkg/cri/store/label"
"github.com/containerd/containerd/pkg/cri/store/stats"
"github.com/containerd/containerd/pkg/cri/store/truncindex"
"github.com/containerd/containerd/pkg/netns"
)
@ -42,6 +43,8 @@ type Sandbox struct {
NetNS *netns.NetNS
// StopCh is used to propagate the stop information of the sandbox.
*store.StopCh
// Stats contains (mutable) stats for the (pause) sandbox container
Stats *stats.ContainerStats
}
// NewSandbox creates an internally used sandbox type. This functions reminds
@ -121,6 +124,27 @@ func (s *Store) List() []Sandbox {
return sandboxes
}
func (s *Store) UpdateContainerStats(id string, newContainerStats *stats.ContainerStats) error {
s.lock.RLock()
defer s.lock.RUnlock()
id, err := s.idIndex.Get(id)
if err != nil {
if err == truncindex.ErrNotExist {
err = errdefs.ErrNotFound
}
return err
}
if _, ok := s.sandboxes[id]; !ok {
return errdefs.ErrNotFound
}
c := s.sandboxes[id]
c.Stats = newContainerStats
s.sandboxes[id] = c
return nil
}
// Delete deletes the sandbox with specified id.
func (s *Store) Delete(id string) {
s.lock.Lock()

View File

@ -18,9 +18,11 @@ package sandbox
import (
"testing"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/pkg/cri/store/label"
"github.com/containerd/containerd/pkg/cri/store/stats"
assertlib "github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -109,6 +111,24 @@ func TestSandboxStore(t *testing.T) {
},
Status{State: StateUnknown},
)
stats := map[string]*stats.ContainerStats{
"1": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 1,
},
"2abcd": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 2,
},
"4a333": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 3,
},
"4abcd": {
Timestamp: time.Now(),
UsageCoreNanoSeconds: 4,
},
}
assert := assertlib.New(t)
s := NewStore(label.NewStore())
@ -136,6 +156,19 @@ func TestSandboxStore(t *testing.T) {
sbs := s.List()
assert.Len(sbs, sbNum)
t.Logf("should be able to update stats on container")
for id := range sandboxes {
err := s.UpdateContainerStats(id, stats[id])
assert.NoError(err)
}
// Validate stats were updated
sbs = s.List()
assert.Len(sbs, sbNum)
for _, sb := range sbs {
assert.Equal(stats[sb.ID], sb.Stats)
}
for testID, v := range sandboxes {
truncID := genTruncIndex(testID)

View File

@ -0,0 +1,27 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import "time"
// ContainerStats contains the information about container stats.
type ContainerStats struct {
// Timestamp of when stats were collected
Timestamp time.Time
// Cumulative CPU usage (sum across all cores) since object creation.
UsageCoreNanoSeconds uint64
}