diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index 1573e87af6e..10fe4d55b33 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/server/stats" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/utils/clock" ) var ( @@ -67,7 +68,11 @@ type criStatsProvider struct { imageService internalapi.ImageManagerService // hostStatsProvider is used to get the status of the host filesystem consumed by pods. hostStatsProvider HostStatsProvider - hcsshimInterface interface{} + //lint:ignore U1000 We can't import hcsshim due to Build constraints in hcsshim + // windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows + windowsNetworkStatsProvider interface{} + // clock is used report current time + clock clock.Clock // cpuUsageCache caches the cpu usage for containers. cpuUsageCache map[string]*cpuUsageRecord @@ -96,6 +101,7 @@ func newCRIStatsProvider( cpuUsageCache: make(map[string]*cpuUsageRecord), disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics, podAndContainerStatsFromCRI: podAndContainerStatsFromCRI, + clock: clock.RealClock{}, } } diff --git a/pkg/kubelet/stats/cri_stats_provider_windows.go b/pkg/kubelet/stats/cri_stats_provider_windows.go index f024d0fc895..a32ffe9e981 100644 --- a/pkg/kubelet/stats/cri_stats_provider_windows.go +++ b/pkg/kubelet/stats/cri_stats_provider_windows.go @@ -20,135 +20,98 @@ limitations under the License. package stats import ( - "fmt" "time" - "github.com/Microsoft/hcsshim" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + + "github.com/Microsoft/hcsshim" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" ) -type hcsShimInterface interface { - GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) - GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) - OpenContainer(id string) (hcsshim.Container, error) +// windowsNetworkStatsProvider creates an interface that allows for testing the logic without needing to create a container +type windowsNetworkStatsProvider interface { + HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) + GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) } -type windowshim struct{} +// networkStats exposes the required functionality for hcsshim in this scenario +type networkStats struct{} -func (s windowshim) GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) { - return hcsshim.GetContainers(q) +func (s networkStats) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) { + return hcsshim.HNSListEndpointRequest() } -func (s windowshim) GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) { - return hcsshim.GetHNSEndpointByID(endpointID) -} - -func (s windowshim) OpenContainer(id string) (hcsshim.Container, error) { - return hcsshim.OpenContainer(id) +func (s networkStats) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) { + return hcsshim.GetHNSEndpointStats(endpointName) } // listContainerNetworkStats returns the network stats of all the running containers. func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) { - shim := newHcsShim(p) - containers, err := shim.GetContainers(hcsshim.ComputeSystemQuery{ - Types: []string{"Container"}, - }) + networkStatsProvider := newNetworkStatsProvider(p) + + endpoints, err := networkStatsProvider.HNSListEndpointRequest() if err != nil { + klog.ErrorS(err, "Failed to fetch current HNS endpoints") return nil, err } - stats := make(map[string]*statsapi.NetworkStats) - for _, c := range containers { - cstats, err := fetchContainerStats(shim, c) + networkStats := make(map[string]*statsapi.NetworkStats) + for _, endpoint := range endpoints { + endpointStats, err := networkStatsProvider.GetHNSEndpointStats(endpoint.Id) if err != nil { - klog.V(4).InfoS("Failed to fetch statistics for container, continue to get stats for other containers", "containerID", c.ID, "err", err) + klog.V(2).InfoS("Failed to fetch statistics for endpoint, continue to get stats for other endpoints", "endpointId", endpoint.Id, "containers", endpoint.SharedContainers) continue } - if len(cstats.Network) > 0 { - stats[c.ID] = hcsStatsToNetworkStats(shim, cstats.Timestamp, cstats.Network) - } - } - return stats, nil -} - -func newHcsShim(p *criStatsProvider) hcsShimInterface { - var shim hcsShimInterface - if p.hcsshimInterface == nil { - shim = windowshim{} - } else { - shim = p.hcsshimInterface.(hcsShimInterface) - } - return shim -} - -func fetchContainerStats(hcsshimInterface hcsShimInterface, c hcsshim.ContainerProperties) (stats hcsshim.Statistics, err error) { - var ( - container hcsshim.Container - ) - container, err = hcsshimInterface.OpenContainer(c.ID) - if err != nil { - return - } - defer func() { - if closeErr := container.Close(); closeErr != nil { - if err != nil { - err = fmt.Errorf("failed to close container after error %v; close error: %v", err, closeErr) - } else { - err = closeErr + // only add the interface for each container if not already in the list + for _, cId := range endpoint.SharedContainers { + networkStat, found := networkStats[cId] + if found && networkStat.Name != endpoint.Name { + iStat := hcsStatToInterfaceStat(endpointStats, endpoint.Name) + networkStat.Interfaces = append(networkStat.Interfaces, iStat) + continue } + networkStats[cId] = hcsStatsToNetworkStats(p.clock.Now(), endpointStats, endpoint.Name) } - }() + } - return container.Statistics() + return networkStats, nil } // hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats -func hcsStatsToNetworkStats(hcsshimInterface hcsShimInterface, timestamp time.Time, hcsStats []hcsshim.NetworkStats) *statsapi.NetworkStats { +func hcsStatsToNetworkStats(timestamp time.Time, hcsStats *hcsshim.HNSEndpointStats, endpointName string) *statsapi.NetworkStats { result := &statsapi.NetworkStats{ Time: metav1.NewTime(timestamp), Interfaces: make([]statsapi.InterfaceStats, 0), } - adapters := sets.NewString() - for _, stat := range hcsStats { - iStat, err := hcsStatsToInterfaceStats(hcsshimInterface, stat) - if err != nil { - klog.InfoS("Failed to get HNS endpoint, continue to get stats for other endpoints", "endpointID", stat.EndpointId, "err", err) - continue - } + iStat := hcsStatToInterfaceStat(hcsStats, endpointName) - // Only count each adapter once. - if adapters.Has(iStat.Name) { - continue - } - - result.Interfaces = append(result.Interfaces, *iStat) - adapters.Insert(iStat.Name) - } - - // TODO(feiskyer): add support of multiple interfaces for getting default interface. - if len(result.Interfaces) > 0 { - result.InterfaceStats = result.Interfaces[0] - } + // TODO: add support of multiple interfaces for getting default interface. + result.Interfaces = append(result.Interfaces, iStat) + result.InterfaceStats = iStat return result } -// hcsStatsToInterfaceStats converts hcsshim.NetworkStats to statsapi.InterfaceStats. -func hcsStatsToInterfaceStats(hcsshimInterface hcsShimInterface, stat hcsshim.NetworkStats) (*statsapi.InterfaceStats, error) { - endpoint, err := hcsshimInterface.GetHNSEndpointByID(stat.EndpointId) - if err != nil { - return nil, err +func hcsStatToInterfaceStat(hcsStats *hcsshim.HNSEndpointStats, endpointName string) statsapi.InterfaceStats { + iStat := statsapi.InterfaceStats{ + Name: endpointName, + RxBytes: &hcsStats.BytesReceived, + TxBytes: &hcsStats.BytesSent, } - - return &statsapi.InterfaceStats{ - Name: endpoint.Name, - RxBytes: &stat.BytesReceived, - TxBytes: &stat.BytesSent, - }, nil + return iStat +} + +// newNetworkStatsProvider uses the real windows hcsshim if not provided otherwise if the interface is provided +// by the cristatsprovider in testing scenarios it uses that one +func newNetworkStatsProvider(p *criStatsProvider) windowsNetworkStatsProvider { + var statsProvider windowsNetworkStatsProvider + if p.windowsNetworkStatsProvider == nil { + statsProvider = networkStats{} + } else { + statsProvider = p.windowsNetworkStatsProvider.(windowsNetworkStatsProvider) + } + return statsProvider } diff --git a/pkg/kubelet/stats/cri_stats_provider_windows_test.go b/pkg/kubelet/stats/cri_stats_provider_windows_test.go index bf26302d2de..6bce71c3bb8 100644 --- a/pkg/kubelet/stats/cri_stats_provider_windows_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_windows_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package stats import ( @@ -6,156 +22,108 @@ import ( "time" "github.com/Microsoft/hcsshim" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + testingclock "k8s.io/utils/clock/testing" ) - - -type fakeConatiner struct { - stat hcsshim.Statistics +type fakeNetworkStatsProvider struct { + containers []containerStats } -func (f fakeConatiner) Start() error { - return nil +type containerStats struct { + container hcsshim.ContainerProperties + hcsStats []hcsshim.NetworkStats } -func (f fakeConatiner) Shutdown() error { - return nil -} - -func (f fakeConatiner) Terminate() error { - return nil -} - -func (f fakeConatiner) Wait() error { - return nil -} - -func (f fakeConatiner) WaitTimeout(duration time.Duration) error { - return nil -} - -func (f fakeConatiner) Pause() error { - return nil -} - -func (f fakeConatiner) Resume() error { - return nil -} - -func (f fakeConatiner) HasPendingUpdates() (bool, error) { - return false, nil -} - -func (f fakeConatiner) Statistics() (hcsshim.Statistics, error) { - return f.stat, nil -} - -func (f fakeConatiner) ProcessList() ([]hcsshim.ProcessListItem, error) { - return []hcsshim.ProcessListItem{}, nil -} - -func (f fakeConatiner) MappedVirtualDisks() (map[int]hcsshim.MappedVirtualDiskController, error) { - return map[int]hcsshim.MappedVirtualDiskController{}, nil -} - -func (f fakeConatiner) CreateProcess(c *hcsshim.ProcessConfig) (hcsshim.Process, error) { - return nil, nil -} - -func (f fakeConatiner) OpenProcess(pid int) (hcsshim.Process, error) { - return nil, nil -} - -func (f fakeConatiner) Close() error { - return nil -} - -func (f fakeConatiner) Modify(config *hcsshim.ResourceModificationRequestResponse) error { - return nil -} - -func (s fakehcsshim) GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) { - cp := []hcsshim.ContainerProperties{} - for _, c := range s.containers { - cp = append(cp, c.container) - } - - return cp, nil -} - -func (s fakehcsshim) GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) { - e := hcsshim.HNSEndpoint{ - Name: endpointID, - } - return &e, nil -} - -func (s fakehcsshim) OpenContainer(id string) (hcsshim.Container, error) { - fc := fakeConatiner{} +func (s fakeNetworkStatsProvider) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) { + eps := hcsshim.HNSEndpointStats{} for _, c := range s.containers { - if c.container.ID == id { - for _, s := range c.hcsStats{ - fc.stat.Network = append(fc.stat.Network, s) + for _, stat := range c.hcsStats { + if endpointName == stat.InstanceId { + eps = hcsshim.HNSEndpointStats{ + EndpointID: stat.EndpointId, + BytesSent: stat.BytesSent, + BytesReceived: stat.BytesReceived, + PacketsReceived: stat.PacketsReceived, + PacketsSent: stat.PacketsSent, + } } } } - return fc, nil + return &eps, nil } -type fakehcsshim struct { - containers []containerStats -} +func (s fakeNetworkStatsProvider) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) { + uniqueEndpoints := map[string]*hcsshim.HNSEndpoint{} -type containerStats struct { - container hcsshim.ContainerProperties - hcsStats []hcsshim.NetworkStats -} + for _, c := range s.containers { + for _, stat := range c.hcsStats { + e, found := uniqueEndpoints[stat.EndpointId] + if found { + // add the container + e.SharedContainers = append(e.SharedContainers, c.container.ID) + continue + } + uniqueEndpoints[stat.EndpointId] = &hcsshim.HNSEndpoint{ + Name: stat.EndpointId, + Id: stat.EndpointId, + SharedContainers: []string{c.container.ID}, + } + } + } + + eps := []hcsshim.HNSEndpoint{} + for _, ep := range uniqueEndpoints { + eps = append(eps, *ep) + } + + return eps, nil +} func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { + fakeClock := testingclock.NewFakeClock(time.Time{}) tests := []struct { name string - fields fakehcsshim + fields fakeNetworkStatsProvider want map[string]*statsapi.NetworkStats wantErr bool }{ { name: "basic example", - fields: fakehcsshim{ + fields: fakeNetworkStatsProvider{ containers: []containerStats{ { container: hcsshim.ContainerProperties{ - ID: "c1", + ID: "c1", }, hcsStats: []hcsshim.NetworkStats{ - { - BytesReceived: 1, - BytesSent: 10, - EndpointId: "test", - InstanceId: "test", + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", }, }, }, { container: hcsshim.ContainerProperties{ - ID: "c2", + ID: "c2", }, hcsStats: []hcsshim.NetworkStats{ { - BytesReceived: 2, - BytesSent: 20, - EndpointId: "test2", - InstanceId: "test2", - }, - }, + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, }, }, }, want: map[string]*statsapi.NetworkStats{ - "c1": &statsapi.NetworkStats{ - Time: v1.Time{}, + "c1": { + Time: v1.NewTime(fakeClock.Now()), InterfaceStats: statsapi.InterfaceStats{ Name: "test", RxBytes: toP(1), @@ -170,7 +138,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, }, }, - "c2": &statsapi.NetworkStats{ + "c2": { Time: v1.Time{}, InterfaceStats: statsapi.InterfaceStats{ Name: "test2", @@ -190,49 +158,49 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, { name: "multiple containers same endpoint", - fields: fakehcsshim{ + fields: fakeNetworkStatsProvider{ containers: []containerStats{ { container: hcsshim.ContainerProperties{ - ID: "c1", + ID: "c1", }, hcsStats: []hcsshim.NetworkStats{ - { - BytesReceived: 1, - BytesSent: 10, - EndpointId: "test", - InstanceId: "test", + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", }, }, }, { container: hcsshim.ContainerProperties{ - ID: "c2", + ID: "c2", }, hcsStats: []hcsshim.NetworkStats{ { - BytesReceived: 2, - BytesSent: 20, - EndpointId: "test2", - InstanceId: "test2", + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", }, }, }, { container: hcsshim.ContainerProperties{ - ID: "c3", + ID: "c3", }, hcsStats: []hcsshim.NetworkStats{ { - BytesReceived: 3, - BytesSent: 30, - EndpointId: "test2", - InstanceId: "test3", - }, - }, + BytesReceived: 3, + BytesSent: 30, + EndpointId: "test2", + InstanceId: "test3", + }, + }, }, }, }, want: map[string]*statsapi.NetworkStats{ - "c1": &statsapi.NetworkStats{ - Time: v1.Time{}, + "c1": { + Time: v1.NewTime(fakeClock.Now()), InterfaceStats: statsapi.InterfaceStats{ Name: "test", RxBytes: toP(1), @@ -247,7 +215,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, }, }, - "c2": &statsapi.NetworkStats{ + "c2": { Time: v1.Time{}, InterfaceStats: statsapi.InterfaceStats{ Name: "test2", @@ -262,18 +230,18 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, }, }, - "c3": &statsapi.NetworkStats{ + "c3": { Time: v1.Time{}, InterfaceStats: statsapi.InterfaceStats{ Name: "test2", - RxBytes: toP(3), - TxBytes: toP(30), + RxBytes: toP(2), + TxBytes: toP(20), }, Interfaces: []statsapi.InterfaceStats{ { Name: "test2", - RxBytes: toP(3), - TxBytes: toP(30), + RxBytes: toP(2), + TxBytes: toP(20), }, }, }, @@ -282,19 +250,19 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, { name: "multiple stats instances of same interface only picks up first", - fields: fakehcsshim{ + fields: fakeNetworkStatsProvider{ containers: []containerStats{ { container: hcsshim.ContainerProperties{ ID: "c1", }, hcsStats: []hcsshim.NetworkStats{ - { - BytesReceived: 1, - BytesSent: 10, - EndpointId: "test", - InstanceId: "test", - }, - { + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + { BytesReceived: 3, BytesSent: 30, EndpointId: "test", @@ -307,18 +275,18 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { ID: "c2", }, hcsStats: []hcsshim.NetworkStats{ { - BytesReceived: 2, - BytesSent: 20, - EndpointId: "test2", - InstanceId: "test2", + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, }, - }, - }, }, }, want: map[string]*statsapi.NetworkStats{ - "c1": &statsapi.NetworkStats{ - Time: v1.Time{}, + "c1": { + Time: v1.NewTime(fakeClock.Now()), InterfaceStats: statsapi.InterfaceStats{ Name: "test", RxBytes: toP(1), @@ -333,7 +301,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, }, }, - "c2": &statsapi.NetworkStats{ + "c2": { Time: v1.Time{}, InterfaceStats: statsapi.InterfaceStats{ Name: "test2", @@ -353,19 +321,19 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, { name: "multiple endpoints per container", - fields: fakehcsshim{ + fields: fakeNetworkStatsProvider{ containers: []containerStats{ { container: hcsshim.ContainerProperties{ ID: "c1", }, hcsStats: []hcsshim.NetworkStats{ - { - BytesReceived: 1, - BytesSent: 10, - EndpointId: "test", - InstanceId: "test", - }, - { + { + BytesReceived: 1, + BytesSent: 10, + EndpointId: "test", + InstanceId: "test", + }, + { BytesReceived: 3, BytesSent: 30, EndpointId: "test3", @@ -377,19 +345,19 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { container: hcsshim.ContainerProperties{ ID: "c2", }, hcsStats: []hcsshim.NetworkStats{ - { - BytesReceived: 2, - BytesSent: 20, - EndpointId: "test2", - InstanceId: "test2", + { + BytesReceived: 2, + BytesSent: 20, + EndpointId: "test2", + InstanceId: "test2", + }, + }, }, - }, - }, }, }, want: map[string]*statsapi.NetworkStats{ - "c1": &statsapi.NetworkStats{ - Time: v1.Time{}, + "c1": { + Time: v1.NewTime(fakeClock.Now()), InterfaceStats: statsapi.InterfaceStats{ Name: "test", RxBytes: toP(1), @@ -410,7 +378,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { }, }, }, - "c2": &statsapi.NetworkStats{ + "c2": { Time: v1.Time{}, InterfaceStats: statsapi.InterfaceStats{ Name: "test2", @@ -432,9 +400,10 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &criStatsProvider{ - hcsshimInterface: fakehcsshim{ - containers: tt.fields.containers, + windowsNetworkStatsProvider: fakeNetworkStatsProvider{ + containers: tt.fields.containers, }, + clock: fakeClock, } got, err := p.listContainerNetworkStats() if (err != nil) != tt.wantErr {