Merge pull request #8366 from mxpv/stats

[sbserver] Backport CRI stats patches to sandboxed CRI
This commit is contained in:
Derek McGowan 2023-04-10 13:38:30 -07:00 committed by GitHub
commit c5a43b0007
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 151 additions and 150 deletions

View File

@ -19,9 +19,11 @@ package sbserver
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/pkg/cri/store/stats"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerstore "github.com/containerd/containerd/pkg/cri/store/container" containerstore "github.com/containerd/containerd/pkg/cri/store/container"
@ -61,11 +63,87 @@ func (c *criService) toCRIContainerStats(
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err) return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err)
} }
if cs.Cpu != nil && cs.Cpu.UsageCoreNanoSeconds != nil {
// this is a calculated value and should be computed for all OSes
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.Cpu.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err)
}
cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
}
containerStats.Stats = append(containerStats.Stats, cs) containerStats.Stats = append(containerStats.Stats, cs)
} }
return containerStats, nil return containerStats, nil
} }
func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
var oldStats *stats.ContainerStats
if isSandbox {
sandbox, err := c.sandboxStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get sandbox container: %s: %w", containerID, err)
}
oldStats = sandbox.Stats
} else {
container, err := c.containerStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get container ID: %s: %w", containerID, err)
}
oldStats = container.Stats
}
if oldStats == nil {
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox stats container ID: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}
return 0, nil
}
nanoSeconds := currentTimestamp.UnixNano() - oldStats.Timestamp.UnixNano()
// zero or negative interval
if nanoSeconds <= 0 {
return 0, nil
}
newUsageNanoCores := uint64(float64(currentUsageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox container stats: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}
return newUsageNanoCores, nil
}
func (c *criService) normalizeContainerStatsFilter(filter *runtime.ContainerStatsFilter) { func (c *criService) normalizeContainerStatsFilter(filter *runtime.ContainerStatsFilter) {
if cntr, err := c.containerStore.Get(filter.GetId()); err == nil { if cntr, err := c.containerStore.Get(filter.GetId()); err == nil {
filter.Id = cntr.ID filter.Id = cntr.ID

View File

@ -28,7 +28,6 @@ import (
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerstore "github.com/containerd/containerd/pkg/cri/store/container" containerstore "github.com/containerd/containerd/pkg/cri/store/container"
"github.com/containerd/containerd/pkg/cri/store/stats"
) )
func (c *criService) containerMetrics( func (c *criService) containerMetrics(
@ -81,72 +80,6 @@ func (c *criService) containerMetrics(
return &cs, nil return &cs, nil
} }
func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
var oldStats *stats.ContainerStats
if isSandbox {
sandbox, err := c.sandboxStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get sandbox container: %s: %w", containerID, err)
}
oldStats = sandbox.Stats
} else {
container, err := c.containerStore.Get(containerID)
if err != nil {
return 0, fmt.Errorf("failed to get container ID: %s: %w", containerID, err)
}
oldStats = container.Stats
}
if oldStats == nil {
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox stats container ID: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}
return 0, nil
}
nanoSeconds := currentTimestamp.UnixNano() - oldStats.Timestamp.UnixNano()
// zero or negative interval
if nanoSeconds <= 0 {
return 0, nil
}
newUsageNanoCores := uint64(float64(currentUsageCoreNanoSeconds-oldStats.UsageCoreNanoSeconds) /
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
newStats := &stats.ContainerStats{
UsageCoreNanoSeconds: currentUsageCoreNanoSeconds,
Timestamp: currentTimestamp,
}
if isSandbox {
err := c.sandboxStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update sandbox container stats: %s: %w", containerID, err)
}
} else {
err := c.containerStore.UpdateContainerStats(containerID, newStats)
if err != nil {
return 0, fmt.Errorf("failed to update container stats ID: %s: %w", containerID, err)
}
}
return newUsageNanoCores, nil
}
// getWorkingSet calculates workingset memory from cgroup memory stats. // getWorkingSet calculates workingset memory from cgroup memory stats.
// The caller should make sure memory is not nil. // The caller should make sure memory is not nil.
// workingset = usage - total_inactive_file // workingset = usage - total_inactive_file
@ -202,16 +135,9 @@ func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interfac
switch metrics := stats.(type) { switch metrics := stats.(type) {
case *v1.Metrics: case *v1.Metrics:
if metrics.CPU != nil && metrics.CPU.Usage != nil { if metrics.CPU != nil && metrics.CPU.Usage != nil {
usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, metrics.CPU.Usage.Total, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", ID, err)
}
return &runtime.CpuUsage{ return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(), Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total}, UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil }, nil
} }
case *v2.Metrics: case *v2.Metrics:
@ -219,15 +145,9 @@ func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interfac
// convert to nano seconds // convert to nano seconds
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000 usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000
usageNanoCores, err := c.getUsageNanoCores(ID, isSandbox, usageCoreNanoSeconds, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", ID, err)
}
return &runtime.CpuUsage{ return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(), Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds}, UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
UsageNanoCores: &runtime.UInt64Value{Value: usageNanoCores},
}, nil }, nil
} }
default: default:

View File

@ -23,7 +23,6 @@ import (
v1 "github.com/containerd/cgroups/v3/cgroup1/stats" v1 "github.com/containerd/cgroups/v3/cgroup1/stats"
v2 "github.com/containerd/cgroups/v3/cgroup2/stats" v2 "github.com/containerd/cgroups/v3/cgroup2/stats"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
) )
@ -156,75 +155,6 @@ func TestGetAvailableBytesV2(t *testing.T) {
} }
} }
func TestContainerMetricsCPU(t *testing.T) {
c := newTestCRIService()
timestamp := time.Now()
secondAfterTimeStamp := timestamp.Add(time.Second)
ID := "ID"
for desc, test := range map[string]struct {
firstMetrics interface{}
secondMetrics interface{}
expectedFirst *runtime.CpuUsage
expectedSecond *runtime.CpuUsage
}{
"v1 metrics": {
firstMetrics: &v1.Metrics{
CPU: &v1.CPUStat{
Usage: &v1.CPUUsage{
Total: 50,
},
},
},
secondMetrics: &v1.Metrics{
CPU: &v1.CPUStat{
Usage: &v1.CPUUsage{
Total: 500,
},
},
},
expectedFirst: &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: 50},
UsageNanoCores: &runtime.UInt64Value{Value: 0},
},
expectedSecond: &runtime.CpuUsage{
Timestamp: secondAfterTimeStamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: 500},
UsageNanoCores: &runtime.UInt64Value{Value: 450},
},
},
} {
t.Run(desc, func(t *testing.T) {
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: ID},
)
assert.NoError(t, err)
assert.Nil(t, container.Stats)
err = c.containerStore.Add(container)
assert.NoError(t, err)
cpuUsage, err := c.cpuContainerStats(ID, false, test.firstMetrics, timestamp)
assert.NoError(t, err)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
assert.Equal(t, test.expectedFirst, cpuUsage)
cpuUsage, err = c.cpuContainerStats(ID, false, test.secondMetrics, secondAfterTimeStamp)
assert.NoError(t, err)
assert.Equal(t, test.expectedSecond, cpuUsage)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
})
}
}
func TestContainerMetricsMemory(t *testing.T) { func TestContainerMetricsMemory(t *testing.T) {
c := newTestCRIService() c := newTestCRIService()
timestamp := time.Now() timestamp := time.Now()

View File

@ -0,0 +1,73 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sbserver
import (
"testing"
"time"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
"github.com/stretchr/testify/assert"
)
func TestContainerMetricsCPUNanoCoreUsage(t *testing.T) {
c := newTestCRIService()
timestamp := time.Now()
secondAfterTimeStamp := timestamp.Add(time.Second)
ID := "ID"
for desc, test := range map[string]struct {
firstCPUValue uint64
secondCPUValue uint64
expectedNanoCoreUsageFirst uint64
expectedNanoCoreUsageSecond uint64
}{
"metrics": {
firstCPUValue: 50,
secondCPUValue: 500,
expectedNanoCoreUsageFirst: 0,
expectedNanoCoreUsageSecond: 450,
},
} {
t.Run(desc, func(t *testing.T) {
container, err := containerstore.NewContainer(
containerstore.Metadata{ID: ID},
)
assert.NoError(t, err)
assert.Nil(t, container.Stats)
err = c.containerStore.Add(container)
assert.NoError(t, err)
cpuUsage, err := c.getUsageNanoCores(ID, false, test.firstCPUValue, timestamp)
assert.NoError(t, err)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
assert.Equal(t, test.expectedNanoCoreUsageFirst, cpuUsage)
cpuUsage, err = c.getUsageNanoCores(ID, false, test.secondCPUValue, secondAfterTimeStamp)
assert.NoError(t, err)
assert.Equal(t, test.expectedNanoCoreUsageSecond, cpuUsage)
container, err = c.containerStore.Get(ID)
assert.NoError(t, err)
assert.NotNil(t, container.Stats)
})
}
}