Get networks stats directly

This commit is contained in:
James Sturtevant 2021-08-09 17:20:39 -07:00
parent c39945c116
commit ab2e58c416
3 changed files with 204 additions and 266 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/clock"
) )
var ( var (
@ -67,7 +68,11 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService imageService internalapi.ImageManagerService
// hostStatsProvider is used to get the status of the host filesystem consumed by pods. // hostStatsProvider is used to get the status of the host filesystem consumed by pods.
hostStatsProvider HostStatsProvider hostStatsProvider HostStatsProvider
hcsshimInterface interface{} //lint:ignore U1000 We can't import hcsshim due to Build constraints in hcsshim
// windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows
windowsNetworkStatsProvider interface{}
// clock is used report current time
clock clock.Clock
// cpuUsageCache caches the cpu usage for containers. // cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord cpuUsageCache map[string]*cpuUsageRecord
@ -96,6 +101,7 @@ func newCRIStatsProvider(
cpuUsageCache: make(map[string]*cpuUsageRecord), cpuUsageCache: make(map[string]*cpuUsageRecord),
disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics, disableAcceleratorUsageMetrics: disableAcceleratorUsageMetrics,
podAndContainerStatsFromCRI: podAndContainerStatsFromCRI, podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
clock: clock.RealClock{},
} }
} }

View File

@ -20,135 +20,98 @@ limitations under the License.
package stats package stats
import ( import (
"fmt"
"time" "time"
"github.com/Microsoft/hcsshim"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"github.com/Microsoft/hcsshim"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
) )
type hcsShimInterface interface { // windowsNetworkStatsProvider creates an interface that allows for testing the logic without needing to create a container
GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) type windowsNetworkStatsProvider interface {
GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error)
OpenContainer(id string) (hcsshim.Container, error) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error)
} }
type windowshim struct{} // networkStats exposes the required functionality for hcsshim in this scenario
type networkStats struct{}
func (s windowshim) GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) { func (s networkStats) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) {
return hcsshim.GetContainers(q) return hcsshim.HNSListEndpointRequest()
} }
func (s windowshim) GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) { func (s networkStats) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) {
return hcsshim.GetHNSEndpointByID(endpointID) return hcsshim.GetHNSEndpointStats(endpointName)
}
func (s windowshim) OpenContainer(id string) (hcsshim.Container, error) {
return hcsshim.OpenContainer(id)
} }
// listContainerNetworkStats returns the network stats of all the running containers. // listContainerNetworkStats returns the network stats of all the running containers.
func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) { func (p *criStatsProvider) listContainerNetworkStats() (map[string]*statsapi.NetworkStats, error) {
shim := newHcsShim(p) networkStatsProvider := newNetworkStatsProvider(p)
containers, err := shim.GetContainers(hcsshim.ComputeSystemQuery{
Types: []string{"Container"}, endpoints, err := networkStatsProvider.HNSListEndpointRequest()
})
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to fetch current HNS endpoints")
return nil, err return nil, err
} }
stats := make(map[string]*statsapi.NetworkStats) networkStats := make(map[string]*statsapi.NetworkStats)
for _, c := range containers { for _, endpoint := range endpoints {
cstats, err := fetchContainerStats(shim, c) endpointStats, err := networkStatsProvider.GetHNSEndpointStats(endpoint.Id)
if err != nil { if err != nil {
klog.V(4).InfoS("Failed to fetch statistics for container, continue to get stats for other containers", "containerID", c.ID, "err", err) klog.V(2).InfoS("Failed to fetch statistics for endpoint, continue to get stats for other endpoints", "endpointId", endpoint.Id, "containers", endpoint.SharedContainers)
continue continue
} }
if len(cstats.Network) > 0 {
stats[c.ID] = hcsStatsToNetworkStats(shim, cstats.Timestamp, cstats.Network) // only add the interface for each container if not already in the list
for _, cId := range endpoint.SharedContainers {
networkStat, found := networkStats[cId]
if found && networkStat.Name != endpoint.Name {
iStat := hcsStatToInterfaceStat(endpointStats, endpoint.Name)
networkStat.Interfaces = append(networkStat.Interfaces, iStat)
continue
}
networkStats[cId] = hcsStatsToNetworkStats(p.clock.Now(), endpointStats, endpoint.Name)
} }
} }
return stats, nil return networkStats, nil
}
func newHcsShim(p *criStatsProvider) hcsShimInterface {
var shim hcsShimInterface
if p.hcsshimInterface == nil {
shim = windowshim{}
} else {
shim = p.hcsshimInterface.(hcsShimInterface)
}
return shim
}
func fetchContainerStats(hcsshimInterface hcsShimInterface, c hcsshim.ContainerProperties) (stats hcsshim.Statistics, err error) {
var (
container hcsshim.Container
)
container, err = hcsshimInterface.OpenContainer(c.ID)
if err != nil {
return
}
defer func() {
if closeErr := container.Close(); closeErr != nil {
if err != nil {
err = fmt.Errorf("failed to close container after error %v; close error: %v", err, closeErr)
} else {
err = closeErr
}
}
}()
return container.Statistics()
} }
// hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats // hcsStatsToNetworkStats converts hcsshim.Statistics.Network to statsapi.NetworkStats
func hcsStatsToNetworkStats(hcsshimInterface hcsShimInterface, timestamp time.Time, hcsStats []hcsshim.NetworkStats) *statsapi.NetworkStats { func hcsStatsToNetworkStats(timestamp time.Time, hcsStats *hcsshim.HNSEndpointStats, endpointName string) *statsapi.NetworkStats {
result := &statsapi.NetworkStats{ result := &statsapi.NetworkStats{
Time: metav1.NewTime(timestamp), Time: metav1.NewTime(timestamp),
Interfaces: make([]statsapi.InterfaceStats, 0), Interfaces: make([]statsapi.InterfaceStats, 0),
} }
adapters := sets.NewString() iStat := hcsStatToInterfaceStat(hcsStats, endpointName)
for _, stat := range hcsStats {
iStat, err := hcsStatsToInterfaceStats(hcsshimInterface, stat)
if err != nil {
klog.InfoS("Failed to get HNS endpoint, continue to get stats for other endpoints", "endpointID", stat.EndpointId, "err", err)
continue
}
// Only count each adapter once. // TODO: add support of multiple interfaces for getting default interface.
if adapters.Has(iStat.Name) { result.Interfaces = append(result.Interfaces, iStat)
continue result.InterfaceStats = iStat
}
result.Interfaces = append(result.Interfaces, *iStat)
adapters.Insert(iStat.Name)
}
// TODO(feiskyer): add support of multiple interfaces for getting default interface.
if len(result.Interfaces) > 0 {
result.InterfaceStats = result.Interfaces[0]
}
return result return result
} }
// hcsStatsToInterfaceStats converts hcsshim.NetworkStats to statsapi.InterfaceStats. func hcsStatToInterfaceStat(hcsStats *hcsshim.HNSEndpointStats, endpointName string) statsapi.InterfaceStats {
func hcsStatsToInterfaceStats(hcsshimInterface hcsShimInterface, stat hcsshim.NetworkStats) (*statsapi.InterfaceStats, error) { iStat := statsapi.InterfaceStats{
endpoint, err := hcsshimInterface.GetHNSEndpointByID(stat.EndpointId) Name: endpointName,
if err != nil { RxBytes: &hcsStats.BytesReceived,
return nil, err TxBytes: &hcsStats.BytesSent,
}
return iStat
} }
return &statsapi.InterfaceStats{ // newNetworkStatsProvider uses the real windows hcsshim if not provided otherwise if the interface is provided
Name: endpoint.Name, // by the cristatsprovider in testing scenarios it uses that one
RxBytes: &stat.BytesReceived, func newNetworkStatsProvider(p *criStatsProvider) windowsNetworkStatsProvider {
TxBytes: &stat.BytesSent, var statsProvider windowsNetworkStatsProvider
}, nil if p.windowsNetworkStatsProvider == nil {
statsProvider = networkStats{}
} else {
statsProvider = p.windowsNetworkStatsProvider.(windowsNetworkStatsProvider)
}
return statsProvider
} }

View File

@ -1,3 +1,19 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats package stats
import ( import (
@ -6,107 +22,12 @@ import (
"time" "time"
"github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
testingclock "k8s.io/utils/clock/testing"
) )
type fakeNetworkStatsProvider struct {
type fakeConatiner struct {
stat hcsshim.Statistics
}
func (f fakeConatiner) Start() error {
return nil
}
func (f fakeConatiner) Shutdown() error {
return nil
}
func (f fakeConatiner) Terminate() error {
return nil
}
func (f fakeConatiner) Wait() error {
return nil
}
func (f fakeConatiner) WaitTimeout(duration time.Duration) error {
return nil
}
func (f fakeConatiner) Pause() error {
return nil
}
func (f fakeConatiner) Resume() error {
return nil
}
func (f fakeConatiner) HasPendingUpdates() (bool, error) {
return false, nil
}
func (f fakeConatiner) Statistics() (hcsshim.Statistics, error) {
return f.stat, nil
}
func (f fakeConatiner) ProcessList() ([]hcsshim.ProcessListItem, error) {
return []hcsshim.ProcessListItem{}, nil
}
func (f fakeConatiner) MappedVirtualDisks() (map[int]hcsshim.MappedVirtualDiskController, error) {
return map[int]hcsshim.MappedVirtualDiskController{}, nil
}
func (f fakeConatiner) CreateProcess(c *hcsshim.ProcessConfig) (hcsshim.Process, error) {
return nil, nil
}
func (f fakeConatiner) OpenProcess(pid int) (hcsshim.Process, error) {
return nil, nil
}
func (f fakeConatiner) Close() error {
return nil
}
func (f fakeConatiner) Modify(config *hcsshim.ResourceModificationRequestResponse) error {
return nil
}
func (s fakehcsshim) GetContainers(q hcsshim.ComputeSystemQuery) ([]hcsshim.ContainerProperties, error) {
cp := []hcsshim.ContainerProperties{}
for _, c := range s.containers {
cp = append(cp, c.container)
}
return cp, nil
}
func (s fakehcsshim) GetHNSEndpointByID(endpointID string) (*hcsshim.HNSEndpoint, error) {
e := hcsshim.HNSEndpoint{
Name: endpointID,
}
return &e, nil
}
func (s fakehcsshim) OpenContainer(id string) (hcsshim.Container, error) {
fc := fakeConatiner{}
for _, c := range s.containers {
if c.container.ID == id {
for _, s := range c.hcsStats{
fc.stat.Network = append(fc.stat.Network, s)
}
}
}
return fc, nil
}
type fakehcsshim struct {
containers []containerStats containers []containerStats
} }
@ -115,17 +36,64 @@ type containerStats struct {
hcsStats []hcsshim.NetworkStats hcsStats []hcsshim.NetworkStats
} }
func (s fakeNetworkStatsProvider) GetHNSEndpointStats(endpointName string) (*hcsshim.HNSEndpointStats, error) {
eps := hcsshim.HNSEndpointStats{}
for _, c := range s.containers {
for _, stat := range c.hcsStats {
if endpointName == stat.InstanceId {
eps = hcsshim.HNSEndpointStats{
EndpointID: stat.EndpointId,
BytesSent: stat.BytesSent,
BytesReceived: stat.BytesReceived,
PacketsReceived: stat.PacketsReceived,
PacketsSent: stat.PacketsSent,
}
}
}
}
return &eps, nil
}
func (s fakeNetworkStatsProvider) HNSListEndpointRequest() ([]hcsshim.HNSEndpoint, error) {
uniqueEndpoints := map[string]*hcsshim.HNSEndpoint{}
for _, c := range s.containers {
for _, stat := range c.hcsStats {
e, found := uniqueEndpoints[stat.EndpointId]
if found {
// add the container
e.SharedContainers = append(e.SharedContainers, c.container.ID)
continue
}
uniqueEndpoints[stat.EndpointId] = &hcsshim.HNSEndpoint{
Name: stat.EndpointId,
Id: stat.EndpointId,
SharedContainers: []string{c.container.ID},
}
}
}
eps := []hcsshim.HNSEndpoint{}
for _, ep := range uniqueEndpoints {
eps = append(eps, *ep)
}
return eps, nil
}
func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) { func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Time{})
tests := []struct { tests := []struct {
name string name string
fields fakehcsshim fields fakeNetworkStatsProvider
want map[string]*statsapi.NetworkStats want map[string]*statsapi.NetworkStats
wantErr bool wantErr bool
}{ }{
{ {
name: "basic example", name: "basic example",
fields: fakehcsshim{ fields: fakeNetworkStatsProvider{
containers: []containerStats{ containers: []containerStats{
{ {
container: hcsshim.ContainerProperties{ container: hcsshim.ContainerProperties{
@ -154,8 +122,8 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
want: map[string]*statsapi.NetworkStats{ want: map[string]*statsapi.NetworkStats{
"c1": &statsapi.NetworkStats{ "c1": {
Time: v1.Time{}, Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test", Name: "test",
RxBytes: toP(1), RxBytes: toP(1),
@ -170,7 +138,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
}, },
"c2": &statsapi.NetworkStats{ "c2": {
Time: v1.Time{}, Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test2", Name: "test2",
@ -190,7 +158,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
{ {
name: "multiple containers same endpoint", name: "multiple containers same endpoint",
fields: fakehcsshim{ fields: fakeNetworkStatsProvider{
containers: []containerStats{ containers: []containerStats{
{ {
container: hcsshim.ContainerProperties{ container: hcsshim.ContainerProperties{
@ -231,8 +199,8 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
want: map[string]*statsapi.NetworkStats{ want: map[string]*statsapi.NetworkStats{
"c1": &statsapi.NetworkStats{ "c1": {
Time: v1.Time{}, Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test", Name: "test",
RxBytes: toP(1), RxBytes: toP(1),
@ -247,7 +215,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
}, },
"c2": &statsapi.NetworkStats{ "c2": {
Time: v1.Time{}, Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test2", Name: "test2",
@ -262,18 +230,18 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
}, },
"c3": &statsapi.NetworkStats{ "c3": {
Time: v1.Time{}, Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test2", Name: "test2",
RxBytes: toP(3), RxBytes: toP(2),
TxBytes: toP(30), TxBytes: toP(20),
}, },
Interfaces: []statsapi.InterfaceStats{ Interfaces: []statsapi.InterfaceStats{
{ {
Name: "test2", Name: "test2",
RxBytes: toP(3), RxBytes: toP(2),
TxBytes: toP(30), TxBytes: toP(20),
}, },
}, },
}, },
@ -282,7 +250,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
{ {
name: "multiple stats instances of same interface only picks up first", name: "multiple stats instances of same interface only picks up first",
fields: fakehcsshim{ fields: fakeNetworkStatsProvider{
containers: []containerStats{ containers: []containerStats{
{ {
container: hcsshim.ContainerProperties{ container: hcsshim.ContainerProperties{
@ -317,8 +285,8 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
want: map[string]*statsapi.NetworkStats{ want: map[string]*statsapi.NetworkStats{
"c1": &statsapi.NetworkStats{ "c1": {
Time: v1.Time{}, Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test", Name: "test",
RxBytes: toP(1), RxBytes: toP(1),
@ -333,7 +301,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
}, },
"c2": &statsapi.NetworkStats{ "c2": {
Time: v1.Time{}, Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test2", Name: "test2",
@ -353,7 +321,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
{ {
name: "multiple endpoints per container", name: "multiple endpoints per container",
fields: fakehcsshim{ fields: fakeNetworkStatsProvider{
containers: []containerStats{ containers: []containerStats{
{ {
container: hcsshim.ContainerProperties{ container: hcsshim.ContainerProperties{
@ -388,8 +356,8 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
want: map[string]*statsapi.NetworkStats{ want: map[string]*statsapi.NetworkStats{
"c1": &statsapi.NetworkStats{ "c1": {
Time: v1.Time{}, Time: v1.NewTime(fakeClock.Now()),
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test", Name: "test",
RxBytes: toP(1), RxBytes: toP(1),
@ -410,7 +378,7 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
}, },
}, },
}, },
"c2": &statsapi.NetworkStats{ "c2": {
Time: v1.Time{}, Time: v1.Time{},
InterfaceStats: statsapi.InterfaceStats{ InterfaceStats: statsapi.InterfaceStats{
Name: "test2", Name: "test2",
@ -432,9 +400,10 @@ func Test_criStatsProvider_listContainerNetworkStats(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
p := &criStatsProvider{ p := &criStatsProvider{
hcsshimInterface: fakehcsshim{ windowsNetworkStatsProvider: fakeNetworkStatsProvider{
containers: tt.fields.containers, containers: tt.fields.containers,
}, },
clock: fakeClock,
} }
got, err := p.listContainerNetworkStats() got, err := p.listContainerNetworkStats()
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {