change framework.RunScorePlugins to return slice organized by node

This commit is contained in:
sanposhiho
2022-03-04 02:00:00 +09:00
committed by Kensei Nakada
parent 6820a383be
commit cbf1ea5e68
4 changed files with 167 additions and 47 deletions

View File

@@ -893,16 +893,17 @@ func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreS
return status
}
// RunScorePlugins runs the set of configured scoring plugins. It returns a list that
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// RunScorePlugins runs the set of configured scoring plugins.
// It returns a list that stores scores from each plugin and total score for each Node.
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ns []framework.NodePluginScores, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
pluginToNodeScores := make(map[string]framework.NodeScoreList, len(f.scorePlugins))
for _, pl := range f.scorePlugins {
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
}
@@ -933,10 +934,10 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
// Run NormalizeScore method for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
nodeScoreList := pluginToNodeScores[pl.Name()]
if pl.ScoreExtensions() == nil {
return
}
nodeScoreList := pluginToNodeScores[pl.Name()]
status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
if !status.IsSuccess() {
err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
@@ -948,28 +949,38 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.Cy
return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
}
// Apply score defaultWeights for each ScorePlugin in parallel.
f.Parallelizer().Until(ctx, len(f.scorePlugins), func(index int) {
pl := f.scorePlugins[index]
// Score plugins' weight has been checked when they are initialized.
weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
// Apply score weight for each ScorePlugin in parallel,
// and then, build allNodePluginScores.
f.Parallelizer().Until(ctx, len(nodes), func(index int) {
nodePluginScores := framework.NodePluginScores{
Name: nodes[index].Name,
Scores: make([]framework.PluginScore, len(f.scorePlugins)),
}
for i, nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
if nodeScore.Score > framework.MaxNodeScore || nodeScore.Score < framework.MinNodeScore {
err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
for i, pl := range f.scorePlugins {
weight := f.scorePluginWeight[pl.Name()]
nodeScoreList := pluginToNodeScores[pl.Name()]
score := nodeScoreList[index].Score
if score > framework.MaxNodeScore || score < framework.MinNodeScore {
err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), score, framework.MinNodeScore, framework.MaxNodeScore)
errCh.SendErrorWithCancel(err, cancel)
return
}
nodeScoreList[i].Score = nodeScore.Score * int64(weight)
weightedScore := score * int64(weight)
nodePluginScores.Scores[i] = framework.PluginScore{
Name: pl.Name(),
Score: weightedScore,
}
nodePluginScores.TotalScore += weightedScore
}
allNodePluginScores[index] = nodePluginScores
}, score)
if err := errCh.ReceiveError(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
}
return pluginToNodeScores, nil
return allNodePluginScores, nil
}
func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {