Updating dependency github.com/google/cadvisor to version 6a8d614

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas
2020-05-14 17:29:52 -04:00
parent 449810c785
commit 082578c22f
109 changed files with 3417 additions and 1312 deletions

View File

@@ -22,6 +22,8 @@ go_library(
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
"//vendor/github.com/google/cadvisor/machine:go_default_library",
"//vendor/github.com/google/cadvisor/nvm:go_default_library",
"//vendor/github.com/google/cadvisor/perf:go_default_library",
"//vendor/github.com/google/cadvisor/stats:go_default_library",
"//vendor/github.com/google/cadvisor/summary:go_default_library",
"//vendor/github.com/google/cadvisor/utils/cpuload:go_default_library",
@@ -30,7 +32,7 @@ go_library(
"//vendor/github.com/google/cadvisor/version:go_default_library",
"//vendor/github.com/google/cadvisor/watcher:go_default_library",
"//vendor/github.com/opencontainers/runc/libcontainer/cgroups:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/clock:go_default_library",
],
)

View File

@@ -39,7 +39,7 @@ import (
"github.com/google/cadvisor/utils/cpuload"
units "github.com/docker/go-units"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@@ -91,6 +91,9 @@ type containerData struct {
// nvidiaCollector updates stats for Nvidia GPUs attached to the container.
nvidiaCollector stats.Collector
// perfCollector updates stats for perf_event cgroup controller.
perfCollector stats.Collector
}
// jitter returns a time.Duration between duration and duration + maxFactor * duration,
@@ -104,23 +107,24 @@ func jitter(duration time.Duration, maxFactor float64) time.Duration {
return wait
}
func (c *containerData) Start() error {
go c.housekeeping()
func (cd *containerData) Start() error {
go cd.housekeeping()
return nil
}
func (c *containerData) Stop() error {
err := c.memoryCache.RemoveContainer(c.info.Name)
func (cd *containerData) Stop() error {
err := cd.memoryCache.RemoveContainer(cd.info.Name)
if err != nil {
return err
}
close(c.stop)
close(cd.stop)
cd.perfCollector.Destroy()
return nil
}
func (c *containerData) allowErrorLogging() bool {
if c.clock.Since(c.lastErrorTime) > time.Minute {
c.lastErrorTime = c.clock.Now()
func (cd *containerData) allowErrorLogging() bool {
if cd.clock.Since(cd.lastErrorTime) > time.Minute {
cd.lastErrorTime = cd.clock.Now()
return true
}
return false
@@ -130,22 +134,22 @@ func (c *containerData) allowErrorLogging() bool {
// It is designed to be used in conjunction with periodic housekeeping, and will cause the timer for
// periodic housekeeping to reset. This should be used sparingly, as calling OnDemandHousekeeping frequently
// can have serious performance costs.
func (c *containerData) OnDemandHousekeeping(maxAge time.Duration) {
if c.clock.Since(c.statsLastUpdatedTime) > maxAge {
func (cd *containerData) OnDemandHousekeeping(maxAge time.Duration) {
if cd.clock.Since(cd.statsLastUpdatedTime) > maxAge {
housekeepingFinishedChan := make(chan struct{})
c.onDemandChan <- housekeepingFinishedChan
cd.onDemandChan <- housekeepingFinishedChan
select {
case <-c.stop:
case <-cd.stop:
case <-housekeepingFinishedChan:
}
}
}
// notifyOnDemand notifies all calls to OnDemandHousekeeping that housekeeping is finished
func (c *containerData) notifyOnDemand() {
func (cd *containerData) notifyOnDemand() {
for {
select {
case finishedChan := <-c.onDemandChan:
case finishedChan := <-cd.onDemandChan:
close(finishedChan)
default:
return
@@ -153,35 +157,42 @@ func (c *containerData) notifyOnDemand() {
}
}
func (c *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, error) {
func (cd *containerData) GetInfo(shouldUpdateSubcontainers bool) (*containerInfo, error) {
// Get spec and subcontainers.
if c.clock.Since(c.infoLastUpdatedTime) > 5*time.Second {
err := c.updateSpec()
if cd.clock.Since(cd.infoLastUpdatedTime) > 5*time.Second {
err := cd.updateSpec()
if err != nil {
return nil, err
}
if shouldUpdateSubcontainers {
err = c.updateSubcontainers()
err = cd.updateSubcontainers()
if err != nil {
return nil, err
}
}
c.infoLastUpdatedTime = c.clock.Now()
cd.infoLastUpdatedTime = cd.clock.Now()
}
// Make a copy of the info for the user.
c.lock.Lock()
defer c.lock.Unlock()
return &c.info, nil
cd.lock.Lock()
defer cd.lock.Unlock()
cInfo := containerInfo{
Subcontainers: cd.info.Subcontainers,
Spec: cd.info.Spec,
}
cInfo.Id = cd.info.Id
cInfo.Name = cd.info.Name
cInfo.Aliases = cd.info.Aliases
cInfo.Namespace = cd.info.Namespace
return &cInfo, nil
}
func (c *containerData) DerivedStats() (v2.DerivedStats, error) {
if c.summaryReader == nil {
return v2.DerivedStats{}, fmt.Errorf("derived stats not enabled for container %q", c.info.Name)
func (cd *containerData) DerivedStats() (v2.DerivedStats, error) {
if cd.summaryReader == nil {
return v2.DerivedStats{}, fmt.Errorf("derived stats not enabled for container %q", cd.info.Name)
}
return c.summaryReader.DerivedStats()
return cd.summaryReader.DerivedStats()
}
func (c *containerData) getCgroupPath(cgroups string) (string, error) {
func (cd *containerData) getCgroupPath(cgroups string) (string, error) {
if cgroups == "-" {
return "/", nil
}
@@ -199,8 +210,8 @@ func (c *containerData) getCgroupPath(cgroups string) (string, error) {
// Returns contents of a file inside the container root.
// Takes in a path relative to container root.
func (c *containerData) ReadFile(filepath string, inHostNamespace bool) ([]byte, error) {
pids, err := c.getContainerPids(inHostNamespace)
func (cd *containerData) ReadFile(filepath string, inHostNamespace bool) ([]byte, error) {
pids, err := cd.getContainerPids(inHostNamespace)
if err != nil {
return nil, err
}
@@ -218,11 +229,11 @@ func (c *containerData) ReadFile(filepath string, inHostNamespace bool) ([]byte,
}
}
// No process paths could be found. Declare config non-existent.
return nil, fmt.Errorf("file %q does not exist.", filepath)
return nil, fmt.Errorf("file %q does not exist", filepath)
}
// Return output for ps command in host /proc with specified format
func (c *containerData) getPsOutput(inHostNamespace bool, format string) ([]byte, error) {
func (cd *containerData) getPsOutput(inHostNamespace bool, format string) ([]byte, error) {
args := []string{}
command := "ps"
if !inHostNamespace {
@@ -239,9 +250,9 @@ func (c *containerData) getPsOutput(inHostNamespace bool, format string) ([]byte
// Get pids of processes in this container.
// A slightly lighterweight call than GetProcessList if other details are not required.
func (c *containerData) getContainerPids(inHostNamespace bool) ([]string, error) {
func (cd *containerData) getContainerPids(inHostNamespace bool) ([]string, error) {
format := "pid,cgroup"
out, err := c.getPsOutput(inHostNamespace, format)
out, err := cd.getPsOutput(inHostNamespace, format)
if err != nil {
return nil, err
}
@@ -257,26 +268,26 @@ func (c *containerData) getContainerPids(inHostNamespace bool) ([]string, error)
return nil, fmt.Errorf("expected at least %d fields, found %d: output: %q", expectedFields, len(fields), line)
}
pid := fields[0]
cgroup, err := c.getCgroupPath(fields[1])
cgroup, err := cd.getCgroupPath(fields[1])
if err != nil {
return nil, fmt.Errorf("could not parse cgroup path from %q: %v", fields[1], err)
}
if c.info.Name == cgroup {
if cd.info.Name == cgroup {
pids = append(pids, pid)
}
}
return pids, nil
}
func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace bool) ([]v2.ProcessInfo, error) {
func (cd *containerData) GetProcessList(cadvisorContainer string, inHostNamespace bool) ([]v2.ProcessInfo, error) {
// report all processes for root.
isRoot := c.info.Name == "/"
isRoot := cd.info.Name == "/"
rootfs := "/"
if !inHostNamespace {
rootfs = "/rootfs"
}
format := "user,pid,ppid,stime,pcpu,pmem,rss,vsz,stat,time,comm,cgroup"
out, err := c.getPsOutput(inHostNamespace, format)
out, err := cd.getPsOutput(inHostNamespace, format)
if err != nil {
return nil, err
}
@@ -299,7 +310,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
if err != nil {
return nil, fmt.Errorf("invalid ppid %q: %v", fields[2], err)
}
percentCpu, err := strconv.ParseFloat(fields[4], 32)
percentCPU, err := strconv.ParseFloat(fields[4], 32)
if err != nil {
return nil, fmt.Errorf("invalid cpu percent %q: %v", fields[4], err)
}
@@ -319,7 +330,7 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
}
// convert to bytes
vs *= 1024
cgroup, err := c.getCgroupPath(fields[11])
cgroup, err := cd.getCgroupPath(fields[11])
if err != nil {
return nil, fmt.Errorf("could not parse cgroup path from %q: %v", fields[11], err)
}
@@ -342,13 +353,13 @@ func (c *containerData) GetProcessList(cadvisorContainer string, inHostNamespace
}
fdCount = len(fds)
if isRoot || c.info.Name == cgroup {
if isRoot || cd.info.Name == cgroup {
processes = append(processes, v2.ProcessInfo{
User: fields[0],
Pid: pid,
Ppid: ppid,
StartTime: fields[3],
PercentCpu: float32(percentCpu),
PercentCpu: float32(percentCPU),
PercentMemory: float32(percentMem),
RSS: rss,
VirtualSize: vs,
@@ -387,6 +398,8 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
collectorManager: collectorManager,
onDemandChan: make(chan chan struct{}, 100),
clock: clock,
perfCollector: &stats.NoopCollector{},
nvidiaCollector: &stats.NoopCollector{},
}
cont.info.ContainerReference = ref
@@ -409,52 +422,52 @@ func newContainerData(containerName string, memoryCache *memory.InMemoryCache, h
cont.summaryReader, err = summary.New(cont.info.Spec)
if err != nil {
cont.summaryReader = nil
klog.Warningf("Failed to create summary reader for %q: %v", ref.Name, err)
klog.V(5).Infof("Failed to create summary reader for %q: %v", ref.Name, err)
}
return cont, nil
}
// Determine when the next housekeeping should occur.
func (self *containerData) nextHousekeepingInterval() time.Duration {
if self.allowDynamicHousekeeping {
func (cd *containerData) nextHousekeepingInterval() time.Duration {
if cd.allowDynamicHousekeeping {
var empty time.Time
stats, err := self.memoryCache.RecentStats(self.info.Name, empty, empty, 2)
stats, err := cd.memoryCache.RecentStats(cd.info.Name, empty, empty, 2)
if err != nil {
if self.allowErrorLogging() {
klog.Warningf("Failed to get RecentStats(%q) while determining the next housekeeping: %v", self.info.Name, err)
if cd.allowErrorLogging() {
klog.Warningf("Failed to get RecentStats(%q) while determining the next housekeeping: %v", cd.info.Name, err)
}
} else if len(stats) == 2 {
// TODO(vishnuk): Use no processes as a signal.
// Raise the interval if usage hasn't changed in the last housekeeping.
if stats[0].StatsEq(stats[1]) && (self.housekeepingInterval < self.maxHousekeepingInterval) {
self.housekeepingInterval *= 2
if self.housekeepingInterval > self.maxHousekeepingInterval {
self.housekeepingInterval = self.maxHousekeepingInterval
if stats[0].StatsEq(stats[1]) && (cd.housekeepingInterval < cd.maxHousekeepingInterval) {
cd.housekeepingInterval *= 2
if cd.housekeepingInterval > cd.maxHousekeepingInterval {
cd.housekeepingInterval = cd.maxHousekeepingInterval
}
} else if self.housekeepingInterval != *HousekeepingInterval {
} else if cd.housekeepingInterval != *HousekeepingInterval {
// Lower interval back to the baseline.
self.housekeepingInterval = *HousekeepingInterval
cd.housekeepingInterval = *HousekeepingInterval
}
}
}
return jitter(self.housekeepingInterval, 1.0)
return jitter(cd.housekeepingInterval, 1.0)
}
// TODO(vmarmol): Implement stats collecting as a custom collector.
func (c *containerData) housekeeping() {
// Start any background goroutines - must be cleaned up in c.handler.Cleanup().
c.handler.Start()
defer c.handler.Cleanup()
func (cd *containerData) housekeeping() {
// Start any background goroutines - must be cleaned up in cd.handler.Cleanup().
cd.handler.Start()
defer cd.handler.Cleanup()
// Initialize cpuload reader - must be cleaned up in c.loadReader.Stop()
if c.loadReader != nil {
err := c.loadReader.Start()
// Initialize cpuload reader - must be cleaned up in cd.loadReader.Stop()
if cd.loadReader != nil {
err := cd.loadReader.Start()
if err != nil {
klog.Warningf("Could not start cpu load stat collector for %q: %s", c.info.Name, err)
klog.Warningf("Could not start cpu load stat collector for %q: %s", cd.info.Name, err)
}
defer c.loadReader.Stop()
defer cd.loadReader.Stop()
}
// Long housekeeping is either 100ms or half of the housekeeping interval.
@@ -464,11 +477,11 @@ func (c *containerData) housekeeping() {
}
// Housekeep every second.
klog.V(3).Infof("Start housekeeping for container %q\n", c.info.Name)
houseKeepingTimer := c.clock.NewTimer(0 * time.Second)
klog.V(3).Infof("Start housekeeping for container %q\n", cd.info.Name)
houseKeepingTimer := cd.clock.NewTimer(0 * time.Second)
defer houseKeepingTimer.Stop()
for {
if !c.housekeepingTick(houseKeepingTimer.C(), longHousekeeping) {
if !cd.housekeepingTick(houseKeepingTimer.C(), longHousekeeping) {
return
}
// Stop and drain the timer so that it is safe to reset it
@@ -479,74 +492,74 @@ func (c *containerData) housekeeping() {
}
}
// Log usage if asked to do so.
if c.logUsage {
if cd.logUsage {
const numSamples = 60
var empty time.Time
stats, err := c.memoryCache.RecentStats(c.info.Name, empty, empty, numSamples)
stats, err := cd.memoryCache.RecentStats(cd.info.Name, empty, empty, numSamples)
if err != nil {
if c.allowErrorLogging() {
klog.Warningf("[%s] Failed to get recent stats for logging usage: %v", c.info.Name, err)
if cd.allowErrorLogging() {
klog.Warningf("[%s] Failed to get recent stats for logging usage: %v", cd.info.Name, err)
}
} else if len(stats) < numSamples {
// Ignore, not enough stats yet.
} else {
usageCpuNs := uint64(0)
usageCPUNs := uint64(0)
for i := range stats {
if i > 0 {
usageCpuNs += (stats[i].Cpu.Usage.Total - stats[i-1].Cpu.Usage.Total)
usageCPUNs += (stats[i].Cpu.Usage.Total - stats[i-1].Cpu.Usage.Total)
}
}
usageMemory := stats[numSamples-1].Memory.Usage
instantUsageInCores := float64(stats[numSamples-1].Cpu.Usage.Total-stats[numSamples-2].Cpu.Usage.Total) / float64(stats[numSamples-1].Timestamp.Sub(stats[numSamples-2].Timestamp).Nanoseconds())
usageInCores := float64(usageCpuNs) / float64(stats[numSamples-1].Timestamp.Sub(stats[0].Timestamp).Nanoseconds())
usageInCores := float64(usageCPUNs) / float64(stats[numSamples-1].Timestamp.Sub(stats[0].Timestamp).Nanoseconds())
usageInHuman := units.HumanSize(float64(usageMemory))
// Don't set verbosity since this is already protected by the logUsage flag.
klog.Infof("[%s] %.3f cores (average: %.3f cores), %s of memory", c.info.Name, instantUsageInCores, usageInCores, usageInHuman)
klog.Infof("[%s] %.3f cores (average: %.3f cores), %s of memory", cd.info.Name, instantUsageInCores, usageInCores, usageInHuman)
}
}
houseKeepingTimer.Reset(c.nextHousekeepingInterval())
houseKeepingTimer.Reset(cd.nextHousekeepingInterval())
}
}
func (c *containerData) housekeepingTick(timer <-chan time.Time, longHousekeeping time.Duration) bool {
func (cd *containerData) housekeepingTick(timer <-chan time.Time, longHousekeeping time.Duration) bool {
select {
case <-c.stop:
case <-cd.stop:
// Stop housekeeping when signaled.
return false
case finishedChan := <-c.onDemandChan:
case finishedChan := <-cd.onDemandChan:
// notify the calling function once housekeeping has completed
defer close(finishedChan)
case <-timer:
}
start := c.clock.Now()
err := c.updateStats()
start := cd.clock.Now()
err := cd.updateStats()
if err != nil {
if c.allowErrorLogging() {
klog.Warningf("Failed to update stats for container \"%s\": %s", c.info.Name, err)
if cd.allowErrorLogging() {
klog.Warningf("Failed to update stats for container \"%s\": %s", cd.info.Name, err)
}
}
// Log if housekeeping took too long.
duration := c.clock.Since(start)
duration := cd.clock.Since(start)
if duration >= longHousekeeping {
klog.V(3).Infof("[%s] Housekeeping took %s", c.info.Name, duration)
klog.V(3).Infof("[%s] Housekeeping took %s", cd.info.Name, duration)
}
c.notifyOnDemand()
c.statsLastUpdatedTime = c.clock.Now()
cd.notifyOnDemand()
cd.statsLastUpdatedTime = cd.clock.Now()
return true
}
func (c *containerData) updateSpec() error {
spec, err := c.handler.GetSpec()
func (cd *containerData) updateSpec() error {
spec, err := cd.handler.GetSpec()
if err != nil {
// Ignore errors if the container is dead.
if !c.handler.Exists() {
if !cd.handler.Exists() {
return nil
}
return err
}
customMetrics, err := c.collectorManager.GetSpec()
customMetrics, err := cd.collectorManager.GetSpec()
if err != nil {
return err
}
@@ -554,28 +567,28 @@ func (c *containerData) updateSpec() error {
spec.HasCustomMetrics = true
spec.CustomMetrics = customMetrics
}
c.lock.Lock()
defer c.lock.Unlock()
c.info.Spec = spec
cd.lock.Lock()
defer cd.lock.Unlock()
cd.info.Spec = spec
return nil
}
// Calculate new smoothed load average using the new sample of runnable threads.
// The decay used ensures that the load will stabilize on a new constant value within
// 10 seconds.
func (c *containerData) updateLoad(newLoad uint64) {
if c.loadAvg < 0 {
c.loadAvg = float64(newLoad) // initialize to the first seen sample for faster stabilization.
func (cd *containerData) updateLoad(newLoad uint64) {
if cd.loadAvg < 0 {
cd.loadAvg = float64(newLoad) // initialize to the first seen sample for faster stabilization.
} else {
c.loadAvg = c.loadAvg*c.loadDecay + float64(newLoad)*(1.0-c.loadDecay)
cd.loadAvg = cd.loadAvg*cd.loadDecay + float64(newLoad)*(1.0-cd.loadDecay)
}
}
func (c *containerData) updateStats() error {
stats, statsErr := c.handler.GetStats()
func (cd *containerData) updateStats() error {
stats, statsErr := cd.handler.GetStats()
if statsErr != nil {
// Ignore errors if the container is dead.
if !c.handler.Exists() {
if !cd.handler.Exists() {
return nil
}
@@ -585,32 +598,32 @@ func (c *containerData) updateStats() error {
if stats == nil {
return statsErr
}
if c.loadReader != nil {
if cd.loadReader != nil {
// TODO(vmarmol): Cache this path.
path, err := c.handler.GetCgroupPath("cpu")
path, err := cd.handler.GetCgroupPath("cpu")
if err == nil {
loadStats, err := c.loadReader.GetCpuLoad(c.info.Name, path)
loadStats, err := cd.loadReader.GetCpuLoad(cd.info.Name, path)
if err != nil {
return fmt.Errorf("failed to get load stat for %q - path %q, error %s", c.info.Name, path, err)
return fmt.Errorf("failed to get load stat for %q - path %q, error %s", cd.info.Name, path, err)
}
stats.TaskStats = loadStats
c.updateLoad(loadStats.NrRunning)
cd.updateLoad(loadStats.NrRunning)
// convert to 'milliLoad' to avoid floats and preserve precision.
stats.Cpu.LoadAverage = int32(c.loadAvg * 1000)
stats.Cpu.LoadAverage = int32(cd.loadAvg * 1000)
}
}
if c.summaryReader != nil {
err := c.summaryReader.AddSample(*stats)
if cd.summaryReader != nil {
err := cd.summaryReader.AddSample(*stats)
if err != nil {
// Ignore summary errors for now.
klog.V(2).Infof("Failed to add summary stats for %q: %v", c.info.Name, err)
klog.V(2).Infof("Failed to add summary stats for %q: %v", cd.info.Name, err)
}
}
var customStatsErr error
cm := c.collectorManager.(*collector.GenericCollectorManager)
cm := cd.collectorManager.(*collector.GenericCollectorManager)
if len(cm.Collectors) > 0 {
if cm.NextCollectionTime.Before(c.clock.Now()) {
customStats, err := c.updateCustomStats()
if cm.NextCollectionTime.Before(cd.clock.Now()) {
customStats, err := cd.updateCustomStats()
if customStats != nil {
stats.CustomMetrics = customStats
}
@@ -621,15 +634,17 @@ func (c *containerData) updateStats() error {
}
var nvidiaStatsErr error
if c.nvidiaCollector != nil {
if cd.nvidiaCollector != nil {
// This updates the Accelerators field of the stats struct
nvidiaStatsErr = c.nvidiaCollector.UpdateStats(stats)
nvidiaStatsErr = cd.nvidiaCollector.UpdateStats(stats)
}
ref, err := c.handler.ContainerReference()
perfStatsErr := cd.perfCollector.UpdateStats(stats)
ref, err := cd.handler.ContainerReference()
if err != nil {
// Ignore errors if the container is dead.
if !c.handler.Exists() {
if !cd.handler.Exists() {
return nil
}
return err
@@ -639,7 +654,7 @@ func (c *containerData) updateStats() error {
ContainerReference: ref,
}
err = c.memoryCache.AddStats(&cInfo, stats)
err = cd.memoryCache.AddStats(&cInfo, stats)
if err != nil {
return err
}
@@ -647,15 +662,20 @@ func (c *containerData) updateStats() error {
return statsErr
}
if nvidiaStatsErr != nil {
klog.Errorf("error occurred while collecting nvidia stats for container %s: %s", cInfo.Name, err)
return nvidiaStatsErr
}
if perfStatsErr != nil {
klog.Errorf("error occurred while collecting perf stats for container %s: %s", cInfo.Name, err)
return perfStatsErr
}
return customStatsErr
}
func (c *containerData) updateCustomStats() (map[string][]info.MetricVal, error) {
_, customStats, customStatsErr := c.collectorManager.Collect()
func (cd *containerData) updateCustomStats() (map[string][]info.MetricVal, error) {
_, customStats, customStatsErr := cd.collectorManager.Collect()
if customStatsErr != nil {
if !c.handler.Exists() {
if !cd.handler.Exists() {
return customStats, nil
}
customStatsErr = fmt.Errorf("%v, continuing to push custom stats", customStatsErr)
@@ -663,19 +683,19 @@ func (c *containerData) updateCustomStats() (map[string][]info.MetricVal, error)
return customStats, customStatsErr
}
func (c *containerData) updateSubcontainers() error {
func (cd *containerData) updateSubcontainers() error {
var subcontainers info.ContainerReferenceSlice
subcontainers, err := c.handler.ListContainers(container.ListSelf)
subcontainers, err := cd.handler.ListContainers(container.ListSelf)
if err != nil {
// Ignore errors if the container is dead.
if !c.handler.Exists() {
if !cd.handler.Exists() {
return nil
}
return err
}
sort.Sort(subcontainers)
c.lock.Lock()
defer c.lock.Unlock()
c.info.Subcontainers = subcontainers
cd.lock.Lock()
defer cd.lock.Unlock()
cd.info.Subcontainers = subcontainers
return nil
}

View File

@@ -37,6 +37,8 @@ import (
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2"
"github.com/google/cadvisor/machine"
"github.com/google/cadvisor/nvm"
"github.com/google/cadvisor/perf"
"github.com/google/cadvisor/stats"
"github.com/google/cadvisor/utils/oomparser"
"github.com/google/cadvisor/utils/sysfs"
@@ -44,7 +46,7 @@ import (
"github.com/google/cadvisor/watcher"
"github.com/opencontainers/runc/libcontainer/cgroups"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@@ -121,7 +123,7 @@ type Manager interface {
// Get past events that have been detected and that fit the request.
GetPastEvents(request *events.Request) ([]*info.Event, error)
CloseEventChannel(watch_id int)
CloseEventChannel(watchID int)
// Get status information about docker.
DockerInfo() (info.DockerStatus, error)
@@ -133,8 +135,14 @@ type Manager interface {
DebugInfo() map[string][]string
}
// Housekeeping configuration for the manager
type HouskeepingConfig = struct {
Interval *time.Duration
AllowDynamic *bool
}
// New takes a memory storage and returns a new manager.
func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingInterval time.Duration, allowDynamicHousekeeping bool, includedMetricsSet container.MetricSet, collectorHttpClient *http.Client, rawContainerCgroupPathPrefixWhiteList []string) (Manager, error) {
func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, houskeepingConfig HouskeepingConfig, includedMetricsSet container.MetricSet, collectorHTTPClient *http.Client, rawContainerCgroupPathPrefixWhiteList []string, perfEventsFile string) (Manager, error) {
if memoryCache == nil {
return nil, fmt.Errorf("manager requires memory storage")
}
@@ -176,12 +184,12 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
cadvisorContainer: selfContainer,
inHostNamespace: inHostNamespace,
startupTime: time.Now(),
maxHousekeepingInterval: maxHousekeepingInterval,
allowDynamicHousekeeping: allowDynamicHousekeeping,
maxHousekeepingInterval: *houskeepingConfig.Interval,
allowDynamicHousekeeping: *houskeepingConfig.AllowDynamic,
includedMetrics: includedMetricsSet,
containerWatchers: []watcher.ContainerWatcher{},
eventsChannel: eventsChannel,
collectorHttpClient: collectorHttpClient,
collectorHTTPClient: collectorHTTPClient,
nvidiaManager: accelerators.NewNvidiaManager(),
rawContainerCgroupPathPrefixWhiteList: rawContainerCgroupPathPrefixWhiteList,
}
@@ -193,6 +201,11 @@ func New(memoryCache *memory.InMemoryCache, sysfs sysfs.SysFs, maxHousekeepingIn
newManager.machineInfo = *machineInfo
klog.V(1).Infof("Machine: %+v", newManager.machineInfo)
newManager.perfManager, err = perf.NewManager(perfEventsFile, machineInfo.NumCores)
if err != nil {
return nil, err
}
versionInfo, err := getVersionInfo()
if err != nil {
return nil, err
@@ -230,17 +243,18 @@ type manager struct {
includedMetrics container.MetricSet
containerWatchers []watcher.ContainerWatcher
eventsChannel chan watcher.ContainerEvent
collectorHttpClient *http.Client
collectorHTTPClient *http.Client
nvidiaManager stats.Manager
perfManager stats.Manager
// List of raw container cgroup path prefix whitelist.
rawContainerCgroupPathPrefixWhiteList []string
}
// Start the container manager.
func (self *manager) Start() error {
self.containerWatchers = container.InitializePlugins(self, self.fsInfo, self.includedMetrics)
func (m *manager) Start() error {
m.containerWatchers = container.InitializePlugins(m, m.fsInfo, m.includedMetrics)
err := raw.Register(self, self.fsInfo, self.includedMetrics, self.rawContainerCgroupPathPrefixWhiteList)
err := raw.Register(m, m.fsInfo, m.includedMetrics, m.rawContainerCgroupPathPrefixWhiteList)
if err != nil {
klog.Errorf("Registration of the raw container factory failed: %v", err)
}
@@ -249,10 +263,10 @@ func (self *manager) Start() error {
if err != nil {
return err
}
self.containerWatchers = append(self.containerWatchers, rawWatcher)
m.containerWatchers = append(m.containerWatchers, rawWatcher)
// Watch for OOMs.
err = self.watchForNewOoms()
err = m.watchForNewOoms()
if err != nil {
klog.Warningf("Could not configure a source for OOM detection, disabling OOM events: %v", err)
}
@@ -262,16 +276,13 @@ func (self *manager) Start() error {
return nil
}
// Setup collection of nvidia GPU metrics if any of them are attached to the machine.
self.nvidiaManager.Setup()
// Create root and then recover all containers.
err = self.createContainer("/", watcher.Raw)
err = m.createContainer("/", watcher.Raw)
if err != nil {
return err
}
klog.V(2).Infof("Starting recovery of all containers")
err = self.detectSubcontainers("/")
err = m.detectSubcontainers("/")
if err != nil {
return err
}
@@ -279,54 +290,63 @@ func (self *manager) Start() error {
// Watch for new container.
quitWatcher := make(chan error)
err = self.watchForNewContainers(quitWatcher)
err = m.watchForNewContainers(quitWatcher)
if err != nil {
return err
}
self.quitChannels = append(self.quitChannels, quitWatcher)
m.quitChannels = append(m.quitChannels, quitWatcher)
// Look for new containers in the main housekeeping thread.
quitGlobalHousekeeping := make(chan error)
self.quitChannels = append(self.quitChannels, quitGlobalHousekeeping)
go self.globalHousekeeping(quitGlobalHousekeeping)
m.quitChannels = append(m.quitChannels, quitGlobalHousekeeping)
go m.globalHousekeeping(quitGlobalHousekeeping)
quitUpdateMachineInfo := make(chan error)
self.quitChannels = append(self.quitChannels, quitUpdateMachineInfo)
go self.updateMachineInfo(quitUpdateMachineInfo)
m.quitChannels = append(m.quitChannels, quitUpdateMachineInfo)
go m.updateMachineInfo(quitUpdateMachineInfo)
return nil
}
func (self *manager) Stop() error {
defer self.nvidiaManager.Destroy()
func (m *manager) Stop() error {
defer m.nvidiaManager.Destroy()
defer m.destroyPerfCollectors()
// Stop and wait on all quit channels.
for i, c := range self.quitChannels {
for i, c := range m.quitChannels {
// Send the exit signal and wait on the thread to exit (by closing the channel).
c <- nil
err := <-c
if err != nil {
// Remove the channels that quit successfully.
self.quitChannels = self.quitChannels[i:]
m.quitChannels = m.quitChannels[i:]
return err
}
}
self.quitChannels = make([]chan error, 0, 2)
m.quitChannels = make([]chan error, 0, 2)
nvm.Finalize()
perf.Finalize()
return nil
}
func (self *manager) updateMachineInfo(quit chan error) {
func (m *manager) destroyPerfCollectors() {
for _, container := range m.containers {
container.perfCollector.Destroy()
}
}
func (m *manager) updateMachineInfo(quit chan error) {
ticker := time.NewTicker(*updateMachineInfoInterval)
for {
select {
case <-ticker.C:
info, err := machine.Info(self.sysFs, self.fsInfo, self.inHostNamespace)
info, err := machine.Info(m.sysFs, m.fsInfo, m.inHostNamespace)
if err != nil {
klog.Errorf("Could not get machine info: %v", err)
break
}
self.machineMu.Lock()
self.machineInfo = *info
self.machineMu.Unlock()
m.machineMu.Lock()
m.machineInfo = *info
m.machineMu.Unlock()
klog.V(5).Infof("Update machine info: %+v", *info)
case <-quit:
ticker.Stop()
@@ -336,21 +356,21 @@ func (self *manager) updateMachineInfo(quit chan error) {
}
}
func (self *manager) globalHousekeeping(quit chan error) {
func (m *manager) globalHousekeeping(quit chan error) {
// Long housekeeping is either 100ms or half of the housekeeping interval.
longHousekeeping := 100 * time.Millisecond
if *globalHousekeepingInterval/2 < longHousekeeping {
longHousekeeping = *globalHousekeepingInterval / 2
}
ticker := time.Tick(*globalHousekeepingInterval)
ticker := time.NewTicker(*globalHousekeepingInterval)
for {
select {
case t := <-ticker:
case t := <-ticker.C:
start := time.Now()
// Check for new containers.
err := self.detectSubcontainers("/")
err := m.detectSubcontainers("/")
if err != nil {
klog.Errorf("Failed to detect containers: %s", err)
}
@@ -369,15 +389,15 @@ func (self *manager) globalHousekeeping(quit chan error) {
}
}
func (self *manager) getContainerData(containerName string) (*containerData, error) {
func (m *manager) getContainerData(containerName string) (*containerData, error) {
var cont *containerData
var ok bool
func() {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
m.containersLock.RLock()
defer m.containersLock.RUnlock()
// Ensure we have the container.
cont, ok = self.containers[namespacedContainerName{
cont, ok = m.containers[namespacedContainerName{
Name: containerName,
}]
}()
@@ -387,8 +407,8 @@ func (self *manager) getContainerData(containerName string) (*containerData, err
return cont, nil
}
func (self *manager) GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) {
conts, err := self.getRequestedContainers(containerName, options)
func (m *manager) GetDerivedStats(containerName string, options v2.RequestOptions) (map[string]v2.DerivedStats, error) {
conts, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
}
@@ -404,8 +424,8 @@ func (self *manager) GetDerivedStats(containerName string, options v2.RequestOpt
return stats, errs.OrNil()
}
func (self *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) {
conts, err := self.getRequestedContainers(containerName, options)
func (m *manager) GetContainerSpec(containerName string, options v2.RequestOptions) (map[string]v2.ContainerSpec, error) {
conts, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
}
@@ -416,43 +436,43 @@ func (self *manager) GetContainerSpec(containerName string, options v2.RequestOp
if err != nil {
errs.append(name, "GetInfo", err)
}
spec := self.getV2Spec(cinfo)
spec := m.getV2Spec(cinfo)
specs[name] = spec
}
return specs, errs.OrNil()
}
// Get V2 container spec from v1 container info.
func (self *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
spec := self.getAdjustedSpec(cinfo)
func (m *manager) getV2Spec(cinfo *containerInfo) v2.ContainerSpec {
spec := m.getAdjustedSpec(cinfo)
return v2.ContainerSpecFromV1(&spec, cinfo.Aliases, cinfo.Namespace)
}
func (self *manager) getAdjustedSpec(cinfo *containerInfo) info.ContainerSpec {
func (m *manager) getAdjustedSpec(cinfo *containerInfo) info.ContainerSpec {
spec := cinfo.Spec
// Set default value to an actual value
if spec.HasMemory {
// Memory.Limit is 0 means there's no limit
if spec.Memory.Limit == 0 {
self.machineMu.RLock()
spec.Memory.Limit = uint64(self.machineInfo.MemoryCapacity)
self.machineMu.RUnlock()
m.machineMu.RLock()
spec.Memory.Limit = uint64(m.machineInfo.MemoryCapacity)
m.machineMu.RUnlock()
}
}
return spec
}
func (self *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cont, err := self.getContainerData(containerName)
func (m *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
cont, err := m.getContainerData(containerName)
if err != nil {
return nil, err
}
return self.containerDataToContainerInfo(cont, query)
return m.containerDataToContainerInfo(cont, query)
}
func (self *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
containers, err := self.getRequestedContainers(containerName, options)
func (m *manager) GetContainerInfoV2(containerName string, options v2.RequestOptions) (map[string]v2.ContainerInfo, error) {
containers, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
}
@@ -469,9 +489,9 @@ func (self *manager) GetContainerInfoV2(containerName string, options v2.Request
infos[name] = result
continue
}
result.Spec = self.getV2Spec(cinfo)
result.Spec = m.getV2Spec(cinfo)
stats, err := self.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
stats, err := m.memoryCache.RecentStats(name, nilTime, nilTime, options.Count)
if err != nil {
errs.append(name, "RecentStats", err)
infos[name] = result
@@ -485,14 +505,14 @@ func (self *manager) GetContainerInfoV2(containerName string, options v2.Request
return infos, errs.OrNil()
}
func (self *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
func (m *manager) containerDataToContainerInfo(cont *containerData, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
// Get the info from the container.
cinfo, err := cont.GetInfo(true)
if err != nil {
return nil, err
}
stats, err := self.memoryCache.RecentStats(cinfo.Name, query.Start, query.End, query.NumStats)
stats, err := m.memoryCache.RecentStats(cinfo.Name, query.Start, query.End, query.NumStats)
if err != nil {
return nil, err
}
@@ -501,55 +521,55 @@ func (self *manager) containerDataToContainerInfo(cont *containerData, query *in
ret := &info.ContainerInfo{
ContainerReference: cinfo.ContainerReference,
Subcontainers: cinfo.Subcontainers,
Spec: self.getAdjustedSpec(cinfo),
Spec: m.getAdjustedSpec(cinfo),
Stats: stats,
}
return ret, nil
}
func (self *manager) getContainer(containerName string) (*containerData, error) {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
cont, ok := self.containers[namespacedContainerName{Name: containerName}]
func (m *manager) getContainer(containerName string) (*containerData, error) {
m.containersLock.RLock()
defer m.containersLock.RUnlock()
cont, ok := m.containers[namespacedContainerName{Name: containerName}]
if !ok {
return nil, fmt.Errorf("unknown container %q", containerName)
}
return cont, nil
}
func (self *manager) getSubcontainers(containerName string) map[string]*containerData {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
containersMap := make(map[string]*containerData, len(self.containers))
func (m *manager) getSubcontainers(containerName string) map[string]*containerData {
m.containersLock.RLock()
defer m.containersLock.RUnlock()
containersMap := make(map[string]*containerData, len(m.containers))
// Get all the unique subcontainers of the specified container
matchedName := path.Join(containerName, "/")
for i := range self.containers {
name := self.containers[i].info.Name
for i := range m.containers {
name := m.containers[i].info.Name
if name == containerName || strings.HasPrefix(name, matchedName) {
containersMap[self.containers[i].info.Name] = self.containers[i]
containersMap[m.containers[i].info.Name] = m.containers[i]
}
}
return containersMap
}
func (self *manager) SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
containersMap := self.getSubcontainers(containerName)
func (m *manager) SubcontainersInfo(containerName string, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
containersMap := m.getSubcontainers(containerName)
containers := make([]*containerData, 0, len(containersMap))
for _, cont := range containersMap {
containers = append(containers, cont)
}
return self.containerDataSliceToContainerInfoSlice(containers, query)
return m.containerDataSliceToContainerInfoSlice(containers, query)
}
func (self *manager) getAllDockerContainers() map[string]*containerData {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
containers := make(map[string]*containerData, len(self.containers))
func (m *manager) getAllDockerContainers() map[string]*containerData {
m.containersLock.RLock()
defer m.containersLock.RUnlock()
containers := make(map[string]*containerData, len(m.containers))
// Get containers in the Docker namespace.
for name, cont := range self.containers {
for name, cont := range m.containers {
if name.Namespace == docker.DockerNamespace {
containers[cont.info.Name] = cont
}
@@ -557,12 +577,12 @@ func (self *manager) getAllDockerContainers() map[string]*containerData {
return containers
}
func (self *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) {
containers := self.getAllDockerContainers()
func (m *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[string]info.ContainerInfo, error) {
containers := m.getAllDockerContainers()
output := make(map[string]info.ContainerInfo, len(containers))
for name, cont := range containers {
inf, err := self.containerDataToContainerInfo(cont, query)
inf, err := m.containerDataToContainerInfo(cont, query)
if err != nil {
// Ignore the error because of race condition and return best-effort result.
if err == memory.ErrDataNotFound {
@@ -576,19 +596,19 @@ func (self *manager) AllDockerContainers(query *info.ContainerInfoRequest) (map[
return output, nil
}
func (self *manager) getDockerContainer(containerName string) (*containerData, error) {
self.containersLock.RLock()
defer self.containersLock.RUnlock()
func (m *manager) getDockerContainer(containerName string) (*containerData, error) {
m.containersLock.RLock()
defer m.containersLock.RUnlock()
// Check for the container in the Docker container namespace.
cont, ok := self.containers[namespacedContainerName{
cont, ok := m.containers[namespacedContainerName{
Namespace: docker.DockerNamespace,
Name: containerName,
}]
// Look for container by short prefix name if no exact match found.
if !ok {
for contName, c := range self.containers {
for contName, c := range m.containers {
if contName.Namespace == docker.DockerNamespace && strings.HasPrefix(contName.Name, containerName) {
if cont == nil {
cont = c
@@ -606,20 +626,20 @@ func (self *manager) getDockerContainer(containerName string) (*containerData, e
return cont, nil
}
func (self *manager) DockerContainer(containerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) {
container, err := self.getDockerContainer(containerName)
func (m *manager) DockerContainer(containerName string, query *info.ContainerInfoRequest) (info.ContainerInfo, error) {
container, err := m.getDockerContainer(containerName)
if err != nil {
return info.ContainerInfo{}, err
}
inf, err := self.containerDataToContainerInfo(container, query)
inf, err := m.containerDataToContainerInfo(container, query)
if err != nil {
return info.ContainerInfo{}, err
}
return *inf, nil
}
func (self *manager) containerDataSliceToContainerInfoSlice(containers []*containerData, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
func (m *manager) containerDataSliceToContainerInfoSlice(containers []*containerData, query *info.ContainerInfoRequest) ([]*info.ContainerInfo, error) {
if len(containers) == 0 {
return nil, fmt.Errorf("no containers found")
}
@@ -627,7 +647,7 @@ func (self *manager) containerDataSliceToContainerInfoSlice(containers []*contai
// Get the info for each container.
output := make([]*info.ContainerInfo, 0, len(containers))
for i := range containers {
cinfo, err := self.containerDataToContainerInfo(containers[i], query)
cinfo, err := m.containerDataToContainerInfo(containers[i], query)
if err != nil {
// Skip containers with errors, we try to degrade gracefully.
continue
@@ -638,8 +658,8 @@ func (self *manager) containerDataSliceToContainerInfoSlice(containers []*contai
return output, nil
}
func (self *manager) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) {
containers, err := self.getRequestedContainers(containerName, options)
func (m *manager) GetRequestedContainersInfo(containerName string, options v2.RequestOptions) (map[string]*info.ContainerInfo, error) {
containers, err := m.getRequestedContainers(containerName, options)
if err != nil {
return nil, err
}
@@ -649,7 +669,7 @@ func (self *manager) GetRequestedContainersInfo(containerName string, options v2
NumStats: options.Count,
}
for name, data := range containers {
info, err := self.containerDataToContainerInfo(data, &query)
info, err := m.containerDataToContainerInfo(data, &query)
if err != nil {
errs.append(name, "containerDataToContainerInfo", err)
}
@@ -658,26 +678,26 @@ func (self *manager) GetRequestedContainersInfo(containerName string, options v2
return containersMap, errs.OrNil()
}
func (self *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) {
func (m *manager) getRequestedContainers(containerName string, options v2.RequestOptions) (map[string]*containerData, error) {
containersMap := make(map[string]*containerData)
switch options.IdType {
case v2.TypeName:
if options.Recursive == false {
cont, err := self.getContainer(containerName)
if !options.Recursive {
cont, err := m.getContainer(containerName)
if err != nil {
return containersMap, err
}
containersMap[cont.info.Name] = cont
} else {
containersMap = self.getSubcontainers(containerName)
containersMap = m.getSubcontainers(containerName)
if len(containersMap) == 0 {
return containersMap, fmt.Errorf("unknown container: %q", containerName)
}
}
case v2.TypeDocker:
if options.Recursive == false {
if !options.Recursive {
containerName = strings.TrimPrefix(containerName, "/")
cont, err := self.getDockerContainer(containerName)
cont, err := m.getDockerContainer(containerName)
if err != nil {
return containersMap, err
}
@@ -686,7 +706,7 @@ func (self *manager) getRequestedContainers(containerName string, options v2.Req
if containerName != "/" {
return containersMap, fmt.Errorf("invalid request for docker container %q with subcontainers", containerName)
}
containersMap = self.getAllDockerContainers()
containersMap = m.getAllDockerContainers()
}
default:
return containersMap, fmt.Errorf("invalid request type %q", options.IdType)
@@ -706,32 +726,32 @@ func (self *manager) getRequestedContainers(containerName string, options v2.Req
return containersMap, nil
}
func (self *manager) GetDirFsInfo(dir string) (v2.FsInfo, error) {
device, err := self.fsInfo.GetDirFsDevice(dir)
func (m *manager) GetDirFsInfo(dir string) (v2.FsInfo, error) {
device, err := m.fsInfo.GetDirFsDevice(dir)
if err != nil {
return v2.FsInfo{}, fmt.Errorf("failed to get device for dir %q: %v", dir, err)
}
return self.getFsInfoByDeviceName(device.Device)
return m.getFsInfoByDeviceName(device.Device)
}
func (self *manager) GetFsInfoByFsUUID(uuid string) (v2.FsInfo, error) {
device, err := self.fsInfo.GetDeviceInfoByFsUUID(uuid)
func (m *manager) GetFsInfoByFsUUID(uuid string) (v2.FsInfo, error) {
device, err := m.fsInfo.GetDeviceInfoByFsUUID(uuid)
if err != nil {
return v2.FsInfo{}, err
}
return self.getFsInfoByDeviceName(device.Device)
return m.getFsInfoByDeviceName(device.Device)
}
func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
func (m *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
var empty time.Time
// Get latest data from filesystems hanging off root container.
stats, err := self.memoryCache.RecentStats("/", empty, empty, 1)
stats, err := m.memoryCache.RecentStats("/", empty, empty, 1)
if err != nil {
return nil, err
}
dev := ""
if len(label) != 0 {
dev, err = self.fsInfo.GetDeviceForLabel(label)
dev, err = m.fsInfo.GetDeviceForLabel(label)
if err != nil {
return nil, err
}
@@ -742,11 +762,11 @@ func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
if len(label) != 0 && fs.Device != dev {
continue
}
mountpoint, err := self.fsInfo.GetMountpointForDevice(fs.Device)
mountpoint, err := m.fsInfo.GetMountpointForDevice(fs.Device)
if err != nil {
return nil, err
}
labels, err := self.fsInfo.GetLabelsForDevice(fs.Device)
labels, err := m.fsInfo.GetLabelsForDevice(fs.Device)
if err != nil {
return nil, err
}
@@ -772,8 +792,7 @@ func (self *manager) GetFsInfo(label string) ([]v2.FsInfo, error) {
func (m *manager) GetMachineInfo() (*info.MachineInfo, error) {
m.machineMu.RLock()
defer m.machineMu.RUnlock()
// Copy and return the MachineInfo.
return &m.machineInfo, nil
return m.machineInfo.Clone(), nil
}
func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
@@ -785,18 +804,15 @@ func (m *manager) GetVersionInfo() (*info.VersionInfo, error) {
}
func (m *manager) Exists(containerName string) bool {
m.containersLock.Lock()
defer m.containersLock.Unlock()
m.containersLock.RLock()
defer m.containersLock.RUnlock()
namespacedName := namespacedContainerName{
Name: containerName,
}
_, ok := m.containers[namespacedName]
if ok {
return true
}
return false
return ok
}
func (m *manager) GetProcessList(containerName string, options v2.RequestOptions) ([]v2.ProcessInfo, error) {
@@ -831,7 +847,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
klog.V(4).Infof("Got config from %q: %q", v, configFile)
if strings.HasPrefix(k, "prometheus") || strings.HasPrefix(k, "Prometheus") {
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
newCollector, err := collector.NewPrometheusCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHTTPClient)
if err != nil {
return fmt.Errorf("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
}
@@ -840,7 +856,7 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
return fmt.Errorf("failed to register collector for container %q, config %q: %v", cont.info.Name, k, err)
}
} else {
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHttpClient)
newCollector, err := collector.NewCollector(k, configFile, *applicationMetricsCountLimit, cont.handler, m.collectorHTTPClient)
if err != nil {
return fmt.Errorf("failed to create collector for container %q, config %q: %v", cont.info.Name, k, err)
}
@@ -853,35 +869,6 @@ func (m *manager) registerCollectors(collectorConfigs map[string]string, cont *c
return nil
}
// Enables overwriting an existing containerData/Handler object for a given containerName.
// Can't use createContainer as it just returns if a given containerName has a handler already.
// Ex: rkt handler will want to take priority over the raw handler, but the raw handler might be created first.
// Only allow raw handler to be overridden
func (m *manager) overrideContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()
namespacedName := namespacedContainerName{
Name: containerName,
}
if _, ok := m.containers[namespacedName]; ok {
containerData := m.containers[namespacedName]
if containerData.handler.Type() != container.ContainerTypeRaw {
return nil
}
err := m.destroyContainerLocked(containerName)
if err != nil {
return fmt.Errorf("overrideContainer: failed to destroy containerData/handler for %v: %v", containerName, err)
}
}
return m.createContainerLocked(containerName, watchSource)
}
// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
@@ -926,7 +913,17 @@ func (m *manager) createContainerLocked(containerName string, watchSource watche
} else {
cont.nvidiaCollector, err = m.nvidiaManager.GetCollector(devicesCgroupPath)
if err != nil {
klog.V(4).Infof("GPU metrics may be unavailable/incomplete for container %q: %v", cont.info.Name, err)
klog.V(4).Infof("GPU metrics may be unavailable/incomplete for container %s: %s", cont.info.Name, err)
}
}
perfCgroupPath, err := handler.GetCgroupPath("perf_event")
if err != nil {
klog.Warningf("Error getting perf_event cgroup path: %q", err)
} else {
cont.perfCollector, err = m.perfManager.GetCollector(perfCgroupPath)
if err != nil {
klog.Infof("perf_event metrics will not be available for container %s: %s", cont.info.Name, err)
}
}
}
@@ -1100,16 +1097,16 @@ func (m *manager) detectSubcontainers(containerName string) error {
}
// Watches for new containers started in the system. Runs forever unless there is a setup error.
func (self *manager) watchForNewContainers(quit chan error) error {
for _, watcher := range self.containerWatchers {
err := watcher.Start(self.eventsChannel)
func (m *manager) watchForNewContainers(quit chan error) error {
for _, watcher := range m.containerWatchers {
err := watcher.Start(m.eventsChannel)
if err != nil {
return err
}
}
// There is a race between starting the watch and new container creation so we do a detection before we read new containers.
err := self.detectSubcontainers("/")
err := m.detectSubcontainers("/")
if err != nil {
return err
}
@@ -1118,15 +1115,15 @@ func (self *manager) watchForNewContainers(quit chan error) error {
go func() {
for {
select {
case event := <-self.eventsChannel:
case event := <-m.eventsChannel:
switch {
case event.EventType == watcher.ContainerAdd:
switch event.WatchSource {
default:
err = self.createContainer(event.Name, event.WatchSource)
err = m.createContainer(event.Name, event.WatchSource)
}
case event.EventType == watcher.ContainerDelete:
err = self.destroyContainer(event.Name)
err = m.destroyContainer(event.Name)
}
if err != nil {
klog.Warningf("Failed to process watch event %+v: %v", event, err)
@@ -1135,7 +1132,7 @@ func (self *manager) watchForNewContainers(quit chan error) error {
var errs partialFailure
// Stop processing events if asked to quit.
for i, watcher := range self.containerWatchers {
for i, watcher := range m.containerWatchers {
err := watcher.Stop()
if err != nil {
errs.append(fmt.Sprintf("watcher %d", i), "Stop", err)
@@ -1155,7 +1152,7 @@ func (self *manager) watchForNewContainers(quit chan error) error {
return nil
}
func (self *manager) watchForNewOoms() error {
func (m *manager) watchForNewOoms() error {
klog.V(2).Infof("Started watching for new ooms in manager")
outStream := make(chan *oomparser.OomInstance, 10)
oomLog, err := oomparser.New()
@@ -1172,7 +1169,7 @@ func (self *manager) watchForNewOoms() error {
Timestamp: oomInstance.TimeOfDeath,
EventType: info.EventOom,
}
err := self.eventHandler.AddEvent(newEvent)
err := m.eventHandler.AddEvent(newEvent)
if err != nil {
klog.Errorf("failed to add OOM event for %q: %v", oomInstance.ContainerName, err)
}
@@ -1189,7 +1186,7 @@ func (self *manager) watchForNewOoms() error {
},
},
}
err = self.eventHandler.AddEvent(newEvent)
err = m.eventHandler.AddEvent(newEvent)
if err != nil {
klog.Errorf("failed to add OOM kill event for %q: %v", oomInstance.ContainerName, err)
}
@@ -1199,18 +1196,18 @@ func (self *manager) watchForNewOoms() error {
}
// can be called by the api which will take events returned on the channel
func (self *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
return self.eventHandler.WatchEvents(request)
func (m *manager) WatchForEvents(request *events.Request) (*events.EventChannel, error) {
return m.eventHandler.WatchEvents(request)
}
// can be called by the api which will return all events satisfying the request
func (self *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) {
return self.eventHandler.GetEvents(request)
func (m *manager) GetPastEvents(request *events.Request) ([]*info.Event, error) {
return m.eventHandler.GetEvents(request)
}
// called by the api when a client is no longer listening to the channel
func (self *manager) CloseEventChannel(watch_id int) {
self.eventHandler.StopWatch(watch_id)
func (m *manager) CloseEventChannel(watchID int) {
m.eventHandler.StopWatch(watchID)
}
// Parses the events StoragePolicy from the flags.
@@ -1303,12 +1300,12 @@ func (m *manager) DebugInfo() map[string][]string {
return debugInfo
}
func (self *manager) getFsInfoByDeviceName(deviceName string) (v2.FsInfo, error) {
mountPoint, err := self.fsInfo.GetMountpointForDevice(deviceName)
func (m *manager) getFsInfoByDeviceName(deviceName string) (v2.FsInfo, error) {
mountPoint, err := m.fsInfo.GetMountpointForDevice(deviceName)
if err != nil {
return v2.FsInfo{}, fmt.Errorf("failed to get mount point for device %q: %v", deviceName, err)
}
infos, err := self.GetFsInfo("")
infos, err := m.GetFsInfo("")
if err != nil {
return v2.FsInfo{}, err
}
@@ -1322,22 +1319,22 @@ func (self *manager) getFsInfoByDeviceName(deviceName string) (v2.FsInfo, error)
func getVersionInfo() (*info.VersionInfo, error) {
kernel_version := machine.KernelVersion()
container_os := machine.ContainerOsVersion()
docker_version, err := docker.VersionString()
kernelVersion := machine.KernelVersion()
osVersion := machine.ContainerOsVersion()
dockerVersion, err := docker.VersionString()
if err != nil {
return nil, err
}
docker_api_version, err := docker.APIVersionString()
dockerAPIVersion, err := docker.APIVersionString()
if err != nil {
return nil, err
}
return &info.VersionInfo{
KernelVersion: kernel_version,
ContainerOsVersion: container_os,
DockerVersion: docker_version,
DockerAPIVersion: docker_api_version,
KernelVersion: kernelVersion,
ContainerOsVersion: osVersion,
DockerVersion: dockerVersion,
DockerAPIVersion: dockerAPIVersion,
CadvisorVersion: version.Info["version"],
CadvisorRevision: version.Info["revision"],
}, nil