Update to new cadvisor v0.48.1

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas
2023-10-31 07:51:09 -04:00
parent 0294521985
commit 8b9fc325e2
36 changed files with 425 additions and 331 deletions

View File

@@ -21,7 +21,6 @@ package common
import (
"encoding/json"
"flag"
"io/ioutil"
"os"
)
@@ -48,7 +47,7 @@ type networkInterface struct {
}
func GetContainerHintsFromFile(containerHintsFile string) (ContainerHints, error) {
dat, err := ioutil.ReadFile(containerHintsFile)
dat, err := os.ReadFile(containerHintsFile)
if os.IsNotExist(err) {
return ContainerHints{}, nil
}

View File

@@ -16,7 +16,6 @@ package common
import (
"fmt"
"io/ioutil"
"math"
"os"
"path"
@@ -76,7 +75,11 @@ func getSpecInternal(cgroupPaths map[string]string, machineInfoFactory info.Mach
dir, err := os.Stat(cgroupPathDir)
if err == nil && dir.ModTime().Before(lowestTime) {
lowestTime = dir.ModTime()
} else if os.IsNotExist(err) {
// Directory does not exist, skip checking for files within.
continue
}
// The modified time of the cgroup directory sometimes changes whenever a subcontainer is created.
// eg. /docker will have creation time matching the creation of latest docker container.
// Use clone_children/events as a workaround as it isn't usually modified. It is only likely changed
@@ -235,7 +238,7 @@ func readString(dirpath string, file string) string {
cgroupFile := path.Join(dirpath, file)
// Read
out, err := ioutil.ReadFile(cgroupFile)
out, err := os.ReadFile(cgroupFile)
if err != nil {
// Ignore non-existent files
if !os.IsNotExist(err) {
@@ -437,3 +440,20 @@ func (m deviceIdentifierMap) Find(major, minor uint64, namer DeviceNamer) string
m[d] = s
return s
}
// RemoveNetMetrics is used to remove any network metrics from the given MetricSet.
// It returns the original set as is if remove is false, or if there are no metrics
// to remove.
func RemoveNetMetrics(metrics container.MetricSet, remove bool) container.MetricSet {
if !remove {
return metrics
}
// Check if there is anything we can remove, to avoid useless copying.
if !metrics.HasAny(container.AllNetworkMetrics) {
return metrics
}
// A copy of all metrics except for network ones.
return metrics.Difference(container.AllNetworkMetrics)
}

View File

@@ -35,6 +35,7 @@ const (
ContainerTypeCrio
ContainerTypeContainerd
ContainerTypeMesos
ContainerTypePodman
)
// Interface for container operation handlers.

View File

@@ -59,6 +59,7 @@ const (
maxBackoffDelay = 3 * time.Second
baseBackoffDelay = 100 * time.Millisecond
connectionTimeout = 2 * time.Second
maxMsgSize = 16 * 1024 * 1024 // 16MB
)
// Client creates a containerd client
@@ -82,6 +83,7 @@ func Client(address, namespace string) (ContainerdClient, error) {
grpc.WithContextDialer(dialer.ContextDialer),
grpc.WithBlock(),
grpc.WithConnectParams(connParams),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
}
unary, stream := newNSInterceptors(namespace)
gopts = append(gopts,

View File

@@ -126,7 +126,14 @@ func newContainerdContainerHandler(
Aliases: []string{id, name},
}
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootfs, int(taskPid), includedMetrics)
// Containers that don't have their own network -- this includes
// containers running in Kubernetes pods that use the network of the
// infrastructure container -- does not need their stats to be
// reported. This stops metrics being reported multiple times for each
// container in a pod.
metrics := common.RemoveNetMetrics(includedMetrics, cntr.Labels["io.cri-containerd.kind"] != "sandbox")
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootfs, int(taskPid), metrics)
handler := &containerdContainerHandler{
machineInfoFactory: machineInfoFactory,
@@ -134,7 +141,7 @@ func newContainerdContainerHandler(
fsInfo: fsInfo,
envs: make(map[string]string),
labels: cntr.Labels,
includedMetrics: includedMetrics,
includedMetrics: metrics,
reference: containerReference,
libcontainerHandler: libcontainerHandler,
}
@@ -164,22 +171,12 @@ func (h *containerdContainerHandler) ContainerReference() (info.ContainerReferen
return h.reference, nil
}
func (h *containerdContainerHandler) needNet() bool {
// Since containerd does not handle networking ideally we need to return based
// on includedMetrics list. Here the assumption is the presence of cri-containerd
// label
if h.includedMetrics.Has(container.NetworkUsageMetrics) {
//TODO change it to exported cri-containerd constants
return h.labels["io.cri-containerd.kind"] == "sandbox"
}
return false
}
func (h *containerdContainerHandler) GetSpec() (info.ContainerSpec, error) {
// TODO: Since we dont collect disk usage stats for containerd, we set hasFilesystem
// to false. Revisit when we support disk usage stats for containerd
hasFilesystem := false
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem)
hasNet := h.includedMetrics.Has(container.NetworkUsageMetrics)
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, hasNet, hasFilesystem)
spec.Labels = h.labels
spec.Envs = h.envs
spec.Image = h.image
@@ -204,13 +201,6 @@ func (h *containerdContainerHandler) GetStats() (*info.ContainerStats, error) {
if err != nil {
return stats, err
}
// Clean up stats for containers that don't have their own network - this
// includes containers running in Kubernetes pods that use the network of the
// infrastructure container. This stops metrics being reported multiple times
// for each container in a pod.
if !h.needNet() {
stats.Network = info.NetworkStats{}
}
// Get filesystem stats.
err = h.getFsStats(stats)

View File

@@ -17,8 +17,9 @@ package crio
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"sync"
@@ -26,6 +27,8 @@ import (
"time"
)
var crioClientTimeout = flag.Duration("crio_client_timeout", time.Duration(0), "CRI-O client timeout. Default is no timeout.")
const (
CrioSocket = "/var/run/crio/crio.sock"
maxUnixSocketPathSize = len(syscall.RawSockaddrUnix{}.Path)
@@ -89,6 +92,7 @@ func Client() (CrioClient, error) {
theClient = &crioClientImpl{
client: &http.Client{
Transport: tr,
Timeout: *crioClientTimeout,
},
}
})
@@ -142,7 +146,7 @@ func (c *crioClientImpl) ContainerInfo(id string) (*ContainerInfo, error) {
// golang's http.Do doesn't return an error if non 200 response code is returned
// handle this case here, rather than failing to decode the body
if resp.StatusCode != http.StatusOK {
respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Error finding container %s: Status %d", id, resp.StatusCode)
}

View File

@@ -148,7 +148,15 @@ func newCrioContainerHandler(
Namespace: CrioNamespace,
}
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, cInfo.Pid, includedMetrics)
// Find out if we need network metrics reported for this container.
// Containers that don't have their own network -- this includes
// containers running in Kubernetes pods that use the network of the
// infrastructure container -- does not need their stats to be
// reported. This stops metrics being reported multiple times for each
// container in a pod.
metrics := common.RemoveNetMetrics(includedMetrics, cInfo.Labels["io.kubernetes.container.name"] != "POD")
libcontainerHandler := containerlibcontainer.NewHandler(cgroupManager, rootFs, cInfo.Pid, metrics)
// TODO: extract object mother method
handler := &crioContainerHandler{
@@ -161,7 +169,7 @@ func newCrioContainerHandler(
rootfsStorageDir: rootfsStorageDir,
envs: make(map[string]string),
labels: cInfo.Labels,
includedMetrics: includedMetrics,
includedMetrics: metrics,
reference: containerReference,
libcontainerHandler: libcontainerHandler,
cgroupManager: cgroupManager,
@@ -210,16 +218,10 @@ func (h *crioContainerHandler) ContainerReference() (info.ContainerReference, er
return h.reference, nil
}
func (h *crioContainerHandler) needNet() bool {
if h.includedMetrics.Has(container.NetworkUsageMetrics) {
return h.labels["io.kubernetes.container.name"] == "POD"
}
return false
}
func (h *crioContainerHandler) GetSpec() (info.ContainerSpec, error) {
hasFilesystem := h.includedMetrics.Has(container.DiskUsageMetrics)
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, h.needNet(), hasFilesystem)
hasNet := h.includedMetrics.Has(container.NetworkUsageMetrics)
spec, err := common.GetSpec(h.cgroupPaths, h.machineInfoFactory, hasNet, hasFilesystem)
spec.Labels = h.labels
spec.Envs = h.envs
@@ -306,13 +308,7 @@ func (h *crioContainerHandler) GetStats() (*info.ContainerStats, error) {
return stats, err
}
if !h.needNet() {
// Clean up stats for containers that don't have their own network - this
// includes containers running in Kubernetes pods that use the network of the
// infrastructure container. This stops metrics being reported multiple times
// for each container in a pod.
stats.Network = info.NetworkStats{}
} else if len(stats.Network.Interfaces) == 0 {
if h.includedMetrics.Has(container.NetworkUsageMetrics) && len(stats.Network.Interfaces) == 0 {
// No network related information indicates that the pid of the
// container is not longer valid and we need to ask crio to
// provide the pid of another container from that pod

View File

@@ -93,6 +93,14 @@ var AllMetrics = MetricSet{
OOMMetrics: struct{}{},
}
// AllNetworkMetrics represents all network metrics that cAdvisor supports.
var AllNetworkMetrics = MetricSet{
NetworkUsageMetrics: struct{}{},
NetworkTcpUsageMetrics: struct{}{},
NetworkAdvancedTcpUsageMetrics: struct{}{},
NetworkUdpUsageMetrics: struct{}{},
}
func (mk MetricKind) String() string {
return string(mk)
}
@@ -104,6 +112,15 @@ func (ms MetricSet) Has(mk MetricKind) bool {
return exists
}
func (ms MetricSet) HasAny(ms1 MetricSet) bool {
for m := range ms1 {
if _, ok := ms[m]; ok {
return true
}
}
return false
}
func (ms MetricSet) add(mk MetricKind) {
ms[mk] = struct{}{}
}
@@ -199,7 +216,9 @@ func InitializePlugins(factory info.MachineInfoFactory, fsInfo fs.FsInfo, includ
for name, plugin := range plugins {
watcher, err := plugin.Register(factory, fsInfo, includedMetrics)
if err != nil {
klog.V(5).Infof("Registration of the %s container factory failed: %v", name, err)
klog.Infof("Registration of the %s container factory failed: %v", name, err)
} else {
klog.Infof("Registration of the %s container factory successfully", name)
}
if watcher != nil {
containerWatchers = append(containerWatchers, watcher)

View File

@@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"regexp"
@@ -40,7 +39,6 @@ import (
)
var (
whitelistedUlimits = [...]string{"max_open_files"}
referencedResetInterval = flag.Uint64("referenced_reset_interval", 0,
"Reset interval for referenced bytes (container_referenced_bytes metric), number of measurement cycles after which referenced bytes are cleared, if set to 0 referenced bytes are never cleared (default: 0)")
@@ -206,54 +204,56 @@ func parseUlimit(value string) (int64, error) {
return num, nil
}
func isUlimitWhitelisted(name string) bool {
for _, whitelist := range whitelistedUlimits {
if name == whitelist {
return true
}
}
return false
}
func processLimitsFile(fileData string) []info.UlimitSpec {
const maxOpenFilesLinePrefix = "Max open files"
limits := strings.Split(fileData, "\n")
ulimits := make([]info.UlimitSpec, 0, len(limits))
for _, lim := range limits {
// Skip any headers/footers
if strings.HasPrefix(lim, "Max") {
// Line format: Max open files 16384 16384 files
fields := regexp.MustCompile(`[\s]{2,}`).Split(lim, -1)
name := strings.Replace(strings.ToLower(strings.TrimSpace(fields[0])), " ", "_", -1)
found := isUlimitWhitelisted(name)
if !found {
continue
}
soft := strings.TrimSpace(fields[1])
softNum, softErr := parseUlimit(soft)
hard := strings.TrimSpace(fields[2])
hardNum, hardErr := parseUlimit(hard)
// Omit metric if there were any parsing errors
if softErr == nil && hardErr == nil {
ulimitSpec := info.UlimitSpec{
Name: name,
SoftLimit: int64(softNum),
HardLimit: int64(hardNum),
}
ulimits = append(ulimits, ulimitSpec)
if strings.HasPrefix(lim, "Max open files") {
// Remove line prefix
ulimit, err := processMaxOpenFileLimitLine(
"max_open_files",
lim[len(maxOpenFilesLinePrefix):],
)
if err == nil {
ulimits = append(ulimits, ulimit)
}
}
}
return ulimits
}
// Any caller of processMaxOpenFileLimitLine must ensure that the name prefix is already removed from the limit line.
// with the "Max open files" prefix.
func processMaxOpenFileLimitLine(name, line string) (info.UlimitSpec, error) {
// Remove any leading whitespace
line = strings.TrimSpace(line)
// Split on whitespace
fields := strings.Fields(line)
if len(fields) != 3 {
return info.UlimitSpec{}, fmt.Errorf("unable to parse max open files line: %s", line)
}
// The first field is the soft limit, the second is the hard limit
soft, err := parseUlimit(fields[0])
if err != nil {
return info.UlimitSpec{}, err
}
hard, err := parseUlimit(fields[1])
if err != nil {
return info.UlimitSpec{}, err
}
return info.UlimitSpec{
Name: name,
SoftLimit: soft,
HardLimit: hard,
}, nil
}
func processRootProcUlimits(rootFs string, rootPid int) []info.UlimitSpec {
filePath := path.Join(rootFs, "/proc", strconv.Itoa(rootPid), "limits")
out, err := ioutil.ReadFile(filePath)
out, err := os.ReadFile(filePath)
if err != nil {
klog.V(4).Infof("error while listing directory %q to read ulimits: %v", filePath, err)
return []info.UlimitSpec{}
@@ -264,14 +264,14 @@ func processRootProcUlimits(rootFs string, rootPid int) []info.UlimitSpec {
func processStatsFromProcs(rootFs string, cgroupPath string, rootPid int) (info.ProcessStats, error) {
var fdCount, socketCount uint64
filePath := path.Join(cgroupPath, "cgroup.procs")
out, err := ioutil.ReadFile(filePath)
out, err := os.ReadFile(filePath)
if err != nil {
return info.ProcessStats{}, fmt.Errorf("couldn't open cpu cgroup procs file %v : %v", filePath, err)
}
pids := strings.Split(string(out), "\n")
// EOL is also treated as a new line while reading "cgroup.procs" file with ioutil.ReadFile.
// EOL is also treated as a new line while reading "cgroup.procs" file with os.ReadFile.
// The last value is an empty string "". Ex: pids = ["22", "1223", ""]
// Trim the last value
if len(pids) != 0 && pids[len(pids)-1] == "" {
@@ -280,7 +280,7 @@ func processStatsFromProcs(rootFs string, cgroupPath string, rootPid int) (info.
for _, pid := range pids {
dirPath := path.Join(rootFs, "/proc", pid, "fd")
fds, err := ioutil.ReadDir(dirPath)
fds, err := os.ReadDir(dirPath)
if err != nil {
klog.V(4).Infof("error while listing directory %q to measure fd count: %v", dirPath, err)
continue
@@ -324,7 +324,7 @@ func (h *Handler) schedulerStatsFromProcs() (info.CpuSchedstat, error) {
return info.CpuSchedstat{}, fmt.Errorf("couldn't open scheduler statistics for process %d: %v", pid, err)
}
defer f.Close()
contents, err := ioutil.ReadAll(f)
contents, err := io.ReadAll(f)
if err != nil {
return info.CpuSchedstat{}, fmt.Errorf("couldn't read scheduler statistics for process %d: %v", pid, err)
}
@@ -392,7 +392,7 @@ func getReferencedKBytes(pids []int) (uint64, error) {
foundMatch := false
for _, pid := range pids {
smapsFilePath := fmt.Sprintf(smapsFilePathPattern, pid)
smapsContent, err := ioutil.ReadFile(smapsFilePath)
smapsContent, err := os.ReadFile(smapsFilePath)
if err != nil {
klog.V(5).Infof("Cannot read %s file, err: %s", smapsFilePath, err)
if os.IsNotExist(err) {
@@ -468,7 +468,7 @@ func networkStatsFromProc(rootFs string, pid int) ([]info.InterfaceStats, error)
return ifaceStats, nil
}
var ignoredDevicePrefixes = []string{"lo", "veth", "docker"}
var ignoredDevicePrefixes = []string{"lo", "veth", "docker", "nerdctl"}
func isIgnoredDevice(ifName string) bool {
for _, prefix := range ignoredDevicePrefixes {
@@ -575,7 +575,7 @@ func advancedTCPStatsFromProc(rootFs string, pid int, file1, file2 string) (info
}
func scanAdvancedTCPStats(advancedStats *info.TcpAdvancedStat, advancedTCPStatsFile string) error {
data, err := ioutil.ReadFile(advancedTCPStatsFile)
data, err := os.ReadFile(advancedTCPStatsFile)
if err != nil {
return fmt.Errorf("failure opening %s: %v", advancedTCPStatsFile, err)
}
@@ -631,7 +631,7 @@ func scanAdvancedTCPStats(advancedStats *info.TcpAdvancedStat, advancedTCPStatsF
func scanTCPStats(tcpStatsFile string) (info.TcpStat, error) {
var stats info.TcpStat
data, err := ioutil.ReadFile(tcpStatsFile)
data, err := os.ReadFile(tcpStatsFile)
if err != nil {
return stats, fmt.Errorf("failure opening %s: %v", tcpStatsFile, err)
}
@@ -802,6 +802,7 @@ func setMemoryStats(s *cgroups.Stats, ret *info.ContainerStats) {
ret.Memory.Usage = s.MemoryStats.Usage.Usage
ret.Memory.MaxUsage = s.MemoryStats.Usage.MaxUsage
ret.Memory.Failcnt = s.MemoryStats.Usage.Failcnt
ret.Memory.KernelUsage = s.MemoryStats.KernelUsage.Usage
if cgroups.IsCgroup2UnifiedMode() {
ret.Memory.Cache = s.MemoryStats.Stats["file"]

View File

@@ -18,13 +18,13 @@ package raw
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
inotify "k8s.io/utils/inotify"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/common"
"github.com/google/cadvisor/container/libcontainer"
"github.com/google/cadvisor/watcher"
@@ -43,8 +43,8 @@ type rawContainerWatcher struct {
stopWatcher chan error
}
func NewRawContainerWatcher() (watcher.ContainerWatcher, error) {
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems(nil)
func NewRawContainerWatcher(includedMetrics container.MetricSet) (watcher.ContainerWatcher, error) {
cgroupSubsystems, err := libcontainer.GetCgroupSubsystems(includedMetrics)
if err != nil {
return nil, fmt.Errorf("failed to get cgroup subsystems: %v", err)
}
@@ -139,7 +139,7 @@ func (w *rawContainerWatcher) watchDirectory(events chan watcher.ContainerEvent,
// TODO(vmarmol): We should re-do this once we're done to ensure directories were not added in the meantime.
// Watch subdirectories as well.
entries, err := ioutil.ReadDir(dir)
entries, err := os.ReadDir(dir)
if err != nil {
return alreadyWatching, err
}