Resource gatherer refactoring
This commit is contained in:
		@@ -51,7 +51,7 @@ type Framework struct {
 | 
				
			|||||||
	namespacesToDelete       []*api.Namespace // Some tests have more than one.
 | 
						namespacesToDelete       []*api.Namespace // Some tests have more than one.
 | 
				
			||||||
	NamespaceDeletionTimeout time.Duration
 | 
						NamespaceDeletionTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	gatherer containerResourceGatherer
 | 
						gatherer *containerResourceGatherer
 | 
				
			||||||
	// Constraints that passed to a check which is executed after data is gathered to
 | 
						// Constraints that passed to a check which is executed after data is gathered to
 | 
				
			||||||
	// see if 99% of results are within acceptable bounds. It as to be injected in the test,
 | 
						// see if 99% of results are within acceptable bounds. It as to be injected in the test,
 | 
				
			||||||
	// as expectations vary greatly. Constraints are groupped by the container names.
 | 
						// as expectations vary greatly. Constraints are groupped by the container names.
 | 
				
			||||||
@@ -116,7 +116,12 @@ func (f *Framework) beforeEach() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if testContext.GatherKubeSystemResourceUsageData {
 | 
						if testContext.GatherKubeSystemResourceUsageData {
 | 
				
			||||||
		f.gatherer.startGatheringData(c, resourceDataGatheringPeriodSeconds*time.Second)
 | 
							f.gatherer, err = NewResourceUsageGatherer(c)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								Logf("Error while creating NewResourceUsageGatherer: %v", err)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								go f.gatherer.startGatheringData()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if testContext.GatherLogsSizes {
 | 
						if testContext.GatherLogsSizes {
 | 
				
			||||||
@@ -170,7 +175,7 @@ func (f *Framework) afterEach() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	summaries := make([]TestDataSummary, 0)
 | 
						summaries := make([]TestDataSummary, 0)
 | 
				
			||||||
	if testContext.GatherKubeSystemResourceUsageData {
 | 
						if testContext.GatherKubeSystemResourceUsageData && f.gatherer != nil {
 | 
				
			||||||
		By("Collecting resource usage data")
 | 
							By("Collecting resource usage data")
 | 
				
			||||||
		summaries = append(summaries, f.gatherer.stopAndSummarize([]int{90, 99}, f.addonResourceConstraints))
 | 
							summaries = append(summaries, f.gatherer.stopAndSummarize([]int{90, 99}, f.addonResourceConstraints))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,7 +34,7 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	resourceDataGatheringPeriodSeconds = 60
 | 
						resourceDataGatheringPeriod = 60 * time.Second
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type resourceConstraint struct {
 | 
					type resourceConstraint struct {
 | 
				
			||||||
@@ -42,12 +42,6 @@ type resourceConstraint struct {
 | 
				
			|||||||
	memoryConstraint uint64
 | 
						memoryConstraint uint64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type containerResourceGatherer struct {
 | 
					 | 
				
			||||||
	usageTimeseries map[time.Time]resourceUsagePerContainer
 | 
					 | 
				
			||||||
	stopCh          chan struct{}
 | 
					 | 
				
			||||||
	wg              sync.WaitGroup
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type SingleContainerSummary struct {
 | 
					type SingleContainerSummary struct {
 | 
				
			||||||
	Name string
 | 
						Name string
 | 
				
			||||||
	Cpu  float64
 | 
						Cpu  float64
 | 
				
			||||||
@@ -75,43 +69,184 @@ func (s *ResourceUsageSummary) PrintJSON() string {
 | 
				
			|||||||
	return prettyPrintJSON(*s)
 | 
						return prettyPrintJSON(*s)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
 | 
					func computePercentiles(timeSeries []resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
 | 
				
			||||||
	g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
 | 
						if len(timeSeries) == 0 {
 | 
				
			||||||
	g.wg.Add(1)
 | 
							return make(map[int]resourceUsagePerContainer)
 | 
				
			||||||
	g.stopCh = make(chan struct{})
 | 
						}
 | 
				
			||||||
	go func() error {
 | 
						dataMap := make(map[string]*usageDataPerContainer)
 | 
				
			||||||
		defer utilruntime.HandleCrash()
 | 
						for i := range timeSeries {
 | 
				
			||||||
		defer g.wg.Done()
 | 
							for name, data := range timeSeries[i] {
 | 
				
			||||||
		for {
 | 
								if dataMap[name] == nil {
 | 
				
			||||||
			select {
 | 
									dataMap[name] = &usageDataPerContainer{
 | 
				
			||||||
			case <-time.After(period):
 | 
										cpuData:        make([]float64, len(timeSeries)),
 | 
				
			||||||
				now := time.Now()
 | 
										memUseData:     make([]uint64, len(timeSeries)),
 | 
				
			||||||
				data, err := g.getKubeSystemContainersResourceUsage(c)
 | 
										memWorkSetData: make([]uint64, len(timeSeries)),
 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					Logf("Error while getting resource usage: %v", err)
 | 
					 | 
				
			||||||
					continue
 | 
					 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				g.usageTimeseries[now] = data
 | 
								}
 | 
				
			||||||
			case <-g.stopCh:
 | 
								dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
 | 
				
			||||||
				Logf("Stop channel is closed. Stopping gatherer.")
 | 
								dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
 | 
				
			||||||
				return nil
 | 
								dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, v := range dataMap {
 | 
				
			||||||
 | 
							sort.Float64s(v.cpuData)
 | 
				
			||||||
 | 
							sort.Sort(uint64arr(v.memUseData))
 | 
				
			||||||
 | 
							sort.Sort(uint64arr(v.memWorkSetData))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						result := make(map[int]resourceUsagePerContainer)
 | 
				
			||||||
 | 
						for _, perc := range percentilesToCompute {
 | 
				
			||||||
 | 
							data := make(resourceUsagePerContainer)
 | 
				
			||||||
 | 
							for k, v := range dataMap {
 | 
				
			||||||
 | 
								percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
 | 
				
			||||||
 | 
								data[k] = &containerResourceUsage{
 | 
				
			||||||
 | 
									Name:                    k,
 | 
				
			||||||
 | 
									CPUUsageInCores:         v.cpuData[percentileIndex],
 | 
				
			||||||
 | 
									MemoryUsageInBytes:      v.memUseData[percentileIndex],
 | 
				
			||||||
 | 
									MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
							result[perc] = data
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return result
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func leftMergeData(left, right map[int]resourceUsagePerContainer) map[int]resourceUsagePerContainer {
 | 
				
			||||||
 | 
						result := make(map[int]resourceUsagePerContainer)
 | 
				
			||||||
 | 
						for percentile, data := range left {
 | 
				
			||||||
 | 
							result[percentile] = data
 | 
				
			||||||
 | 
							if _, ok := right[percentile]; !ok {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							for k, v := range right[percentile] {
 | 
				
			||||||
 | 
								result[percentile][k] = v
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return result
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type resourceGatherWorker struct {
 | 
				
			||||||
 | 
						c                    *client.Client
 | 
				
			||||||
 | 
						nodeName             string
 | 
				
			||||||
 | 
						wg                   *sync.WaitGroup
 | 
				
			||||||
 | 
						containerIDToNameMap map[string]string
 | 
				
			||||||
 | 
						containerIDs         []string
 | 
				
			||||||
 | 
						stopCh               chan struct{}
 | 
				
			||||||
 | 
						dataSeries           []resourceUsagePerContainer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (w *resourceGatherWorker) singleProbe() {
 | 
				
			||||||
 | 
						data := make(resourceUsagePerContainer)
 | 
				
			||||||
 | 
						nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, 15*time.Second, func() []string { return w.containerIDs }, true)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							Logf("Error while reading data from %v: %v", w.nodeName, err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for k, v := range nodeUsage {
 | 
				
			||||||
 | 
							data[w.containerIDToNameMap[k]] = v
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						w.dataSeries = append(w.dataSeries, data)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
 | 
				
			||||||
 | 
						defer utilruntime.HandleCrash()
 | 
				
			||||||
 | 
						defer w.wg.Done()
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(initialSleep):
 | 
				
			||||||
 | 
							w.singleProbe()
 | 
				
			||||||
 | 
							for {
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case <-time.After(resourceDataGatheringPeriod):
 | 
				
			||||||
 | 
									w.singleProbe()
 | 
				
			||||||
 | 
								case <-w.stopCh:
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case <-w.stopCh:
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) {
 | 
				
			||||||
 | 
						delay := resourceDataGatheringPeriod / time.Duration(len(g.workers))
 | 
				
			||||||
 | 
						for i := range g.workers {
 | 
				
			||||||
 | 
							go g.workers[i].gather(delay)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						g.workerWg.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type containerResourceGatherer struct {
 | 
				
			||||||
 | 
						client               *client.Client
 | 
				
			||||||
 | 
						stopCh               chan struct{}
 | 
				
			||||||
 | 
						workers              []resourceGatherWorker
 | 
				
			||||||
 | 
						workerWg             sync.WaitGroup
 | 
				
			||||||
 | 
						containerIDToNameMap map[string]string
 | 
				
			||||||
 | 
						containerIDs         []string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewResourceUsageGatherer(c *client.Client) (*containerResourceGatherer, error) {
 | 
				
			||||||
 | 
						g := containerResourceGatherer{
 | 
				
			||||||
 | 
							client:               c,
 | 
				
			||||||
 | 
							stopCh:               make(chan struct{}),
 | 
				
			||||||
 | 
							containerIDToNameMap: make(map[string]string),
 | 
				
			||||||
 | 
							containerIDs:         make([]string, 0),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pods, err := c.Pods("kube-system").List(api.ListOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							Logf("Error while listing Pods: %v", err)
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, pod := range pods.Items {
 | 
				
			||||||
 | 
							for _, container := range pod.Status.ContainerStatuses {
 | 
				
			||||||
 | 
								containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
 | 
				
			||||||
 | 
								g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
 | 
				
			||||||
 | 
								g.containerIDs = append(g.containerIDs, containerID)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						nodeList, err := c.Nodes().List(api.ListOptions{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							Logf("Error while listing Nodes: %v", err)
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						g.workerWg.Add(len(nodeList.Items))
 | 
				
			||||||
 | 
						for _, node := range nodeList.Items {
 | 
				
			||||||
 | 
							g.workers = append(g.workers, resourceGatherWorker{
 | 
				
			||||||
 | 
								c:                    c,
 | 
				
			||||||
 | 
								nodeName:             node.Name,
 | 
				
			||||||
 | 
								wg:                   &g.workerWg,
 | 
				
			||||||
 | 
								containerIDToNameMap: g.containerIDToNameMap,
 | 
				
			||||||
 | 
								containerIDs:         g.containerIDs,
 | 
				
			||||||
 | 
								stopCh:               g.stopCh,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &g, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// startGatheringData blocks until stopAndSummarize is called.
 | 
				
			||||||
 | 
					func (g *containerResourceGatherer) startGatheringData() {
 | 
				
			||||||
 | 
						g.getKubeSystemContainersResourceUsage(g.client)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary {
 | 
					func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary {
 | 
				
			||||||
	close(g.stopCh)
 | 
						close(g.stopCh)
 | 
				
			||||||
	Logf("Closed stop channel.")
 | 
						Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
 | 
				
			||||||
	g.wg.Wait()
 | 
						g.workerWg.Wait()
 | 
				
			||||||
	Logf("Waitgroup finished.")
 | 
						Logf("Waitgroup finished.")
 | 
				
			||||||
	if len(percentiles) == 0 {
 | 
						if len(percentiles) == 0 {
 | 
				
			||||||
		Logf("Warning! Empty percentile list for stopAndPrintData.")
 | 
							Logf("Warning! Empty percentile list for stopAndPrintData.")
 | 
				
			||||||
		return &ResourceUsageSummary{}
 | 
							return &ResourceUsageSummary{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	stats := g.computePercentiles(g.usageTimeseries, percentiles)
 | 
						data := make(map[int]resourceUsagePerContainer)
 | 
				
			||||||
 | 
						for i := range g.workers {
 | 
				
			||||||
 | 
							stats := computePercentiles(g.workers[i].dataSeries, percentiles)
 | 
				
			||||||
 | 
							data = leftMergeData(stats, data)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Workers has been stopped. We need to gather data stored in them.
 | 
				
			||||||
	sortedKeys := []string{}
 | 
						sortedKeys := []string{}
 | 
				
			||||||
	for name := range stats[percentiles[0]] {
 | 
						for name := range data[percentiles[0]] {
 | 
				
			||||||
		sortedKeys = append(sortedKeys, name)
 | 
							sortedKeys = append(sortedKeys, name)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	sort.Strings(sortedKeys)
 | 
						sort.Strings(sortedKeys)
 | 
				
			||||||
@@ -119,7 +254,7 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai
 | 
				
			|||||||
	summary := make(ResourceUsageSummary)
 | 
						summary := make(ResourceUsageSummary)
 | 
				
			||||||
	for _, perc := range percentiles {
 | 
						for _, perc := range percentiles {
 | 
				
			||||||
		for _, name := range sortedKeys {
 | 
							for _, name := range sortedKeys {
 | 
				
			||||||
			usage := stats[perc][name]
 | 
								usage := data[perc][name]
 | 
				
			||||||
			summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{
 | 
								summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{
 | 
				
			||||||
				Name: name,
 | 
									Name: name,
 | 
				
			||||||
				Cpu:  usage.CPUUsageInCores,
 | 
									Cpu:  usage.CPUUsageInCores,
 | 
				
			||||||
@@ -157,92 +292,3 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai
 | 
				
			|||||||
	Expect(violatedConstraints).To(BeEmpty())
 | 
						Expect(violatedConstraints).To(BeEmpty())
 | 
				
			||||||
	return &summary
 | 
						return &summary
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
 | 
					 | 
				
			||||||
	if len(timeSeries) == 0 {
 | 
					 | 
				
			||||||
		return make(map[int]resourceUsagePerContainer)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	dataMap := make(map[string]*usageDataPerContainer)
 | 
					 | 
				
			||||||
	for _, singleStatistic := range timeSeries {
 | 
					 | 
				
			||||||
		for name, data := range singleStatistic {
 | 
					 | 
				
			||||||
			if dataMap[name] == nil {
 | 
					 | 
				
			||||||
				dataMap[name] = &usageDataPerContainer{
 | 
					 | 
				
			||||||
					cpuData:        make([]float64, len(timeSeries)),
 | 
					 | 
				
			||||||
					memUseData:     make([]uint64, len(timeSeries)),
 | 
					 | 
				
			||||||
					memWorkSetData: make([]uint64, len(timeSeries)),
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
 | 
					 | 
				
			||||||
			dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
 | 
					 | 
				
			||||||
			dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, v := range dataMap {
 | 
					 | 
				
			||||||
		sort.Float64s(v.cpuData)
 | 
					 | 
				
			||||||
		sort.Sort(uint64arr(v.memUseData))
 | 
					 | 
				
			||||||
		sort.Sort(uint64arr(v.memWorkSetData))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	result := make(map[int]resourceUsagePerContainer)
 | 
					 | 
				
			||||||
	for _, perc := range percentilesToCompute {
 | 
					 | 
				
			||||||
		data := make(resourceUsagePerContainer)
 | 
					 | 
				
			||||||
		for k, v := range dataMap {
 | 
					 | 
				
			||||||
			percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
 | 
					 | 
				
			||||||
			data[k] = &containerResourceUsage{
 | 
					 | 
				
			||||||
				Name:                    k,
 | 
					 | 
				
			||||||
				CPUUsageInCores:         v.cpuData[percentileIndex],
 | 
					 | 
				
			||||||
				MemoryUsageInBytes:      v.memUseData[percentileIndex],
 | 
					 | 
				
			||||||
				MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		result[perc] = data
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return result
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
 | 
					 | 
				
			||||||
	pods, err := c.Pods("kube-system").List(api.ListOptions{})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return resourceUsagePerContainer{}, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	nodes, err := c.Nodes().List(api.ListOptions{})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return resourceUsagePerContainer{}, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	containerIDToNameMap := make(map[string]string)
 | 
					 | 
				
			||||||
	containerIDs := make([]string, 0)
 | 
					 | 
				
			||||||
	for _, pod := range pods.Items {
 | 
					 | 
				
			||||||
		for _, container := range pod.Status.ContainerStatuses {
 | 
					 | 
				
			||||||
			containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
 | 
					 | 
				
			||||||
			containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
 | 
					 | 
				
			||||||
			containerIDs = append(containerIDs, containerID)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mutex := sync.Mutex{}
 | 
					 | 
				
			||||||
	wg := sync.WaitGroup{}
 | 
					 | 
				
			||||||
	wg.Add(len(nodes.Items))
 | 
					 | 
				
			||||||
	errors := make([]error, 0)
 | 
					 | 
				
			||||||
	nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
 | 
					 | 
				
			||||||
	for _, node := range nodes.Items {
 | 
					 | 
				
			||||||
		go func(nodeName string) {
 | 
					 | 
				
			||||||
			defer utilruntime.HandleCrash()
 | 
					 | 
				
			||||||
			defer wg.Done()
 | 
					 | 
				
			||||||
			nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true)
 | 
					 | 
				
			||||||
			mutex.Lock()
 | 
					 | 
				
			||||||
			defer mutex.Unlock()
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				errors = append(errors, err)
 | 
					 | 
				
			||||||
				return
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			for k, v := range nodeUsage {
 | 
					 | 
				
			||||||
				nameToUsageMap[containerIDToNameMap[k]] = v
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}(node.Name)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	wg.Wait()
 | 
					 | 
				
			||||||
	if len(errors) != 0 {
 | 
					 | 
				
			||||||
		return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nameToUsageMap, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user