Add MultiPoint scheduler plugin config field
This commit is contained in:
@@ -289,22 +289,6 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
|
||||
return f, nil
|
||||
}
|
||||
|
||||
var totalPriority int64
|
||||
for _, e := range profile.Plugins.Score.Enabled {
|
||||
// a weight of zero is not permitted, plugins can be disabled explicitly
|
||||
// when configured.
|
||||
f.scorePluginWeight[e.Name] = int(e.Weight)
|
||||
if f.scorePluginWeight[e.Name] == 0 {
|
||||
f.scorePluginWeight[e.Name] = 1
|
||||
}
|
||||
|
||||
// Checks totalPriority against MaxTotalScore to avoid overflow
|
||||
if int64(f.scorePluginWeight[e.Name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
|
||||
return nil, fmt.Errorf("total score of Score plugins could overflow")
|
||||
}
|
||||
totalPriority += int64(f.scorePluginWeight[e.Name]) * framework.MaxNodeScore
|
||||
}
|
||||
|
||||
// get needed plugins from config
|
||||
pg := f.pluginsNeeded(profile.Plugins)
|
||||
|
||||
@@ -346,12 +330,28 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
|
||||
fillEventToPluginMap(p, options.clusterEventMap)
|
||||
}
|
||||
|
||||
// initialize plugins per individual extension points
|
||||
for _, e := range f.getExtensionPoints(profile.Plugins) {
|
||||
if err := updatePluginList(e.slicePtr, *e.plugins, pluginsMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// initialize multiPoint plugins to their expanded extension points
|
||||
if len(profile.Plugins.MultiPoint.Enabled) > 0 {
|
||||
if err := f.expandMultiPointPlugins(profile, pluginsMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(f.queueSortPlugins) != 1 {
|
||||
return nil, fmt.Errorf("one queue sort plugin required for profile with scheduler name %q", profile.SchedulerName)
|
||||
}
|
||||
|
||||
if err := getScoreWeights(f, pluginsMap, append(profile.Plugins.Score.Enabled, profile.Plugins.MultiPoint.Enabled...)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Verifying the score weights again since Plugin.Name() could return a different
|
||||
// value from the one used in the configuration.
|
||||
for _, scorePlugin := range f.scorePlugins {
|
||||
@@ -384,6 +384,106 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
|
||||
// plugin weights there is not an overflow of MaxTotalScore.
|
||||
func getScoreWeights(f *frameworkImpl, pluginsMap map[string]framework.Plugin, plugins []config.Plugin) error {
|
||||
var totalPriority int64
|
||||
scorePlugins := reflect.ValueOf(&f.scorePlugins).Elem()
|
||||
pluginType := scorePlugins.Type().Elem()
|
||||
for _, e := range plugins {
|
||||
pg := pluginsMap[e.Name]
|
||||
if !reflect.TypeOf(pg).Implements(pluginType) {
|
||||
continue
|
||||
}
|
||||
|
||||
// We append MultiPoint plugins to the list of Score plugins. So if this plugin has already been
|
||||
// encountered, let the individual Score weight take precedence.
|
||||
if _, ok := f.scorePluginWeight[e.Name]; ok {
|
||||
continue
|
||||
}
|
||||
// a weight of zero is not permitted, plugins can be disabled explicitly
|
||||
// when configured.
|
||||
f.scorePluginWeight[e.Name] = int(e.Weight)
|
||||
if f.scorePluginWeight[e.Name] == 0 {
|
||||
f.scorePluginWeight[e.Name] = 1
|
||||
}
|
||||
|
||||
// Checks totalPriority against MaxTotalScore to avoid overflow
|
||||
if int64(f.scorePluginWeight[e.Name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
|
||||
return fmt.Errorf("total score of Score plugins could overflow")
|
||||
}
|
||||
totalPriority += int64(f.scorePluginWeight[e.Name]) * framework.MaxNodeScore
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *frameworkImpl) expandMultiPointPlugins(profile *config.KubeSchedulerProfile, pluginsMap map[string]framework.Plugin) error {
|
||||
// initialize MultiPoint plugins
|
||||
for _, e := range f.getExtensionPoints(profile.Plugins) {
|
||||
plugins := reflect.ValueOf(e.slicePtr).Elem()
|
||||
pluginType := plugins.Type().Elem()
|
||||
// build enabledSet of plugins already registered via normal extension points
|
||||
// to check double registration
|
||||
enabledSet := sets.NewString()
|
||||
for _, plugin := range e.plugins.Enabled {
|
||||
enabledSet.Insert(plugin.Name)
|
||||
}
|
||||
|
||||
disabledSet := sets.NewString()
|
||||
for _, disabledPlugin := range e.plugins.Disabled {
|
||||
disabledSet.Insert(disabledPlugin.Name)
|
||||
}
|
||||
if disabledSet.Has("*") {
|
||||
klog.V(4).InfoS("all plugins disabled for extension point, skipping MultiPoint expansion", "extension", pluginType)
|
||||
continue
|
||||
}
|
||||
|
||||
// track plugins enabled via multipoint separately from those enabled by specific extensions,
|
||||
// so that we can distinguish between double-registration and explicit overrides
|
||||
multiPointEnabled := sets.NewString()
|
||||
|
||||
for _, ep := range profile.Plugins.MultiPoint.Enabled {
|
||||
pg, ok := pluginsMap[ep.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
|
||||
}
|
||||
|
||||
// if this plugin doesn't implement the type for the current extension we're trying to expand, skip
|
||||
if !reflect.TypeOf(pg).Implements(pluginType) {
|
||||
continue
|
||||
}
|
||||
|
||||
// a plugin that's enabled via MultiPoint can still be disabled for specific extension points
|
||||
if disabledSet.Has(ep.Name) {
|
||||
klog.V(4).InfoS("plugin disabled for extension point", "plugin", ep.Name, "extension", pluginType)
|
||||
continue
|
||||
}
|
||||
|
||||
// if this plugin has already been enabled by the specific extension point,
|
||||
// the user intent is to override the default plugin or make some other explicit setting.
|
||||
// Either way, discard the MultiPoint value for this plugin.
|
||||
// This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582)
|
||||
if enabledSet.Has(ep.Name) {
|
||||
klog.InfoS("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
// if this plugin is already registered via MultiPoint, then this is
|
||||
// a double registration and an error in the config.
|
||||
if multiPointEnabled.Has(ep.Name) {
|
||||
return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
|
||||
}
|
||||
|
||||
// we only need to update the multipoint set, since we already have the specific extension set from above
|
||||
multiPointEnabled.Insert(ep.Name)
|
||||
|
||||
newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
|
||||
plugins.Set(newPlugins)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func fillEventToPluginMap(p framework.Plugin, eventToPlugins map[framework.ClusterEvent]sets.String) {
|
||||
ext, ok := p.(framework.EnqueueExtensions)
|
||||
if !ok {
|
||||
@@ -1174,6 +1274,9 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
|
||||
for _, e := range f.getExtensionPoints(plugins) {
|
||||
find(e.plugins)
|
||||
}
|
||||
|
||||
// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
|
||||
find(&plugins.MultiPoint)
|
||||
return pgMap
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user