[cri] Handle Windows pod transitions gracefully

When the pods are transitioning there are several
cases where containers might not be in valid state.
There were several cases where the stats where
failing hard but we should just continue on as
they are transient and will be picked up again
when kubelet queries for the stats again.

Signed-off-by: James Sturtevant <jstur@microsoft.com>

Signed-off-by: Mark Rossetti <marosset@microsoft.com>
This commit is contained in:
James Sturtevant 2023-06-09 17:09:59 -07:00 committed by Mark Rossetti
parent ad9d1a82f1
commit f914edf4f6
No known key found for this signature in database
GPG Key ID: 3188D8FC849D8762
8 changed files with 150 additions and 43 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/containernetworking/plugins/pkg/ns"
@ -37,7 +38,7 @@ func (c *criService) podSandboxStats(
meta := sandbox.Metadata
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state", meta.ID)
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state: %w", meta.ID, errdefs.ErrUnavailable)
}
stats, err := metricsForSandbox(sandbox)

View File

@ -20,7 +20,10 @@ import (
"context"
"fmt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/hashicorp/go-multierror"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
@ -31,16 +34,21 @@ func (c *criService) ListPodSandboxStats(
) (*runtime.ListPodSandboxStatsResponse, error) {
sandboxes := c.sandboxesForListPodSandboxStatsRequest(r)
var errs *multierror.Error
podSandboxStats := new(runtime.ListPodSandboxStatsResponse)
for _, sandbox := range sandboxes {
sandboxStats, err := c.podSandboxStats(ctx, sandbox)
if err != nil {
return nil, fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err)
}
switch {
case errdefs.IsUnavailable(err):
log.G(ctx).WithField("podsandboxid", sandbox.ID).Debugf("failed to get pod sandbox stats, this is likely a transient error: %v", err)
case err != nil:
errs = multierror.Append(errs, fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err))
default:
podSandboxStats.Stats = append(podSandboxStats.Stats, sandboxStats)
}
}
return podSandboxStats, nil
return podSandboxStats, errs.ErrorOrNil()
}
func (c *criService) sandboxesForListPodSandboxStatsRequest(r *runtime.ListPodSandboxStatsRequest) []sandboxstore.Sandbox {

View File

@ -26,6 +26,7 @@ import (
"github.com/Microsoft/hcsshim/hcn"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
@ -36,12 +37,11 @@ import (
func (c *criService) podSandboxStats(
ctx context.Context,
sandbox sandboxstore.Sandbox,
) (*runtime.PodSandboxStats, error) {
sandbox sandboxstore.Sandbox) (*runtime.PodSandboxStats, error) {
meta := sandbox.Metadata
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state", meta.ID)
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state: %w", meta.ID, errdefs.ErrUnavailable)
}
timestamp := time.Now()
@ -62,7 +62,7 @@ func (c *criService) podSandboxStats(
statsMap, err := convertMetricsToWindowsStats(metrics, sandbox)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert stats: %w", err)
}
podCPU, containerStats, err := c.toPodSandboxStats(sandbox, statsMap, containers, timestamp)
@ -72,12 +72,11 @@ func (c *criService) podSandboxStats(
podSandboxStats.Windows.Cpu = podCPU.Cpu
podSandboxStats.Windows.Memory = podCPU.Memory
podSandboxStats.Windows.Containers = containerStats
podSandboxStats.Windows.Network = windowsNetworkUsage(ctx, sandbox, timestamp)
pidCount, err := c.getSandboxPidCount(ctx, sandbox)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get pid count: %w", err)
}
podSandboxStats.Windows.Process = &runtime.WindowsProcessUsage{
@ -131,11 +130,13 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
if cntr.Status.Get().State() != runtime.ContainerState_CONTAINER_RUNNING {
// containers that are just created, in a failed state or exited (init containers) will not have stats
log.L.Warnf("failed to get container stats since container %q is not in running state", cntr.ID)
continue
}
if containerMetric == nil {
return nil, nil, fmt.Errorf("failed to find metrics for container with id %s: %w", cntr.ID, err)
log.L.Warnf("no metrics found for container %q", cntr.ID)
continue
}
containerStats, err := c.convertToCRIStats(containerMetric)
@ -144,7 +145,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}
// Calculate NanoCores for container
if containerStats.Cpu.UsageCoreNanoSeconds != nil {
if containerStats.Cpu != nil && containerStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(containerStats.Cpu.UsageCoreNanoSeconds.Value, cntr.Stats, containerStats.Cpu.Timestamp)
containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
}
@ -180,7 +181,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}
// Calculate NanoCores for pod after adding containers cpu including the pods cpu
if podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil {
if podRuntimeStats.Cpu != nil && podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(podRuntimeStats.Cpu.UsageCoreNanoSeconds.Value, sandbox.Stats, podRuntimeStats.Cpu.Timestamp)
podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
}
@ -391,10 +392,16 @@ func (c *criService) getSandboxPidCount(ctx context.Context, sandbox sandboxstor
// get process count inside PodSandbox for Windows
task, err := sandbox.Container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
return 0, nil
}
return 0, err
}
processes, err := task.Pids(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
return 0, nil
}
return 0, err
}
pidCount += uint64(len(processes))
@ -416,6 +423,9 @@ func (c *criService) getSandboxPidCount(ctx context.Context, sandbox sandboxstor
processes, err := task.Pids(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return 0, err
}
pidCount += uint64(len(processes))

View File

@ -93,7 +93,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
metrics map[string]*wstats.Statistics
sandbox sandboxstore.Sandbox
containers []containerstore.Container
expectedPodStats expectedStats
expectedPodStats *expectedStats
expectedContainerStats []expectedStats
expectError bool
}{
@ -102,7 +102,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
metrics: map[string]*wstats.Statistics{},
sandbox: sandboxstore.Sandbox{},
containers: []containerstore.Container{},
expectedPodStats: expectedStats{},
expectedPodStats: &expectedStats{},
expectedContainerStats: []expectedStats{},
expectError: true,
},
@ -120,7 +120,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
containers: []containerstore.Container{
newContainer("c1", running, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -152,7 +152,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", running, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 600,
UsageNanoCores: 0,
WorkingSetBytes: 60,
@ -186,7 +186,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", exitedValid, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -215,7 +215,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", exitedInvalid, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -246,7 +246,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 800,
UsageNanoCores: 400,
WorkingSetBytes: 40,
@ -275,7 +275,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 200,
WorkingSetBytes: 20,
@ -304,7 +304,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 200,
WorkingSetBytes: 20,
@ -318,6 +318,32 @@ func Test_criService_podSandboxStats(t *testing.T) {
},
expectError: false,
},
{
desc: "pod sandbox with a container that has no cpu shouldn't error",
metrics: map[string]*wstats.Statistics{
"c1": {},
"s1": {},
},
sandbox: sandboxPod("s1", initialStatsTimestamp, 200),
containers: []containerstore.Container{
newContainer("c1", running, &stats.ContainerStats{
Timestamp: initialStatsTimestamp,
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: nil,
expectedContainerStats: []expectedStats{},
expectError: false,
},
{
desc: "pod sandbox with no stats in metric mapp will fail",
metrics: map[string]*wstats.Statistics{},
sandbox: sandboxPod("s1", initialStatsTimestamp, 200),
containers: []containerstore.Container{},
expectedPodStats: nil,
expectedContainerStats: []expectedStats{},
expectError: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
@ -327,6 +353,13 @@ func Test_criService_podSandboxStats(t *testing.T) {
return
}
assert.Nil(t, err)
if test.expectedPodStats == nil {
assert.Nil(t, actualPodStats.Cpu)
assert.Nil(t, actualPodStats.Memory)
return
}
assert.Equal(t, test.expectedPodStats.UsageCoreNanoSeconds, actualPodStats.Cpu.UsageCoreNanoSeconds.Value)
assert.Equal(t, test.expectedPodStats.UsageNanoCores, actualPodStats.Cpu.UsageNanoCores.Value)

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/cgroups/v3"
"github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/containernetworking/plugins/pkg/ns"
@ -37,7 +38,7 @@ func (c *criService) podSandboxStats(
meta := sandbox.Metadata
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state", meta.ID)
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state: %w", meta.ID, errdefs.ErrUnavailable)
}
stats, err := metricsForSandbox(sandbox)

View File

@ -20,8 +20,12 @@ import (
"context"
"fmt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
"github.com/hashicorp/go-multierror"
)
// ListPodSandboxStats returns stats of all ready sandboxes.
@ -31,16 +35,21 @@ func (c *criService) ListPodSandboxStats(
) (*runtime.ListPodSandboxStatsResponse, error) {
sandboxes := c.sandboxesForListPodSandboxStatsRequest(r)
var errs *multierror.Error
podSandboxStats := new(runtime.ListPodSandboxStatsResponse)
for _, sandbox := range sandboxes {
sandboxStats, err := c.podSandboxStats(ctx, sandbox)
if err != nil {
return nil, fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err)
}
switch {
case errdefs.IsUnavailable(err):
log.G(ctx).WithField("podsandboxid", sandbox.ID).Debugf("failed to get pod sandbox stats, this is likely a transient error: %v", err)
case err != nil:
errs = multierror.Append(errs, fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err))
default:
podSandboxStats.Stats = append(podSandboxStats.Stats, sandboxStats)
}
}
return podSandboxStats, nil
return podSandboxStats, errs.ErrorOrNil()
}
func (c *criService) sandboxesForListPodSandboxStatsRequest(r *runtime.ListPodSandboxStatsRequest) []sandboxstore.Sandbox {

View File

@ -26,6 +26,7 @@ import (
"github.com/Microsoft/hcsshim/hcn"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
@ -40,7 +41,7 @@ func (c *criService) podSandboxStats(
meta := sandbox.Metadata
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state", meta.ID)
return nil, fmt.Errorf("failed to get pod sandbox stats since sandbox container %q is not in ready state: %w", meta.ID, errdefs.ErrUnavailable)
}
timestamp := time.Now()
@ -61,7 +62,7 @@ func (c *criService) podSandboxStats(
statsMap, err := convertMetricsToWindowsStats(metrics, sandbox)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert stats: %w", err)
}
podCPU, containerStats, err := c.toPodSandboxStats(sandbox, statsMap, containers, timestamp)
@ -75,7 +76,7 @@ func (c *criService) podSandboxStats(
pidCount, err := c.getSandboxPidCount(ctx, sandbox)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get pid count: %w", err)
}
podSandboxStats.Windows.Process = &runtime.WindowsProcessUsage{
@ -129,11 +130,13 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
if cntr.Status.Get().State() != runtime.ContainerState_CONTAINER_RUNNING {
// containers that are just created, in a failed state or exited (init containers) will not have stats
log.L.Warnf("failed to get container stats since container %q is not in running state", cntr.ID)
continue
}
if containerMetric == nil {
return nil, nil, fmt.Errorf("failed to find metrics for container with id %s: %w", cntr.ID, err)
log.L.Warnf("no metrics found for container %q", cntr.ID)
continue
}
containerStats, err := c.convertToCRIStats(containerMetric)
@ -142,7 +145,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}
// Calculate NanoCores for container
if containerStats.Cpu.UsageCoreNanoSeconds != nil {
if containerStats.Cpu != nil && containerStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(containerStats.Cpu.UsageCoreNanoSeconds.Value, cntr.Stats, containerStats.Cpu.Timestamp)
containerStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
}
@ -178,7 +181,7 @@ func (c *criService) toPodSandboxStats(sandbox sandboxstore.Sandbox, statsMap ma
}
// Calculate NanoCores for pod after adding containers cpu including the pods cpu
if podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil {
if podRuntimeStats.Cpu != nil && podRuntimeStats.Cpu.UsageCoreNanoSeconds != nil {
nanoCoreUsage := getUsageNanoCores(podRuntimeStats.Cpu.UsageCoreNanoSeconds.Value, sandbox.Stats, podRuntimeStats.Cpu.Timestamp)
podRuntimeStats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoCoreUsage}
}
@ -389,10 +392,16 @@ func (c *criService) getSandboxPidCount(ctx context.Context, sandbox sandboxstor
// get process count inside PodSandbox for Windows
task, err := sandbox.Container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
return 0, nil
}
return 0, err
}
processes, err := task.Pids(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
return 0, nil
}
return 0, err
}
pidCount += uint64(len(processes))
@ -414,6 +423,9 @@ func (c *criService) getSandboxPidCount(ctx context.Context, sandbox sandboxstor
processes, err := task.Pids(ctx)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return 0, err
}
pidCount += uint64(len(processes))

View File

@ -93,7 +93,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
metrics map[string]*wstats.Statistics
sandbox sandboxstore.Sandbox
containers []containerstore.Container
expectedPodStats expectedStats
expectedPodStats *expectedStats
expectedContainerStats []expectedStats
expectError bool
}{
@ -102,7 +102,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
metrics: map[string]*wstats.Statistics{},
sandbox: sandboxstore.Sandbox{},
containers: []containerstore.Container{},
expectedPodStats: expectedStats{},
expectedPodStats: &expectedStats{},
expectedContainerStats: []expectedStats{},
expectError: true,
},
@ -120,7 +120,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
containers: []containerstore.Container{
newContainer("c1", running, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -152,7 +152,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", running, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 600,
UsageNanoCores: 0,
WorkingSetBytes: 60,
@ -186,7 +186,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", exitedValid, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -215,7 +215,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
newContainer("c1", running, nil),
newContainer("i1", exitedInvalid, nil),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 0,
WorkingSetBytes: 40,
@ -246,7 +246,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 800,
UsageNanoCores: 400,
WorkingSetBytes: 40,
@ -275,7 +275,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 200,
WorkingSetBytes: 20,
@ -304,7 +304,7 @@ func Test_criService_podSandboxStats(t *testing.T) {
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: expectedStats{
expectedPodStats: &expectedStats{
UsageCoreNanoSeconds: 400,
UsageNanoCores: 200,
WorkingSetBytes: 20,
@ -318,6 +318,32 @@ func Test_criService_podSandboxStats(t *testing.T) {
},
expectError: false,
},
{
desc: "pod sandbox with a container that has no cpu shouldn't error",
metrics: map[string]*wstats.Statistics{
"c1": {},
"s1": {},
},
sandbox: sandboxPod("s1", initialStatsTimestamp, 200),
containers: []containerstore.Container{
newContainer("c1", running, &stats.ContainerStats{
Timestamp: initialStatsTimestamp,
UsageCoreNanoSeconds: 200,
}),
},
expectedPodStats: nil,
expectedContainerStats: []expectedStats{},
expectError: false,
},
{
desc: "pod sandbox with no stats in metric mapp will fail",
metrics: map[string]*wstats.Statistics{},
sandbox: sandboxPod("s1", initialStatsTimestamp, 200),
containers: []containerstore.Container{},
expectedPodStats: nil,
expectedContainerStats: []expectedStats{},
expectError: true,
},
} {
test := test
t.Run(test.desc, func(t *testing.T) {
@ -327,6 +353,13 @@ func Test_criService_podSandboxStats(t *testing.T) {
return
}
assert.Nil(t, err)
if test.expectedPodStats == nil {
assert.Nil(t, actualPodStats.Cpu)
assert.Nil(t, actualPodStats.Memory)
return
}
assert.Equal(t, test.expectedPodStats.UsageCoreNanoSeconds, actualPodStats.Cpu.UsageCoreNanoSeconds.Value)
assert.Equal(t, test.expectedPodStats.UsageNanoCores, actualPodStats.Cpu.UsageNanoCores.Value)