Move scheduler plugin set and configuration defaulting to component config

This commit is contained in:
Abdullah Gharaibeh
2021-06-10 08:45:49 -04:00
parent 21ee533508
commit 265ef1741f
37 changed files with 3200 additions and 3382 deletions

View File

@@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
@@ -34,10 +33,7 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta1"
"k8s.io/kube-scheduler/config/v1beta2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
@@ -72,8 +68,6 @@ var allClusterEvents = []framework.ClusterEvent{
{Resource: framework.StorageClass, ActionType: framework.All},
}
var configDecoder = scheme.Codecs.UniversalDecoder()
// frameworkImpl is the component responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
@@ -116,7 +110,7 @@ type frameworkImpl struct {
// frameworkImpl.
type extensionPoint struct {
// the set of plugins to be configured at this extension point.
plugins config.PluginSet
plugins *config.PluginSet
// a pointer to the slice storing plugins implementations that will run at this
// extension point.
slicePtr interface{}
@@ -124,17 +118,17 @@ type extensionPoint struct {
func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
return []extensionPoint{
{plugins.PreFilter, &f.preFilterPlugins},
{plugins.Filter, &f.filterPlugins},
{plugins.PostFilter, &f.postFilterPlugins},
{plugins.Reserve, &f.reservePlugins},
{plugins.PreScore, &f.preScorePlugins},
{plugins.Score, &f.scorePlugins},
{plugins.PreBind, &f.preBindPlugins},
{plugins.Bind, &f.bindPlugins},
{plugins.PostBind, &f.postBindPlugins},
{plugins.Permit, &f.permitPlugins},
{plugins.QueueSort, &f.queueSortPlugins},
{&plugins.PreFilter, &f.preFilterPlugins},
{&plugins.Filter, &f.filterPlugins},
{&plugins.PostFilter, &f.postFilterPlugins},
{&plugins.Reserve, &f.reservePlugins},
{&plugins.PreScore, &f.preScorePlugins},
{&plugins.Score, &f.scorePlugins},
{&plugins.PreBind, &f.preBindPlugins},
{&plugins.Bind, &f.bindPlugins},
{&plugins.PostBind, &f.postBindPlugins},
{&plugins.Permit, &f.permitPlugins},
{&plugins.QueueSort, &f.queueSortPlugins},
}
}
@@ -335,10 +329,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
continue
}
args, err := getPluginArgsOrDefault(options.componentConfigVersion, pluginConfig, name)
if err != nil {
return nil, fmt.Errorf("getting args for Plugin %q: %w", name, err)
}
args := pluginConfig[name]
if args != nil {
outputProfile.PluginConfig = append(outputProfile.PluginConfig, config.PluginConfig{
Name: name,
@@ -356,7 +347,7 @@ func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Opti
}
for _, e := range f.getExtensionPoints(profile.Plugins) {
if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil {
if err := updatePluginList(e.slicePtr, *e.plugins, pluginsMap); err != nil {
return nil, err
}
}
@@ -424,34 +415,6 @@ func registerClusterEvents(name string, eventToPlugins map[framework.ClusterEven
}
}
// getPluginArgsOrDefault returns a configuration provided by the user or builds
// a default from the scheme. Returns `nil, nil` if the plugin does not have a
// defined arg types, such as in-tree plugins that don't require configuration
// or out-of-tree plugins.
func getPluginArgsOrDefault(componentConfigVersion string, pluginConfig map[string]runtime.Object, name string) (runtime.Object, error) {
res, ok := pluginConfig[name]
if ok {
return res, nil
}
// Use defaults from latest config API version.
var gvk schema.GroupVersionKind
switch componentConfigVersion {
case v1beta1.SchemeGroupVersion.String():
gvk = v1beta1.SchemeGroupVersion.WithKind(name + "Args")
case v1beta2.SchemeGroupVersion.String():
gvk = v1beta2.SchemeGroupVersion.WithKind(name + "Args")
default:
// default to v1beta2 (latest API)
gvk = v1beta2.SchemeGroupVersion.WithKind(name + "Args")
}
obj, _, err := configDecoder.Decode(nil, &gvk, nil)
if runtime.IsNotRegisteredError(err) {
// This plugin is out-of-tree or doesn't require configuration.
return nil, nil
}
return obj, err
}
func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
plugins := reflect.ValueOf(pluginList).Elem()
pluginType := plugins.Type().Elem()
@@ -1153,10 +1116,10 @@ func (f *frameworkImpl) HasScorePlugins() bool {
// ListPlugins returns a map of extension point name to plugin names configured at each extension
// point. Returns nil if no plugins where configured.
func (f *frameworkImpl) ListPlugins() map[string][]config.Plugin {
m := make(map[string][]config.Plugin)
func (f *frameworkImpl) ListPlugins() *config.Plugins {
m := config.Plugins{}
for _, e := range f.getExtensionPoints(&config.Plugins{}) {
for _, e := range f.getExtensionPoints(&m) {
plugins := reflect.ValueOf(e.slicePtr).Elem()
extName := plugins.Type().Elem().Name()
var cfgs []config.Plugin
@@ -1170,13 +1133,10 @@ func (f *frameworkImpl) ListPlugins() map[string][]config.Plugin {
cfgs = append(cfgs, p)
}
if len(cfgs) > 0 {
m[extName] = cfgs
e.plugins.Enabled = cfgs
}
}
if len(m) > 0 {
return m
}
return nil
return &m
}
// ClientSet returns a kubernetes clientset.
@@ -1206,7 +1166,7 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config
return pgMap
}
find := func(pgs config.PluginSet) {
find := func(pgs *config.PluginSet) {
for _, pg := range pgs.Enabled {
pgMap[pg.Name] = pg
}