From d0298944eb26a0be35b2c538abb564a3221368e6 Mon Sep 17 00:00:00 2001 From: Abhinandan Prativadi Date: Tue, 19 Sep 2017 11:20:03 -0700 Subject: [PATCH 1/2] Adding container metrics Signed-off-by: Abhinandan Prativadi --- pkg/server/container_stats.go | 25 ++++- pkg/server/container_stats_list.go | 150 ++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 7 deletions(-) diff --git a/pkg/server/container_stats.go b/pkg/server/container_stats.go index efd78af17..103eb769b 100644 --- a/pkg/server/container_stats.go +++ b/pkg/server/container_stats.go @@ -17,15 +17,34 @@ limitations under the License. package server import ( - "errors" + "fmt" + tasks "github.com/containerd/containerd/api/services/tasks/v1" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. func (c *criContainerdService) ContainerStats(ctx context.Context, in *runtime.ContainerStatsRequest) (*runtime.ContainerStatsResponse, error) { - return nil, errors.New("not implemented") + // Validate the stats request + if in.GetContainerId() == "" { + return nil, fmt.Errorf("invalid container stats request") + } + containerID := in.GetContainerId() + _, err := c.containerStore.Get(containerID) + if err != nil { + return nil, fmt.Errorf("failed to find container %q: %v", containerID, err) + } + request := &tasks.MetricsRequest{Filters: []string{"id==" + containerID}} + resp, err := c.taskService.Metrics(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err) + } + + var cs runtime.ContainerStats + if err := c.getContainerMetrics(containerID, resp.Metrics[0], &cs); err != nil { + return nil, fmt.Errorf("failed to decode container metrics: %v", err) + } + return &runtime.ContainerStatsResponse{Stats: &cs}, nil } diff --git a/pkg/server/container_stats_list.go b/pkg/server/container_stats_list.go index 93b776706..f0db512bc 100644 --- a/pkg/server/container_stats_list.go +++ b/pkg/server/container_stats_list.go @@ -17,14 +17,156 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/containerd/cgroups" + tasks "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types" + "github.com/containerd/typeurl" + "github.com/golang/glog" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ListContainerStats returns stats of all running containers. -func (c *criContainerdService) ListContainerStats(ctx context.Context, in *runtime.ListContainerStatsRequest) (*runtime.ListContainerStatsResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ListContainerStats( + ctx context.Context, + in *runtime.ListContainerStatsRequest, +) (*runtime.ListContainerStatsResponse, error) { + request, candidateContainers, err := c.buildTaskMetricsRequest(in) + if err != nil { + return nil, fmt.Errorf("failed to build metrics request: %v", err) + } + resp, err := c.taskService.Metrics(ctx, &request) + if err != nil { + return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err) + } + criStats, err := c.toCRIContainerStats(resp.Metrics, candidateContainers) + if err != nil { + return nil, fmt.Errorf("failed to convert to cri containerd stats format: %v", err) + } + return criStats, nil +} + +func (c *criContainerdService) toCRIContainerStats( + stats []*types.Metric, + candidateContainers map[string]bool, +) (*runtime.ListContainerStatsResponse, error) { + containerStats := new(runtime.ListContainerStatsResponse) + for _, stat := range stats { + var cs runtime.ContainerStats + if err := c.getContainerMetrics(stat.ID, stat, &cs); err != nil { + glog.Errorf("failed to decode container metrics: %v", err) + continue + } + delete(candidateContainers, stat.ID) + containerStats.Stats = append(containerStats.Stats, &cs) + } + // If there is a state where containers are dead at the time of query + // but not removed, then check if the writeableLayer information + // is present and attach the same. + for id := range candidateContainers { + var cs runtime.ContainerStats + if err := c.getContainerMetrics(id, nil, &cs); err != nil { + glog.Errorf("failed to decode container metrics: %v", err) + continue + } + containerStats.Stats = append(containerStats.Stats, &cs) + } + return containerStats, nil +} + +func (c *criContainerdService) getContainerMetrics( + containerID string, + stats *types.Metric, + cs *runtime.ContainerStats, +) error { + var usedBytes, inodesUsed uint64 + sn, err := c.snapshotStore.Get(containerID) + // If snapshotstore doesn't have cached snapshot information + // set WritableLayer usage to zero + if err == nil { + inodesUsed = sn.Size + usedBytes = sn.Inodes + } + cs.WritableLayer = &runtime.FilesystemUsage{ + Timestamp: sn.Timestamp, + StorageId: &runtime.StorageIdentifier{ + Uuid: c.imageFSUUID, + }, + UsedBytes: &runtime.UInt64Value{usedBytes}, + InodesUsed: &runtime.UInt64Value{inodesUsed}, + } + + // Get the container from store and extract the attributes + cnt, err := c.containerStore.Get(containerID) + if err != nil { + return fmt.Errorf("failed to find container %q in container store: %v", containerID, err) + } + cs.Attributes = &runtime.ContainerAttributes{ + Id: containerID, + Metadata: cnt.Config.GetMetadata(), + Labels: cnt.Config.GetLabels(), + Annotations: cnt.Config.GetAnnotations(), + } + + if stats != nil { + s, err := typeurl.UnmarshalAny(stats.Data) + if err != nil { + return fmt.Errorf("failed to extract container metrics: %v", err) + } + metrics := s.(*cgroups.Metrics) + cs.Cpu = &runtime.CpuUsage{ + Timestamp: stats.Timestamp.Unix(), + UsageCoreNanoSeconds: &runtime.UInt64Value{metrics.CPU.Usage.Total}, + } + cs.Memory = &runtime.MemoryUsage{ + Timestamp: stats.Timestamp.Unix(), + WorkingSetBytes: &runtime.UInt64Value{metrics.Memory.Usage.Usage}, + } + } + + return nil +} + +// buildTaskMetricsRequest constructs a tasks.MetricsRequest based on +// the information in the stats request and the containerStore +func (c *criContainerdService) buildTaskMetricsRequest( + r *runtime.ListContainerStatsRequest, +) (tasks.MetricsRequest, map[string]bool, error) { + var req tasks.MetricsRequest + if r.GetFilter == nil { + return req, nil, nil + } + + candidateContainers := make(map[string]bool) + for _, c := range c.containerStore.List() { + if r.Filter.GetId() != "" && c.ID != r.Filter.GetId() { + continue + } + if r.Filter.GetPodSandboxId() != "" && c.SandboxID != r.Filter.GetPodSandboxId() { + continue + } + if r.Filter.GetLabelSelector() != nil && !matchLabelSelector(r.Filter.GetLabelSelector(), c.Config.GetLabels()) { + continue + } + candidateContainers[c.ID] = true + } + for id := range candidateContainers { + req.Filters = append(req.Filters, "id=="+id) + } + return req, candidateContainers, nil +} + +func matchLabelSelector(selector, labels map[string]string) bool { + for k, v := range selector { + if val, ok := labels[k]; ok { + if v != val { + return false + } + } else { + return false + } + } + return true } From 853804bd8e27481b0ed0b80ab3013f595f1892cb Mon Sep 17 00:00:00 2001 From: Abhinandan Prativadi Date: Tue, 19 Sep 2017 18:53:19 -0700 Subject: [PATCH 2/2] Adding integration test for container stats Signed-off-by: Abhinandan Prativadi --- integration/container_stats_test.go | 322 ++++++++++++++++++++++++++++ integration/test_utils.go | 36 +++- 2 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 integration/container_stats_test.go diff --git a/integration/container_stats_test.go b/integration/container_stats_test.go new file mode 100644 index 000000000..11991aad9 --- /dev/null +++ b/integration/container_stats_test.go @@ -0,0 +1,322 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" +) + +// Test to verify for a container ID +func TestContainerStats(t *testing.T) { + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("sandbox1", "stats") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + t.Logf("Create a container config and run container in a pod") + containerConfig := ContainerConfig( + "container1", + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + + t.Logf("Fetch stats for container") + var s *runtime.ContainerStats + require.NoError(t, Eventually(func() (bool, error) { + s, err = runtimeService.ContainerStats(cn) + if err != nil { + return false, err + } + if s.GetWritableLayer().GetUsedBytes().GetValue() != 0 && + s.GetWritableLayer().GetInodesUsed().GetValue() != 0 { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + + t.Logf("Verify stats received for container %q", cn) + testStats(t, s, containerConfig) +} + +// Test to verify filtering without any filter +func TestContainerListStats(t *testing.T) { + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("running-pod", "statsls") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + t.Logf("Create a container config and run containers in a pod") + containerConfigMap := make(map[string]*runtime.ContainerConfig) + for i := 0; i < 3; i++ { + cName := fmt.Sprintf("container%d", i) + containerConfig := ContainerConfig( + cName, + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + require.NoError(t, err) + containerConfigMap[cn] = containerConfig + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + } + + t.Logf("Fetch all container stats") + var stats []*runtime.ContainerStats + require.NoError(t, Eventually(func() (bool, error) { + stats, err = runtimeService.ListContainerStats(&runtime.ContainerStatsFilter{}) + if err != nil { + return false, err + } + for _, s := range stats { + if s.GetWritableLayer().GetUsedBytes().GetValue() == 0 && + s.GetWritableLayer().GetInodesUsed().GetValue() == 0 { + return false, nil + } + } + return true, nil + }, time.Second, 30*time.Second)) + + t.Logf("Verify all container stats") + for _, s := range stats { + testStats(t, s, containerConfigMap[s.GetAttributes().GetId()]) + } +} + +// Test to verify filtering given a specific container ID +// TODO Convert the filter tests into table driven tests and unit tests +func TestContainerListStatsWithIdFilter(t *testing.T) { + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("running-pod", "statsls") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + t.Logf("Create a container config and run containers in a pod") + containerConfigMap := make(map[string]*runtime.ContainerConfig) + for i := 0; i < 3; i++ { + cName := fmt.Sprintf("container%d", i) + containerConfig := ContainerConfig( + cName, + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + containerConfigMap[cn] = containerConfig + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + } + + t.Logf("Fetch container stats for each container with Filter") + var stats []*runtime.ContainerStats + for id := range containerConfigMap { + require.NoError(t, Eventually(func() (bool, error) { + stats, err = runtimeService.ListContainerStats( + &runtime.ContainerStatsFilter{Id: id}) + if err != nil { + return false, err + } + if len(stats) != 1 { + return false, fmt.Errorf("unexpected stats length") + } + if stats[0].GetWritableLayer().GetUsedBytes().GetValue() != 0 && + stats[0].GetWritableLayer().GetInodesUsed().GetValue() != 0 { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + + t.Logf("Verify container stats for %s", id) + for _, s := range stats { + require.Equal(t, s.GetAttributes().GetId(), id) + testStats(t, s, containerConfigMap[id]) + } + } +} + +// Test to verify filtering given a specific Sandbox ID. Stats for +// all the containers in a pod should be returned +func TestContainerListStatsWithSandboxIdFilter(t *testing.T) { + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("running-pod", "statsls") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + t.Logf("Create a container config and run containers in a pod") + containerConfigMap := make(map[string]*runtime.ContainerConfig) + for i := 0; i < 3; i++ { + cName := fmt.Sprintf("container%d", i) + containerConfig := ContainerConfig( + cName, + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + containerConfigMap[cn] = containerConfig + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + } + + t.Logf("Fetch container stats for each container with Filter") + var stats []*runtime.ContainerStats + require.NoError(t, Eventually(func() (bool, error) { + stats, err = runtimeService.ListContainerStats( + &runtime.ContainerStatsFilter{PodSandboxId: sb}) + if err != nil { + return false, err + } + if len(stats) != 3 { + return false, fmt.Errorf("unexpected stats length") + } + if stats[0].GetWritableLayer().GetUsedBytes().GetValue() != 0 && + stats[0].GetWritableLayer().GetInodesUsed().GetValue() != 0 { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + t.Logf("Verify container stats for sandbox %q", sb) + for _, s := range stats { + testStats(t, s, containerConfigMap[s.GetAttributes().GetId()]) + } +} + +// Test to verify filtering given a specific container ID and +// sandbox ID +func TestContainerListStatsWithIdSandboxIdFilter(t *testing.T) { + t.Logf("Create a pod config and run sandbox container") + sbConfig := PodSandboxConfig("running-pod", "statsls") + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + t.Logf("Create container config and run containers in a pod") + containerConfigMap := make(map[string]*runtime.ContainerConfig) + for i := 0; i < 3; i++ { + cName := fmt.Sprintf("container%d", i) + containerConfig := ContainerConfig( + cName, + pauseImage, + WithTestLabels(), + WithTestAnnotations(), + ) + cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) + containerConfigMap[cn] = containerConfig + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.RemoveContainer(cn)) + }() + require.NoError(t, runtimeService.StartContainer(cn)) + defer func() { + assert.NoError(t, runtimeService.StopContainer(cn, 10)) + }() + } + t.Logf("Fetch container stats for sandbox ID and container ID filter") + var stats []*runtime.ContainerStats + for id, config := range containerConfigMap { + require.NoError(t, Eventually(func() (bool, error) { + stats, err = runtimeService.ListContainerStats( + &runtime.ContainerStatsFilter{Id: id, PodSandboxId: sb}) + if err != nil { + return false, err + } + if len(stats) != 1 { + return false, fmt.Errorf("Unexpected stats length") + } + if stats[0].GetWritableLayer().GetUsedBytes().GetValue() != 0 && + stats[0].GetWritableLayer().GetInodesUsed().GetValue() != 0 { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + t.Logf("Verify container stats for sandbox %q and container %q filter", sb, id) + for _, s := range stats { + testStats(t, s, config) + } + } +} + +// TODO make this as options to use for dead container tests +func testStats(t *testing.T, + s *runtime.ContainerStats, + config *runtime.ContainerConfig, +) { + require.NotEmpty(t, s.GetAttributes().GetId()) + require.NotEmpty(t, s.GetAttributes().GetMetadata()) + require.NotEmpty(t, s.GetAttributes().GetAnnotations()) + require.Equal(t, s.GetAttributes().GetLabels(), config.Labels) + require.Equal(t, s.GetAttributes().GetAnnotations(), config.Annotations) + require.Equal(t, s.GetAttributes().GetMetadata().Name, config.Metadata.Name) + require.NotEmpty(t, s.GetAttributes().GetLabels()) + require.NotEmpty(t, s.GetCpu().GetTimestamp()) + require.NotEmpty(t, s.GetCpu().GetUsageCoreNanoSeconds().GetValue()) + require.NotEmpty(t, s.GetMemory().GetTimestamp()) + require.NotEmpty(t, s.GetMemory().GetWorkingSetBytes().GetValue()) + require.NotEmpty(t, s.GetWritableLayer().GetTimestamp()) + require.NotEmpty(t, s.GetWritableLayer().GetStorageId().GetUuid()) + require.NotEmpty(t, s.GetWritableLayer().GetUsedBytes().GetValue()) + require.NotEmpty(t, s.GetWritableLayer().GetInodesUsed().GetValue()) + +} diff --git a/integration/test_utils.go b/integration/test_utils.go index 91cdc27fe..12783cb73 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -29,8 +29,9 @@ import ( ) const ( - sock = "/var/run/cri-containerd.sock" - timeout = 1 * time.Minute + sock = "/var/run/cri-containerd.sock" + timeout = 1 * time.Minute + pauseImage = "gcr.io/google_containers/pause:3.0" ) var ( @@ -71,6 +72,37 @@ func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandb return config } +// ContainerOpts to set any specific attribute like labels, +// annotations, metadata etc +type ContainerOpts func(*runtime.ContainerConfig) + +func WithTestLabels() ContainerOpts { + return func(cf *runtime.ContainerConfig) { + cf.Labels = map[string]string{"key": "value"} + } +} + +func WithTestAnnotations() ContainerOpts { + return func(cf *runtime.ContainerConfig) { + cf.Annotations = map[string]string{"a.b.c": "test"} + } +} + +// ContainerConfig creates a container config given a name and image name +// and additional container config options +func ContainerConfig(name, image string, opts ...ContainerOpts) *runtime.ContainerConfig { + cConfig := &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: name, + }, + Image: &runtime.ImageSpec{Image: image}, + } + for _, opt := range opts { + opt(cConfig) + } + return cConfig +} + // CheckFunc is the function used to check a condition is true/false. type CheckFunc func() (bool, error)