MapReduce-like scheduler priority functions

This commit is contained in:
Wojciech Tyczynski
2016-08-26 16:08:40 +02:00
parent 6fcbbe8663
commit 33c710adf0
8 changed files with 327 additions and 146 deletions

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@@ -237,7 +238,7 @@ func PrioritizeNodes(
nodes []*api.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
result := make(schedulerapi.HostPriorityList, 0, len(nodeNameToInfo))
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
@@ -252,6 +253,7 @@ func PrioritizeNodes(
errs []error
)
meta := priorities.PriorityMetadata(pod, nodes)
for _, priorityConfig := range priorityConfigs {
// skip the priority function if the weight is specified as 0
if priorityConfig.Weight == 0 {
@@ -262,8 +264,26 @@ func PrioritizeNodes(
go func(config algorithm.PriorityConfig) {
defer wg.Done()
weight := config.Weight
priorityFunc := config.Function
prioritizedList, err := priorityFunc(pod, nodeNameToInfo, nodes)
prioritizedList, err := func() (schedulerapi.HostPriorityList, error) {
if config.Function != nil {
return config.Function(pod, nodeNameToInfo, nodes)
}
prioritizedList := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
hostResult, err := config.Map(pod, meta, nodeNameToInfo[nodes[i].Name])
if err != nil {
return nil, err
}
prioritizedList = append(prioritizedList, hostResult)
}
if config.Reduce != nil {
if err := config.Reduce(prioritizedList); err != nil {
return nil, err
}
}
return prioritizedList, nil
}()
mu.Lock()
defer mu.Unlock()
@@ -277,13 +297,12 @@ func PrioritizeNodes(
}
}(priorityConfig)
}
// wait for all go routines to finish
wg.Wait()
if len(errs) != 0 {
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
}
// wait for all go routines to finish
wg.Wait()
if len(extenders) != 0 && nodes != nil {
for _, extender := range extenders {
wg.Add(1)