Define algorithm providers in terms of plugins.

This commit is contained in:
Abdullah Gharaibeh
2019-12-27 14:04:21 -05:00
parent 8743d98431
commit a6b7b0d95e
32 changed files with 1245 additions and 1741 deletions

View File

@@ -15,6 +15,7 @@ go_library(
"//pkg/client/leaderelectionconfig:go_default_library", "//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/master/ports:go_default_library", "//pkg/master/ports:go_default_library",
"//pkg/scheduler:go_default_library", "//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/v1alpha1:go_default_library", "//pkg/scheduler/apis/config/v1alpha1:go_default_library",

View File

@@ -22,7 +22,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
) )
@@ -44,7 +44,7 @@ func (o *DeprecatedOptions) AddFlags(fs *pflag.FlagSet, cfg *kubeschedulerconfig
return return
} }
fs.StringVar(&o.AlgorithmProvider, "algorithm-provider", o.AlgorithmProvider, "DEPRECATED: the scheduling algorithm provider to use, one of: "+scheduler.ListAlgorithmProviders()) fs.StringVar(&o.AlgorithmProvider, "algorithm-provider", o.AlgorithmProvider, "DEPRECATED: the scheduling algorithm provider to use, one of: "+algorithmprovider.ListAlgorithmProviders())
fs.StringVar(&o.PolicyConfigFile, "policy-config-file", o.PolicyConfigFile, "DEPRECATED: file with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true") fs.StringVar(&o.PolicyConfigFile, "policy-config-file", o.PolicyConfigFile, "DEPRECATED: file with scheduler policy configuration. This file is used if policy ConfigMap is not provided or --use-legacy-policy-config=true")
usage := fmt.Sprintf("DEPRECATED: name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config=false. The config must be provided as the value of an element in 'Data' map with the key='%v'", kubeschedulerconfig.SchedulerPolicyConfigMapKey) usage := fmt.Sprintf("DEPRECATED: name of the ConfigMap object that contains scheduler's policy configuration. It must exist in the system namespace before scheduler initialization if --use-legacy-policy-config=false. The config must be provided as the value of an element in 'Data' map with the key='%v'", kubeschedulerconfig.SchedulerPolicyConfigMapKey)
fs.StringVar(&o.PolicyConfigMapName, "policy-configmap", o.PolicyConfigMapName, usage) fs.StringVar(&o.PolicyConfigMapName, "policy-configmap", o.PolicyConfigMapName, usage)

View File

@@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"algorithm_factory.go",
"eventhandlers.go", "eventhandlers.go",
"factory.go", "factory.go",
"scheduler.go", "scheduler.go",
@@ -16,20 +15,18 @@ go_library(
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/validation:go_default_library", "//pkg/scheduler/apis/config/validation:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/cache/debugger:go_default_library", "//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
@@ -59,7 +56,6 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"algorithm_factory_test.go",
"eventhandlers_test.go", "eventhandlers_test.go",
"factory_test.go", "factory_test.go",
"scheduler_test.go", "scheduler_test.go",
@@ -110,7 +106,6 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
], ],
) )

View File

@@ -396,9 +396,3 @@ func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, f
} }
return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
} }
// EvenPodsSpreadPredicate is the legacy function using old path of metadata.
// DEPRECATED
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
return false, nil, fmt.Errorf("this function should never be called")
}

View File

@@ -27,11 +27,10 @@ import (
// MetadataFactory is a factory to produce PriorityMetadata. // MetadataFactory is a factory to produce PriorityMetadata.
type MetadataFactory struct { type MetadataFactory struct {
serviceLister corelisters.ServiceLister serviceLister corelisters.ServiceLister
controllerLister corelisters.ReplicationControllerLister controllerLister corelisters.ReplicationControllerLister
replicaSetLister appslisters.ReplicaSetLister replicaSetLister appslisters.ReplicaSetLister
statefulSetLister appslisters.StatefulSetLister statefulSetLister appslisters.StatefulSetLister
hardPodAffinityWeight int32
} }
// NewMetadataFactory creates a MetadataFactory. // NewMetadataFactory creates a MetadataFactory.
@@ -40,14 +39,12 @@ func NewMetadataFactory(
controllerLister corelisters.ReplicationControllerLister, controllerLister corelisters.ReplicationControllerLister,
replicaSetLister appslisters.ReplicaSetLister, replicaSetLister appslisters.ReplicaSetLister,
statefulSetLister appslisters.StatefulSetLister, statefulSetLister appslisters.StatefulSetLister,
hardPodAffinityWeight int32,
) MetadataProducer { ) MetadataProducer {
factory := &MetadataFactory{ factory := &MetadataFactory{
serviceLister: serviceLister, serviceLister: serviceLister,
controllerLister: controllerLister, controllerLister: controllerLister,
replicaSetLister: replicaSetLister, replicaSetLister: replicaSetLister,
statefulSetLister: statefulSetLister, statefulSetLister: statefulSetLister,
hardPodAffinityWeight: hardPodAffinityWeight,
} }
return factory.PriorityMetadata return factory.PriorityMetadata
} }

View File

@@ -155,7 +155,6 @@ func TestPriorityMetadata(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(), informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(), informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(), informerFactory.Apps().V1().StatefulSets().Lister(),
1,
) )
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {

View File

@@ -352,7 +352,6 @@ func TestSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs), fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss), fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss), fakelisters.StatefulSetLister(test.sss),
1,
) )
metaData := metaDataProducer(test.pod, nodes, snapshot) metaData := metaDataProducer(test.pod, nodes, snapshot)
@@ -590,7 +589,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs), fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss), fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss), fakelisters.StatefulSetLister(test.sss),
1,
) )
metaData := metaDataProducer(test.pod, nodes, snapshot) metaData := metaDataProducer(test.pod, nodes, snapshot)
list, err := runMapReducePriority(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData, test.pod, snapshot, makeLabeledNodeList(labeledNodes)) list, err := runMapReducePriority(selectorSpread.CalculateSpreadPriorityMap, selectorSpread.CalculateSpreadPriorityReduce, metaData, test.pod, snapshot, makeLabeledNodeList(labeledNodes))

View File

@@ -1,460 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"regexp"
"sort"
"strings"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
"k8s.io/klog"
)
// AlgorithmFactoryArgs are passed to all factory functions.
type AlgorithmFactoryArgs struct {
SharedLister schedulerlisters.SharedLister
InformerFactory informers.SharedInformerFactory
VolumeBinder *volumebinder.VolumeBinder
HardPodAffinitySymmetricWeight int32
}
// PriorityMetadataProducerFactory produces MetadataProducer from the given args.
type PriorityMetadataProducerFactory func(AlgorithmFactoryArgs) priorities.MetadataProducer
var (
algorithmRegistry = &AlgorithmRegistry{
// predicate keys supported for backward compatibility with v1.Policy.
predicateKeys: sets.NewString(
"PodFitsPorts", // This exists for compatibility reasons.
predicates.PodFitsHostPortsPred,
predicates.PodFitsResourcesPred,
predicates.HostNamePred,
predicates.MatchNodeSelectorPred,
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MaxCinderVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
predicates.CheckVolumeBindingPred,
),
// priority keys to weights, this exist for backward compatibility with v1.Policy.
priorityKeys: map[string]int64{
priorities.LeastRequestedPriority: 1,
priorities.BalancedResourceAllocation: 1,
priorities.MostRequestedPriority: 1,
priorities.ImageLocalityPriority: 1,
priorities.NodeAffinityPriority: 1,
priorities.SelectorSpreadPriority: 1,
priorities.ServiceSpreadingPriority: 1,
priorities.TaintTolerationPriority: 1,
priorities.InterPodAffinityPriority: 1,
priorities.NodePreferAvoidPodsPriority: 10000,
},
// MandatoryPredicates the set of keys for predicates that the scheduler will
// be configured with all the time.
mandatoryPredicateKeys: sets.NewString(
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
),
algorithmProviders: make(map[string]AlgorithmProviderConfig),
}
// Registered metadata producers
priorityMetadataProducerFactory PriorityMetadataProducerFactory
schedulerFactoryMutex sync.RWMutex
)
// AlgorithmProviderConfig is used to store the configuration of algorithm providers.
type AlgorithmProviderConfig struct {
PredicateKeys sets.String
PriorityKeys sets.String
}
// AlgorithmRegistry is used to store current state of registered predicates and priorities.
type AlgorithmRegistry struct {
predicateKeys sets.String
priorityKeys map[string]int64
mandatoryPredicateKeys sets.String
algorithmProviders map[string]AlgorithmProviderConfig
}
// RegisteredPredicatesAndPrioritiesSnapshot returns a snapshot of current registered predicates and priorities.
func RegisteredPredicatesAndPrioritiesSnapshot() *AlgorithmRegistry {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
copy := AlgorithmRegistry{
predicateKeys: sets.NewString(),
mandatoryPredicateKeys: sets.NewString(),
priorityKeys: make(map[string]int64),
algorithmProviders: make(map[string]AlgorithmProviderConfig),
}
for k := range algorithmRegistry.predicateKeys {
copy.predicateKeys.Insert(k)
}
for k := range algorithmRegistry.mandatoryPredicateKeys {
copy.mandatoryPredicateKeys.Insert(k)
}
for k, v := range algorithmRegistry.priorityKeys {
copy.priorityKeys[k] = v
}
for provider, config := range algorithmRegistry.algorithmProviders {
copyPredKeys, copyPrioKeys := sets.NewString(), sets.NewString()
for k := range config.PredicateKeys {
copyPredKeys.Insert(k)
}
for k := range config.PriorityKeys {
copyPrioKeys.Insert(k)
}
copy.algorithmProviders[provider] = AlgorithmProviderConfig{
PredicateKeys: copyPredKeys,
PriorityKeys: copyPrioKeys,
}
}
return &copy
}
// ApplyPredicatesAndPriorities sets state of predicates and priorities to `s`.
func ApplyPredicatesAndPriorities(s *AlgorithmRegistry) {
schedulerFactoryMutex.Lock()
algorithmRegistry = s
schedulerFactoryMutex.Unlock()
}
// RegisterPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
// TODO(Huang-Wei): remove this.
func RegisterPredicate(name string) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.predicateKeys.Insert(name)
return name
}
// RegisterMandatoryPredicate registers a mandatory predicate.
func RegisterMandatoryPredicate(name string) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.predicateKeys.Insert(name)
algorithmRegistry.mandatoryPredicateKeys.Insert(name)
return name
}
// AddPredicateToAlgorithmProviders adds a predicate key to all algorithm providers.
func AddPredicateToAlgorithmProviders(key string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
for _, provider := range algorithmRegistry.algorithmProviders {
provider.PredicateKeys.Insert(key)
}
}
// AddPriorityToAlgorithmProviders adds a priority key to all algorithm providers.
func AddPriorityToAlgorithmProviders(key string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
for _, provider := range algorithmRegistry.algorithmProviders {
provider.PriorityKeys.Insert(key)
}
}
// RegisterCustomPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func RegisterCustomPredicate(policy schedulerapi.PredicatePolicy, pluginArgs *plugins.ConfigProducerArgs) string {
var ok bool
var predicate string
validatePredicateOrDie(policy)
// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
// We use the ServiceAffinity predicate name for all ServiceAffinity custom predicates.
// It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckServiceAffinityPred
// map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin.
if pluginArgs.ServiceAffinityArgs == nil {
pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...)
} else if policy.Argument.LabelsPresence != nil {
// We use the CheckNodeLabelPresencePred predicate name for all kNodeLabel custom predicates.
// It may get called multiple times but we essentially only register one instance of NodeLabel predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckNodeLabelPresencePred
// Map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if pluginArgs.NodeLabelArgs == nil {
pluginArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelsPresence.Presence {
pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
}
} else if _, ok = algorithmRegistry.predicateKeys[policy.Name]; ok {
// checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return policy.Name
}
if len(predicate) == 0 {
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
}
return predicate
}
// RegisterPriorityMetadataProducerFactory registers a PriorityMetadataProducerFactory.
func RegisterPriorityMetadataProducerFactory(f PriorityMetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
priorityMetadataProducerFactory = f
}
// RegisterPriority registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriority(name string, weight int64) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.priorityKeys[name] = weight
return name
}
// RegisterCustomPriority registers a custom priority with the algorithm registry.
// Returns the name, with which the priority function was registered.
func RegisterCustomPriority(policy schedulerapi.PriorityPolicy, configProducerArgs *plugins.ConfigProducerArgs) string {
var priority string
var weight int64
validatePriorityOrDie(policy)
// generate the priority function, if a custom priority is requested
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
// We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities.
// It may get called multiple times but we essentially only register one instance of
// ServiceAffinity priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = serviceaffinity.Name
if configProducerArgs.ServiceAffinityArgs == nil {
configProducerArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference = append(configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, policy.Argument.ServiceAntiAffinity.Label)
weight = policy.Weight
schedulerFactoryMutex.RLock()
if existingWeight, ok := algorithmRegistry.priorityKeys[priority]; ok {
// If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
// score plugin is n*(weight of each priority).
weight += existingWeight
}
schedulerFactoryMutex.RUnlock()
} else if policy.Argument.LabelPreference != nil {
// We use the NodeLabel plugin name for all NodeLabel custom priorities.
// It may get called multiple times but we essentially only register one instance of NodeLabel priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = nodelabel.Name
if configProducerArgs.NodeLabelArgs == nil {
configProducerArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelPreference.Presence {
configProducerArgs.NodeLabelArgs.PresentLabelsPreference = append(configProducerArgs.NodeLabelArgs.PresentLabelsPreference, policy.Argument.LabelPreference.Label)
} else {
configProducerArgs.NodeLabelArgs.AbsentLabelsPreference = append(configProducerArgs.NodeLabelArgs.AbsentLabelsPreference, policy.Argument.LabelPreference.Label)
}
weight = policy.Weight
schedulerFactoryMutex.RLock()
if existingWeight, ok := algorithmRegistry.priorityKeys[priority]; ok {
// If there are n NodeLabel priority configured in the policy, the weight for the corresponding
// priority is n*(weight of each priority in policy).
weight += existingWeight
}
schedulerFactoryMutex.RUnlock()
} else if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
configProducerArgs.RequestedToCapacityRatioArgs = &noderesources.RequestedToCapacityRatioArgs{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
// We do not allow specifying the name for custom plugins, see #83472
priority = noderesources.RequestedToCapacityRatioName
weight = policy.Weight
}
} else if _, ok := algorithmRegistry.priorityKeys[policy.Name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
// set/update the weight based on the policy
priority = policy.Name
weight = policy.Weight
}
if len(priority) == 0 {
klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
}
return RegisterPriority(priority, weight)
}
func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *schedulerapi.RequestedToCapacityRatioArguments) (noderesources.FunctionShape, noderesources.ResourceToWeightMap) {
n := len(arguments.Shape)
points := make([]noderesources.FunctionShapePoint, 0, n)
for _, point := range arguments.Shape {
points = append(points, noderesources.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / schedulerapi.MaxCustomPriorityScore),
})
}
shape, err := noderesources.NewFunctionShape(points)
if err != nil {
klog.Fatalf("invalid RequestedToCapacityRatioPriority arguments: %s", err.Error())
}
resourceToWeightMap := make(noderesources.ResourceToWeightMap, 0)
if len(arguments.Resources) == 0 {
resourceToWeightMap = noderesources.DefaultRequestedRatioResources
return shape, resourceToWeightMap
}
for _, resource := range arguments.Resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
if resource.Weight == 0 {
resourceToWeightMap[v1.ResourceName(resource.Name)] = 1
}
}
return shape, resourceToWeightMap
}
// RegisterAlgorithmProvider registers a new algorithm provider with the algorithm registry.
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmRegistry.algorithmProviders[name] = AlgorithmProviderConfig{
PredicateKeys: predicateKeys,
PriorityKeys: priorityKeys,
}
return name
}
// GetAlgorithmProvider should not be used to modify providers. It is publicly visible for testing.
func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
provider, ok := algorithmRegistry.algorithmProviders[name]
if !ok {
return nil, fmt.Errorf("provider %q is not registered", name)
}
return &provider, nil
}
func getPriorityMetadataProducer(args AlgorithmFactoryArgs) (priorities.MetadataProducer, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
if priorityMetadataProducerFactory == nil {
return priorities.EmptyMetadataProducer, nil
}
return priorityMetadataProducerFactory(args), nil
}
var validName = regexp.MustCompile("^[a-zA-Z0-9]([-a-zA-Z0-9]*[a-zA-Z0-9])$")
func validateAlgorithmNameOrDie(name string) {
if !validName.MatchString(name) {
klog.Fatalf("Algorithm name %v does not match the name validation regexp \"%v\".", name, validName)
}
}
func validatePredicateOrDie(predicate schedulerapi.PredicatePolicy) {
if predicate.Argument != nil {
numArgs := 0
if predicate.Argument.ServiceAffinity != nil {
numArgs++
}
if predicate.Argument.LabelsPresence != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 predicate argument is required, numArgs: %v, Predicate: %s", numArgs, predicate.Name)
}
}
}
func validatePriorityOrDie(priority schedulerapi.PriorityPolicy) {
if priority.Argument != nil {
numArgs := 0
if priority.Argument.ServiceAntiAffinity != nil {
numArgs++
}
if priority.Argument.LabelPreference != nil {
numArgs++
}
if priority.Argument.RequestedToCapacityRatioArguments != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 priority argument is required, numArgs: %v, Priority: %s", numArgs, priority.Name)
}
}
}
// ListAlgorithmProviders is called when listing all available algorithm providers in `kube-scheduler --help`
func ListAlgorithmProviders() string {
var availableAlgorithmProviders []string
for name := range algorithmRegistry.algorithmProviders {
availableAlgorithmProviders = append(availableAlgorithmProviders, name)
}
sort.Strings(availableAlgorithmProviders)
return strings.Join(availableAlgorithmProviders, " | ")
}

View File

@@ -1,100 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
)
func TestAlgorithmNameValidation(t *testing.T) {
algorithmNamesShouldValidate := []string{
"1SomeAlgo1rithm",
"someAlgor-ithm1",
}
algorithmNamesShouldNotValidate := []string{
"-SomeAlgorithm",
"SomeAlgorithm-",
"Some,Alg:orithm",
}
for _, name := range algorithmNamesShouldValidate {
t.Run(name, func(t *testing.T) {
if !validName.MatchString(name) {
t.Errorf("should be a valid algorithm name but is not valid.")
}
})
}
for _, name := range algorithmNamesShouldNotValidate {
t.Run(name, func(t *testing.T) {
if validName.MatchString(name) {
t.Errorf("should be an invalid algorithm name but is valid.")
}
})
}
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArguments(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
Resources: []schedulerapi.ResourceSpec{
{Name: string(v1.ResourceCPU)},
{Name: string(v1.ResourceMemory)},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArgumentsNilResourceToWeightMap(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}

View File

@@ -8,14 +8,10 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["registry.go"],
"plugins.go",
"registry.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider", importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider",
deps = [ deps = [
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library", "//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/imagelocality:go_default_library", "//pkg/scheduler/framework/plugins/imagelocality:go_default_library",
@@ -40,20 +36,29 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = ["registry_test.go"],
"plugins_test.go",
"registry_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/scheduler:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/defaultpodtopologyspread:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/plugins/imagelocality:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/framework/plugins/interpodaffinity:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/framework/plugins/nodeaffinity:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//pkg/scheduler/framework/plugins/nodename:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//pkg/scheduler/framework/plugins/nodeports:go_default_library",
"//pkg/scheduler/framework/plugins/nodepreferavoidpods:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/nodeunschedulable:go_default_library",
"//pkg/scheduler/framework/plugins/nodevolumelimits:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/tainttoleration:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library",
], ],
) )

View File

@@ -1,44 +1,10 @@
package(default_visibility = ["//visibility:public"]) load("@io_bazel_rules_go//go:def.bzl", "go_library")
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = ["defaults.go"],
"defaults.go",
"register_priorities.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults", importpath = "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults",
deps = [ visibility = ["//visibility:public"],
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["defaults_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
) )
filegroup( filegroup(
@@ -52,4 +18,5 @@ filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [":package-srcs"],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"],
) )

View File

@@ -16,104 +16,4 @@ limitations under the License.
package defaults package defaults
import ( // TODO(ahg-g): remove this pkg.
"k8s.io/klog"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
const (
// ClusterAutoscalerProvider defines the default autoscaler provider
ClusterAutoscalerProvider = "ClusterAutoscalerProvider"
)
func init() {
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}
func defaultPredicates() sets.String {
return sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
}
// ApplyFeatureGates applies algorithm by feature gates.
// The returned function is used to restore the state of registered predicates/priorities
// when this function is called, and should be called in tests which may modify the value
// of a feature gate temporarily.
// TODO(Huang-Wei): refactor this function to have a clean way to disable/enable plugins.
func ApplyFeatureGates() (restore func()) {
snapshot := scheduler.RegisteredPredicatesAndPrioritiesSnapshot()
// Only register EvenPodsSpread predicate & priority if the feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
klog.Infof("Registering EvenPodsSpread predicate and priority function")
// register predicate
scheduler.AddPredicateToAlgorithmProviders(predicates.EvenPodsSpreadPred)
scheduler.RegisterPredicate(predicates.EvenPodsSpreadPred)
// register priority
scheduler.AddPriorityToAlgorithmProviders(priorities.EvenPodsSpreadPriority)
scheduler.RegisterPriority(priorities.EvenPodsSpreadPriority, 1)
}
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
scheduler.RegisterPriority(priorities.ResourceLimitsPriority, 1)
// Register the priority function to specific provider too.
scheduler.AddPriorityToAlgorithmProviders(priorities.ResourceLimitsPriority)
}
restore = func() {
scheduler.ApplyPredicatesAndPriorities(snapshot)
}
return
}
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
scheduler.RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
func defaultPriorities() sets.String {
return sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
}
func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.String {
result := sets.NewString(set.List()...)
if result.Has(replaceWhat) {
result.Delete(replaceWhat)
result.Insert(replaceWith)
}
return result
}

View File

@@ -1,258 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaults
import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
)
func TestCopyAndReplace(t *testing.T) {
testCases := []struct {
set sets.String
replaceWhat string
replaceWith string
expected sets.String
}{
{
set: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
replaceWhat: "A",
replaceWith: "C",
expected: sets.String{"B": sets.Empty{}, "C": sets.Empty{}},
},
{
set: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
replaceWhat: "D",
replaceWith: "C",
expected: sets.String{"A": sets.Empty{}, "B": sets.Empty{}},
},
}
for _, testCase := range testCases {
result := copyAndReplace(testCase.set, testCase.replaceWhat, testCase.replaceWith)
if !result.Equal(testCase.expected) {
t.Errorf("expected %v got %v", testCase.expected, result)
}
}
}
func TestDefaultPriorities(t *testing.T) {
result := sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
if expected := defaultPriorities(); !result.Equal(expected) {
t.Errorf("expected %v got %v", expected, result)
}
}
func TestDefaultPredicates(t *testing.T) {
result := sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
if expected := defaultPredicates(); !result.Equal(expected) {
t.Errorf("expected %v got %v", expected, result)
}
}
func TestCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
testcases := []struct {
name string
provider string
wantPlugins map[string][]config.Plugin
}{
{
name: "No Provider specified",
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "DefaultProvider",
provider: config.SchedulerDefaultProviderName,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "ClusterAutoscalerProvider",
provider: ClusterAutoscalerProvider,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesMostAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var opts []scheduler.Option
if len(tc.provider) != 0 {
opts = append(opts, scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider,
}))
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
make(chan struct{}),
opts...,
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
})
}
}

View File

@@ -1,34 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package defaults
import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
)
func init() {
// Register functions that extract metadata used by priorities computations.
scheduler.RegisterPriorityMetadataProducerFactory(
func(args scheduler.AlgorithmFactoryArgs) priorities.MetadataProducer {
serviceLister := args.InformerFactory.Core().V1().Services().Lister()
controllerLister := args.InformerFactory.Core().V1().ReplicationControllers().Lister()
replicaSetLister := args.InformerFactory.Apps().V1().ReplicaSets().Lister()
statefulSetLister := args.InformerFactory.Apps().V1().StatefulSets().Lister()
return priorities.NewMetadataFactory(serviceLister, controllerLister, replicaSetLister, statefulSetLister, args.HardPodAffinitySymmetricWeight)
})
}

View File

@@ -1,26 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package algorithmprovider
import (
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
)
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() func() {
return defaults.ApplyFeatureGates()
}

View File

@@ -1,60 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package algorithmprovider
import (
"testing"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
var (
algorithmProviderNames = []string{
schedulerapi.SchedulerDefaultProviderName,
}
)
func TestApplyFeatureGates(t *testing.T) {
for _, pn := range algorithmProviderNames {
t.Run(pn, func(t *testing.T) {
p, err := scheduler.GetAlgorithmProvider(pn)
if err != nil {
t.Fatalf("Error retrieving provider: %v", err)
}
if !p.PredicateKeys.Has("PodToleratesNodeTaints") {
t.Fatalf("Failed to find predicate: 'PodToleratesNodeTaints'")
}
})
}
defer ApplyFeatureGates()()
for _, pn := range algorithmProviderNames {
t.Run(pn, func(t *testing.T) {
p, err := scheduler.GetAlgorithmProvider(pn)
if err != nil {
t.Fatalf("Error retrieving '%s' provider: %v", pn, err)
}
if !p.PredicateKeys.Has("PodToleratesNodeTaints") {
t.Fatalf("Failed to find predicate: 'PodToleratesNodeTaints'")
}
})
}
}

View File

@@ -18,6 +18,8 @@ package algorithmprovider
import ( import (
"fmt" "fmt"
"sort"
"strings"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -67,6 +69,17 @@ func NewRegistry(hardPodAffinityWeight int64) Registry {
} }
} }
// ListAlgorithmProviders lists registered algorithm providers.
func ListAlgorithmProviders() string {
r := NewRegistry(1)
var providers []string
for k := range r {
providers = append(providers, k)
}
sort.Strings(providers)
return strings.Join(providers, " | ")
}
func getDefaultConfig(hardPodAffinityWeight int64) *Config { func getDefaultConfig(hardPodAffinityWeight int64) *Config {
return &Config{ return &Config{
FrameworkPlugins: &schedulerapi.Plugins{ FrameworkPlugins: &schedulerapi.Plugins{
@@ -148,6 +161,7 @@ func applyFeatureGates(config *Config) {
f := schedulerapi.Plugin{Name: podtopologyspread.Name} f := schedulerapi.Plugin{Name: podtopologyspread.Name}
config.FrameworkPlugins.PreFilter.Enabled = append(config.FrameworkPlugins.PreFilter.Enabled, f) config.FrameworkPlugins.PreFilter.Enabled = append(config.FrameworkPlugins.PreFilter.Enabled, f)
config.FrameworkPlugins.Filter.Enabled = append(config.FrameworkPlugins.Filter.Enabled, f) config.FrameworkPlugins.Filter.Enabled = append(config.FrameworkPlugins.Filter.Enabled, f)
config.FrameworkPlugins.PostFilter.Enabled = append(config.FrameworkPlugins.PostFilter.Enabled, f)
s := schedulerapi.Plugin{Name: podtopologyspread.Name, Weight: 1} s := schedulerapi.Plugin{Name: podtopologyspread.Name, Weight: 1}
config.FrameworkPlugins.Score.Enabled = append(config.FrameworkPlugins.Score.Enabled, s) config.FrameworkPlugins.Score.Enabled = append(config.FrameworkPlugins.Score.Enabled, s)
} }
@@ -155,6 +169,13 @@ func applyFeatureGates(config *Config) {
// Prioritizes nodes that satisfy pod's resource limits // Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) { if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function") klog.Infof("Registering resourcelimits priority function")
// TODO(ahg-g): append to config.FrameworkPlugins.Score.Enabled when available. s := schedulerapi.Plugin{Name: noderesources.ResourceLimitsName, Weight: 1}
config.FrameworkPlugins.Score.Enabled = append(config.FrameworkPlugins.Score.Enabled, s)
} }
} }
// ApplyFeatureGates applies algorithm by feature gates.
// TODO(ahg-g): DEPRECATED, remove.
func ApplyFeatureGates() func() {
return func() {}
}

View File

@@ -17,83 +17,163 @@ limitations under the License.
package algorithmprovider package algorithmprovider
import ( import (
"fmt"
"testing" "testing"
"time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/scheduler/apis/config" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins" "k8s.io/kubernetes/pkg/features"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
) )
func TestCompatibility(t *testing.T) { func TestClusterAutoscalerProvider(t *testing.T) {
testcases := []struct { hardPodAffinityWeight := int64(1)
name string wantConfig := &Config{
provider string FrameworkPlugins: &schedulerapi.Plugins{
wantPlugins map[string][]config.Plugin PreFilter: &schedulerapi.PluginSet{
}{ Enabled: []schedulerapi.Plugin{
{ {Name: noderesources.FitName},
name: "DefaultProvider", {Name: nodeports.Name},
provider: config.SchedulerDefaultProviderName, {Name: interpodaffinity.Name},
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: nodeunschedulable.Name},
{Name: noderesources.FitName},
{Name: nodename.Name},
{Name: nodeports.Name},
{Name: nodeaffinity.Name},
{Name: volumerestrictions.Name},
{Name: tainttoleration.Name},
{Name: nodevolumelimits.EBSName},
{Name: nodevolumelimits.GCEPDName},
{Name: nodevolumelimits.CSIName},
{Name: nodevolumelimits.AzureDiskName},
{Name: volumebinding.Name},
{Name: volumezone.Name},
{Name: interpodaffinity.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: interpodaffinity.Name},
{Name: tainttoleration.Name},
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.MostAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
},
},
}, },
{ FrameworkPluginConfig: []schedulerapi.PluginConfig{
name: "ClusterAutoscalerProvider", {
provider: ClusterAutoscalerProvider, Name: interpodaffinity.Name,
Args: runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, hardPodAffinityWeight))},
},
}, },
} }
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New( r := NewRegistry(hardPodAffinityWeight)
client, gotConfig := r[ClusterAutoscalerProvider]
informerFactory, if diff := cmp.Diff(wantConfig, gotConfig); diff != "" {
informerFactory.Core().V1().Pods(), t.Errorf("unexpected config diff (-want, +got): %s", diff)
nil, }
make(chan struct{}), }
scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider, func TestApplyFeatureGates(t *testing.T) {
})) hardPodAffinityWeight := int64(1)
if err != nil { wantConfig := &Config{
t.Fatalf("Error constructing: %v", err) FrameworkPlugins: &schedulerapi.Plugins{
} PreFilter: &schedulerapi.PluginSet{
gotPlugins := sched.Framework.ListPlugins() Enabled: []schedulerapi.Plugin{
{Name: noderesources.FitName},
volumeBinder := volumebinder.NewVolumeBinder( {Name: nodeports.Name},
client, {Name: interpodaffinity.Name},
informerFactory.Core().V1().Nodes(), {Name: podtopologyspread.Name},
informerFactory.Storage().V1().CSINodes(), },
informerFactory.Core().V1().PersistentVolumeClaims(), },
informerFactory.Core().V1().PersistentVolumes(), Filter: &schedulerapi.PluginSet{
informerFactory.Storage().V1().StorageClasses(), Enabled: []schedulerapi.Plugin{
time.Second, {Name: nodeunschedulable.Name},
) {Name: noderesources.FitName},
providerRegistry := NewRegistry(1) {Name: nodename.Name},
config := providerRegistry[tc.provider] {Name: nodeports.Name},
fwk, err := framework.NewFramework( {Name: nodeaffinity.Name},
plugins.NewInTreeRegistry(&plugins.RegistryArgs{ {Name: volumerestrictions.Name},
VolumeBinder: volumeBinder, {Name: tainttoleration.Name},
}), {Name: nodevolumelimits.EBSName},
config.FrameworkPlugins, {Name: nodevolumelimits.GCEPDName},
config.FrameworkPluginConfig, {Name: nodevolumelimits.CSIName},
framework.WithClientSet(client), {Name: nodevolumelimits.AzureDiskName},
framework.WithInformerFactory(informerFactory), {Name: volumebinding.Name},
framework.WithSnapshotSharedLister(nodeinfosnapshot.NewEmptySnapshot()), {Name: volumezone.Name},
) {Name: interpodaffinity.Name},
if err != nil { {Name: podtopologyspread.Name},
t.Fatalf("error initializing the scheduling framework: %v", err) },
} },
wantPlugins := fwk.ListPlugins() PostFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
if diff := cmp.Diff(wantPlugins, gotPlugins); diff != "" { {Name: interpodaffinity.Name},
t.Errorf("unexpected plugins diff (-want, +got): %s", diff) {Name: tainttoleration.Name},
} {Name: podtopologyspread.Name},
}) },
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: noderesources.BalancedAllocationName, Weight: 1},
{Name: imagelocality.Name, Weight: 1},
{Name: interpodaffinity.Name, Weight: 1},
{Name: noderesources.LeastAllocatedName, Weight: 1},
{Name: nodeaffinity.Name, Weight: 1},
{Name: nodepreferavoidpods.Name, Weight: 10000},
{Name: defaultpodtopologyspread.Name, Weight: 1},
{Name: tainttoleration.Name, Weight: 1},
{Name: podtopologyspread.Name, Weight: 1},
{Name: noderesources.ResourceLimitsName, Weight: 1},
},
},
},
FrameworkPluginConfig: []schedulerapi.PluginConfig{
{
Name: interpodaffinity.Name,
Args: runtime.Unknown{Raw: []byte(fmt.Sprintf(`{"hardPodAffinityWeight":%d}`, hardPodAffinityWeight))},
},
},
}
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ResourceLimitsPriorityFunction, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EvenPodsSpread, true)()
r := NewRegistry(hardPodAffinityWeight)
gotConfig := r[schedulerapi.SchedulerDefaultProviderName]
if diff := cmp.Diff(wantConfig, gotConfig); diff != "" {
t.Errorf("unexpected config diff (-want, +got): %s", diff)
} }
} }

View File

@@ -11,7 +11,6 @@ go_test(
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library", "//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/algorithmprovider/defaults:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library", "//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",

View File

@@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
) )
@@ -1312,7 +1311,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
for feature, value := range tc.featureGates { for feature, value := range tc.featureGates {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, value)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, value)()
} }
defer algorithmprovider.ApplyFeatureGates()()
policyConfigMap := v1.ConfigMap{ policyConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: "scheduler-custom-policy-config"}, ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: "scheduler-custom-policy-config"},
Data: map[string]string{config.SchedulerPolicyConfigMapKey: tc.JSON}, Data: map[string]string{config.SchedulerPolicyConfigMapKey: tc.JSON},
@@ -1363,3 +1362,167 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
}) })
} }
} }
func TestAlgorithmProviderCompatibility(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
testcases := []struct {
name string
provider string
wantPlugins map[string][]config.Plugin
}{
{
name: "No Provider specified",
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "DefaultProvider",
provider: config.SchedulerDefaultProviderName,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesLeastAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
{
name: "ClusterAutoscalerProvider",
provider: algorithmprovider.ClusterAutoscalerProvider,
wantPlugins: map[string][]config.Plugin{
"PreFilterPlugin": {
{Name: "NodeResourcesFit"},
{Name: "NodePorts"},
{Name: "InterPodAffinity"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "NodeResourcesFit"},
{Name: "NodeName"},
{Name: "NodePorts"},
{Name: "NodeAffinity"},
{Name: "VolumeRestrictions"},
{Name: "TaintToleration"},
{Name: "EBSLimits"},
{Name: "GCEPDLimits"},
{Name: "NodeVolumeLimits"},
{Name: "AzureDiskLimits"},
{Name: "VolumeBinding"},
{Name: "VolumeZone"},
{Name: "InterPodAffinity"},
},
"PostFilterPlugin": {
{Name: "InterPodAffinity"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourcesBalancedAllocation", Weight: 1},
{Name: "ImageLocality", Weight: 1},
{Name: "InterPodAffinity", Weight: 1},
{Name: "NodeResourcesMostAllocated", Weight: 1},
{Name: "NodeAffinity", Weight: 1},
{Name: "NodePreferAvoidPods", Weight: 10000},
{Name: "DefaultPodTopologySpread", Weight: 1},
{Name: "TaintToleration", Weight: 1},
},
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
var opts []scheduler.Option
if len(tc.provider) != 0 {
opts = append(opts, scheduler.WithAlgorithmSource(config.SchedulerAlgorithmSource{
Provider: &tc.provider,
}))
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
make(chan struct{}),
opts...,
)
if err != nil {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
})
}
}

View File

@@ -108,6 +108,10 @@ func ValidatePolicy(policy config.Policy) error {
if binders > 1 { if binders > 1 {
validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders)) validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders))
} }
if policy.HardPodAffinitySymmetricWeight < 0 || policy.HardPodAffinitySymmetricWeight > 100 {
validationErrors = append(validationErrors, field.Invalid(field.NewPath("hardPodAffinitySymmetricWeight"), policy.HardPodAffinitySymmetricWeight, "not in valid range [0-100]"))
}
return utilerrors.NewAggregate(validationErrors) return utilerrors.NewAggregate(validationErrors)
} }

View File

@@ -262,6 +262,20 @@ func TestValidatePolicy(t *testing.T) {
}, },
expected: errors.New("ServiceAntiAffinity priority \"customPriority2\" has a different weight with \"customPriority1\""), expected: errors.New("ServiceAntiAffinity priority \"customPriority2\" has a different weight with \"customPriority1\""),
}, },
{
name: "invalid hardPodAffinitySymmetricWeight, above the range",
policy: config.Policy{
HardPodAffinitySymmetricWeight: 101,
},
expected: errors.New("hardPodAffinitySymmetricWeight: Invalid value: 101: not in valid range [0-100]"),
},
{
name: "invalid hardPodAffinitySymmetricWeight, below the range",
policy: config.Policy{
HardPodAffinitySymmetricWeight: -1,
},
expected: errors.New("hardPodAffinitySymmetricWeight: Invalid value: -1: not in valid range [0-100]"),
},
} }
for _, test := range tests { for _, test := range tests {

View File

@@ -1117,7 +1117,6 @@ func TestZeroRequest(t *testing.T) {
informerFactory.Core().V1().ReplicationControllers().Lister(), informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(), informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(), informerFactory.Apps().V1().StatefulSets().Lister(),
1,
) )
metadata := metadataProducer(test.pod, test.nodes, snapshot) metadata := metadataProducer(test.pod, test.nodes, snapshot)

View File

@@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
@@ -104,14 +105,10 @@ type Configurator struct {
enableNonPreempting bool enableNonPreempting bool
// framework configuration arguments. // framework configuration arguments.
registry framework.Registry registry framework.Registry
plugins *schedulerapi.Plugins plugins *schedulerapi.Plugins
pluginConfig []schedulerapi.PluginConfig pluginConfig []schedulerapi.PluginConfig
pluginConfigProducerRegistry *plugins.ConfigProducerRegistry nodeInfoSnapshot *nodeinfosnapshot.Snapshot
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
algorithmFactoryArgs AlgorithmFactoryArgs
configProducerArgs *plugins.ConfigProducerArgs
} }
// Create creates a scheduler with the default algorithm provider. // Create creates a scheduler with the default algorithm provider.
@@ -122,138 +119,29 @@ func (c *Configurator) Create() (*Scheduler, error) {
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider. // CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
func (c *Configurator) CreateFromProvider(providerName string) (*Scheduler, error) { func (c *Configurator) CreateFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName) r := algorithmprovider.NewRegistry(int64(c.hardPodAffinitySymmetricWeight))
if err != nil { provider, exist := r[providerName]
return nil, err if !exist {
} return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
return c.CreateFromKeys(provider.PredicateKeys, provider.PriorityKeys, []algorithm.SchedulerExtender{})
}
// CreateFromConfig creates a scheduler from the configuration file
func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
} }
predicateKeys := sets.NewString() // Combine the provided plugins with the ones from component config.
if policy.Predicates == nil { // TODO(#86162): address disabled plugins.
klog.V(2).Infof("Using predicates from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
provider, err := GetAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName)
if err != nil {
return nil, err
}
predicateKeys = provider.PredicateKeys
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(RegisterCustomPredicate(predicate, c.configProducerArgs))
}
}
priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
provider, err := GetAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName)
if err != nil {
return nil, err
}
priorityKeys = provider.PriorityKeys
} else {
for _, priority := range policy.Priorities {
if priority.Name == priorities.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(RegisterCustomPriority(priority, c.configProducerArgs))
}
}
var extenders []algorithm.SchedulerExtender
if len(policy.Extenders) != 0 {
var ignorableExtenders []algorithm.SchedulerExtender
var ignoredExtendedResources []string
for ii := range policy.Extenders {
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.Extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
c.configProducerArgs.NodeResourcesFitArgs = &noderesources.FitArgs{
IgnoredResources: ignoredExtendedResources,
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
}
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight)
}
c.configProducerArgs.InterPodAffinityArgs = &interpodaffinity.Args{
HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight,
}
pluginsForPredicates, pluginConfigForPredicates, err := c.getPredicateConfigs(predicateKeys)
if err != nil {
return nil, err
}
pluginsForPriorities, pluginConfigForPriorities, err := c.getPriorityConfigs(priorityKeys)
if err != nil {
return nil, err
}
priorityMetaProducer, err := getPriorityMetadataProducer(c.algorithmFactoryArgs)
if err != nil {
return nil, err
}
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var plugins schedulerapi.Plugins var plugins schedulerapi.Plugins
plugins.Append(pluginsForPredicates) plugins.Append(provider.FrameworkPlugins)
plugins.Append(pluginsForPriorities)
plugins.Append(c.plugins) plugins.Append(c.plugins)
var pluginConfig []schedulerapi.PluginConfig var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, pluginConfigForPredicates...) pluginConfig = append(pluginConfig, provider.FrameworkPluginConfig...)
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
pluginConfig = append(pluginConfig, c.pluginConfig...) pluginConfig = append(pluginConfig, c.pluginConfig...)
return c.CreateFromPlugins(&plugins, pluginConfig, []algorithm.SchedulerExtender{})
}
// CreateFromPlugins creates a scheduler from a set of registered plugins.
func (c *Configurator) CreateFromPlugins(plugins *schedulerapi.Plugins, pluginConfig []schedulerapi.PluginConfig, extenders []algorithm.SchedulerExtender) (*Scheduler, error) {
framework, err := framework.NewFramework( framework, err := framework.NewFramework(
c.registry, c.registry,
&plugins, plugins,
pluginConfig, pluginConfig,
framework.WithClientSet(c.client), framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory), framework.WithInformerFactory(c.informerFactory),
@@ -283,7 +171,12 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
c.schedulerCache, c.schedulerCache,
podQueue, podQueue,
nil, nil,
priorityMetaProducer, priorities.NewMetadataFactory(
c.informerFactory.Core().V1().Services().Lister(),
c.informerFactory.Core().V1().ReplicationControllers().Lister(),
c.informerFactory.Apps().V1().ReplicaSets().Lister(),
c.informerFactory.Apps().V1().StatefulSets().Lister(),
),
c.nodeInfoSnapshot, c.nodeInfoSnapshot,
framework, framework,
extenders, extenders,
@@ -310,40 +203,132 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
}, nil }, nil
} }
// getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod. // CreateFromConfig creates a scheduler from the configuration file
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, error) {
defaultBinder := &binder{client} lr := plugins.NewLegacyRegistry()
return func(pod *v1.Pod) Binder { args := &plugins.ConfigProducerArgs{}
for _, extender := range extenders {
if extender.IsBinder() && extender.IsInterested(pod) { klog.V(2).Infof("Creating scheduler from configuration: %v", policy)
return extender
// validate the policy configuration
if err := validation.ValidatePolicy(policy); err != nil {
return nil, err
}
predicateKeys := sets.NewString()
if policy.Predicates == nil {
klog.V(2).Infof("Using predicates from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
predicateKeys = lr.DefaultPredicates
} else {
for _, predicate := range policy.Predicates {
klog.V(2).Infof("Registering predicate: %s", predicate.Name)
predicateKeys.Insert(lr.RegisterCustomPredicate(predicate, args))
}
}
priorityKeys := sets.NewString()
if policy.Priorities == nil {
klog.V(2).Infof("Using priorities from algorithm provider '%v'", schedulerapi.SchedulerDefaultProviderName)
priorityKeys = lr.DefaultPriorities
} else {
for _, priority := range policy.Priorities {
if priority.Name == priorities.EqualPriority {
klog.V(2).Infof("Skip registering priority: %s", priority.Name)
continue
}
klog.V(2).Infof("Registering priority: %s", priority.Name)
priorityKeys.Insert(lr.RegisterCustomPriority(priority, args))
}
}
var extenders []algorithm.SchedulerExtender
if len(policy.Extenders) != 0 {
var ignorableExtenders []algorithm.SchedulerExtender
var ignoredExtendedResources []string
for ii := range policy.Extenders {
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
extenders = append(extenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range policy.Extenders[ii].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
} }
} }
return defaultBinder args.NodeResourcesFitArgs = &noderesources.FitArgs{
IgnoredResources: ignoredExtendedResources,
}
// place ignorable extenders to the tail of extenders
extenders = append(extenders, ignorableExtenders...)
} }
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
// Give it higher precedence than scheduler CLI configuration when it is provided.
if policy.HardPodAffinitySymmetricWeight != 0 {
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
klog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v'", predicateKeys, priorityKeys)
if c.hardPodAffinitySymmetricWeight < 1 || c.hardPodAffinitySymmetricWeight > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", c.hardPodAffinitySymmetricWeight)
}
args.InterPodAffinityArgs = &interpodaffinity.Args{
HardPodAffinityWeight: c.hardPodAffinitySymmetricWeight,
}
pluginsForPredicates, pluginConfigForPredicates, err := getPredicateConfigs(predicateKeys, lr, args)
if err != nil {
return nil, err
}
pluginsForPriorities, pluginConfigForPriorities, err := getPriorityConfigs(priorityKeys, lr, args)
if err != nil {
return nil, err
}
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var plugins schedulerapi.Plugins
plugins.Append(pluginsForPredicates)
plugins.Append(pluginsForPriorities)
plugins.Append(c.plugins)
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, pluginConfigForPredicates...)
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
pluginConfig = append(pluginConfig, c.pluginConfig...)
return c.CreateFromPlugins(&plugins, pluginConfig, extenders)
} }
// getPriorityConfigs // getPriorityConfigs
// getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run // getPriorityConfigs returns priorities configuration: ones that will run as priorities and ones that will run
// as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was // as framework plugins. Specifically, a priority will run as a framework plugin if a plugin config producer was
// registered for that priority. // registered for that priority.
func (c *Configurator) getPriorityConfigs(keys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) { func getPriorityConfigs(keys sets.String, lr *plugins.LegacyRegistry, args *plugins.ConfigProducerArgs) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
if c.pluginConfigProducerRegistry == nil {
return nil, nil, nil
}
var plugins schedulerapi.Plugins var plugins schedulerapi.Plugins
var pluginConfig []schedulerapi.PluginConfig var pluginConfig []schedulerapi.PluginConfig
frameworkConfigProducers := c.pluginConfigProducerRegistry.PriorityToConfigProducer
for _, p := range keys.List() { for _, p := range keys.List() {
weight, exist := algorithmRegistry.priorityKeys[p] weight, exist := lr.Priorities[p]
if !exist { if !exist {
return nil, nil, fmt.Errorf("priority key %q is not registered", p) return nil, nil, fmt.Errorf("priority key %q is not registered", p)
} }
if producer, exist := frameworkConfigProducers[p]; exist { if producer, exist := lr.ConfigProducerRegistry.PriorityToConfigProducer[p]; exist {
args := *c.configProducerArgs a := *args
args.Weight = int32(weight) a.Weight = int32(weight)
pl, pc := producer(args) pl, pc := producer(a)
plugins.Append(&pl) plugins.Append(&pl)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
} }
@@ -356,19 +341,13 @@ func (c *Configurator) getPriorityConfigs(keys sets.String) (*schedulerapi.Plugi
// registered for that predicate. // registered for that predicate.
// Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins // Note that the framework executes plugins according to their order in the Plugins list, and so predicates run as plugins
// are added to the Plugins list according to the order specified in predicates.Ordering(). // are added to the Plugins list according to the order specified in predicates.Ordering().
func (c *Configurator) getPredicateConfigs(keys sets.String) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) { func getPredicateConfigs(keys sets.String, lr *plugins.LegacyRegistry, args *plugins.ConfigProducerArgs) (*schedulerapi.Plugins, []schedulerapi.PluginConfig, error) {
if c.pluginConfigProducerRegistry == nil { allPredicates := keys.Union(lr.MandatoryPredicates)
// No config producer registry available, so predicates can't be translated to plugins.
return nil, nil, fmt.Errorf("No config producer registry available, can't producer plugins configs for provided predicate keys")
}
allPredicates := keys.Union(algorithmRegistry.mandatoryPredicateKeys)
if allPredicates.Has("PodFitsPorts") { if allPredicates.Has("PodFitsPorts") {
// For compatibility reasons, "PodFitsPorts" as a key is still supported. // For compatibility reasons, "PodFitsPorts" as a key is still supported.
allPredicates.Delete("PodFitsPorts") allPredicates.Delete("PodFitsPorts")
allPredicates.Insert(predicates.PodFitsHostPortsPred) allPredicates.Insert(predicates.PodFitsHostPortsPred)
} }
frameworkConfigProducers := c.pluginConfigProducerRegistry.PredicateToConfigProducer
// Create the framework plugin configurations, and place them in the order // Create the framework plugin configurations, and place them in the order
// that the corresponding predicates were supposed to run. // that the corresponding predicates were supposed to run.
@@ -377,11 +356,11 @@ func (c *Configurator) getPredicateConfigs(keys sets.String) (*schedulerapi.Plug
for _, predicateKey := range predicates.Ordering() { for _, predicateKey := range predicates.Ordering() {
if allPredicates.Has(predicateKey) { if allPredicates.Has(predicateKey) {
producer, exist := frameworkConfigProducers[predicateKey] producer, exist := lr.ConfigProducerRegistry.PredicateToConfigProducer[predicateKey]
if !exist { if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey) return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
} }
p, pc := producer(*c.configProducerArgs) p, pc := producer(*args)
plugins.Append(&p) plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
allPredicates.Delete(predicateKey) allPredicates.Delete(predicateKey)
@@ -390,11 +369,11 @@ func (c *Configurator) getPredicateConfigs(keys sets.String) (*schedulerapi.Plug
// Third, add the rest in no specific order. // Third, add the rest in no specific order.
for predicateKey := range allPredicates { for predicateKey := range allPredicates {
producer, exist := frameworkConfigProducers[predicateKey] producer, exist := lr.ConfigProducerRegistry.PredicateToConfigProducer[predicateKey]
if !exist { if !exist {
return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey) return nil, nil, fmt.Errorf("no framework config producer registered for %q", predicateKey)
} }
p, pc := producer(*c.configProducerArgs) p, pc := producer(*args)
plugins.Append(&p) plugins.Append(&p)
pluginConfig = append(pluginConfig, pc...) pluginConfig = append(pluginConfig, pc...)
} }
@@ -402,6 +381,19 @@ func (c *Configurator) getPredicateConfigs(keys sets.String) (*schedulerapi.Plug
return &plugins, pluginConfig, nil return &plugins, pluginConfig, nil
} }
// getBinderFunc returns a func which returns an extender that supports bind or a default binder based on the given pod.
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder {
defaultBinder := &binder{client}
return func(pod *v1.Pod) Binder {
for _, extender := range extenders {
if extender.IsBinder() && extender.IsInterested(pod) {
return extender
}
}
return defaultBinder
}
}
type podInformer struct { type podInformer struct {
informer cache.SharedIndexInformer informer cache.SharedIndexInformer
} }

View File

@@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@@ -215,15 +214,12 @@ func TestCreateFromEmptyConfig(t *testing.T) {
// Test configures a scheduler from a policy that does not specify any // Test configures a scheduler from a policy that does not specify any
// predicate/priority. // predicate/priority.
// The predicate/priority from DefaultProvider will be used. // The predicate/priority from DefaultProvider will be used.
// TODO(Huang-Wei): refactor (or remove) this test along with eliminating 'RegisterFitPredicate()'.
func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) { func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh) factory := newConfigFactory(client, v1.DefaultHardPodAffinitySymmetricWeight, stopCh)
RegisterAlgorithmProvider(schedulerapi.SchedulerDefaultProviderName, sets.NewString("PodFitsResources"), sets.NewString("NodeAffinityPriority"))
configData := []byte(`{ configData := []byte(`{
"kind" : "Policy", "kind" : "Policy",
"apiVersion" : "v1" "apiVersion" : "v1"
@@ -435,55 +431,9 @@ func testBind(binding *v1.Binding, t *testing.T) {
} }
} }
func TestInvalidHardPodAffinitySymmetricWeight(t *testing.T) {
client := fake.NewSimpleClientset()
// factory of "default-scheduler"
stopCh := make(chan struct{})
factory := newConfigFactory(client, -1, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: invalid hardPodAffinitySymmetricWeight, got nothing")
}
}
func TestInvalidFactoryArgs(t *testing.T) {
client := fake.NewSimpleClientset()
testCases := []struct {
name string
hardPodAffinitySymmetricWeight int32
expectErr string
}{
{
name: "symmetric weight below range",
hardPodAffinitySymmetricWeight: -1,
expectErr: "invalid hardPodAffinitySymmetricWeight: -1, must be in the range [0-100]",
},
{
name: "symmetric weight above range",
hardPodAffinitySymmetricWeight: 101,
expectErr: "invalid hardPodAffinitySymmetricWeight: 101, must be in the range [0-100]",
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
stopCh := make(chan struct{})
factory := newConfigFactory(client, test.hardPodAffinitySymmetricWeight, stopCh)
defer close(stopCh)
_, err := factory.Create()
if err == nil {
t.Errorf("expected err: %s, got nothing", test.expectErr)
}
})
}
}
func newConfigFactoryWithFrameworkRegistry( func newConfigFactoryWithFrameworkRegistry(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}, client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{},
registry framework.Registry, pluginConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry) *Configurator { registry framework.Registry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0) informerFactory := informers.NewSharedInformerFactory(client, 0)
snapshot := nodeinfosnapshot.NewEmptySnapshot() snapshot := nodeinfosnapshot.NewEmptySnapshot()
return &Configurator{ return &Configurator{
@@ -501,21 +451,14 @@ func newConfigFactoryWithFrameworkRegistry(
registry: registry, registry: registry,
plugins: nil, plugins: nil,
pluginConfig: []schedulerapi.PluginConfig{}, pluginConfig: []schedulerapi.PluginConfig{},
pluginConfigProducerRegistry: pluginConfigProducerRegistry,
nodeInfoSnapshot: snapshot, nodeInfoSnapshot: snapshot,
algorithmFactoryArgs: AlgorithmFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
} }
} }
func newConfigFactory( func newConfigFactory(
client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator { client clientset.Interface, hardPodAffinitySymmetricWeight int32, stopCh <-chan struct{}) *Configurator {
return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh, return newConfigFactoryWithFrameworkRegistry(client, hardPodAffinitySymmetricWeight, stopCh,
frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}), frameworkplugins.NewConfigProducerRegistry()) frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{}))
} }
type fakeExtender struct { type fakeExtender struct {

View File

@@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["registry.go"], srcs = [
"legacy_registry.go",
"registry.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins", importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library", "//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
@@ -28,7 +32,10 @@ go_library(
"//pkg/scheduler/framework/plugins/volumezone:go_default_library", "//pkg/scheduler/framework/plugins/volumezone:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )
@@ -71,10 +78,14 @@ filegroup(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["registry_test.go"], srcs = ["legacy_registry_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
], ],
) )

View File

@@ -357,7 +357,6 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
fakelisters.ControllerLister(test.rcs), fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss), fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss), fakelisters.StatefulSetLister(test.sss),
1,
) )
metaData := metaDataProducer(test.pod, nodes, snapshot) metaData := metaDataProducer(test.pod, nodes, snapshot)
@@ -615,7 +614,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
fakelisters.ControllerLister(test.rcs), fakelisters.ControllerLister(test.rcs),
fakelisters.ReplicaSetLister(test.rss), fakelisters.ReplicaSetLister(test.rss),
fakelisters.StatefulSetLister(test.sss), fakelisters.StatefulSetLister(test.sss),
1,
) )
metaData := metaDataProducer(test.pod, nodes, snapshot) metaData := metaDataProducer(test.pod, nodes, snapshot)

View File

@@ -0,0 +1,615 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodelabel"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodepreferavoidpods"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// LegacyRegistry is used to store current state of registered predicates and priorities.
type LegacyRegistry struct {
Predicates sets.String
Priorities map[string]int64
MandatoryPredicates sets.String
DefaultPredicates sets.String
DefaultPriorities sets.String
ConfigProducerRegistry *ConfigProducerRegistry
}
// ConfigProducerArgs contains arguments that are passed to the producer.
// As we add more predicates/priorities to framework plugins mappings, more arguments
// may be added here.
type ConfigProducerArgs struct {
// Weight used for priority functions.
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *noderesources.RequestedToCapacityRatioArgs
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
ServiceAffinityArgs *serviceaffinity.Args
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
NodeResourcesFitArgs *noderesources.FitArgs
// InterPodAffinityArgs is the args for InterPodAffinity plugin
InterPodAffinityArgs *interpodaffinity.Args
}
// ConfigProducer produces a framework's configuration.
type ConfigProducer func(args ConfigProducerArgs) (config.Plugins, []config.PluginConfig)
// ConfigProducerRegistry tracks mappings from predicates/priorities to framework config producers.
type ConfigProducerRegistry struct {
// maps that associate predicates/priorities with framework plugin configurations.
PredicateToConfigProducer map[string]ConfigProducer
PriorityToConfigProducer map[string]ConfigProducer
}
// newConfigProducerRegistry creates a new producer registry.
func newConfigProducerRegistry() *ConfigProducerRegistry {
registry := &ConfigProducerRegistry{
PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer),
}
// Register Predicates.
registry.RegisterPredicate(predicates.GeneralPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// GeneralPredicate is a combination of predicates.
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsResourcesPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
return
})
registry.RegisterPredicate(predicates.HostNamePred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsHostPortsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
return
})
registry.RegisterPredicate(predicates.MatchNodeSelectorPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeUnschedulablePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoDiskConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoVolumeZoneConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
return
})
registry.RegisterPredicate(predicates.MaxCSIVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
return
})
registry.RegisterPredicate(predicates.MaxEBSVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
return
})
registry.RegisterPredicate(predicates.MaxGCEPDVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
return
})
registry.RegisterPredicate(predicates.MaxAzureDiskVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
return
})
registry.RegisterPredicate(predicates.MaxCinderVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
return
})
registry.RegisterPredicate(predicates.MatchInterPodAffinityPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.EvenPodsSpreadPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeLabelPresencePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPredicate(predicates.CheckServiceAffinityPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
return
})
// Register Priorities.
registry.RegisterPriority(priorities.SelectorSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.TaintTolerationPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, tainttoleration.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.NodeAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.ImageLocalityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.InterPodAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
return
})
registry.RegisterPriority(priorities.NodePreferAvoidPodsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.MostRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.BalancedResourceAllocation,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
return
})
registry.RegisterPriority(priorities.LeastRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(noderesources.RequestedToCapacityRatioName,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
return
})
registry.RegisterPriority(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPriority(serviceaffinity.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
registry.RegisterPriority(priorities.ResourceLimitsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil)
plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
return
})
return registry
}
func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error {
if _, exist := producersMap[name]; exist {
return fmt.Errorf("already registered %q", name)
}
producersMap[name] = producer
return nil
}
// RegisterPredicate registers a config producer for a predicate.
func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PredicateToConfigProducer)
}
// RegisterPriority registers a framework config producer for a priority.
func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PriorityToConfigProducer)
}
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}
func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}
// NewLegacyRegistry returns a legacy algorithm registry of predicates and priorities.
func NewLegacyRegistry() *LegacyRegistry {
registry := &LegacyRegistry{
// predicate keys supported for backward compatibility with v1.Policy.
Predicates: sets.NewString(
"PodFitsPorts", // This exists for compatibility reasons.
predicates.PodFitsHostPortsPred,
predicates.PodFitsResourcesPred,
predicates.HostNamePred,
predicates.MatchNodeSelectorPred,
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MaxCinderVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
predicates.CheckVolumeBindingPred,
),
// priority keys to weights, this exist for backward compatibility with v1.Policy.
Priorities: map[string]int64{
priorities.LeastRequestedPriority: 1,
priorities.BalancedResourceAllocation: 1,
priorities.MostRequestedPriority: 1,
priorities.ImageLocalityPriority: 1,
priorities.NodeAffinityPriority: 1,
priorities.SelectorSpreadPriority: 1,
priorities.ServiceSpreadingPriority: 1,
priorities.TaintTolerationPriority: 1,
priorities.InterPodAffinityPriority: 1,
priorities.NodePreferAvoidPodsPriority: 10000,
},
// MandatoryPredicates the set of keys for predicates that the scheduler will
// be configured with all the time.
MandatoryPredicates: sets.NewString(
predicates.PodToleratesNodeTaintsPred,
predicates.CheckNodeUnschedulablePred,
),
DefaultPredicates: sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
),
DefaultPriorities: sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
),
ConfigProducerRegistry: newConfigProducerRegistry(),
}
// The following two features are the last ones to be supported as predicate/priority.
// Once they graduate to GA, there will be no more checking for featue gates here.
// Only register EvenPodsSpread predicate & priority if the feature is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
klog.Infof("Registering EvenPodsSpread predicate and priority function")
registry.Predicates.Insert(predicates.EvenPodsSpreadPred)
registry.DefaultPredicates.Insert(predicates.EvenPodsSpreadPred)
registry.Priorities[priorities.EvenPodsSpreadPriority] = 1
registry.DefaultPriorities.Insert(priorities.EvenPodsSpreadPriority)
}
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
registry.Priorities[priorities.ResourceLimitsPriority] = 1
registry.DefaultPriorities.Insert(priorities.ResourceLimitsPriority)
}
return registry
}
// RegisterCustomPredicate registers a custom fit predicate with the algorithm registry.
// Returns the name, with which the predicate was registered.
func (a *LegacyRegistry) RegisterCustomPredicate(policy config.PredicatePolicy, pluginArgs *ConfigProducerArgs) string {
var ok bool
var predicate string
validatePredicateOrDie(policy)
// generate the predicate function, if a custom type is requested
if policy.Argument != nil {
if policy.Argument.ServiceAffinity != nil {
// We use the ServiceAffinity predicate name for all ServiceAffinity custom predicates.
// It may get called multiple times but we essentially only register one instance of ServiceAffinity predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckServiceAffinityPred
// map LabelsPresence policy to ConfigProducerArgs that's used to configure the ServiceAffinity plugin.
if pluginArgs.ServiceAffinityArgs == nil {
pluginArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
pluginArgs.ServiceAffinityArgs.AffinityLabels = append(pluginArgs.ServiceAffinityArgs.AffinityLabels, policy.Argument.ServiceAffinity.Labels...)
} else if policy.Argument.LabelsPresence != nil {
// We use the CheckNodeLabelPresencePred predicate name for all kNodeLabel custom predicates.
// It may get called multiple times but we essentially only register one instance of NodeLabel predicate.
// This name is then used to find the registered plugin and run the plugin instead of the predicate.
predicate = predicates.CheckNodeLabelPresencePred
// Map LabelPresence policy to ConfigProducerArgs that's used to configure the NodeLabel plugin.
if pluginArgs.NodeLabelArgs == nil {
pluginArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelsPresence.Presence {
pluginArgs.NodeLabelArgs.PresentLabels = append(pluginArgs.NodeLabelArgs.PresentLabels, policy.Argument.LabelsPresence.Labels...)
} else {
pluginArgs.NodeLabelArgs.AbsentLabels = append(pluginArgs.NodeLabelArgs.AbsentLabels, policy.Argument.LabelsPresence.Labels...)
}
}
} else if _, ok = a.Predicates[policy.Name]; ok {
// checking to see if a pre-defined predicate is requested
klog.V(2).Infof("Predicate type %s already registered, reusing.", policy.Name)
return policy.Name
}
if len(predicate) == 0 {
klog.Fatalf("Invalid configuration: Predicate type not found for %s", policy.Name)
}
return predicate
}
// RegisterCustomPriority registers a custom priority with the algorithm registry.
// Returns the name, with which the priority function was registered.
func (a *LegacyRegistry) RegisterCustomPriority(policy config.PriorityPolicy, configProducerArgs *ConfigProducerArgs) string {
var priority string
var weight int64
validatePriorityOrDie(policy)
// generate the priority function, if a custom priority is requested
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
// We use the ServiceAffinity plugin name for all ServiceAffinity custom priorities.
// It may get called multiple times but we essentially only register one instance of
// ServiceAffinity priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = serviceaffinity.Name
if configProducerArgs.ServiceAffinityArgs == nil {
configProducerArgs.ServiceAffinityArgs = &serviceaffinity.Args{}
}
configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference = append(configProducerArgs.ServiceAffinityArgs.AntiAffinityLabelsPreference, policy.Argument.ServiceAntiAffinity.Label)
weight = policy.Weight
if existingWeight, ok := a.Priorities[priority]; ok {
// If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
// score plugin is n*(weight of each priority).
weight += existingWeight
}
} else if policy.Argument.LabelPreference != nil {
// We use the NodeLabel plugin name for all NodeLabel custom priorities.
// It may get called multiple times but we essentially only register one instance of NodeLabel priority.
// This name is then used to find the registered plugin and run the plugin instead of the priority.
priority = nodelabel.Name
if configProducerArgs.NodeLabelArgs == nil {
configProducerArgs.NodeLabelArgs = &nodelabel.Args{}
}
if policy.Argument.LabelPreference.Presence {
configProducerArgs.NodeLabelArgs.PresentLabelsPreference = append(configProducerArgs.NodeLabelArgs.PresentLabelsPreference, policy.Argument.LabelPreference.Label)
} else {
configProducerArgs.NodeLabelArgs.AbsentLabelsPreference = append(configProducerArgs.NodeLabelArgs.AbsentLabelsPreference, policy.Argument.LabelPreference.Label)
}
weight = policy.Weight
if existingWeight, ok := a.Priorities[priority]; ok {
// If there are n NodeLabel priority configured in the policy, the weight for the corresponding
// priority is n*(weight of each priority in policy).
weight += existingWeight
}
} else if policy.Argument.RequestedToCapacityRatioArguments != nil {
scoringFunctionShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(policy.Argument.RequestedToCapacityRatioArguments)
configProducerArgs.RequestedToCapacityRatioArgs = &noderesources.RequestedToCapacityRatioArgs{
FunctionShape: scoringFunctionShape,
ResourceToWeightMap: resources,
}
// We do not allow specifying the name for custom plugins, see #83472
priority = noderesources.RequestedToCapacityRatioName
weight = policy.Weight
}
} else if _, ok := a.Priorities[policy.Name]; ok {
klog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name)
// set/update the weight based on the policy
priority = policy.Name
weight = policy.Weight
}
if len(priority) == 0 {
klog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name)
}
a.Priorities[priority] = weight
return priority
}
func buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(arguments *config.RequestedToCapacityRatioArguments) (noderesources.FunctionShape, noderesources.ResourceToWeightMap) {
n := len(arguments.Shape)
points := make([]noderesources.FunctionShapePoint, 0, n)
for _, point := range arguments.Shape {
points = append(points, noderesources.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
shape, err := noderesources.NewFunctionShape(points)
if err != nil {
klog.Fatalf("invalid RequestedToCapacityRatioPriority arguments: %s", err.Error())
}
resourceToWeightMap := make(noderesources.ResourceToWeightMap, 0)
if len(arguments.Resources) == 0 {
resourceToWeightMap = noderesources.DefaultRequestedRatioResources
return shape, resourceToWeightMap
}
for _, resource := range arguments.Resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
if resource.Weight == 0 {
resourceToWeightMap[v1.ResourceName(resource.Name)] = 1
}
}
return shape, resourceToWeightMap
}
func validatePredicateOrDie(predicate config.PredicatePolicy) {
if predicate.Argument != nil {
numArgs := 0
if predicate.Argument.ServiceAffinity != nil {
numArgs++
}
if predicate.Argument.LabelsPresence != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 predicate argument is required, numArgs: %v, Predicate: %s", numArgs, predicate.Name)
}
}
}
func validatePriorityOrDie(priority config.PriorityPolicy) {
if priority.Argument != nil {
numArgs := 0
if priority.Argument.ServiceAntiAffinity != nil {
numArgs++
}
if priority.Argument.LabelPreference != nil {
numArgs++
}
if priority.Argument.RequestedToCapacityRatioArguments != nil {
numArgs++
}
if numArgs != 1 {
klog.Fatalf("Exactly 1 priority argument is required, numArgs: %v, Priority: %s", numArgs, priority.Name)
}
}
}

View File

@@ -1,5 +1,5 @@
/* /*
Copyright 2019 The Kubernetes Authors. Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@@ -22,9 +22,61 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
) )
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArguments(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
Resources: []schedulerapi.ResourceSpec{
{Name: string(v1.ResourceCPU)},
{Name: string(v1.ResourceMemory)},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func TestBuildScoringFunctionShapeFromRequestedToCapacityRatioArgumentsNilResourceToWeightMap(t *testing.T) {
arguments := schedulerapi.RequestedToCapacityRatioArguments{
Shape: []schedulerapi.UtilizationShapePoint{
{Utilization: 10, Score: 1},
{Utilization: 30, Score: 5},
{Utilization: 70, Score: 2},
},
}
builtShape, resources := buildScoringFunctionShapeFromRequestedToCapacityRatioArguments(&arguments)
expectedShape, _ := noderesources.NewFunctionShape([]noderesources.FunctionShapePoint{
{Utilization: 10, Score: 10},
{Utilization: 30, Score: 50},
{Utilization: 70, Score: 20},
})
expectedResources := noderesources.ResourceToWeightMap{
v1.ResourceCPU: 1,
v1.ResourceMemory: 1,
}
assert.Equal(t, expectedShape, builtShape)
assert.Equal(t, expectedResources, resources)
}
func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) { func produceConfig(keys []string, producersMap map[string]ConfigProducer, args ConfigProducerArgs) (*config.Plugins, []config.PluginConfig, error) {
var plugins config.Plugins var plugins config.Plugins
var pluginConfig []config.PluginConfig var pluginConfig []config.PluginConfig
@@ -41,7 +93,7 @@ func produceConfig(keys []string, producersMap map[string]ConfigProducer, args C
} }
func TestRegisterConfigProducers(t *testing.T) { func TestRegisterConfigProducers(t *testing.T) {
registry := NewConfigProducerRegistry() registry := newConfigProducerRegistry()
testPredicateName1 := "testPredicate1" testPredicateName1 := "testPredicate1"
testFilterName1 := "testFilter1" testFilterName1 := "testFilter1"
registry.RegisterPredicate(testPredicateName1, registry.RegisterPredicate(testPredicateName1,

View File

@@ -17,14 +17,7 @@ limitations under the License.
package plugins package plugins
import ( import (
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpodtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
@@ -86,275 +79,3 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry {
serviceaffinity.Name: serviceaffinity.New, serviceaffinity.Name: serviceaffinity.New,
} }
} }
// ConfigProducerArgs contains arguments that are passed to the producer.
// As we add more predicates/priorities to framework plugins mappings, more arguments
// may be added here.
type ConfigProducerArgs struct {
// Weight used for priority functions.
Weight int32
// NodeLabelArgs is the args for the NodeLabel plugin.
NodeLabelArgs *nodelabel.Args
// RequestedToCapacityRatioArgs is the args for the RequestedToCapacityRatio plugin.
RequestedToCapacityRatioArgs *noderesources.RequestedToCapacityRatioArgs
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
ServiceAffinityArgs *serviceaffinity.Args
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
NodeResourcesFitArgs *noderesources.FitArgs
// InterPodAffinityArgs is the args for InterPodAffinity plugin
InterPodAffinityArgs *interpodaffinity.Args
}
// ConfigProducer produces a framework's configuration.
type ConfigProducer func(args ConfigProducerArgs) (config.Plugins, []config.PluginConfig)
// ConfigProducerRegistry tracks mappings from predicates/priorities to framework config producers.
type ConfigProducerRegistry struct {
// maps that associate predicates/priorities with framework plugin configurations.
PredicateToConfigProducer map[string]ConfigProducer
PriorityToConfigProducer map[string]ConfigProducer
}
// NewConfigProducerRegistry creates a new producer registry.
func NewConfigProducerRegistry() *ConfigProducerRegistry {
registry := &ConfigProducerRegistry{
PredicateToConfigProducer: make(map[string]ConfigProducer),
PriorityToConfigProducer: make(map[string]ConfigProducer),
}
// Register Predicates.
registry.RegisterPredicate(predicates.GeneralPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
// GeneralPredicate is a combination of predicates.
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodToleratesNodeTaintsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsResourcesPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
return
})
registry.RegisterPredicate(predicates.HostNamePred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
return
})
registry.RegisterPredicate(predicates.PodFitsHostPortsPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
return
})
registry.RegisterPredicate(predicates.MatchNodeSelectorPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeUnschedulablePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckVolumeBindingPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoDiskConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
return
})
registry.RegisterPredicate(predicates.NoVolumeZoneConflictPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
return
})
registry.RegisterPredicate(predicates.MaxCSIVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
return
})
registry.RegisterPredicate(predicates.MaxEBSVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
return
})
registry.RegisterPredicate(predicates.MaxGCEPDVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
return
})
registry.RegisterPredicate(predicates.MaxAzureDiskVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
return
})
registry.RegisterPredicate(predicates.MaxCinderVolumeCountPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
return
})
registry.RegisterPredicate(predicates.MatchInterPodAffinityPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
return
})
registry.RegisterPredicate(predicates.EvenPodsSpreadPred,
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
return
})
registry.RegisterPredicate(predicates.CheckNodeLabelPresencePred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPredicate(predicates.CheckServiceAffinityPred,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
return
})
// Register Priorities.
registry.RegisterPriority(priorities.SelectorSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.TaintTolerationPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, tainttoleration.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.NodeAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.ImageLocalityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.InterPodAffinityPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, interpodaffinity.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
return
})
registry.RegisterPriority(priorities.NodePreferAvoidPodsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
return
})
registry.RegisterPriority(priorities.MostRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.BalancedResourceAllocation,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
return
})
registry.RegisterPriority(priorities.LeastRequestedPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
return
})
registry.RegisterPriority(priorities.EvenPodsSpreadPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, podtopologyspread.Name, nil)
plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
return
})
registry.RegisterPriority(noderesources.RequestedToCapacityRatioName,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
return
})
registry.RegisterPriority(nodelabel.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(nodelabel.Name, args.NodeLabelArgs))
return
})
registry.RegisterPriority(serviceaffinity.Name,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &args.Weight)
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
registry.RegisterPriority(priorities.ResourceLimitsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil)
plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
return
})
return registry
}
func registerProducer(name string, producer ConfigProducer, producersMap map[string]ConfigProducer) error {
if _, exist := producersMap[name]; exist {
return fmt.Errorf("already registered %q", name)
}
producersMap[name] = producer
return nil
}
// RegisterPredicate registers a config producer for a predicate.
func (f *ConfigProducerRegistry) RegisterPredicate(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PredicateToConfigProducer)
}
// RegisterPriority registers a framework config producer for a priority.
func (f *ConfigProducerRegistry) RegisterPriority(name string, producer ConfigProducer) error {
return registerProducer(name, producer, f.PriorityToConfigProducer)
}
func appendToPluginSet(set *config.PluginSet, name string, weight *int32) *config.PluginSet {
if set == nil {
set = &config.PluginSet{}
}
cfg := config.Plugin{Name: name}
if weight != nil {
cfg.Weight = *weight
}
set.Enabled = append(set.Enabled, cfg)
return set
}
func makePluginConfig(pluginName string, args interface{}) config.PluginConfig {
encoding, err := json.Marshal(args)
if err != nil {
klog.Fatal(fmt.Errorf("Failed to marshal %+v: %v", args, err))
return config.PluginConfig{}
}
config := config.PluginConfig{
Name: pluginName,
Args: runtime.Unknown{Raw: encoding},
}
return config
}

View File

@@ -403,8 +403,7 @@ func TestServiceAffinityScore(t *testing.T) {
fakelisters.ServiceLister(test.services), fakelisters.ServiceLister(test.services),
fakelisters.ControllerLister(rcs), fakelisters.ControllerLister(rcs),
fakelisters.ReplicaSetLister(rss), fakelisters.ReplicaSetLister(rss),
fakelisters.StatefulSetLister(sss), fakelisters.StatefulSetLister(sss))
1)
metaData := metaDataProducer(test.pod, nodes, snapshot) metaData := metaDataProducer(test.pod, nodes, snapshot)
state := framework.NewCycleState() state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData}) state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: metaData})

View File

@@ -24,8 +24,6 @@ import (
"os" "os"
"time" "time"
"k8s.io/klog"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@@ -36,6 +34,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -123,7 +122,7 @@ type Scheduler struct {
scheduledPodsHasSynced func() bool scheduledPodsHasSynced func() bool
// The final configuration of the framework. // The final configuration of the framework.
Plugins schedulerapi.Plugins Plugins *schedulerapi.Plugins
PluginConfig []schedulerapi.PluginConfig PluginConfig []schedulerapi.PluginConfig
} }
@@ -144,10 +143,9 @@ type schedulerOptions struct {
// Contains all in-tree plugins. // Contains all in-tree plugins.
frameworkInTreeRegistry framework.Registry frameworkInTreeRegistry framework.Registry
// This registry contains out of tree plugins to be merged with the in-tree registry. // This registry contains out of tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry framework.Registry frameworkOutOfTreeRegistry framework.Registry
frameworkConfigProducerRegistry *frameworkplugins.ConfigProducerRegistry frameworkPlugins *schedulerapi.Plugins
frameworkPlugins *schedulerapi.Plugins frameworkPluginConfig []schedulerapi.PluginConfig
frameworkPluginConfig []schedulerapi.PluginConfig
} }
// Option configures a Scheduler // Option configures a Scheduler
@@ -210,13 +208,6 @@ func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
} }
} }
// WithFrameworkConfigProducerRegistry sets the framework plugin producer registry.
func WithFrameworkConfigProducerRegistry(registry *frameworkplugins.ConfigProducerRegistry) Option {
return func(o *schedulerOptions) {
o.frameworkConfigProducerRegistry = registry
}
}
// WithFrameworkPlugins sets the plugins that the framework should be configured with. // WithFrameworkPlugins sets the plugins that the framework should be configured with.
func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option { func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option {
return func(o *schedulerOptions) { return func(o *schedulerOptions) {
@@ -250,22 +241,14 @@ var defaultSchedulerOptions = schedulerOptions{
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{ schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(), Provider: defaultAlgorithmSourceProviderName(),
}, },
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
disablePreemption: false, disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds, bindTimeoutSeconds: BindTimeoutSeconds,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
frameworkConfigProducerRegistry: frameworkplugins.NewConfigProducerRegistry(), frameworkPlugins: nil,
// The plugins and pluginConfig options are currently nil because we currently don't have frameworkPluginConfig: nil,
// "default" plugins. All plugins that we run through the framework currently come from two
// sources: 1) specified in component config, in which case those two options should be
// set using their corresponding With* functions, 2) predicate/priority-mapped plugins, which
// pluginConfigProducerRegistry contains a mapping for and produces their configurations.
// TODO(ahg-g) Once predicates and priorities are migrated to natively run as plugins, the
// below two parameters will be populated accordingly.
frameworkPlugins: nil,
frameworkPluginConfig: nil,
} }
// New returns a Scheduler // New returns a Scheduler
@@ -326,15 +309,7 @@ func New(client clientset.Interface,
registry: registry, registry: registry,
plugins: options.frameworkPlugins, plugins: options.frameworkPlugins,
pluginConfig: options.frameworkPluginConfig, pluginConfig: options.frameworkPluginConfig,
pluginConfigProducerRegistry: options.frameworkConfigProducerRegistry,
nodeInfoSnapshot: snapshot, nodeInfoSnapshot: snapshot,
algorithmFactoryArgs: AlgorithmFactoryArgs{
SharedLister: snapshot,
InformerFactory: informerFactory,
VolumeBinder: volumeBinder,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
},
configProducerArgs: &frameworkplugins.ConfigProducerArgs{},
} }
var sched *Scheduler var sched *Scheduler