Remove priority execution paths in favor of score plugins

Mainly affects core/generic_scheduler.go (and related tests). Removes the "prioritizers" field and related functions.
This commit is contained in:
Mike Dame
2019-12-09 16:37:23 -05:00
parent 442107b6b9
commit 255ab6d2c3
10 changed files with 240 additions and 283 deletions

View File

@@ -43,7 +43,6 @@ type testCase struct {
JSON string JSON string
featureGates map[featuregate.Feature]bool featureGates map[featuregate.Feature]bool
wantPredicates sets.String wantPredicates sets.String
wantPrioritizers sets.String
wantPlugins map[string][]config.Plugin wantPlugins map[string][]config.Plugin
wantExtenders []config.Extender wantExtenders []config.Extender
} }
@@ -99,9 +98,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
wantPredicates: sets.NewString( wantPredicates: sets.NewString(
"PodFitsPorts", "PodFitsPorts",
), ),
wantPrioritizers: sets.NewString(
"ServiceSpreadingPriority",
),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -145,7 +141,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -198,7 +193,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -260,7 +254,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -325,7 +318,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
] ]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -401,7 +393,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -488,7 +479,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -576,7 +566,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -668,7 +657,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -772,7 +760,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -878,7 +865,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -984,7 +970,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -1095,7 +1080,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}] }]
}`, }`,
wantPredicates: sets.NewString(), wantPredicates: sets.NewString(),
wantPrioritizers: sets.NewString(),
wantPlugins: map[string][]config.Plugin{ wantPlugins: map[string][]config.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -1171,7 +1155,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}, },
} }
registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...) registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...)
registeredPriorities := sets.NewString(scheduler.ListRegisteredPriorityFunctions()...)
seenPredicates := sets.NewString() seenPredicates := sets.NewString()
seenPriorities := sets.NewString() seenPriorities := sets.NewString()
mandatoryPredicates := sets.NewString() mandatoryPredicates := sets.NewString()
@@ -1255,14 +1238,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates) t.Errorf("Got predicates %v, want %v", gotPredicates, wantPredicates)
} }
gotPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
gotPrioritizers.Insert(p.Name)
}
if !gotPrioritizers.Equal(tc.wantPrioritizers) {
t.Errorf("Got prioritizers %v, want %v", gotPrioritizers, tc.wantPrioritizers)
}
gotPlugins := sched.Framework.ListPlugins() gotPlugins := sched.Framework.ListPlugins()
for _, p := range gotPlugins["FilterPlugin"] { for _, p := range gotPlugins["FilterPlugin"] {
seenPredicates.Insert(filterToPredicateMap[p.Name]) seenPredicates.Insert(filterToPredicateMap[p.Name])
@@ -1296,16 +1271,12 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
} }
seenPredicates = seenPredicates.Union(gotPredicates) seenPredicates = seenPredicates.Union(gotPredicates)
seenPriorities = seenPriorities.Union(gotPrioritizers)
}) })
} }
if !seenPredicates.HasAll(registeredPredicates.List()...) { if !seenPredicates.HasAll(registeredPredicates.List()...) {
t.Errorf("Registered predicates are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPredicates.Difference(seenPredicates).List()) t.Errorf("Registered predicates are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPredicates.Difference(seenPredicates).List())
} }
if !seenPriorities.HasAll(registeredPriorities.List()...) {
t.Errorf("Registered priorities are missing from compatibility test (add to test stanza for version currently in development): %#v", registeredPriorities.Difference(seenPriorities).List())
}
} }
func pluginsToStringSet(plugins []config.Plugin) sets.String { func pluginsToStringSet(plugins []config.Plugin) sets.String {

View File

@@ -28,7 +28,6 @@ go_library(
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
@@ -55,13 +54,13 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/extender/v1:go_default_library", "//pkg/scheduler/apis/extender/v1:go_default_library",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library",
@@ -72,7 +71,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@@ -18,7 +18,6 @@ package core
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
@@ -28,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@@ -108,16 +108,28 @@ func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.Node
return &result, nil return &result, nil
} }
func machine2Prioritizer(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type machine2PrioritizerPlugin struct{}
node := nodeInfo.Node()
if node == nil { func newMachine2PrioritizerPlugin() framework.PluginFactory {
return framework.NodeScore{}, errors.New("node not found") return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &machine2PrioritizerPlugin{}, nil
} }
}
func (pl *machine2PrioritizerPlugin) Name() string {
return "Machine2Prioritizer"
}
func (pl *machine2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score := 10 score := 10
if node.Name == "machine2" { if nodeName == "machine2" {
score = 100 score = 100
} }
return framework.NodeScore{Name: node.Name, Score: int64(score)}, nil return int64(score), nil
}
func (pl *machine2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
} }
type FakeExtender struct { type FakeExtender struct {
@@ -351,7 +363,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
registerFilterPlugin st.RegisterFilterPluginFunc registerFilterPlugin st.RegisterFilterPluginFunc
prioritizers []priorities.PriorityConfig registerScorePlugin st.RegisterScorePluginFunc
extenders []FakeExtender extenders []FakeExtender
nodes []string nodes []string
expectedResult ScheduleResult expectedResult ScheduleResult
@@ -458,7 +470,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}, },
{ {
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 20}}, registerScorePlugin: st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 20),
extenders: []FakeExtender{ extenders: []FakeExtender{
{ {
predicates: []fitPredicate{truePredicateExtender}, predicates: []fitPredicate{truePredicateExtender},
@@ -483,7 +495,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender and/or // because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender. // errorPrioritizerExtender.
registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), registerFilterPlugin: st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
prioritizers: []priorities.PriorityConfig{{Map: machine2Prioritizer, Weight: 1}}, registerScorePlugin: st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 1),
extenders: []FakeExtender{ extenders: []FakeExtender{
{ {
predicates: []fitPredicate{errorPredicateExtender}, predicates: []fitPredicate{errorPredicateExtender},
@@ -545,9 +557,13 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
registry := framework.Registry{} registry := framework.Registry{}
plugins := &schedulerapi.Plugins{ plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
} }
var pluginConfigs []schedulerapi.PluginConfig var pluginConfigs []schedulerapi.PluginConfig
test.registerFilterPlugin(&registry, plugins, pluginConfigs) test.registerFilterPlugin(&registry, plugins, pluginConfigs)
if test.registerScorePlugin != nil {
test.registerScorePlugin(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
@@ -555,7 +571,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
queue, queue,
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
test.prioritizers,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,

View File

@@ -33,7 +33,6 @@ import (
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
policylisters "k8s.io/client-go/listers/policy/v1beta1" policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@@ -129,9 +128,6 @@ type ScheduleAlgorithm interface {
Predicates() map[string]predicates.FitPredicate Predicates() map[string]predicates.FitPredicate
// Prioritizers returns a slice of priority config. This is exposed for // Prioritizers returns a slice of priority config. This is exposed for
// testing. // testing.
Prioritizers() []priorities.PriorityConfig
// Extenders returns a slice of extender config. This is exposed for
// testing.
Extenders() []algorithm.SchedulerExtender Extenders() []algorithm.SchedulerExtender
// GetPredicateMetadataProducer returns the predicate metadata producer. This is needed // GetPredicateMetadataProducer returns the predicate metadata producer. This is needed
// for cluster autoscaler integration. // for cluster autoscaler integration.
@@ -684,11 +680,10 @@ func (g *genericScheduler) podFitsOnNode(
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
} }
// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel. // prioritizeNodes prioritizes the nodes by running the score plugins,
// Each priority function is expected to set a score of 0-10 // which return a score for each node from the call to RunScorePlugins().
// 0 is the lowest priority score (least preferred node) and 10 is the highest // The scores from each plugin are added together to make the score for that node, then
// Each priority function can also have its own weight // any extenders are run as well.
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes // All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes( func (g *genericScheduler) prioritizeNodes(
ctx context.Context, ctx context.Context,
@@ -699,7 +694,7 @@ func (g *genericScheduler) prioritizeNodes(
) (framework.NodeScoreList, error) { ) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one. // If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format // This is required to generate the priority list in the required format
if len(g.prioritizers) == 0 && len(g.extenders) == 0 && !g.framework.HasScorePlugins() { if len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes)) result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes { for i := range nodes {
result = append(result, framework.NodeScore{ result = append(result, framework.NodeScore{
@@ -710,62 +705,6 @@ func (g *genericScheduler) prioritizeNodes(
return result, nil return result, nil
} }
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errs []error
)
appendError := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
results := make([]framework.NodeScoreList, len(g.prioritizers))
for i := range g.prioritizers {
results[i] = make(framework.NodeScoreList, len(nodes))
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
if len(errs) != 0 {
return framework.NodeScoreList{}, errors.NewAggregate(errs)
}
// Run the Score plugins. // Run the Score plugins.
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta}) state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes) scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
@@ -778,16 +717,14 @@ func (g *genericScheduler) prioritizeNodes(
for i := range nodes { for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range g.prioritizers {
result[i].Score += results[j][i].Score * g.prioritizers[j].Weight
}
for j := range scoresMap { for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score result[i].Score += scoresMap[j][i].Score
} }
} }
if len(g.extenders) != 0 && nodes != nil { if len(g.extenders) != 0 && nodes != nil {
var mu sync.Mutex
var wg sync.WaitGroup
combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList)) combinedScores := make(map[string]int64, len(g.nodeInfoSnapshot.NodeInfoList))
for i := range g.extenders { for i := range g.extenders {
if !g.extenders[i].IsInterested(pod) { if !g.extenders[i].IsInterested(pod) {
@@ -1249,7 +1186,6 @@ func NewGenericScheduler(
podQueue internalqueue.SchedulingQueue, podQueue internalqueue.SchedulingQueue,
predicates map[string]predicates.FitPredicate, predicates map[string]predicates.FitPredicate,
predicateMetaProducer predicates.MetadataProducer, predicateMetaProducer predicates.MetadataProducer,
prioritizers []priorities.PriorityConfig,
priorityMetaProducer priorities.MetadataProducer, priorityMetaProducer priorities.MetadataProducer,
nodeInfoSnapshot *nodeinfosnapshot.Snapshot, nodeInfoSnapshot *nodeinfosnapshot.Snapshot,
framework framework.Framework, framework framework.Framework,
@@ -1266,7 +1202,6 @@ func NewGenericScheduler(
schedulingQueue: podQueue, schedulingQueue: podQueue,
predicates: predicates, predicates: predicates,
predicateMetaProducer: predicateMetaProducer, predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer, priorityMetaProducer: priorityMetaProducer,
framework: framework, framework: framework,
extenders: extenders, extenders: extenders,

View File

@@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@@ -43,13 +42,13 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1" extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@@ -173,60 +172,120 @@ func NewFakeFilterPlugin(failedNodeReturnCodeMap map[string]framework.Code) fram
} }
} }
func numericMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type numericMapPlugin struct{}
node := nodeInfo.Node()
score, err := strconv.Atoi(node.Name)
if err != nil {
return framework.NodeScore{}, err
}
return framework.NodeScore{ func newNumericMapPlugin() framework.PluginFactory {
Name: node.Name, return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
Score: int64(score), return &numericMapPlugin{}, nil
}, nil }
} }
func reverseNumericReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { func (pl *numericMapPlugin) Name() string {
return "NumericMap"
}
func (pl *numericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score, err := strconv.Atoi(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
}
return int64(score), nil
}
func (pl *numericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
type reverseNumericMapPlugin struct{}
func newReverseNumericMapPlugin() framework.PluginFactory {
return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &reverseNumericMapPlugin{}, nil
}
}
func (pl *reverseNumericMapPlugin) Name() string {
return "ReverseNumericMap"
}
func (pl *reverseNumericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score, err := strconv.Atoi(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
}
return int64(score), nil
}
func (pl *reverseNumericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return pl
}
func (pl *reverseNumericMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
var maxScore float64 var maxScore float64
minScore := math.MaxFloat64 minScore := math.MaxFloat64
for _, hostPriority := range result { for _, hostPriority := range nodeScores {
maxScore = math.Max(maxScore, float64(hostPriority.Score)) maxScore = math.Max(maxScore, float64(hostPriority.Score))
minScore = math.Min(minScore, float64(hostPriority.Score)) minScore = math.Min(minScore, float64(hostPriority.Score))
} }
for i, hostPriority := range result { for i, hostPriority := range nodeScores {
result[i] = framework.NodeScore{ nodeScores[i] = framework.NodeScore{
Name: hostPriority.Name, Name: hostPriority.Name,
Score: int64(maxScore + minScore - float64(hostPriority.Score)), Score: int64(maxScore + minScore - float64(hostPriority.Score)),
} }
} }
return nil return nil
} }
func trueMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { type trueMapPlugin struct{}
return framework.NodeScore{
Name: nodeInfo.Node().Name, func newTrueMapPlugin() framework.PluginFactory {
Score: 1, return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
}, nil return &trueMapPlugin{}, nil
}
} }
func falseMapPriority(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { func (pl *trueMapPlugin) Name() string {
return framework.NodeScore{}, errPrioritize return "TrueMap"
} }
func getNodeReducePriority(pod *v1.Pod, meta interface{}, sharedLister schedulerlisters.SharedLister, result framework.NodeScoreList) error { func (pl *trueMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
for _, host := range result { return 1, nil
}
func (pl *trueMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return pl
}
func (pl *trueMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
for _, host := range nodeScores {
if host.Name == "" { if host.Name == "" {
return fmt.Errorf("unexpected empty host name") return framework.NewStatus(framework.Error, "unexpected empty host name")
} }
} }
return nil return nil
} }
// emptyPluginRegistry is a test plugin set used by the default scheduler. type falseMapPlugin struct{}
var emptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, []schedulerapi.PluginConfig{}) func newFalseMapPlugin() framework.PluginFactory {
return func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &falseMapPlugin{}, nil
}
}
func (pl *falseMapPlugin) Name() string {
return "FalseMap"
}
func (pl *falseMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
return 0, framework.NewStatus(framework.Error, errPrioritize.Error())
}
func (pl *falseMapPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
var emptySnapshot = nodeinfosnapshot.NewEmptySnapshot() var emptySnapshot = nodeinfosnapshot.NewEmptySnapshot()
func makeNodeList(nodeNames []string) []*v1.Node { func makeNodeList(nodeNames []string) []*v1.Node {
@@ -313,7 +372,7 @@ func TestGenericScheduler(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
registerFilterPlugins []st.RegisterFilterPluginFunc registerFilterPlugins []st.RegisterFilterPluginFunc
prioritizers []priorities.PriorityConfig registerScorePlugins []st.RegisterScorePluginFunc
alwaysCheckAllPredicates bool alwaysCheckAllPredicates bool
nodes []string nodes []string
pvcs []v1.PersistentVolumeClaim pvcs []v1.PersistentVolumeClaim
@@ -365,7 +424,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")}},
expectedHosts: sets.NewString("3"), expectedHosts: sets.NewString("3"),
@@ -376,7 +437,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
expectedHosts: sets.NewString("2"), expectedHosts: sets.NewString("2"),
@@ -387,16 +450,9 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{ registerScorePlugins: []st.RegisterScorePluginFunc{
{ st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
Map: numericMapPriority, st.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
Weight: 1,
},
{
Map: numericMapPriority,
Reduce: reverseNumericReducePriority,
Weight: 2,
},
}, },
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
@@ -409,7 +465,9 @@ func TestGenericScheduler(t *testing.T) {
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin), st.RegisterFilterPlugin("FalseFilter", NewFalseFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3", "2", "1"}, nodes: []string{"3", "2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
name: "test 7", name: "test 7",
@@ -429,6 +487,9 @@ func TestGenericScheduler(t *testing.T) {
st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin), st.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin), st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
}, },
registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
pods: []*v1.Pod{ pods: []*v1.Pod{
{ {
ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}, ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")},
@@ -441,7 +502,6 @@ func TestGenericScheduler(t *testing.T) {
}, },
}, },
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2", UID: types.UID("2")}},
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
name: "test 8", name: "test 8",
wErr: &FitError{ wErr: &FitError{
@@ -530,11 +590,14 @@ func TestGenericScheduler(t *testing.T) {
registerFilterPlugins: []st.RegisterFilterPluginFunc{ registerFilterPlugins: []st.RegisterFilterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin), st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
}, },
prioritizers: []priorities.PriorityConfig{{Map: falseMapPriority, Weight: 1}, {Map: trueMapPriority, Reduce: getNodeReducePriority, Weight: 2}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1),
st.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2),
},
nodes: []string{"2", "1"}, nodes: []string{"2", "1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
name: "test error with priority map", name: "test error with priority map",
wErr: errors.NewAggregate([]error{errPrioritize, errPrioritize}), wErr: fmt.Errorf("error while running score plugin for pod \"2\": %+v", errPrioritize),
}, },
{ {
name: "test even pods spread predicate - 2 nodes with maxskew=1", name: "test even pods spread predicate - 2 nodes with maxskew=1",
@@ -644,7 +707,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}), NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@@ -665,7 +730,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}), NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"3"}, nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@@ -686,7 +753,9 @@ func TestGenericScheduler(t *testing.T) {
NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}), NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}),
), ),
}, },
prioritizers: []priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}}, registerScorePlugins: []st.RegisterScorePluginFunc{
st.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
},
nodes: []string{"1", "2"}, nodes: []string{"1", "2"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}}, pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil, expectedHosts: nil,
@@ -701,11 +770,15 @@ func TestGenericScheduler(t *testing.T) {
registry := framework.Registry{} registry := framework.Registry{}
plugins := &schedulerapi.Plugins{ plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{}, Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
} }
var pluginConfigs []schedulerapi.PluginConfig var pluginConfigs []schedulerapi.PluginConfig
for _, f := range test.registerFilterPlugins { for _, f := range test.registerFilterPlugins {
f(&registry, plugins, pluginConfigs) f(&registry, plugins, pluginConfigs)
} }
for _, f := range test.registerScorePlugins {
f(&registry, plugins, pluginConfigs)
}
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs) fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs)
cache := internalcache.New(time.Duration(0), wait.NeverStop) cache := internalcache.New(time.Duration(0), wait.NeverStop)
@@ -730,7 +803,6 @@ func TestGenericScheduler(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predMetaProducer, predMetaProducer,
test.prioritizers,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@@ -778,7 +850,6 @@ func makeScheduler(nodes []*v1.Node, fns ...st.RegisterFilterPluginFunc) *generi
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
algorithmpredicates.EmptyMetadataProducer, algorithmpredicates.EmptyMetadataProducer,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@@ -908,7 +979,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
queue, queue,
nil, nil,
algorithmpredicates.EmptyMetadataProducer, algorithmpredicates.EmptyMetadataProducer,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,
@@ -1063,22 +1133,6 @@ func TestZeroRequest(t *testing.T) {
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
// This should match the configuration in defaultPriorities() in
// pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.
priorityConfigs := []priorities.PriorityConfig{
{Map: priorities.LeastRequestedPriorityMap, Weight: 1},
{Map: priorities.BalancedResourceAllocationMap, Weight: 1},
}
selectorSpreadPriorityMap, selectorSpreadPriorityReduce := priorities.NewSelectorSpreadPriority(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
)
pc := priorities.PriorityConfig{Map: selectorSpreadPriorityMap, Reduce: selectorSpreadPriorityReduce, Weight: 1}
priorityConfigs = append(priorityConfigs, pc)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes)) snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
metaDataProducer := priorities.NewMetadataFactory( metaDataProducer := priorities.NewMetadataFactory(
@@ -1091,15 +1145,40 @@ func TestZeroRequest(t *testing.T) {
metaData := metaDataProducer(test.pod, test.nodes, snapshot) metaData := metaDataProducer(test.pod, test.nodes, snapshot)
registry := framework.Registry{}
plugins := &schedulerapi.Plugins{
Filter: &schedulerapi.PluginSet{},
Score: &schedulerapi.PluginSet{},
}
var pluginConfigs []schedulerapi.PluginConfig
pluginRegistrations := []st.RegisterScorePluginFunc{
st.RegisterScorePlugin(noderesources.LeastAllocatedName, noderesources.NewLeastAllocated, 1),
st.RegisterScorePlugin(noderesources.BalancedAllocationName, noderesources.NewBalancedAllocation, 1),
st.RegisterScorePlugin(defaultpodtopologyspread.Name, defaultpodtopologyspread.New, 1),
}
for _, f := range pluginRegistrations {
f(&registry, plugins, pluginConfigs)
}
fwk, err := framework.NewFramework(
registry,
plugins,
pluginConfigs,
framework.WithInformerFactory(informerFactory),
framework.WithSnapshotSharedLister(snapshot),
framework.WithClientSet(client),
)
if err != nil {
t.Fatalf("error creating framework: %+v", err)
}
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
nil, nil,
nil, nil,
nil, nil,
nil, nil,
priorityConfigs,
metaDataProducer, metaDataProducer,
emptySnapshot, emptySnapshot,
emptyFramework, fwk,
[]algorithm.SchedulerExtender{}, []algorithm.SchedulerExtender{},
nil, nil,
nil, nil,
@@ -1539,7 +1618,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
factory.GetPredicateMetadata, factory.GetPredicateMetadata,
nil,
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
snapshot, snapshot,
fwk, fwk,
@@ -2283,7 +2361,6 @@ func TestPreempt(t *testing.T) {
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predMetaProducer, predMetaProducer,
[]priorities.PriorityConfig{{Map: numericMapPriority, Weight: 1}},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
emptySnapshot, emptySnapshot,
fwk, fwk,

View File

@@ -227,7 +227,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
return nil, err return nil, err
} }
priorityConfigs, pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys) pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -285,7 +285,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
podQueue, podQueue,
predicateFuncs, predicateFuncs,
predicateMetaProducer, predicateMetaProducer,
priorityConfigs,
priorityMetaProducer, priorityMetaProducer,
c.nodeInfoSnapshot, c.nodeInfoSnapshot,
framework, framework,
@@ -327,20 +326,20 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
} }
} }
// getPriorityConfigs
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run // getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was // as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority. // registered for that priority.
func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, *schedulerapi.Plugins, []schedulerapi.PluginConfig, error) { func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, c.algorithmFactoryArgs) allPriorityConfigs, err := getPriorityFunctionConfigs(priorityKeys, c.algorithmFactoryArgs)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, err
} }
if c.pluginConfigProducerRegistry == nil { if c.pluginConfigProducerRegistry == nil {
return allPriorityConfigs, nil, nil, nil return nil, nil, nil
} }
var priorityConfigs []priorities.PriorityConfig
var plugins schedulerapi.Plugins var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig var pluginConfig []schedulerapi.PluginConfig
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
@@ -351,11 +350,9 @@ func (c *Configurator) getPriorityConfigs(priorityKeys sets.String) ([]prioritie
pl, pc := producer(args) pl, pc := producer(args)
plugins.Append(&pl) plugins.Append(&pl)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
} else {
priorityConfigs = append(priorityConfigs, p)
} }
} }
return priorityConfigs, &plugins, pluginConfig, nil return &plugins, pluginConfig, nil
} }
// getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run // getPredicateConfigs returns predicates configuration: ones that will run as fitPredicates and ones that will run

View File

@@ -270,9 +270,6 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) { if !foundPlugin(c.Plugins.Filter.Enabled, predicateOne) {
t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName) t.Errorf("Expected predicate PredicateOne from %q", schedulerapi.SchedulerDefaultProviderName)
} }
if len(c.Algorithm.Prioritizers()) != 1 || c.Algorithm.Prioritizers()[0].Name != "PriorityOne" {
t.Errorf("Expected priority PriorityOne from %q", schedulerapi.SchedulerDefaultProviderName)
}
} }
func foundPlugin(plugins []schedulerapi.Plugin, name string) bool { func foundPlugin(plugins []schedulerapi.Plugin, name string) bool {
@@ -316,9 +313,6 @@ func TestCreateFromConfigWithEmptyPredicatesOrPriorities(t *testing.T) {
if len(c.Algorithm.Predicates()) != 0 { if len(c.Algorithm.Predicates()) != 0 {
t.Error("Expected empty predicate sets") t.Error("Expected empty predicate sets")
} }
if len(c.Algorithm.Prioritizers()) != 0 {
t.Error("Expected empty priority sets")
}
} }
func PredicateFunc(pod *v1.Pod, meta predicates.Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) { func PredicateFunc(pod *v1.Pod, meta predicates.Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []predicates.PredicateFailureReason, error) {
@@ -854,15 +848,6 @@ func TestCreateWithFrameworkPlugins(t *testing.T) {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff) t.Errorf("unexpected predicates diff (-want, +got): %s", diff)
} }
gotPriorities := sets.NewString()
for _, p := range c.Algorithm.Prioritizers() {
gotPriorities.Insert(p.Name)
}
wantPriorities := sets.NewString(priorityThreeName)
if diff := cmp.Diff(wantPriorities, gotPriorities); diff != "" {
t.Errorf("unexpected priorities diff (-want, +got): %s", diff)
}
// Verify the aggregated configuration. // Verify the aggregated configuration.
wantPlugins := schedulerapi.Plugins{ wantPlugins := schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{}, QueueSort: &schedulerapi.PluginSet{},

View File

@@ -167,9 +167,7 @@ func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleStat
func (es mockScheduler) Predicates() map[string]predicates.FitPredicate { func (es mockScheduler) Predicates() map[string]predicates.FitPredicate {
return nil return nil
} }
func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
return nil
}
func (es mockScheduler) Extenders() []algorithm.SchedulerExtender { func (es mockScheduler) Extenders() []algorithm.SchedulerExtender {
return nil return nil
} }
@@ -690,7 +688,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
internalqueue.NewSchedulingQueue(nil), internalqueue.NewSchedulingQueue(nil),
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(), nodeinfosnapshot.NewEmptySnapshot(),
fwk, fwk,
@@ -749,7 +746,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
queue, queue,
nil, nil,
predicates.EmptyMetadataProducer, predicates.EmptyMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyMetadataProducer, priorities.EmptyMetadataProducer,
nodeinfosnapshot.NewEmptySnapshot(), nodeinfosnapshot.NewEmptySnapshot(),
fwk, fwk,
@@ -1004,7 +1000,6 @@ func TestInitPolicyFromFile(t *testing.T) {
for i, test := range []struct { for i, test := range []struct {
policy string policy string
expectedPredicates sets.String expectedPredicates sets.String
expectedPrioritizers sets.String
}{ }{
// Test json format policy file // Test json format policy file
{ {
@@ -1024,10 +1019,6 @@ func TestInitPolicyFromFile(t *testing.T) {
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
}, },
// Test yaml format policy file // Test yaml format policy file
{ {
@@ -1046,10 +1037,6 @@ priorities:
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
}, },
} { } {
file := fmt.Sprintf("scheduler-policy-config-file-%d", i) file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
@@ -1077,8 +1064,5 @@ priorities:
if !schedPredicates.Equal(test.expectedPredicates) { if !schedPredicates.Equal(test.expectedPredicates) {
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates) t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
} }
if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
}
} }
} }

View File

@@ -35,3 +35,18 @@ func RegisterFilterPlugin(pluginName string, pluginNewFunc framework.PluginFacto
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName}) pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
} }
} }
// RegisterScorePluginFunc is a function signature used in method RegisterScorePlugin()
// to register a Score Plugin to a given registry.
type RegisterScorePluginFunc func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig)
// RegisterScorePlugin returns a function to register a Score Plugin to a given registry.
func RegisterScorePlugin(pluginName string, pluginNewFunc framework.PluginFactory, weight int32) RegisterScorePluginFunc {
return func(reg *framework.Registry, plugins *schedulerapi.Plugins, pluginConfigs []schedulerapi.PluginConfig) {
reg.Register(pluginName, pluginNewFunc)
plugins.Score.Enabled = append(plugins.Score.Enabled, schedulerapi.Plugin{Name: pluginName, Weight: weight})
//lint:ignore SA4006 this value of pluginConfigs is never used.
//lint:ignore SA4010 this result of append is never used.
pluginConfigs = append(pluginConfigs, schedulerapi.PluginConfig{Name: pluginName})
}
}

View File

@@ -93,7 +93,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
for i, test := range []struct { for i, test := range []struct {
policy string policy string
expectedPredicates sets.String expectedPredicates sets.String
expectedPrioritizers sets.String
expectedPlugins map[string][]kubeschedulerconfig.Plugin expectedPlugins map[string][]kubeschedulerconfig.Plugin
}{ }{
{ {
@@ -113,10 +112,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -129,7 +124,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"kind" : "Policy", "kind" : "Policy",
"apiVersion" : "v1" "apiVersion" : "v1"
}`, }`,
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -167,7 +161,6 @@ func TestSchedulerCreationFromConfigMap(t *testing.T) {
"priorities" : [] "priorities" : []
}`, }`,
expectedPredicates: sets.NewString(), expectedPredicates: sets.NewString(),
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -191,10 +184,6 @@ priorities:
"PredicateOne", "PredicateOne",
"PredicateTwo", "PredicateTwo",
), ),
expectedPrioritizers: sets.NewString(
"PriorityOne",
"PriorityTwo",
),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -206,7 +195,6 @@ priorities:
policy: `apiVersion: v1 policy: `apiVersion: v1
kind: Policy kind: Policy
`, `,
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -243,7 +231,6 @@ predicates: []
priorities: [] priorities: []
`, `,
expectedPredicates: sets.NewString(), expectedPredicates: sets.NewString(),
expectedPrioritizers: sets.NewString(),
expectedPlugins: map[string][]kubeschedulerconfig.Plugin{ expectedPlugins: map[string][]kubeschedulerconfig.Plugin{
"FilterPlugin": { "FilterPlugin": {
{Name: "NodeUnschedulable"}, {Name: "NodeUnschedulable"},
@@ -294,16 +281,9 @@ priorities: []
for k := range sched.Algorithm.Predicates() { for k := range sched.Algorithm.Predicates() {
schedPredicates.Insert(k) schedPredicates.Insert(k)
} }
schedPrioritizers := sets.NewString()
for _, p := range sched.Algorithm.Prioritizers() {
schedPrioritizers.Insert(p.Name)
}
if !schedPredicates.Equal(test.expectedPredicates) { if !schedPredicates.Equal(test.expectedPredicates) {
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates) t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
} }
if !schedPrioritizers.Equal(test.expectedPrioritizers) {
t.Errorf("Expected priority functions %v, got %v", test.expectedPrioritizers, schedPrioritizers)
}
schedPlugins := sched.Framework.ListPlugins() schedPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" { if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" {
t.Errorf("unexpected predicates diff (-want, +got): %s", diff) t.Errorf("unexpected predicates diff (-want, +got): %s", diff)