Merge pull request #113275 from Huang-Wei/kep-3521-B

[KEP-3521] Part 2: Core scheduling implementation
This commit is contained in:
Kubernetes Prow Robot 2022-11-07 23:18:19 -08:00 committed by GitHub
commit 95bd687a28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 797 additions and 94 deletions

View File

@ -221,6 +221,9 @@ clientConnection:
kubeconfig: '%s'
profiles:
- plugins:
preEnqueue:
enabled:
- name: foo
reserve:
enabled:
- name: foo
@ -830,6 +833,11 @@ profiles:
{
SchedulerName: "default-scheduler",
Plugins: &kubeschedulerconfig.Plugins{
PreEnqueue: kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{Name: "foo"},
},
},
Reserve: kubeschedulerconfig.PluginSet{
Enabled: []kubeschedulerconfig.Plugin{
{Name: "foo"},
@ -944,6 +952,7 @@ profiles:
{
SchedulerName: "default-scheduler",
Plugins: &kubeschedulerconfig.Plugins{
PreEnqueue: defaults.PluginsV1beta3.PreEnqueue,
QueueSort: defaults.PluginsV1beta3.QueueSort,
PreFilter: defaults.PluginsV1beta3.PreFilter,
Filter: defaults.PluginsV1beta3.Filter,
@ -1065,6 +1074,7 @@ profiles:
{
SchedulerName: "default-scheduler",
Plugins: &kubeschedulerconfig.Plugins{
PreEnqueue: defaults.PluginsV1beta2.PreEnqueue,
QueueSort: defaults.PluginsV1beta2.QueueSort,
PreFilter: defaults.PluginsV1beta2.PreFilter,
Filter: defaults.PluginsV1beta2.Filter,
@ -1427,6 +1437,7 @@ profiles:
{
SchedulerName: "bar-profile",
Plugins: &kubeschedulerconfig.Plugins{
PreEnqueue: defaults.PluginsV1beta2.PreEnqueue,
QueueSort: defaults.PluginsV1beta2.QueueSort,
PreFilter: defaults.PluginsV1beta2.PreFilter,
Filter: defaults.PluginsV1beta2.Filter,

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
func TestSetup(t *testing.T) {
@ -93,6 +94,7 @@ profiles:
- plugins:
multiPoint:
enabled:
- name: SchedulingGates
- name: DefaultBinder
- name: PrioritySort
- name: DefaultPreemption
@ -131,6 +133,7 @@ profiles:
- plugins:
multiPoint:
enabled:
- name: SchedulingGates
- name: DefaultBinder
- name: PrioritySort
- name: DefaultPreemption
@ -315,16 +318,21 @@ leaderElection:
wantLeaderElection *componentbaseconfig.LeaderElectionConfiguration
}{
{
name: "default config with an alpha feature enabled",
name: "default config with two alpha features enabled",
flags: []string{
"--kubeconfig", configKubeconfig,
"--feature-gates=VolumeCapacityPriority=true",
"--feature-gates=VolumeCapacityPriority=true,PodSchedulingReadiness=true",
},
wantPlugins: map[string]*config.Plugins{
"default-scheduler": defaults.ExpandedPluginsV1,
"default-scheduler": func() *config.Plugins {
plugins := defaults.ExpandedPluginsV1.DeepCopy()
plugins.PreEnqueue.Enabled = append(plugins.PreEnqueue.Enabled, config.Plugin{Name: names.SchedulingGates})
return plugins
}(),
},
restoreFeatures: map[featuregate.Feature]bool{
features.VolumeCapacityPriority: false,
features.PodSchedulingReadiness: false,
},
},
{
@ -384,6 +392,7 @@ leaderElection:
},
wantPlugins: map[string]*config.Plugins{
"default-scheduler": {
PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}},
Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}},
Filter: config.PluginSet{
Enabled: []config.Plugin{
@ -424,6 +433,7 @@ leaderElection:
},
wantPlugins: map[string]*config.Plugins{
"default-scheduler": {
PreEnqueue: config.PluginSet{Enabled: []config.Plugin{{Name: "SchedulingGates"}}},
Bind: config.PluginSet{Enabled: []config.Plugin{{Name: "DefaultBinder"}}},
Filter: config.PluginSet{
Enabled: []config.Plugin{
@ -515,6 +525,7 @@ leaderElection:
registryOptions: []Option{WithPlugin("Foo", newFoo)},
wantPlugins: map[string]*config.Plugins{
"default-scheduler": {
PreEnqueue: defaults.ExpandedPluginsV1.PreEnqueue,
Bind: defaults.ExpandedPluginsV1.Bind,
Filter: config.PluginSet{
Enabled: append(defaults.ExpandedPluginsV1.Filter.Enabled, config.Plugin{Name: "Foo"}),

View File

@ -53456,6 +53456,13 @@ func schema_k8sio_kube_scheduler_config_v1_Plugins(ref common.ReferenceCallback)
Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"preEnqueue": {
SchemaProps: spec.SchemaProps{
Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.",
Default: map[string]interface{}{},
Ref: ref("k8s.io/kube-scheduler/config/v1.PluginSet"),
},
},
"queueSort": {
SchemaProps: spec.SchemaProps{
Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.",
@ -54560,6 +54567,13 @@ func schema_k8sio_kube_scheduler_config_v1beta2_Plugins(ref common.ReferenceCall
Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"preEnqueue": {
SchemaProps: spec.SchemaProps{
Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.",
Default: map[string]interface{}{},
Ref: ref("k8s.io/kube-scheduler/config/v1beta2.PluginSet"),
},
},
"queueSort": {
SchemaProps: spec.SchemaProps{
Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.",
@ -55650,6 +55664,13 @@ func schema_k8sio_kube_scheduler_config_v1beta3_Plugins(ref common.ReferenceCall
Description: "Plugins include multiple extension points. When specified, the list of plugins for a particular extension point are the only ones enabled. If an extension point is omitted from the config, then the default set of plugins is used for that extension point. Enabled plugins are called in the order specified here, after default plugins. If they need to be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.",
Type: []string{"object"},
Properties: map[string]spec.Schema{
"preEnqueue": {
SchemaProps: spec.SchemaProps{
Description: "PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.",
Default: map[string]interface{}{},
Ref: ref("k8s.io/kube-scheduler/config/v1beta3.PluginSet"),
},
},
"queueSort": {
SchemaProps: spec.SchemaProps{
Description: "QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.",

View File

@ -138,6 +138,9 @@ type KubeSchedulerProfile struct {
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
PreEnqueue PluginSet
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort PluginSet
@ -231,6 +234,7 @@ func (p *Plugins) Names() []string {
return nil
}
extensions := []PluginSet{
p.PreEnqueue,
p.PreFilter,
p.Filter,
p.PostFilter,

View File

@ -18,8 +18,10 @@ package v1
import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
v1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
)
@ -52,10 +54,17 @@ func getDefaultPlugins() *v1.Plugins {
},
},
}
applyFeatureGates(plugins)
return plugins
}
func applyFeatureGates(config *v1.Plugins) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) {
config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.SchedulingGates})
}
}
// mergePlugins merges the custom set into the given default one, handling disabled sets.
func mergePlugins(defaultPlugins, customPlugins *v1.Plugins) *v1.Plugins {
if customPlugins == nil {
@ -63,6 +72,7 @@ func mergePlugins(defaultPlugins, customPlugins *v1.Plugins) *v1.Plugins {
}
defaultPlugins.MultiPoint = mergePluginSet(defaultPlugins.MultiPoint, customPlugins.MultiPoint)
defaultPlugins.PreEnqueue = mergePluginSet(defaultPlugins.PreEnqueue, customPlugins.PreEnqueue)
defaultPlugins.QueueSort = mergePluginSet(defaultPlugins.QueueSort, customPlugins.QueueSort)
defaultPlugins.PreFilter = mergePluginSet(defaultPlugins.PreFilter, customPlugins.PreFilter)
defaultPlugins.Filter = mergePluginSet(defaultPlugins.Filter, customPlugins.Filter)

View File

@ -24,6 +24,7 @@ import (
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
v1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
)
@ -63,6 +64,39 @@ func TestApplyFeatureGates(t *testing.T) {
},
},
},
{
name: "Feature gate PodSchedulingReadiness enabled",
features: map[featuregate.Feature]bool{
features.PodSchedulingReadiness: true,
},
wantConfig: &v1.Plugins{
MultiPoint: v1.PluginSet{
Enabled: []v1.Plugin{
{Name: names.PrioritySort},
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration, Weight: pointer.Int32(3)},
{Name: names.NodeAffinity, Weight: pointer.Int32(2)},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit, Weight: pointer.Int32(1)},
{Name: names.VolumeRestrictions},
{Name: names.EBSLimits},
{Name: names.GCEPDLimits},
{Name: names.NodeVolumeLimits},
{Name: names.AzureDiskLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: pointer.Int32(2)},
{Name: names.InterPodAffinity, Weight: pointer.Int32(2)},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)},
{Name: names.ImageLocality, Weight: pointer.Int32(1)},
{Name: names.DefaultBinder},
{Name: names.SchedulingGates},
},
},
},
},
}
for _, test := range tests {

View File

@ -53,6 +53,7 @@ func pluginsNames(p *configv1.Plugins) []string {
p.Bind,
p.PostBind,
p.Permit,
p.PreEnqueue,
p.QueueSort,
}
n := sets.NewString()

View File

@ -712,6 +712,9 @@ func Convert_config_PluginSet_To_v1_PluginSet(in *config.PluginSet, out *v1.Plug
}
func autoConvert_v1_Plugins_To_config_Plugins(in *v1.Plugins, out *config.Plugins, s conversion.Scope) error {
if err := Convert_v1_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_v1_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}
@ -757,6 +760,9 @@ func Convert_v1_Plugins_To_config_Plugins(in *v1.Plugins, out *config.Plugins, s
}
func autoConvert_config_Plugins_To_v1_Plugins(in *config.Plugins, out *v1.Plugins, s conversion.Scope) error {
if err := Convert_config_PluginSet_To_v1_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_config_PluginSet_To_v1_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}

View File

@ -111,3 +111,7 @@ func convertToExternalPluginConfigArgs(out *v1beta2.KubeSchedulerConfiguration)
func Convert_config_KubeSchedulerProfile_To_v1beta2_KubeSchedulerProfile(in *config.KubeSchedulerProfile, out *v1beta2.KubeSchedulerProfile, s conversion.Scope) error {
return autoConvert_config_KubeSchedulerProfile_To_v1beta2_KubeSchedulerProfile(in, out, s)
}
func Convert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error {
return autoConvert_config_Plugins_To_v1beta2_Plugins(in, out, s)
}

View File

@ -116,6 +116,9 @@ func applyFeatureGates(config *v1beta2.Plugins) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
config.Score.Enabled = append(config.Score.Enabled, v1beta2.Plugin{Name: names.VolumeBinding, Weight: pointer.Int32(1)})
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) {
config.PreEnqueue.Enabled = append(config.PreEnqueue.Enabled, v1beta2.Plugin{Name: names.SchedulingGates})
}
}
// mergePlugins merges the custom set into the given default one, handling disabled sets.

View File

@ -24,6 +24,7 @@ import (
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kube-scheduler/config/v1beta2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
)
@ -113,6 +114,93 @@ func TestApplyFeatureGates(t *testing.T) {
},
},
},
{
name: "Feature gate PodSchedulingReadiness enabled",
features: map[featuregate.Feature]bool{
features.PodSchedulingReadiness: true,
},
wantConfig: &v1beta2.Plugins{
PreEnqueue: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.SchedulingGates},
},
},
QueueSort: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.PrioritySort},
},
},
PreFilter: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
{Name: names.NodeAffinity},
},
},
Filter: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration},
{Name: names.NodeAffinity},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit},
{Name: names.VolumeRestrictions},
{Name: names.EBSLimits},
{Name: names.GCEPDLimits},
{Name: names.NodeVolumeLimits},
{Name: names.AzureDiskLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
},
},
PostFilter: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.DefaultPreemption},
},
},
PreScore: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.InterPodAffinity},
{Name: names.PodTopologySpread},
{Name: names.TaintToleration},
{Name: names.NodeAffinity},
},
},
Score: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)},
{Name: names.ImageLocality, Weight: pointer.Int32(1)},
{Name: names.InterPodAffinity, Weight: pointer.Int32(1)},
{Name: names.NodeResourcesFit, Weight: pointer.Int32(1)},
{Name: names.NodeAffinity, Weight: pointer.Int32(1)},
{Name: names.PodTopologySpread, Weight: pointer.Int32(2)},
{Name: names.TaintToleration, Weight: pointer.Int32(1)},
},
},
Reserve: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.VolumeBinding},
},
},
PreBind: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.VolumeBinding},
},
},
Bind: v1beta2.PluginSet{
Enabled: []v1beta2.Plugin{
{Name: names.DefaultBinder},
},
},
},
},
}
for _, test := range tests {

View File

@ -160,11 +160,6 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1beta2.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugins_To_v1beta2_Plugins(a.(*config.Plugins), b.(*v1beta2.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1beta2.PodTopologySpreadArgs)(nil), (*config.PodTopologySpreadArgs)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta2_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(a.(*v1beta2.PodTopologySpreadArgs), b.(*config.PodTopologySpreadArgs), scope)
}); err != nil {
@ -235,6 +230,11 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddConversionFunc((*config.Plugins)(nil), (*v1beta2.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugins_To_v1beta2_Plugins(a.(*config.Plugins), b.(*v1beta2.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*v1beta2.KubeSchedulerConfiguration)(nil), (*config.KubeSchedulerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta2_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(a.(*v1beta2.KubeSchedulerConfiguration), b.(*config.KubeSchedulerConfiguration), scope)
}); err != nil {
@ -716,6 +716,9 @@ func Convert_config_PluginSet_To_v1beta2_PluginSet(in *config.PluginSet, out *v1
}
func autoConvert_v1beta2_Plugins_To_config_Plugins(in *v1beta2.Plugins, out *config.Plugins, s conversion.Scope) error {
if err := Convert_v1beta2_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_v1beta2_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}
@ -761,6 +764,9 @@ func Convert_v1beta2_Plugins_To_config_Plugins(in *v1beta2.Plugins, out *config.
}
func autoConvert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error {
if err := Convert_config_PluginSet_To_v1beta2_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_config_PluginSet_To_v1beta2_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}
@ -800,11 +806,6 @@ func autoConvert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1be
return nil
}
// Convert_config_Plugins_To_v1beta2_Plugins is an autogenerated conversion function.
func Convert_config_Plugins_To_v1beta2_Plugins(in *config.Plugins, out *v1beta2.Plugins, s conversion.Scope) error {
return autoConvert_config_Plugins_To_v1beta2_Plugins(in, out, s)
}
func autoConvert_v1beta2_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(in *v1beta2.PodTopologySpreadArgs, out *config.PodTopologySpreadArgs, s conversion.Scope) error {
out.DefaultConstraints = *(*[]corev1.TopologySpreadConstraint)(unsafe.Pointer(&in.DefaultConstraints))
out.DefaultingType = config.PodTopologySpreadConstraintsDefaulting(in.DefaultingType)

View File

@ -111,3 +111,8 @@ func convertToExternalPluginConfigArgs(out *v1beta3.KubeSchedulerConfiguration)
func Convert_config_KubeSchedulerProfile_To_v1beta3_KubeSchedulerProfile(in *config.KubeSchedulerProfile, out *v1beta3.KubeSchedulerProfile, s conversion.Scope) error {
return autoConvert_config_KubeSchedulerProfile_To_v1beta3_KubeSchedulerProfile(in, out, s)
}
// Convert_config_Plugins_To_v1beta3_Plugins is an autogenerated conversion function.
func Convert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error {
return autoConvert_config_Plugins_To_v1beta3_Plugins(in, out, s)
}

View File

@ -18,8 +18,10 @@ package v1beta3
import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kube-scheduler/config/v1beta3"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
)
@ -52,10 +54,17 @@ func getDefaultPlugins() *v1beta3.Plugins {
},
},
}
applyFeatureGates(plugins)
return plugins
}
func applyFeatureGates(config *v1beta3.Plugins) {
if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) {
config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1beta3.Plugin{Name: names.SchedulingGates})
}
}
// mergePlugins merges the custom set into the given default one, handling disabled sets.
func mergePlugins(defaultPlugins, customPlugins *v1beta3.Plugins) *v1beta3.Plugins {
if customPlugins == nil {

View File

@ -24,6 +24,7 @@ import (
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kube-scheduler/config/v1beta3"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/utils/pointer"
)
@ -63,6 +64,39 @@ func TestApplyFeatureGates(t *testing.T) {
},
},
},
{
name: "Feature gate PodSchedulingReadiness enabled",
features: map[featuregate.Feature]bool{
features.PodSchedulingReadiness: true,
},
wantConfig: &v1beta3.Plugins{
MultiPoint: v1beta3.PluginSet{
Enabled: []v1beta3.Plugin{
{Name: names.PrioritySort},
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration, Weight: pointer.Int32(3)},
{Name: names.NodeAffinity, Weight: pointer.Int32(2)},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit, Weight: pointer.Int32(1)},
{Name: names.VolumeRestrictions},
{Name: names.EBSLimits},
{Name: names.GCEPDLimits},
{Name: names.NodeVolumeLimits},
{Name: names.AzureDiskLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: pointer.Int32(2)},
{Name: names.InterPodAffinity, Weight: pointer.Int32(2)},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)},
{Name: names.ImageLocality, Weight: pointer.Int32(1)},
{Name: names.DefaultBinder},
{Name: names.SchedulingGates},
},
},
},
},
}
for _, test := range tests {

View File

@ -160,11 +160,6 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*config.Plugins)(nil), (*v1beta3.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugins_To_v1beta3_Plugins(a.(*config.Plugins), b.(*v1beta3.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*v1beta3.PodTopologySpreadArgs)(nil), (*config.PodTopologySpreadArgs)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta3_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(a.(*v1beta3.PodTopologySpreadArgs), b.(*config.PodTopologySpreadArgs), scope)
}); err != nil {
@ -235,6 +230,11 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddConversionFunc((*config.Plugins)(nil), (*v1beta3.Plugins)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_config_Plugins_To_v1beta3_Plugins(a.(*config.Plugins), b.(*v1beta3.Plugins), scope)
}); err != nil {
return err
}
if err := s.AddConversionFunc((*v1beta3.KubeSchedulerConfiguration)(nil), (*config.KubeSchedulerConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1beta3_KubeSchedulerConfiguration_To_config_KubeSchedulerConfiguration(a.(*v1beta3.KubeSchedulerConfiguration), b.(*config.KubeSchedulerConfiguration), scope)
}); err != nil {
@ -706,6 +706,9 @@ func Convert_config_PluginSet_To_v1beta3_PluginSet(in *config.PluginSet, out *v1
}
func autoConvert_v1beta3_Plugins_To_config_Plugins(in *v1beta3.Plugins, out *config.Plugins, s conversion.Scope) error {
if err := Convert_v1beta3_PluginSet_To_config_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_v1beta3_PluginSet_To_config_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}
@ -751,6 +754,9 @@ func Convert_v1beta3_Plugins_To_config_Plugins(in *v1beta3.Plugins, out *config.
}
func autoConvert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error {
if err := Convert_config_PluginSet_To_v1beta3_PluginSet(&in.PreEnqueue, &out.PreEnqueue, s); err != nil {
return err
}
if err := Convert_config_PluginSet_To_v1beta3_PluginSet(&in.QueueSort, &out.QueueSort, s); err != nil {
return err
}
@ -790,11 +796,6 @@ func autoConvert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1be
return nil
}
// Convert_config_Plugins_To_v1beta3_Plugins is an autogenerated conversion function.
func Convert_config_Plugins_To_v1beta3_Plugins(in *config.Plugins, out *v1beta3.Plugins, s conversion.Scope) error {
return autoConvert_config_Plugins_To_v1beta3_Plugins(in, out, s)
}
func autoConvert_v1beta3_PodTopologySpreadArgs_To_config_PodTopologySpreadArgs(in *v1beta3.PodTopologySpreadArgs, out *config.PodTopologySpreadArgs, s conversion.Scope) error {
out.DefaultConstraints = *(*[]corev1.TopologySpreadConstraint)(unsafe.Pointer(&in.DefaultConstraints))
out.DefaultingType = config.PodTopologySpreadConstraintsDefaulting(in.DefaultingType)

View File

@ -198,6 +198,7 @@ func validatePluginConfig(path *field.Path, apiVersion string, profile *config.K
if profile.Plugins != nil {
stagesToPluginSet := map[string]config.PluginSet{
"preEnqueue": profile.Plugins.PreEnqueue,
"queueSort": profile.Plugins.QueueSort,
"preFilter": profile.Plugins.PreFilter,
"filter": profile.Plugins.Filter,

View File

@ -543,6 +543,12 @@ func TestValidateKubeSchedulerConfigurationV1beta3(t *testing.T) {
},
}
duplicatedPlugins := validConfig.DeepCopy()
duplicatedPlugins.Profiles[0].Plugins.PreEnqueue.Enabled = []config.Plugin{
{Name: "CustomPreEnqueue"},
{Name: "CustomPreEnqueue"},
}
duplicatedPluginConfig := validConfig.DeepCopy()
duplicatedPluginConfig.Profiles[0].PluginConfig = []config.PluginConfig{
{

View File

@ -394,6 +394,7 @@ func (in *PluginSet) DeepCopy() *PluginSet {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
in.PreEnqueue.DeepCopyInto(&out.PreEnqueue)
in.QueueSort.DeepCopyInto(&out.QueueSort)
in.PreFilter.DeepCopyInto(&out.PreFilter)
in.Filter.DeepCopyInto(&out.Filter)

View File

@ -323,6 +323,17 @@ type Plugin interface {
Name() string
}
// PreEnqueuePlugin is an interface that must be implemented by "PreEnqueue" plugins.
// These plugins are called prior to adding Pods to activeQ.
// Note: an preEnqueue plugin is expected to be lightweight and efficient, so it's not expected to
// involve expensive calls like accessing external endpoints; otherwise it'd block other
// Pods' enqueuing in event handlers.
type PreEnqueuePlugin interface {
Plugin
// PreEnqueue is called prior to adding Pods to activeQ.
PreEnqueue(ctx context.Context, p *v1.Pod) *Status
}
// LessFunc is the function to sort pod info
type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool
@ -521,6 +532,10 @@ type BindPlugin interface {
// Configured plugins are called at specified points in a scheduling context.
type Framework interface {
Handle
// PreEnqueuePlugins returns the registered preEnqueue plugins.
PreEnqueuePlugins() []PreEnqueuePlugin
// QueueSortFunc returns the function to sort pods in scheduling queue
QueueSortFunc() LessFunc

View File

@ -25,4 +25,5 @@ type Features struct {
EnableMinDomainsInPodTopologySpread bool
EnableNodeInclusionPolicyInPodTopologySpread bool
EnableMatchLabelKeysInPodTopologySpread bool
EnablePodSchedulingReadiness bool
}

View File

@ -33,6 +33,7 @@ const (
EBSLimits = "EBSLimits"
GCEPDLimits = "GCEPDLimits"
PodTopologySpread = "PodTopologySpread"
SchedulingGates = "SchedulingGates"
SelectorSpread = "SelectorSpread"
TaintToleration = "TaintToleration"
VolumeBinding = "VolumeBinding"

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
@ -50,6 +51,7 @@ func NewInTreeRegistry() runtime.Registry {
EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
}
return runtime.Registry{
@ -74,5 +76,6 @@ func NewInTreeRegistry() runtime.Registry {
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulinggates
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
// Name of the plugin used in the plugin registry and configurations.
const Name = names.SchedulingGates
// SchedulingGates checks if a Pod carries .spec.schedulingGates.
type SchedulingGates struct {
enablePodSchedulingReadiness bool
}
var _ framework.PreEnqueuePlugin = &SchedulingGates{}
var _ framework.EnqueueExtensions = &SchedulingGates{}
func (pl *SchedulingGates) Name() string {
return Name
}
func (pl *SchedulingGates) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
if !pl.enablePodSchedulingReadiness || len(p.Spec.SchedulingGates) == 0 {
return nil
}
var gates []string
for _, gate := range p.Spec.SchedulingGates {
gates = append(gates, gate.Name)
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("waiting for scheduling gates: %v", gates))
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *SchedulingGates) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: framework.Pod, ActionType: framework.Update},
}
}
// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) {
return &SchedulingGates{enablePodSchedulingReadiness: fts.EnablePodSchedulingReadiness}, nil
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulinggates
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestPreEnqueue(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
enablePodSchedulingReadiness bool
want *framework.Status
}{
{
name: "pod does not carry scheduling gates, feature disabled",
pod: st.MakePod().Name("p").Obj(),
enablePodSchedulingReadiness: false,
want: nil,
},
{
name: "pod does not carry scheduling gates, feature enabled",
pod: st.MakePod().Name("p").Obj(),
enablePodSchedulingReadiness: true,
want: nil,
},
{
name: "pod carries scheduling gates, feature disabled",
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
enablePodSchedulingReadiness: false,
want: nil,
},
{
name: "pod carries scheduling gates, feature enabled",
pod: st.MakePod().Name("p").SchedulingGates([]string{"foo", "bar"}).Obj(),
enablePodSchedulingReadiness: true,
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for scheduling gates: [foo bar]"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, err := New(nil, nil, feature.Features{EnablePodSchedulingReadiness: tt.enablePodSchedulingReadiness})
if err != nil {
t.Fatalf("Creating plugin: %v", err)
}
got := p.(framework.PreEnqueuePlugin).PreEnqueue(context.Background(), tt.pod)
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected status (-want, +got):\n%s", diff)
}
})
}
}

View File

@ -75,6 +75,7 @@ type frameworkImpl struct {
snapshotSharedLister framework.SharedLister
waitingPods *waitingPodsMap
scorePluginWeight map[string]int
preEnqueuePlugins []framework.PreEnqueuePlugin
queueSortPlugins []framework.QueueSortPlugin
preFilterPlugins []framework.PreFilterPlugin
filterPlugins []framework.FilterPlugin
@ -125,6 +126,7 @@ func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionP
{&plugins.Bind, &f.bindPlugins},
{&plugins.PostBind, &f.postBindPlugins},
{&plugins.Permit, &f.permitPlugins},
{&plugins.PreEnqueue, &f.preEnqueuePlugins},
{&plugins.QueueSort, &f.queueSortPlugins},
}
}
@ -574,6 +576,11 @@ func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, plugin
return nil
}
// EnqueuePlugins returns the registered enqueue plugins.
func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin {
return f.preEnqueuePlugins
}
// QueueSortFunc returns the function to sort pods in scheduling queue
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
if f == nil {

View File

@ -41,6 +41,7 @@ import (
)
const (
preEnqueuePlugin = "preEnqueue-plugin"
queueSortPlugin = "no-op-queue-sort-plugin"
scoreWithNormalizePlugin1 = "score-with-normalize-plugin-1"
scoreWithNormalizePlugin2 = "score-with-normalize-plugin-2"
@ -302,6 +303,18 @@ func (pp *TestPermitPlugin) Permit(ctx context.Context, state *framework.CycleSt
return framework.NewStatus(framework.Wait), 10 * time.Second
}
var _ framework.PreEnqueuePlugin = &TestPreEnqueuePlugin{}
type TestPreEnqueuePlugin struct{}
func (pl *TestPreEnqueuePlugin) Name() string {
return preEnqueuePlugin
}
func (pl *TestPreEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
return nil
}
var _ framework.QueueSortPlugin = &TestQueueSortPlugin{}
func newQueueSortPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
@ -984,6 +997,61 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
}
}
func TestPreEnqueuePlugins(t *testing.T) {
tests := []struct {
name string
plugins []framework.Plugin
want []framework.PreEnqueuePlugin
}{
{
name: "no PreEnqueuePlugin registered",
},
{
name: "one PreEnqueuePlugin registered",
plugins: []framework.Plugin{
&TestPreEnqueuePlugin{},
},
want: []framework.PreEnqueuePlugin{
&TestPreEnqueuePlugin{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registry := Registry{}
cfgPls := &config.Plugins{}
for _, pl := range tt.plugins {
// register all plugins
tmpPl := pl
if err := registry.Register(pl.Name(),
func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return tmpPl, nil
}); err != nil {
t.Fatalf("fail to register preEnqueue plugin (%s)", pl.Name())
}
// append plugins to filter pluginset
cfgPls.PreEnqueue.Enabled = append(
cfgPls.PreEnqueue.Enabled,
config.Plugin{Name: pl.Name()},
)
}
profile := config.KubeSchedulerProfile{Plugins: cfgPls}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(registry, profile, ctx.Done())
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
got := f.PreEnqueuePlugins()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("PreEnqueuePlugins(): want %v, but got %v", tt.want, got)
}
})
}
}
func TestRunScorePlugins(t *testing.T) {
tests := []struct {
name string

View File

@ -102,6 +102,8 @@ type QueuedPodInfo struct {
InitialAttemptTimestamp time.Time
// If a Pod failed in a scheduling cycle, record the plugin names it failed by.
UnschedulablePlugins sets.String
// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.
Gated bool
}
// DeepCopy returns a deep copy of the QueuedPodInfo object.
@ -331,9 +333,9 @@ func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
// }
}
return terms
}
@ -344,9 +346,9 @@ func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm)
terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
//if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
// terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
//}
// }
}
return terms
}

View File

@ -27,6 +27,7 @@ limitations under the License.
package queue
import (
"context"
"fmt"
"reflect"
"sync"
@ -63,6 +64,8 @@ const (
activeQName = "Active"
backoffQName = "Backoff"
unschedulablePods = "Unschedulable"
preEnqueue = "PreEnqueue"
)
const (
@ -172,6 +175,8 @@ type PriorityQueue struct {
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.String
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
@ -187,6 +192,7 @@ type priorityQueueOptions struct {
podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
// Option configures a PriorityQueue
@ -234,6 +240,13 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
}
}
// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue.
func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
return func(o *priorityQueueOptions) {
o.preEnqueuePluginMap = m
}
}
var defaultPriorityQueueOptions = priorityQueueOptions{
clock: clock.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
@ -283,9 +296,10 @@ func NewPriorityQueue(
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@ -300,19 +314,66 @@ func (p *PriorityQueue) Run() {
go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin.
// It returns true if all PreEnqueue function run successfully; otherwise returns false
// upon the first failure.
// Note: we need to associate the failed plugin to `pInfo`, so that the pod can be moved back
// to activeQ by related cluster event.
func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool {
var s *framework.Status
pod := pInfo.Pod
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
s = pl.PreEnqueue(ctx, pod)
if s.IsSuccess() {
continue
}
pInfo.UnschedulablePlugins.Insert(pl.Name())
metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc()
if s.Code() == framework.Error {
klog.ErrorS(s.AsError(), "Unexpected error running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name())
} else {
klog.V(5).InfoS("Status after running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", s)
}
return false
}
return true
}
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
}
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
return true, nil
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
if p.unschedulablePods.get(pod) != nil {
klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil {
@ -367,11 +428,10 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
return false
}
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
if added, _ := p.addToActiveQ(pInfo); !added {
return false
}
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
@ -446,8 +506,9 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
if rawPodInfo == nil {
break
}
pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
if p.isPodBackingoff(rawPodInfo.(*framework.QueuedPodInfo)) {
pInfo := rawPodInfo.(*framework.QueuedPodInfo)
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
break
}
_, err := p.podBackoffQ.Pop()
@ -455,11 +516,12 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
p.activeQ.Add(rawPodInfo)
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
activated = true
}
}
if activated {
p.cond.Broadcast()
@ -560,13 +622,13 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod)
p.unschedulablePods.delete(usPodInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName)
} else {
if err := p.activeQ.Add(pInfo); err != nil {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod)
p.unschedulablePods.delete(usPodInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName)
p.cond.Broadcast()
}
@ -579,7 +641,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
}
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if err := p.activeQ.Add(pInfo); err != nil {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
@ -594,10 +656,11 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil {
pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod))
p.unschedulablePods.delete(pod)
p.podBackoffQ.Delete(pInfo)
p.unschedulablePods.delete(pInfo)
}
return nil
}
@ -652,16 +715,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
} else {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
} else {
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
}
}
@ -806,25 +867,33 @@ type UnschedulablePods struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.
podInfoMap map[string]*framework.QueuedPodInfo
keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
// unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil.
unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}
// Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
u.metricRecorder.Inc()
if _, exists := u.podInfoMap[podID]; !exists {
if pInfo.Gated && u.gatedRecorder != nil {
u.gatedRecorder.Inc()
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Inc()
}
}
u.podInfoMap[podID] = pInfo
}
// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePods) delete(pod *v1.Pod) {
podID := u.keyFunc(pod)
if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
u.metricRecorder.Dec()
func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; exists {
if pInfo.Gated && u.gatedRecorder != nil {
u.gatedRecorder.Dec()
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Dec()
}
}
delete(u.podInfoMap, podID)
}
@ -842,17 +911,21 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo {
// Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePods) clear() {
u.podInfoMap = make(map[string]*framework.QueuedPodInfo)
if u.metricRecorder != nil {
u.metricRecorder.Clear()
if u.unschedulableRecorder != nil {
u.unschedulableRecorder.Clear()
}
if u.gatedRecorder != nil {
u.gatedRecorder.Clear()
}
}
// newUnschedulablePods initializes a new object of UnschedulablePods.
func newUnschedulablePods(metricRecorder metrics.MetricRecorder) *UnschedulablePods {
func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods {
return &UnschedulablePods{
podInfoMap: make(map[string]*framework.QueuedPodInfo),
keyFunc: util.GetPodFullName,
metricRecorder: metricRecorder,
unschedulableRecorder: unschedulableRecorder,
gatedRecorder: gatedRecorder,
}
}

View File

@ -446,6 +446,84 @@ func TestPriorityQueue_Activate(t *testing.T) {
}
}
type preEnqueuePlugin struct {
allowlists []string
}
func (pl *preEnqueuePlugin) Name() string {
return "preEnqueuePlugin"
}
func (pl *preEnqueuePlugin) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
for _, allowed := range pl.allowlists {
if strings.Contains(p.Name, allowed) {
return nil
}
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "pod name not in allowlists")
}
func TestPriorityQueue_addToActiveQ(t *testing.T) {
tests := []struct {
name string
plugins []framework.PreEnqueuePlugin
pod *v1.Pod
wantUnschedulablePods int
wantSuccess bool
}{
{
name: "no plugins registered",
pod: st.MakePod().Name("p").Obj(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
{
name: "preEnqueue plugin registered, pod name not in allowlists",
plugins: []framework.PreEnqueuePlugin{&preEnqueuePlugin{}, &preEnqueuePlugin{}},
pod: st.MakePod().Name("p").Obj(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod failed one preEnqueue plugin",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"foo"}},
},
pod: st.MakePod().Name("bar").Obj(),
wantUnschedulablePods: 1,
wantSuccess: false,
},
{
name: "preEnqueue plugin registered, pod passed all preEnqueue plugins",
plugins: []framework.PreEnqueuePlugin{
&preEnqueuePlugin{allowlists: []string{"foo", "bar"}},
&preEnqueuePlugin{allowlists: []string{"bar"}},
},
pod: st.MakePod().Name("bar").Obj(),
wantUnschedulablePods: 0,
wantSuccess: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m := map[string][]framework.PreEnqueuePlugin{"": tt.plugins}
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), []runtime.Object{tt.pod}, WithPreEnqueuePluginMap(m))
got, _ := q.addToActiveQ(newQueuedPodInfoForLookup(tt.pod))
if got != tt.wantSuccess {
t.Errorf("Unexpected result: want %v, but got %v", tt.wantSuccess, got)
}
if tt.wantUnschedulablePods != len(q.unschedulablePods.podInfoMap) {
t.Errorf("Unexpected unschedulablePods: want %v, but got %v", tt.wantUnschedulablePods, len(q.unschedulablePods.podInfoMap))
}
})
}
}
func BenchmarkMoveAllToActiveOrBackoffQueue(b *testing.B) {
tests := []struct {
name string
@ -949,7 +1027,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
upm := newUnschedulablePods(nil)
upm := newUnschedulablePods(nil, nil)
for _, p := range test.podsToAdd {
upm.addOrUpdate(newQueuedPodInfoForLookup(p))
}
@ -968,7 +1046,7 @@ func TestUnschedulablePodsMap(t *testing.T) {
}
}
for _, p := range test.podsToDelete {
upm.delete(p)
upm.delete(newQueuedPodInfoForLookup(p))
}
if !reflect.DeepEqual(upm.podInfoMap, test.expectedMapAfterDelete) {
t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v",
@ -1307,6 +1385,7 @@ var (
queue.activeQ.Update(pInfo)
}
addPodUnschedulablePods = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
if !pInfo.Gated {
// Update pod condition to unschedulable.
podutil.UpdatePodCondition(&pInfo.Pod.Status, &v1.PodCondition{
Type: v1.PodScheduled,
@ -1314,6 +1393,7 @@ var (
Reason: v1.PodReasonUnschedulable,
Message: "fake scheduling failure",
})
}
queue.unschedulablePods.addOrUpdate(pInfo)
}
addPodBackoffQ = func(queue *PriorityQueue, pInfo *framework.QueuedPodInfo) {
@ -1434,10 +1514,20 @@ func TestPodTimestamp(t *testing.T) {
func TestPendingPodsMetric(t *testing.T) {
timestamp := time.Now()
metrics.Register()
total := 50
pInfos := makeQueuedPodInfos(total, timestamp)
total := 60
queueableNum := 50
queueable := "queueable"
// First 50 Pods are queueable.
pInfos := makeQueuedPodInfos(queueableNum, queueable, timestamp)
// The last 10 Pods are not queueable.
gated := makeQueuedPodInfos(total-queueableNum, "fail-me", timestamp)
// Manually mark them as gated=true.
for _, pInfo := range gated {
pInfo.Gated = true
}
pInfos = append(pInfos, gated...)
totalWithDelay := 20
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, timestamp.Add(2*time.Second))
pInfosWithDelay := makeQueuedPodInfos(totalWithDelay, queueable, timestamp.Add(2*time.Second))
tests := []struct {
name string
@ -1458,10 +1548,11 @@ func TestPendingPodsMetric(t *testing.T) {
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 30
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 10
scheduler_pending_pods{queue="unschedulable"} 20
`,
},
@ -1479,10 +1570,11 @@ scheduler_pending_pods{queue="unschedulable"} 20
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 15
scheduler_pending_pods{queue="backoff"} 25
scheduler_pending_pods{queue="gated"} 10
scheduler_pending_pods{queue="unschedulable"} 10
`,
},
@ -1500,10 +1592,11 @@ scheduler_pending_pods{queue="unschedulable"} 10
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 50
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 10
scheduler_pending_pods{queue="unschedulable"} 0
`,
},
@ -1523,10 +1616,11 @@ scheduler_pending_pods{queue="unschedulable"} 0
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 30
scheduler_pending_pods{queue="backoff"} 20
scheduler_pending_pods{queue="gated"} 10
scheduler_pending_pods{queue="unschedulable"} 0
`,
},
@ -1540,16 +1634,17 @@ scheduler_pending_pods{queue="unschedulable"} 0
},
operands: [][]*framework.QueuedPodInfo{
pInfos[:40],
pInfos[40:],
pInfos[40:50],
{nil},
{nil},
},
metricsName: "scheduler_pending_pods",
wants: `
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.
# HELP scheduler_pending_pods [STABLE] Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.
# TYPE scheduler_pending_pods gauge
scheduler_pending_pods{queue="active"} 50
scheduler_pending_pods{queue="backoff"} 0
scheduler_pending_pods{queue="gated"} 0
scheduler_pending_pods{queue="unschedulable"} 0
`,
},
@ -1559,6 +1654,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
metrics.ActivePods().Set(0)
metrics.BackoffPods().Set(0)
metrics.UnschedulablePods().Set(0)
metrics.GatedPods().Set(0)
}
for _, test := range tests {
@ -1566,7 +1662,9 @@ scheduler_pending_pods{queue="unschedulable"} 0
resetMetrics()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)))
m := map[string][]framework.PreEnqueuePlugin{"": {&preEnqueuePlugin{allowlists: []string{queueable}}}}
queue := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(testingclock.NewFakeClock(timestamp)), WithPreEnqueuePluginMap(m))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@ -1986,12 +2084,13 @@ func TestMoveAllToActiveOrBackoffQueue_PreEnqueueChecks(t *testing.T) {
}
}
func makeQueuedPodInfos(num int, timestamp time.Time) []*framework.QueuedPodInfo {
func makeQueuedPodInfos(num int, namePrefix string, timestamp time.Time) []*framework.QueuedPodInfo {
var pInfos = make([]*framework.QueuedPodInfo, 0, num)
for i := 1; i <= num; i++ {
p := &framework.QueuedPodInfo{
PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("test-pod-%d", i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()),
PodInfo: mustNewPodInfo(st.MakePod().Name(fmt.Sprintf("%v-%d", namePrefix, i)).Namespace(fmt.Sprintf("ns%d", i)).UID(fmt.Sprintf("tp-%d", i)).Obj()),
Timestamp: timestamp,
UnschedulablePlugins: sets.NewString(),
}
pInfos = append(pInfos, p)
}

View File

@ -56,6 +56,13 @@ func NewBackoffPodsRecorder() *PendingPodsRecorder {
}
}
// NewGatedPodsRecorder returns GatedPods in a Prometheus metric fashion
func NewGatedPodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: GatedPods(),
}
}
// Inc increases a metric counter by 1, in an atomic way
func (r *PendingPodsRecorder) Inc() {
r.recorder.Inc()

View File

@ -92,7 +92,7 @@ var (
&metrics.GaugeOpts{
Subsystem: SchedulerSubsystem,
Name: "pending_pods",
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods.",
Help: "Number of pending pods, by the queue type. 'active' means number of pods in activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number of pods in unschedulablePods that the scheduler attempted to schedule and failed; 'gated' is the number of unschedulable pods that the scheduler never attempted to schedule because they are gated.",
StabilityLevel: metrics.STABLE,
}, []string{"queue"})
// SchedulerGoroutines isn't called in some parts where goroutines start.
@ -249,6 +249,11 @@ func UnschedulablePods() metrics.GaugeMetric {
return pendingPods.With(metrics.Labels{"queue": "unschedulable"})
}
// GatedPods returns the pending pods metrics with the label gated
func GatedPods() metrics.GaugeMetric {
return pendingPods.With(metrics.Labels{"queue": "gated"})
}
// SinceInSeconds gets the time since the specified start in seconds.
func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()

View File

@ -309,6 +309,10 @@ func New(client clientset.Interface,
return nil, errors.New("at least one profile is required")
}
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
@ -317,6 +321,7 @@ func New(client clientset.Interface,
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
)
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)

View File

@ -385,6 +385,14 @@ func (p *PodWrapper) Volume(volume v1.Volume) *PodWrapper {
return p
}
// SchedulingGates sets `gates` as additional SchedulerGates of the inner pod.
func (p *PodWrapper) SchedulingGates(gates []string) *PodWrapper {
for _, gate := range gates {
p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, v1.PodSchedulingGate{Name: gate})
}
return p
}
// PodAffinityKind represents different kinds of PodAffinity.
type PodAffinityKind int

View File

@ -170,6 +170,9 @@ type KubeSchedulerProfile struct {
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
PreEnqueue PluginSet `json:"preEnqueue,omitempty"`
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort PluginSet `json:"queueSort,omitempty"`

View File

@ -436,6 +436,7 @@ func (in *PluginSet) DeepCopy() *PluginSet {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
in.PreEnqueue.DeepCopyInto(&out.PreEnqueue)
in.QueueSort.DeepCopyInto(&out.QueueSort)
in.PreFilter.DeepCopyInto(&out.PreFilter)
in.Filter.DeepCopyInto(&out.Filter)

View File

@ -166,6 +166,9 @@ type KubeSchedulerProfile struct {
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
PreEnqueue PluginSet `json:"preEnqueue,omitempty"`
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort PluginSet `json:"queueSort,omitempty"`

View File

@ -441,6 +441,7 @@ func (in *PluginSet) DeepCopy() *PluginSet {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
in.PreEnqueue.DeepCopyInto(&out.PreEnqueue)
in.QueueSort.DeepCopyInto(&out.QueueSort)
in.PreFilter.DeepCopyInto(&out.PreFilter)
in.Filter.DeepCopyInto(&out.Filter)

View File

@ -159,6 +159,9 @@ type KubeSchedulerProfile struct {
// Enabled plugins are called in the order specified here, after default plugins. If they need to
// be invoked before default plugins, default plugins must be disabled and re-enabled here in desired order.
type Plugins struct {
// PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
PreEnqueue PluginSet `json:"preEnqueue,omitempty"`
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort PluginSet `json:"queueSort,omitempty"`

View File

@ -431,6 +431,7 @@ func (in *PluginSet) DeepCopy() *PluginSet {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Plugins) DeepCopyInto(out *Plugins) {
*out = *in
in.PreEnqueue.DeepCopyInto(&out.PreEnqueue)
in.QueueSort.DeepCopyInto(&out.QueueSort)
in.PreFilter.DeepCopyInto(&out.PreFilter)
in.Filter.DeepCopyInto(&out.Filter)

View File

@ -49,7 +49,9 @@
subsystem: scheduler
help: Number of pending pods, by the queue type. 'active' means number of pods in
activeQ; 'backoff' means number of pods in backoffQ; 'unschedulable' means number
of pods in unschedulablePods.
of pods in unschedulablePods that the scheduler attempted to schedule and failed;
'gated' is the number of unschedulable pods that the scheduler never attempted
to schedule because they are gated.
type: Gauge
stabilityLevel: STABLE
labels: