diff --git a/pkg/server/container_stats.go b/pkg/server/container_stats.go index efd78af17..103eb769b 100644 --- a/pkg/server/container_stats.go +++ b/pkg/server/container_stats.go @@ -17,15 +17,34 @@ limitations under the License. package server import ( - "errors" + "fmt" + tasks "github.com/containerd/containerd/api/services/tasks/v1" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. func (c *criContainerdService) ContainerStats(ctx context.Context, in *runtime.ContainerStatsRequest) (*runtime.ContainerStatsResponse, error) { - return nil, errors.New("not implemented") + // Validate the stats request + if in.GetContainerId() == "" { + return nil, fmt.Errorf("invalid container stats request") + } + containerID := in.GetContainerId() + _, err := c.containerStore.Get(containerID) + if err != nil { + return nil, fmt.Errorf("failed to find container %q: %v", containerID, err) + } + request := &tasks.MetricsRequest{Filters: []string{"id==" + containerID}} + resp, err := c.taskService.Metrics(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err) + } + + var cs runtime.ContainerStats + if err := c.getContainerMetrics(containerID, resp.Metrics[0], &cs); err != nil { + return nil, fmt.Errorf("failed to decode container metrics: %v", err) + } + return &runtime.ContainerStatsResponse{Stats: &cs}, nil } diff --git a/pkg/server/container_stats_list.go b/pkg/server/container_stats_list.go index 93b776706..f0db512bc 100644 --- a/pkg/server/container_stats_list.go +++ b/pkg/server/container_stats_list.go @@ -17,14 +17,156 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/containerd/cgroups" + tasks "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types" + "github.com/containerd/typeurl" + "github.com/golang/glog" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // ListContainerStats returns stats of all running containers. -func (c *criContainerdService) ListContainerStats(ctx context.Context, in *runtime.ListContainerStatsRequest) (*runtime.ListContainerStatsResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ListContainerStats( + ctx context.Context, + in *runtime.ListContainerStatsRequest, +) (*runtime.ListContainerStatsResponse, error) { + request, candidateContainers, err := c.buildTaskMetricsRequest(in) + if err != nil { + return nil, fmt.Errorf("failed to build metrics request: %v", err) + } + resp, err := c.taskService.Metrics(ctx, &request) + if err != nil { + return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err) + } + criStats, err := c.toCRIContainerStats(resp.Metrics, candidateContainers) + if err != nil { + return nil, fmt.Errorf("failed to convert to cri containerd stats format: %v", err) + } + return criStats, nil +} + +func (c *criContainerdService) toCRIContainerStats( + stats []*types.Metric, + candidateContainers map[string]bool, +) (*runtime.ListContainerStatsResponse, error) { + containerStats := new(runtime.ListContainerStatsResponse) + for _, stat := range stats { + var cs runtime.ContainerStats + if err := c.getContainerMetrics(stat.ID, stat, &cs); err != nil { + glog.Errorf("failed to decode container metrics: %v", err) + continue + } + delete(candidateContainers, stat.ID) + containerStats.Stats = append(containerStats.Stats, &cs) + } + // If there is a state where containers are dead at the time of query + // but not removed, then check if the writeableLayer information + // is present and attach the same. + for id := range candidateContainers { + var cs runtime.ContainerStats + if err := c.getContainerMetrics(id, nil, &cs); err != nil { + glog.Errorf("failed to decode container metrics: %v", err) + continue + } + containerStats.Stats = append(containerStats.Stats, &cs) + } + return containerStats, nil +} + +func (c *criContainerdService) getContainerMetrics( + containerID string, + stats *types.Metric, + cs *runtime.ContainerStats, +) error { + var usedBytes, inodesUsed uint64 + sn, err := c.snapshotStore.Get(containerID) + // If snapshotstore doesn't have cached snapshot information + // set WritableLayer usage to zero + if err == nil { + inodesUsed = sn.Size + usedBytes = sn.Inodes + } + cs.WritableLayer = &runtime.FilesystemUsage{ + Timestamp: sn.Timestamp, + StorageId: &runtime.StorageIdentifier{ + Uuid: c.imageFSUUID, + }, + UsedBytes: &runtime.UInt64Value{usedBytes}, + InodesUsed: &runtime.UInt64Value{inodesUsed}, + } + + // Get the container from store and extract the attributes + cnt, err := c.containerStore.Get(containerID) + if err != nil { + return fmt.Errorf("failed to find container %q in container store: %v", containerID, err) + } + cs.Attributes = &runtime.ContainerAttributes{ + Id: containerID, + Metadata: cnt.Config.GetMetadata(), + Labels: cnt.Config.GetLabels(), + Annotations: cnt.Config.GetAnnotations(), + } + + if stats != nil { + s, err := typeurl.UnmarshalAny(stats.Data) + if err != nil { + return fmt.Errorf("failed to extract container metrics: %v", err) + } + metrics := s.(*cgroups.Metrics) + cs.Cpu = &runtime.CpuUsage{ + Timestamp: stats.Timestamp.Unix(), + UsageCoreNanoSeconds: &runtime.UInt64Value{metrics.CPU.Usage.Total}, + } + cs.Memory = &runtime.MemoryUsage{ + Timestamp: stats.Timestamp.Unix(), + WorkingSetBytes: &runtime.UInt64Value{metrics.Memory.Usage.Usage}, + } + } + + return nil +} + +// buildTaskMetricsRequest constructs a tasks.MetricsRequest based on +// the information in the stats request and the containerStore +func (c *criContainerdService) buildTaskMetricsRequest( + r *runtime.ListContainerStatsRequest, +) (tasks.MetricsRequest, map[string]bool, error) { + var req tasks.MetricsRequest + if r.GetFilter == nil { + return req, nil, nil + } + + candidateContainers := make(map[string]bool) + for _, c := range c.containerStore.List() { + if r.Filter.GetId() != "" && c.ID != r.Filter.GetId() { + continue + } + if r.Filter.GetPodSandboxId() != "" && c.SandboxID != r.Filter.GetPodSandboxId() { + continue + } + if r.Filter.GetLabelSelector() != nil && !matchLabelSelector(r.Filter.GetLabelSelector(), c.Config.GetLabels()) { + continue + } + candidateContainers[c.ID] = true + } + for id := range candidateContainers { + req.Filters = append(req.Filters, "id=="+id) + } + return req, candidateContainers, nil +} + +func matchLabelSelector(selector, labels map[string]string) bool { + for k, v := range selector { + if val, ok := labels[k]; ok { + if v != val { + return false + } + } else { + return false + } + } + return true }