Add NormalizeScore extension point for scheduler framework.

This commit is contained in:
Cong Liu
2019-07-29 11:45:01 -04:00
parent 75d5189612
commit 34373662d3
7 changed files with 685 additions and 57 deletions

View File

@@ -34,20 +34,21 @@ import (
// framework is the component responsible for initializing and running scheduler
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
filterPlugins []FilterPlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
registry Registry
nodeInfoSnapshot *cache.NodeInfoSnapshot
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
prefilterPlugins []PrefilterPlugin
filterPlugins []FilterPlugin
scorePlugins []ScorePlugin
scoreWithNormalizePlugins []ScoreWithNormalizePlugin
reservePlugins []ReservePlugin
prebindPlugins []PrebindPlugin
bindPlugins []BindPlugin
postbindPlugins []PostbindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
}
const (
@@ -131,6 +132,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
if plugins.Score != nil {
for _, sc := range plugins.Score.Enabled {
if pg, ok := pluginsMap[sc.Name]; ok {
// First, make sure the plugin implements ScorePlugin interface.
p, ok := pg.(ScorePlugin)
if !ok {
return nil, fmt.Errorf("plugin %v does not extend score plugin", sc.Name)
@@ -139,6 +141,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
return nil, fmt.Errorf("score plugin %v is not configured with weight", p.Name())
}
f.scorePlugins = append(f.scorePlugins, p)
// Next, if the plugin also implements ScoreWithNormalizePlugin interface,
// add it to the normalizeScore plugin list.
np, ok := pg.(ScoreWithNormalizePlugin)
if ok {
f.scoreWithNormalizePlugins = append(f.scoreWithNormalizePlugins, np)
}
} else {
return nil, fmt.Errorf("score plugin %v does not exist", sc.Name)
}
@@ -317,14 +326,12 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(nodes), func(index int) {
for _, pl := range f.scorePlugins {
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
score, status := pl.Score(pc, pod, nodes[index].Name)
if !status.IsSuccess() {
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
return
}
pluginToNodeScoreMap[pl.Name()][index] = score * weight
pluginToNodeScoreMap[pl.Name()][index] = score
}
})
@@ -337,6 +344,64 @@ func (f *framework) RunScorePlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.
return pluginToNodeScoreMap, nil
}
// RunNormalizeScorePlugins runs the NormalizeScore function of Score plugins.
// It should be called after RunScorePlugins with the PluginToNodeScoreMap result.
// It then modifies the map with normalized scores. It returns a non-success Status
// if any of the NormalizeScore functions returns a non-success status.
func (f *framework) RunNormalizeScorePlugins(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(f.scoreWithNormalizePlugins), func(index int) {
pl := f.scoreWithNormalizePlugins[index]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("normalize score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
}
status := pl.NormalizeScore(pc, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("normalize score plugin %v failed with error %v", pl.Name(), status.Message())
errCh.SendErrorWithCancel(err, cancel)
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while running normalize score plugin for pod %v: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}
// ApplyScoreWeights applies weights to the score results. It should be called after
// RunNormalizeScorePlugins.
func (f *framework) ApplyScoreWeights(pc *PluginContext, pod *v1.Pod, scores PluginToNodeScoreMap) *Status {
ctx, cancel := context.WithCancel(context.Background())
errCh := schedutil.NewErrorChannel()
workqueue.ParallelizeUntil(ctx, 16, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.pluginNameToWeightMap[pl.Name()]
nodeScoreList, ok := scores[pl.Name()]
if !ok {
err := fmt.Errorf("score plugin %v has no corresponding scores in the PluginToNodeScoreMap", pl.Name())
errCh.SendErrorWithCancel(err, cancel)
}
for i := range nodeScoreList {
nodeScoreList[i] = nodeScoreList[i] * weight
}
})
if err := errCh.ReceiveError(); err != nil {
msg := fmt.Sprintf("error while applying score weights for pod %v: %v", pod.Name, err)
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}
// RunPrebindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
@@ -520,7 +585,6 @@ func pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
find(plugins.Filter)
find(plugins.PostFilter)
find(plugins.Score)
find(plugins.NormalizeScore)
find(plugins.Reserve)
find(plugins.Permit)
find(plugins.PreBind)