Support multiple scheduling profiles in a single scheduler

Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2020-02-13 11:08:46 -05:00
parent fe9073b8c1
commit c048858471
32 changed files with 1028 additions and 535 deletions

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/util/configz:go_default_library",
"//pkg/util/flag:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -53,7 +53,6 @@ type Config struct {
CoreBroadcaster record.EventBroadcaster
EventClient v1beta1.EventsGetter
Recorder events.EventRecorder
Broadcaster events.EventBroadcaster
// LeaderElection is optional.

View File

@ -19,7 +19,6 @@ package app
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@ -59,6 +58,7 @@ import (
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/util/configz"
utilflag "k8s.io/kubernetes/pkg/util/flag"
)
@ -172,34 +172,19 @@ func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTre
}
}
if len(cc.ComponentConfig.Profiles) != 1 {
// TODO(#85737): Support more than one profile.
return errors.New("multiple scheduling profiles are unsupported")
}
profile := cc.ComponentConfig.Profiles[0]
// Prepare event clients.
if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil {
cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")})
cc.Recorder = cc.Broadcaster.NewRecorder(scheme.Scheme, profile.SchedulerName)
} else {
recorder := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: profile.SchedulerName})
cc.Recorder = record.NewEventRecorderAdapter(recorder)
}
recorderFactory := getRecorderFactory(&cc)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
cc.Recorder,
recorderFactory,
ctx.Done(),
scheduler.WithName(profile.SchedulerName),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithFrameworkPlugins(profile.Plugins),
scheduler.WithFrameworkPluginConfig(profile.PluginConfig),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
)
@ -336,6 +321,17 @@ func newHealthzHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, s
return pathRecorderMux
}
func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil {
cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")})
return profile.NewRecorderFactory(cc.Broadcaster)
}
return func(name string) events.EventRecorder {
r := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: name})
return record.NewEventRecorderAdapter(r)
}
}
// WithPlugin creates an Option based on plugin name and factory. This function is used to register out-of-tree plugins.
func WithPlugin(name string, factory framework.PluginFactory) Option {
return func(registry framework.Registry) error {

View File

@ -27,6 +27,7 @@ go_library(
"//pkg/scheduler/internal/cache/debugger:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
@ -45,7 +46,6 @@ go_library(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -81,6 +81,7 @@ go_test(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -128,6 +129,7 @@ filegroup(
"//pkg/scheduler/listers:all-srcs",
"//pkg/scheduler/metrics:all-srcs",
"//pkg/scheduler/nodeinfo:all-srcs",
"//pkg/scheduler/profile:all-srcs",
"//pkg/scheduler/testing:all-srcs",
"//pkg/scheduler/util:all-srcs",
"//pkg/scheduler/volumebinder:all-srcs",

View File

@ -14,12 +14,14 @@ go_test(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",

View File

@ -20,6 +20,8 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/profile"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -1376,12 +1378,13 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
}
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}))
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
recorderFactory,
make(chan struct{}),
scheduler.WithAlgorithmSource(algorithmSrc),
)
@ -1390,7 +1393,8 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
defProf := sched.Profiles["default-scheduler"]
gotPlugins := defProf.Framework.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
@ -1538,12 +1542,13 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}))
sched, err := scheduler.New(
client,
informerFactory,
informerFactory.Core().V1().Pods(),
nil,
recorderFactory,
make(chan struct{}),
opts...,
)
@ -1552,7 +1557,8 @@ func TestAlgorithmProviderCompatibility(t *testing.T) {
t.Fatalf("Error constructing: %v", err)
}
gotPlugins := sched.Framework.ListPlugins()
defProf := sched.Profiles["default-scheduler"]
gotPlugins := defProf.ListPlugins()
if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}

View File

@ -18,8 +18,8 @@ go_library(
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -66,6 +66,7 @@ go_test(
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -42,6 +42,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -589,21 +590,22 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{
Framework: fwk,
}
scheduler := NewGenericScheduler(
cache,
queue,
emptySnapshot,
fwk,
extenders,
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), podIgnored)
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -44,8 +44,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
utiltrace "k8s.io/utils/trace"
)
@ -102,22 +102,15 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(context.Context, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(context.Context, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Prioritizers returns a slice of priority config. This is exposed for
// testing.
Extenders() []SchedulerExtender
// Snapshot snapshots scheduler cache and node infos. This is needed
// for cluster autoscaler integration.
// TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
Snapshot() error
// Framework returns the scheduler framework instance. This is needed for cluster autoscaler integration.
// TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
Framework() framework.Framework
}
// ScheduleResult represents the result of one pod scheduled. It will contain
@ -134,10 +127,8 @@ type ScheduleResult struct {
type genericScheduler struct {
cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue
framework framework.Framework
extenders []SchedulerExtender
nodeInfoSnapshot *internalcache.Snapshot
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister policylisters.PodDisruptionBudgetLister
disablePreemption bool
@ -146,23 +137,17 @@ type genericScheduler struct {
nextStartNodeIndex int
}
// Snapshot snapshots scheduler cache and node infos for all fit and priority
// snapshot snapshots scheduler cache and node infos for all fit and priority
// functions.
func (g *genericScheduler) Snapshot() error {
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateSnapshot(g.nodeInfoSnapshot)
}
// Framework returns the framework instance.
func (g *genericScheduler) Framework() framework.Framework {
// Used for all fit and priority funcs.
return g.framework
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
@ -171,7 +156,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
trace.Step("Basic checks done")
if err := g.Snapshot(); err != nil {
if err := g.snapshot(); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")
@ -181,14 +166,14 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
// Run "prefilter" plugins.
preFilterStatus := g.framework.RunPreFilterPlugins(ctx, state, pod)
preFilterStatus := prof.RunPreFilterPlugins(ctx, state, pod)
if !preFilterStatus.IsSuccess() {
return result, preFilterStatus.AsError()
}
trace.Step("Running prefilter plugins done")
startPredicateEvalTime := time.Now()
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, state, pod)
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
if err != nil {
return result, err
}
@ -203,7 +188,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}
// Run "prescore" plugins.
prescoreStatus := g.framework.RunPreScorePlugins(ctx, state, pod, filteredNodes)
prescoreStatus := prof.RunPreScorePlugins(ctx, state, pod, filteredNodes)
if !prescoreStatus.IsSuccess() {
return result, prescoreStatus.AsError()
}
@ -223,7 +208,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, state *framework.CycleS
}, nil
}
priorityList, err := g.prioritizeNodes(ctx, state, pod, filteredNodes)
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
if err != nil {
return result, err
}
@ -282,7 +267,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
@ -313,7 +298,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleSt
return nil, nil, nil, err
}
}
nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
if err != nil {
return nil, nil, nil, err
}
@ -426,9 +411,9 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
filteredNodesStatuses := make(framework.NodeToStatusMap)
filtered, err := g.findNodesThatPassFilters(ctx, state, pod, filteredNodesStatuses)
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
if err != nil {
return nil, nil, err
}
@ -441,7 +426,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, state *frame
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, err
@ -453,7 +438,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *
// and allow assigning.
filtered := make([]*v1.Node, numNodesToFind)
if !g.framework.HasFilterPlugins() {
if !prof.HasFilterPlugins() {
for i := range filtered {
filtered[i] = allNodes[i].Node()
}
@ -469,7 +454,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, state *
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
fits, status, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
@ -547,8 +532,7 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo.
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, state *framework.CycleState,
nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen only in tests.
return false, state, nodeInfo, nil
@ -563,7 +547,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st
for _, p := range nominatedPods {
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
nodeInfoOut.AddPod(p)
status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
if !status.IsSuccess() {
return false, state, nodeInfo, status.AsError()
}
@ -585,6 +569,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, st
// NodeInfo before calling this function.
func (g *genericScheduler) podPassesFiltersOnNode(
ctx context.Context,
prof *profile.Profile,
state *framework.CycleState,
pod *v1.Pod,
info *schedulernodeinfo.NodeInfo,
@ -615,7 +600,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
nodeInfoToUse := info
if i == 0 {
var err error
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, state, info)
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info)
if err != nil {
return false, nil, err
}
@ -623,7 +608,7 @@ func (g *genericScheduler) podPassesFiltersOnNode(
break
}
statusMap := g.framework.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
status = statusMap.Merge()
if !status.IsSuccess() && !status.IsUnschedulable() {
return false, status, status.AsError()
@ -640,13 +625,14 @@ func (g *genericScheduler) podPassesFiltersOnNode(
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
prof *profile.Profile,
state *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
if len(g.extenders) == 0 && !g.framework.HasScorePlugins() {
if len(g.extenders) == 0 && !prof.HasScorePlugins() {
result := make(framework.NodeScoreList, 0, len(nodes))
for i := range nodes {
result = append(result, framework.NodeScore{
@ -658,7 +644,7 @@ func (g *genericScheduler) prioritizeNodes(
}
// Run the Score plugins.
scoresMap, scoreStatus := g.framework.RunScorePlugins(ctx, state, pod, nodes)
scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes)
if !scoreStatus.IsSuccess() {
return framework.NodeScoreList{}, scoreStatus.AsError()
}
@ -863,6 +849,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *
// preemption in parallel.
func (g *genericScheduler) selectNodesForPreemption(
ctx context.Context,
prof *profile.Profile,
state *framework.CycleState,
pod *v1.Pod,
potentialNodes []*schedulernodeinfo.NodeInfo,
@ -874,7 +861,7 @@ func (g *genericScheduler) selectNodesForPreemption(
checkNode := func(i int) {
nodeInfoCopy := potentialNodes[i].Clone()
stateCopy := state.Clone()
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
if fits {
resultLock.Lock()
victims := extenderv1.Victims{
@ -952,6 +939,7 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
// these predicates can be satisfied by removing more pods from the node.
func (g *genericScheduler) selectVictimsOnNode(
ctx context.Context,
prof *profile.Profile,
state *framework.CycleState,
pod *v1.Pod,
nodeInfo *schedulernodeinfo.NodeInfo,
@ -963,7 +951,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := nodeInfo.RemovePod(rp); err != nil {
return err
}
status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
@ -971,7 +959,7 @@ func (g *genericScheduler) selectVictimsOnNode(
}
addPod := func(ap *v1.Pod) error {
nodeInfo.AddPod(ap)
status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
@ -994,7 +982,7 @@ func (g *genericScheduler) selectVictimsOnNode(
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if fits, _, err := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo); !fits {
if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -1012,7 +1000,7 @@ func (g *genericScheduler) selectVictimsOnNode(
if err := addPod(p); err != nil {
return false, err
}
fits, _, _ := g.podPassesFiltersOnNode(ctx, state, pod, nodeInfo)
fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
if !fits {
if err := removePod(p); err != nil {
return false, err
@ -1114,9 +1102,7 @@ func NewGenericScheduler(
cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue,
nodeInfoSnapshot *internalcache.Snapshot,
framework framework.Framework,
extenders []SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister policylisters.PodDisruptionBudgetLister,
disablePreemption bool,
@ -1125,10 +1111,8 @@ func NewGenericScheduler(
return &genericScheduler{
cache: cache,
schedulingQueue: podQueue,
framework: framework,
extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot,
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,
disablePreemption: disablePreemption,

View File

@ -59,6 +59,7 @@ import (
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -804,6 +805,7 @@ func TestGenericScheduler(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{Framework: fwk}
var pvcs []v1.PersistentVolumeClaim
pvcs = append(pvcs, test.pvcs...)
@ -813,15 +815,13 @@ func TestGenericScheduler(t *testing.T) {
cache,
internalqueue.NewSchedulingQueue(nil),
snapshot,
fwk,
[]SchedulerExtender{},
nil,
pvcLister,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
result, err := scheduler.Schedule(context.Background(), framework.NewCycleState(), test.pod)
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
}
@ -836,35 +836,46 @@ func TestGenericScheduler(t *testing.T) {
}
// makeScheduler makes a simple genericScheduler for testing.
func makeScheduler(nodes []*v1.Node, fns ...st.RegisterPluginFunc) *genericScheduler {
func makeScheduler(nodes []*v1.Node) *genericScheduler {
cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
}
fwk, _ := st.NewFramework(fns)
s := NewGenericScheduler(
cache,
internalqueue.NewSchedulingQueue(nil),
emptySnapshot,
fwk,
nil, nil, nil, nil, false,
nil, nil, nil, false,
schedulerapi.DefaultPercentageOfNodesToScore, false)
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
}
func makeProfile(fns ...st.RegisterPluginFunc) (*profile.Profile, error) {
fwk, err := st.NewFramework(fns)
if err != nil {
return nil, err
}
return &profile.Profile{
Framework: fwk,
}, nil
}
func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(
nodes,
scheduler := makeScheduler(nodes)
prof, err := makeProfile(
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
if err != nil {
t.Fatal(err)
}
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{})
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -890,16 +901,19 @@ func TestFindFitAllError(t *testing.T) {
func TestFindFitSomeError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(
nodes,
scheduler := makeScheduler(nodes)
prof, err := makeProfile(
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("MatchFilter", NewMatchFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
if err != nil {
t.Fatal(err)
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), pod)
_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -947,11 +961,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
for _, test := range tests {
nodes := makeNodeList([]string{"1"})
cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, n := range nodes {
cache.AddNode(n)
}
plugin := fakeFilterPlugin{}
registerFakeFilterFunc := st.RegisterFilterPlugin(
"FakeFilter",
@ -964,23 +973,18 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
registerFakeFilterFunc,
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
fwk, err := st.NewFramework(registerPlugins)
prof, err := makeProfile(registerPlugins...)
if err != nil {
t.Fatal(err)
}
queue := internalqueue.NewSchedulingQueue(nil)
scheduler := NewGenericScheduler(
cache,
queue,
emptySnapshot,
fwk,
nil, nil, nil, nil, false,
schedulerapi.DefaultPercentageOfNodesToScore, false).(*genericScheduler)
cache.UpdateSnapshot(scheduler.nodeInfoSnapshot)
queue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("nominated")}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
scheduler := makeScheduler(nodes)
if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil {
t.Fatal(err)
}
scheduler.schedulingQueue.UpdateNominatedPodForNode(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1")
_, _, err = scheduler.findNodesThatFitPod(context.Background(), framework.NewCycleState(), test.pod)
_, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1127,16 +1131,15 @@ func TestZeroRequest(t *testing.T) {
if err != nil {
t.Fatalf("error creating framework: %+v", err)
}
prof := &profile.Profile{Framework: fwk}
scheduler := NewGenericScheduler(
nil,
nil,
emptySnapshot,
fwk,
[]SchedulerExtender{},
nil,
nil,
nil,
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false).(*genericScheduler)
@ -1144,17 +1147,12 @@ func TestZeroRequest(t *testing.T) {
ctx := context.Background()
state := framework.NewCycleState()
_, _, err = scheduler.findNodesThatFitPod(ctx, state, test.pod)
_, _, err = scheduler.findNodesThatFitPod(ctx, prof, state, test.pod)
if err != nil {
t.Fatalf("error filtering nodes: %+v", err)
}
scheduler.framework.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
list, err := scheduler.prioritizeNodes(
ctx,
state,
test.pod,
test.nodes,
)
prof.RunPreScorePlugins(ctx, state, test.pod, test.nodes)
list, err := scheduler.prioritizeNodes(ctx, prof, state, test.pod, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1613,15 +1611,14 @@ func TestSelectNodesForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{Framework: fwk}
scheduler := NewGenericScheduler(
nil,
internalqueue.NewSchedulingQueue(nil),
snapshot,
fwk,
[]SchedulerExtender{},
nil,
nil,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
schedulerapi.DefaultPercentageOfNodesToScore,
@ -1632,7 +1629,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
preFilterStatus := prof.RunPreFilterPlugins(context.Background(), state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
}
@ -1640,7 +1637,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
nodeToPods, err := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, test.pdbs)
nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs)
if err != nil {
t.Error(err)
}
@ -1903,9 +1900,9 @@ func TestPickOneNodeForPreemption(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{Framework: fwk}
g := &genericScheduler{
framework: fwk,
nodeInfoSnapshot: snapshot,
}
assignDefaultStartTime(test.pods)
@ -1916,11 +1913,11 @@ func TestPickOneNodeForPreemption(t *testing.T) {
}
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
preFilterStatus := prof.RunPreFilterPlugins(context.Background(), state, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
}
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), state, test.pod, nodeInfos, nil)
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil)
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
@ -2411,14 +2408,13 @@ func TestPreempt(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{Framework: fwk}
scheduler := NewGenericScheduler(
cache,
internalqueue.NewSchedulingQueue(nil),
snapshot,
fwk,
extenders,
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
@ -2435,7 +2431,7 @@ func TestPreempt(t *testing.T) {
if test.failedNodeToStatusMap != nil {
failedNodeToStatusMap = test.failedNodeToStatusMap
}
node, victims, _, err := scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
node, victims, _, err := scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
@ -2465,7 +2461,7 @@ func TestPreempt(t *testing.T) {
test.pod.Status.NominatedNodeName = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(context.Background(), state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
@ -2547,19 +2543,22 @@ func TestFairEvaluationForNodes(t *testing.T) {
nodeNames = append(nodeNames, strconv.Itoa(i))
}
nodes := makeNodeList(nodeNames)
g := makeScheduler(
nodes,
g := makeScheduler(nodes)
prof, err := makeProfile(
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
if err != nil {
t.Fatal(err)
}
// To make numAllNodes % nodesToFind != 0
g.percentageOfNodesToScore = 30
nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes)))
// Iterating over all nodes more than twice
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), framework.NewCycleState(), &v1.Pod{})
nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -21,6 +21,7 @@ import (
"reflect"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/profile"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
@ -209,7 +210,14 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
// Volume binder only wants to keep unassigned pods
sched.VolumeBinder.DeletePodBindings(pod)
}
sched.Framework.RejectWaitingPod(pod.UID)
prof, err := sched.profileForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.Error(err)
return
}
prof.Framework.RejectWaitingPod(pod.UID)
}
func (sched *Scheduler) addPodToCache(obj interface{}) {
@ -286,8 +294,8 @@ func assignedPod(pod *v1.Pod) bool {
}
// responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
func responsibleForPod(pod *v1.Pod, schedulerName string) bool {
return schedulerName == pod.Spec.SchedulerName
func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
}
// skipPodUpdate checks whether the specified pod update should be ignored.
@ -337,11 +345,10 @@ func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
return true
}
// AddAllEventHandlers is a helper function used in tests and in Scheduler
// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func AddAllEventHandlers(
func addAllEventHandlers(
sched *Scheduler,
schedulerName string,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
) {
@ -376,10 +383,10 @@ func AddAllEventHandlers(
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return !assignedPod(t) && responsibleForPod(t, schedulerName)
return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return !assignedPod(pod) && responsibleForPod(pod, schedulerName)
return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
}
utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
return false

View File

@ -18,12 +18,13 @@ package scheduler
import (
"context"
"errors"
"fmt"
"sort"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
@ -51,6 +52,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -69,6 +71,8 @@ type Binder interface {
type Configurator struct {
client clientset.Interface
recorderFactory profile.RecorderFactory
informerFactory informers.SharedInformerFactory
podInformer coreinformers.PodInformer
@ -98,31 +102,37 @@ type Configurator struct {
enableNonPreempting bool
// framework configuration arguments.
profiles []schedulerapi.KubeSchedulerProfile
registry framework.Registry
plugins *schedulerapi.Plugins
pluginConfig []schedulerapi.PluginConfig
nodeInfoSnapshot *internalcache.Snapshot
}
// create a scheduler from a set of registered plugins.
func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) {
framework, err := framework.NewFramework(
func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
return framework.NewFramework(
c.registry,
c.plugins,
c.pluginConfig,
p.Plugins,
p.PluginConfig,
framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
framework.WithRunAllFilters(c.alwaysCheckAllPredicates),
framework.WithVolumeBinder(c.volumeBinder),
)
if err != nil {
return nil, fmt.Errorf("initializing the scheduling framework: %v", err)
}
}
// create a scheduler from a set of registered plugins.
func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, error) {
profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
framework,
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
)
@ -140,9 +150,7 @@ func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, e
c.schedulerCache,
podQueue,
c.nodeInfoSnapshot,
framework,
extenders,
c.volumeBinder,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
GetPodDisruptionBudgetLister(c.informerFactory),
c.disablePreemption,
@ -153,7 +161,7 @@ func (c *Configurator) create(extenders []core.SchedulerExtender) (*Scheduler, e
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Framework: framework,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
@ -171,9 +179,13 @@ func (c *Configurator) createFromProvider(providerName string) (*Scheduler, erro
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}
// Combine the provided plugins with the ones from component config.
defaultPlugins.Apply(c.plugins)
c.plugins = defaultPlugins
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create([]core.SchedulerExtender{})
}
@ -271,10 +283,10 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
}
// Combine all framework configurations. If this results in any duplication, framework
// instantiation should fail.
var defaultPlugins schedulerapi.Plugins
var defPlugins schedulerapi.Plugins
// "PrioritySort" and "DefaultBinder" were neither predicates nor priorities
// before. We add them by default.
defaultPlugins.Append(&schedulerapi.Plugins{
defPlugins.Append(&schedulerapi.Plugins{
QueueSort: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}},
},
@ -282,16 +294,23 @@ func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler,
Enabled: []schedulerapi.Plugin{{Name: defaultbinder.Name}},
},
})
defaultPlugins.Append(pluginsForPredicates)
defaultPlugins.Append(pluginsForPriorities)
defaultPlugins.Apply(c.plugins)
c.plugins = &defaultPlugins
defPlugins.Append(pluginsForPredicates)
defPlugins.Append(pluginsForPriorities)
var defPluginConfig []schedulerapi.PluginConfig
defPluginConfig = append(defPluginConfig, pluginConfigForPredicates...)
defPluginConfig = append(defPluginConfig, pluginConfigForPriorities...)
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(&defPlugins)
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, pluginConfigForPredicates...)
pluginConfig = append(pluginConfig, pluginConfigForPriorities...)
pluginConfig = append(pluginConfig, c.pluginConfig...)
c.pluginConfig = pluginConfig
var pluginConfig []schedulerapi.PluginConfig
pluginConfig = append(pluginConfig, defPluginConfig...)
pluginConfig = append(pluginConfig, prof.PluginConfig...)
prof.PluginConfig = pluginConfig
}
return c.create(extenders)
}
@ -397,14 +416,14 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch
} else {
if _, ok := err.(*core.FitError); ok {
klog.V(2).Infof("Unable to schedule %v/%v: no fit: %v; waiting", pod.Namespace, pod.Name, err)
} else if errors.IsNotFound(err) {
} else if apierrors.IsNotFound(err) {
klog.V(2).Infof("Unable to schedule %v/%v: possibly due to node not found: %v; waiting", pod.Namespace, pod.Name, err)
if errStatus, ok := err.(errors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
nodeName := errStatus.Status().Details.Name
// when node is not found, We do not remove the node right away. Trying again to get
// the node and if the node is still not found, then remove it from the scheduler cache.
_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
if err != nil && apierrors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
if err := schedulerCache.RemoveNode(&node); err != nil {
klog.V(4).Infof("Node %q is not found; failed to remove it from the cache.", node.Name)
@ -442,7 +461,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch
}
break
}
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
klog.Warningf("A pod %v no longer exists", podID)
return
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
apitesting "k8s.io/kubernetes/pkg/api/testing"
kubefeatures "k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -51,6 +52,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile"
)
const (
@ -58,6 +60,7 @@ const (
bindTimeoutSeconds = 600
podInitialBackoffDurationSeconds = 1
podMaxBackoffDurationSeconds = 10
testSchedulerName = "test-scheduler"
)
func TestCreate(t *testing.T) {
@ -65,7 +68,9 @@ func TestCreate(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
factory := newConfigFactory(client, stopCh)
factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName)
if _, err := factory.createFromProvider(schedulerapi.SchedulerDefaultProviderName); err != nil {
t.Error(err)
}
}
// Test configures a scheduler from a policies defined in a file
@ -106,12 +111,14 @@ func TestCreateFromConfig(t *testing.T) {
if err != nil {
t.Fatalf("createFromConfig failed: %v", err)
}
queueSortPls := sched.Framework.ListPlugins()["QueueSortPlugin"]
// createFromConfig is the old codepath where we only have one profile.
prof := sched.Profiles[testSchedulerName]
queueSortPls := prof.ListPlugins()["QueueSortPlugin"]
wantQueuePls := []schedulerapi.Plugin{{Name: queuesort.Name}}
if diff := cmp.Diff(wantQueuePls, queueSortPls); diff != "" {
t.Errorf("Unexpected QueueSort plugins (-want, +got): %s", diff)
}
bindPls := sched.Framework.ListPlugins()["BindPlugin"]
bindPls := prof.ListPlugins()["BindPlugin"]
wantBindPls := []schedulerapi.Plugin{{Name: defaultbinder.Name}}
if diff := cmp.Diff(wantBindPls, bindPls); diff != "" {
t.Errorf("Unexpected Bind plugins (-want, +got): %s", diff)
@ -119,16 +126,16 @@ func TestCreateFromConfig(t *testing.T) {
// Verify that node label predicate/priority are converted to framework plugins.
wantArgs := `{"Name":"NodeLabel","Args":{"presentLabels":["zone"],"absentLabels":["foo"],"presentLabelsPreference":["l1"],"absentLabelsPreference":["l2"]}}`
verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
verifyPluginConvertion(t, nodelabel.Name, []string{"FilterPlugin", "ScorePlugin"}, prof, &factory.profiles[0], 6, wantArgs)
// Verify that service affinity custom predicate/priority is converted to framework plugin.
wantArgs = `{"Name":"ServiceAffinity","Args":{"labels":["zone","foo"],"antiAffinityLabelsPreference":["rack","zone"]}}`
verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, sched, factory, 6, wantArgs)
verifyPluginConvertion(t, serviceaffinity.Name, []string{"FilterPlugin", "ScorePlugin"}, prof, &factory.profiles[0], 6, wantArgs)
// TODO(#87703): Verify all plugin configs.
}
func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, sched *Scheduler, configurator *Configurator, wantWeight int32, wantArgs string) {
func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string, prof *profile.Profile, cfg *schedulerapi.KubeSchedulerProfile, wantWeight int32, wantArgs string) {
for _, extensionPoint := range extentionPoints {
plugin, ok := findPlugin(name, extensionPoint, sched)
plugin, ok := findPlugin(name, extensionPoint, prof)
if !ok {
t.Fatalf("%q plugin does not exist in framework.", name)
}
@ -138,7 +145,7 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string,
}
}
// Verify that the policy config is converted to plugin config.
pluginConfig := findPluginConfig(name, configurator)
pluginConfig := findPluginConfig(name, cfg)
encoding, err := json.Marshal(pluginConfig)
if err != nil {
t.Errorf("Failed to marshal %+v: %v", pluginConfig, err)
@ -149,8 +156,8 @@ func verifyPluginConvertion(t *testing.T, name string, extentionPoints []string,
}
}
func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plugin, bool) {
for _, pl := range sched.Framework.ListPlugins()[extensionPoint] {
func findPlugin(name, extensionPoint string, prof *profile.Profile) (schedulerapi.Plugin, bool) {
for _, pl := range prof.ListPlugins()[extensionPoint] {
if pl.Name == name {
return pl, true
}
@ -158,8 +165,8 @@ func findPlugin(name, extensionPoint string, sched *Scheduler) (schedulerapi.Plu
return schedulerapi.Plugin{}, false
}
func findPluginConfig(name string, configurator *Configurator) schedulerapi.PluginConfig {
for _, c := range configurator.pluginConfig {
func findPluginConfig(name string, prof *schedulerapi.KubeSchedulerProfile) schedulerapi.PluginConfig {
for _, c := range prof.PluginConfig {
if c.Name == name {
return c
}
@ -196,10 +203,12 @@ func TestCreateFromConfigWithHardPodAffinitySymmetricWeight(t *testing.T) {
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), configData, &policy); err != nil {
t.Fatalf("Invalid configuration: %v", err)
}
factory.createFromConfig(policy)
if _, err := factory.createFromConfig(policy); err != nil {
t.Fatal(err)
}
// TODO(#87703): Verify that the entire pluginConfig is correct.
foundAffinityCfg := false
for _, cfg := range factory.pluginConfig {
for _, cfg := range factory.profiles[0].PluginConfig {
if cfg.Name == interpodaffinity.Name {
foundAffinityCfg = true
wantArgs := runtime.Unknown{Raw: []byte(`{"hardPodAffinityWeight":10}`)}
@ -228,9 +237,12 @@ func TestCreateFromEmptyConfig(t *testing.T) {
t.Errorf("Invalid configuration: %v", err)
}
factory.createFromConfig(policy)
if len(factory.pluginConfig) != 0 {
t.Errorf("got plugin config %s, want none", factory.pluginConfig)
if _, err := factory.createFromConfig(policy); err != nil {
t.Fatal(err)
}
prof := factory.profiles[0]
if len(prof.PluginConfig) != 0 {
t.Errorf("got plugin config %s, want none", prof.PluginConfig)
}
}
@ -256,7 +268,7 @@ func TestCreateFromConfigWithUnspecifiedPredicatesOrPriorities(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create scheduler from configuration: %v", err)
}
if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched); !exist {
if _, exist := findPlugin("NodeResourcesFit", "FilterPlugin", sched.Profiles[testSchedulerName]); !exist {
t.Errorf("Expected plugin NodeResourcesFit")
}
}
@ -396,6 +408,7 @@ func newConfigFactoryWithFrameworkRegistry(
registry framework.Registry) *Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
snapshot := internalcache.NewEmptySnapshot()
recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}))
return &Configurator{
client: client,
informerFactory: informerFactory,
@ -408,9 +421,11 @@ func newConfigFactoryWithFrameworkRegistry(
StopEverything: stopCh,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
registry: registry,
plugins: nil,
pluginConfig: []schedulerapi.PluginConfig{},
nodeInfoSnapshot: snapshot,
profiles: []schedulerapi.KubeSchedulerProfile{
{SchedulerName: testSchedulerName},
},
recorderFactory: recorderFactory,
nodeInfoSnapshot: snapshot,
}
}

View File

@ -28,19 +28,14 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
],
)

View File

@ -99,8 +99,8 @@ type SchedulingQueue interface {
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(fwk framework.Framework, opts ...Option) SchedulingQueue {
return NewPriorityQueue(fwk, opts...)
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
}
// NominatedNodeName returns nominated node name of a Pod.
@ -200,7 +200,7 @@ func newPodInfoNoTimestamp(pod *v1.Pod) *framework.PodInfo {
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
fwk framework.Framework,
lessFn framework.LessFunc,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
@ -211,7 +211,7 @@ func NewPriorityQueue(
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo)
return fwk.QueueSortFunc()(pInfo1, pInfo2)
return lessFn(pInfo1, pInfo2)
}
pq := &PriorityQueue{

View File

@ -28,15 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/metrics/testutil"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -122,7 +117,7 @@ func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
}
func TestPriorityQueue_Add(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -158,25 +153,13 @@ func TestPriorityQueue_Add(t *testing.T) {
}
}
func newDefaultFramework() framework.Framework {
plugins := algorithmprovider.NewRegistry()[schedulerapi.SchedulerDefaultProviderName]
fakeClient := fake.NewSimpleClientset()
fwk, err := framework.NewFramework(
frameworkplugins.NewInTreeRegistry(),
plugins,
nil,
framework.WithClientSet(fakeClient),
framework.WithInformerFactory(informers.NewSharedInformerFactory(fakeClient, 0)),
framework.WithSnapshotSharedLister(cache.NewEmptySnapshot()),
)
if err != nil {
panic(err)
}
return fwk
func newDefaultQueueSort() framework.LessFunc {
sort := &queuesort.PrioritySort{}
return sort.Less
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -192,7 +175,7 @@ func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&highPriNominatedPod), q.SchedulingCycle()) // Must not add anything.
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
@ -224,7 +207,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
// Pods in and before current scheduling cycle will be put back to activeQueue
// if we were trying to schedule them when we received move request.
func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(time.Now())))
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(time.Now())))
totalNum := 10
expectedPods := make([]v1.Pod, 0, totalNum)
for i := 0; i < totalNum; i++ {
@ -291,7 +274,7 @@ func TestPriorityQueue_AddUnschedulableIfNotPresent_Backoff(t *testing.T) {
}
func TestPriorityQueue_Pop(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
@ -308,7 +291,7 @@ func TestPriorityQueue_Pop(t *testing.T) {
}
func TestPriorityQueue_Update(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Update(nil, &highPriorityPod)
q.lock.RLock()
if _, exists, _ := q.activeQ.Get(newPodInfoNoTimestamp(&highPriorityPod)); !exists {
@ -364,7 +347,7 @@ func TestPriorityQueue_Update(t *testing.T) {
}
func TestPriorityQueue_Delete(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod)
if err := q.Delete(&highPriNominatedPod); err != nil {
@ -390,7 +373,7 @@ func TestPriorityQueue_Delete(t *testing.T) {
}
func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Add(&medPriorityPod)
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
@ -442,7 +425,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ.
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
@ -468,7 +451,7 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
}
func TestPriorityQueue_NominatedPodsForNode(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
@ -493,7 +476,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
return pendingSet
}
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
q.Add(&medPriorityPod)
q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
@ -510,7 +493,7 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
}
func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
if err := q.Add(&medPriorityPod); err != nil {
t.Errorf("add failed: %v", err)
}
@ -580,7 +563,7 @@ func TestPriorityQueue_UpdateNominatedPodForNode(t *testing.T) {
func TestPriorityQueue_NewWithOptions(t *testing.T) {
q := createAndRunPriorityQueue(
newDefaultFramework(),
newDefaultQueueSort(),
WithPodInitialBackoffDuration(2*time.Second),
WithPodMaxBackoffDuration(20*time.Second),
)
@ -752,7 +735,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
}{
{
name: "PriorityQueue close",
q: createAndRunPriorityQueue(newDefaultFramework()),
q: createAndRunPriorityQueue(newDefaultQueueSort()),
expectedErr: fmt.Errorf(queueClosed),
},
}
@ -781,7 +764,7 @@ func TestSchedulingQueue_Close(t *testing.T) {
// ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue.
func TestRecentlyTriedPodsGoBack(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
// Add a few pods to priority queue.
for i := 0; i < 5; i++ {
p := v1.Pod{
@ -836,7 +819,7 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
// Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule
@ -927,7 +910,7 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// TestHighPriorityBackoff tests that a high priority pod does not block
// other pods if it is unschedulable
func TestHighPriorityBackoff(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework())
q := createAndRunPriorityQueue(newDefaultQueueSort())
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -991,7 +974,7 @@ func TestHighPriorityBackoff(t *testing.T) {
// activeQ after one minutes if it is in unschedulableQ
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
q := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod",
@ -1182,7 +1165,7 @@ func TestPodTimestamp(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
var podInfoList []*framework.PodInfo
for i, op := range test.operations {
@ -1341,7 +1324,7 @@ scheduler_pending_pods{queue="unschedulable"} 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resetMetrics()
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
for i, op := range test.operations {
for _, pInfo := range test.operands[i] {
op(queue, pInfo)
@ -1370,7 +1353,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 1: A pod is created and scheduled after 1 attempt. The queue operations are
// Add -> Pop.
c := clock.NewFakeClock(timestamp)
queue := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue := createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err := queue.Pop()
if err != nil {
@ -1381,7 +1364,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 2: A pod is created and scheduled after 2 attempts. The queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1401,7 +1384,7 @@ func TestPerPodSchedulingMetrics(t *testing.T) {
// Case 3: Similar to case 2, but before the second pop, call update, the queue operations are
// Add -> Pop -> AddUnschedulableIfNotPresent -> flushUnschedulableQLeftover -> Update -> Pop.
c = clock.NewFakeClock(timestamp)
queue = createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
queue = createAndRunPriorityQueue(newDefaultQueueSort(), WithClock(c))
queue.Add(pod)
pInfo, err = queue.Pop()
if err != nil {
@ -1499,7 +1482,7 @@ func TestIncomingPodsMetrics(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
metrics.SchedulerQueueIncomingPods.Reset()
queue := NewPriorityQueue(newDefaultFramework(), WithClock(clock.NewFakeClock(timestamp)))
queue := NewPriorityQueue(newDefaultQueueSort(), WithClock(clock.NewFakeClock(timestamp)))
queue.Close()
queue.Run()
for _, op := range test.operations {
@ -1525,15 +1508,15 @@ func checkPerPodSchedulingMetrics(name string, t *testing.T, pInfo *framework.Po
}
}
func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *PriorityQueue {
q := NewPriorityQueue(fwk, opts...)
func createAndRunPriorityQueue(lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
q := NewPriorityQueue(lessFn, opts...)
q.Run()
return q
}
func TestBackOffFlow(t *testing.T) {
cl := clock.NewFakeClock(time.Now())
q := NewPriorityQueue(newDefaultFramework(), WithClock(cl))
q := NewPriorityQueue(newDefaultQueueSort(), WithClock(cl))
steps := []struct {
wantBackoff time.Duration
}{

View File

@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["profile.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/profile",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["profile_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
],
)

View File

@ -0,0 +1,129 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package profile holds the definition of a scheduling Profile.
package profile
import (
"errors"
"fmt"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
// RecorderFactory builds an EventRecorder for a given scheduler name.
type RecorderFactory func(string) events.EventRecorder
// FrameworkFactory builds a Framework for a given profile configuration.
type FrameworkFactory func(config.KubeSchedulerProfile) (framework.Framework, error)
// Profile is a scheduling profile.
type Profile struct {
framework.Framework
Recorder events.EventRecorder
}
// NewProfile builds a Profile for the given configuration.
func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (*Profile, error) {
f, err := frameworkFact(cfg)
if err != nil {
return nil, err
}
r := recorderFact(cfg.SchedulerName)
return &Profile{
Framework: f,
Recorder: r,
}, nil
}
// Map holds profiles indexed by scheduler name.
type Map map[string]*Profile
// NewMap builds the profiles given by the configuration, indexed by name.
func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory) (Map, error) {
m := make(Map)
v := cfgValidator{m: m}
for _, cfg := range cfgs {
if err := v.validate(cfg); err != nil {
return nil, err
}
p, err := NewProfile(cfg, frameworkFact, recorderFact)
if err != nil {
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
}
m[cfg.SchedulerName] = p
}
return m, nil
}
// HandlesSchedulerName returns whether a profile handles the given scheduler name.
func (m Map) HandlesSchedulerName(name string) bool {
_, ok := m[name]
return ok
}
// NewRecorderFactory returns a RecorderFactory for the broadcaster.
func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory {
return func(name string) events.EventRecorder {
return b.NewRecorder(scheme.Scheme, name)
}
}
type cfgValidator struct {
m Map
queueSort string
queueSortArgs runtime.Unknown
}
func (v *cfgValidator) validate(cfg config.KubeSchedulerProfile) error {
if len(cfg.SchedulerName) == 0 {
return errors.New("scheduler name is needed")
}
if cfg.Plugins == nil {
return fmt.Errorf("plugins required for profile with scheduler name %q", cfg.SchedulerName)
}
if v.m[cfg.SchedulerName] != nil {
return fmt.Errorf("duplicate profile with scheduler name %q", cfg.SchedulerName)
}
if cfg.Plugins.QueueSort == nil || len(cfg.Plugins.QueueSort.Enabled) != 1 {
return fmt.Errorf("one queue sort plugin required for profile with scheduler name %q", cfg.SchedulerName)
}
queueSort := cfg.Plugins.QueueSort.Enabled[0].Name
var queueSortArgs runtime.Unknown
for _, plCfg := range cfg.PluginConfig {
if plCfg.Name == queueSort {
queueSortArgs = plCfg.Args
}
}
if len(v.queueSort) == 0 {
v.queueSort = queueSort
v.queueSortArgs = queueSortArgs
return nil
}
if v.queueSort != queueSort {
return fmt.Errorf("different queue sort plugins for profile %q: %q, first: %q", cfg.SchedulerName, queueSort, v.queueSort)
}
if !cmp.Equal(v.queueSortArgs, queueSortArgs) {
return fmt.Errorf("different queue sort plugin args for profile %q: %s", cfg.SchedulerName, queueSortArgs.Raw)
}
return nil
}

View File

@ -0,0 +1,327 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package profile
import (
"context"
"fmt"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
var fakeRegistry = framework.Registry{
"QueueSort": newFakePlugin,
"Bind1": newFakePlugin,
"Bind2": newFakePlugin,
"Another": newFakePlugin,
}
func TestNewProfile(t *testing.T) {
cases := []struct {
name string
cfg config.KubeSchedulerProfile
wantErr string
}{
{
name: "valid",
cfg: config.KubeSchedulerProfile{
SchedulerName: "valid-profile",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind1"},
},
},
},
PluginConfig: []config.PluginConfig{
{
Name: "QueueSort",
Args: runtime.Unknown{Raw: []byte("{}")},
},
},
},
},
{
name: "invalid framework configuration",
cfg: config.KubeSchedulerProfile{
SchedulerName: "invalid-profile",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
},
},
wantErr: "at least one bind plugin is needed",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := fake.NewSimpleClientset()
b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1beta1().Events("")})
p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b))
if err := checkErr(err, tc.wantErr); err != nil {
t.Fatal(err)
}
if len(tc.wantErr) != 0 {
return
}
called := make(chan struct{})
var ctrl string
stopFn := b.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*v1beta1.Event)
ctrl = e.ReportingController
close(called)
})
p.Recorder.Eventf(&v1.Pod{}, nil, v1.EventTypeNormal, "", "", "")
<-called
stopFn()
if ctrl != tc.cfg.SchedulerName {
t.Errorf("got controller name %q in event, want %q", ctrl, tc.cfg.SchedulerName)
}
})
}
}
func TestNewMap(t *testing.T) {
cases := []struct {
name string
cfgs []config.KubeSchedulerProfile
wantErr string
}{
{
name: "valid",
cfgs: []config.KubeSchedulerProfile{
{
SchedulerName: "profile-1",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind1"},
},
},
},
},
{
SchedulerName: "profile-2",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind2"},
},
},
},
PluginConfig: []config.PluginConfig{
{
Name: "Bind2",
Args: runtime.Unknown{Raw: []byte("{}")},
},
},
},
},
},
{
name: "different queue sort",
cfgs: []config.KubeSchedulerProfile{
{
SchedulerName: "profile-1",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind1"},
},
},
},
},
{
SchedulerName: "profile-2",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Another"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind2"},
},
},
},
},
},
wantErr: "different queue sort plugins",
},
{
name: "different queue sort args",
cfgs: []config.KubeSchedulerProfile{
{
SchedulerName: "profile-1",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind1"},
},
},
},
PluginConfig: []config.PluginConfig{
{
Name: "QueueSort",
Args: runtime.Unknown{Raw: []byte("{}")},
},
},
},
{
SchedulerName: "profile-2",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind2"},
},
},
},
},
},
wantErr: "different queue sort plugin args",
},
{
name: "duplicate scheduler name",
cfgs: []config.KubeSchedulerProfile{
{
SchedulerName: "profile-1",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind1"},
},
},
},
},
{
SchedulerName: "profile-1",
Plugins: &config.Plugins{
QueueSort: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "QueueSort"},
},
},
Bind: &config.PluginSet{
Enabled: []config.Plugin{
{Name: "Bind2"},
},
},
},
},
},
wantErr: "duplicate profile",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
m, err := NewMap(tc.cfgs, fakeFrameworkFactory, nilRecorderFactory)
if err := checkErr(err, tc.wantErr); err != nil {
t.Fatal(err)
}
if len(tc.wantErr) != 0 {
return
}
if len(m) != len(tc.cfgs) {
t.Errorf("got %d profiles, want %d", len(m), len(tc.cfgs))
}
})
}
}
type fakePlugin struct{}
func (p *fakePlugin) Name() string {
return ""
}
func (p *fakePlugin) Less(*framework.PodInfo, *framework.PodInfo) bool {
return false
}
func (p *fakePlugin) Bind(context.Context, *framework.CycleState, *v1.Pod, string) *framework.Status {
return nil
}
func newFakePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &fakePlugin{}, nil
}
func fakeFrameworkFactory(cfg config.KubeSchedulerProfile) (framework.Framework, error) {
return framework.NewFramework(fakeRegistry, cfg.Plugins, cfg.PluginConfig)
}
func nilRecorderFactory(_ string) events.EventRecorder {
return nil
}
func checkErr(err error, wantErr string) error {
if len(wantErr) == 0 {
return err
}
if err == nil || !strings.Contains(err.Error(), wantErr) {
return fmt.Errorf("got error %q, want %q", err, wantErr)
}
return nil
}

View File

@ -33,7 +33,6 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
kubefeatures "k8s.io/kubernetes/pkg/features"
@ -45,6 +44,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -89,8 +89,6 @@ type Scheduler struct {
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
// the preemptor pod.
podPreemptor podPreemptor
// Framework runs scheduler plugins at configured extension points.
Framework framework.Framework
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -102,9 +100,6 @@ type Scheduler struct {
// question, and the error
Error func(*framework.PodInfo, error)
// Recorder is the EventRecorder to use
Recorder events.EventRecorder
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
@ -117,6 +112,9 @@ type Scheduler struct {
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
scheduledPodsHasSynced func() bool
}
@ -126,7 +124,6 @@ func (sched *Scheduler) Cache() internalcache.Cache {
}
type schedulerOptions struct {
schedulerName string
schedulerAlgorithmSource schedulerapi.SchedulerAlgorithmSource
disablePreemption bool
percentageOfNodesToScore int32
@ -135,18 +132,17 @@ type schedulerOptions struct {
podMaxBackoffSeconds int64
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry framework.Registry
// Plugins and PluginConfig set from ComponentConfig.
frameworkPlugins *schedulerapi.Plugins
frameworkPluginConfig []schedulerapi.PluginConfig
profiles []schedulerapi.KubeSchedulerProfile
}
// Option configures a Scheduler
type Option func(*schedulerOptions)
// WithName sets schedulerName for Scheduler, the default schedulerName is default-scheduler
func WithName(schedulerName string) Option {
// WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
return func(o *schedulerOptions) {
o.schedulerName = schedulerName
o.profiles = p
}
}
@ -186,20 +182,6 @@ func WithFrameworkOutOfTreeRegistry(registry framework.Registry) Option {
}
}
// WithFrameworkPlugins sets the plugins that the framework should be configured with.
func WithFrameworkPlugins(plugins *schedulerapi.Plugins) Option {
return func(o *schedulerOptions) {
o.frameworkPlugins = plugins
}
}
// WithFrameworkPluginConfig sets the PluginConfig slice that the framework should be configured with.
func WithFrameworkPluginConfig(pluginConfig []schedulerapi.PluginConfig) Option {
return func(o *schedulerOptions) {
o.frameworkPluginConfig = pluginConfig
}
}
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
@ -215,7 +197,10 @@ func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
}
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles' default plugins are set from the algorithm provider.
{SchedulerName: v1.DefaultSchedulerName},
},
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(),
},
@ -230,7 +215,7 @@ var defaultSchedulerOptions = schedulerOptions{
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorder events.EventRecorder,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
@ -264,6 +249,7 @@ func New(client clientset.Interface,
configurator := &Configurator{
client: client,
recorderFactory: recorderFactory,
informerFactory: informerFactory,
podInformer: podInformer,
volumeBinder: volumeBinder,
@ -275,9 +261,8 @@ func New(client clientset.Interface,
podInitialBackoffSeconds: options.podInitialBackoffSeconds,
podMaxBackoffSeconds: options.podMaxBackoffSeconds,
enableNonPreempting: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
profiles: append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
registry: registry,
plugins: options.frameworkPlugins,
pluginConfig: options.frameworkPluginConfig,
nodeInfoSnapshot: snapshot,
}
@ -315,14 +300,13 @@ func New(client clientset.Interface,
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
// Additional tweaks to the config produced by the configurator.
sched.Recorder = recorder
sched.DisablePreemption = options.disablePreemption
sched.StopEverything = stopEverything
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
sched.podPreemptor = &podPreemptorImpl{client}
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
}
@ -375,10 +359,10 @@ func (sched *Scheduler) Run(ctx context.Context) {
// recordFailedSchedulingEvent records an event for the pod that indicates the
// pod has failed to schedule.
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err error, reason string, message string) {
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.PodInfo, err error, reason string, message string) {
sched.Error(podInfo, err)
pod := podInfo.Pod
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
@ -392,14 +376,14 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) {
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
@ -426,10 +410,10 @@ func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
metrics.PreemptionVictims.Observe(float64(len(victims)))
@ -496,17 +480,17 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
start := time.Now()
defer func() {
sched.finishBinding(assumed, targetNode, start, err)
sched.finishBinding(prof, assumed, targetNode, start, err)
}()
bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return err
}
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode)
bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
if bindStatus.IsSuccess() {
return nil
}
@ -530,7 +514,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error)
return false, nil
}
func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start time.Time, err error) {
func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, targetNode string, start time.Time, err error) {
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil {
klog.Errorf("scheduler cache FinishBinding failed: %v", finErr)
}
@ -544,20 +528,25 @@ func (sched *Scheduler) finishBinding(assumed *v1.Pod, targetNode string, start
metrics.BindingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(start))
sched.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
prof.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
}
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk := sched.Framework
podInfo := sched.NextPod()
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo.Pod == nil {
return
}
pod := podInfo.Pod
if sched.skipPodSchedule(pod) {
prof, err := sched.profileForPod(pod)
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
klog.Error(err)
return
}
if sched.skipPodSchedule(prof, pod) {
return
}
@ -569,9 +558,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
if err != nil {
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
// Schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
@ -582,7 +571,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
@ -612,15 +601,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError,
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
metrics.PodScheduleErrors.Inc()
return
}
// Run "reserve" plugins.
if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
}
@ -633,15 +622,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
@ -655,8 +644,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// One of the plugins returned status different than success or wait.
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
return
}
@ -667,7 +656,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if waitOnPermitStatus.IsUnschedulable() {
@ -681,8 +670,8 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
return
}
@ -690,16 +679,16 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
sched.recordSchedulingFailure(prof, assumedPodInfo, err, "VolumeBindingFailed", err.Error())
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
}
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleErrors.Inc()
@ -708,18 +697,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
return
}
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
if err != nil {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
if klog.V(2) {
@ -731,16 +720,24 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
// Run "postbind" plugins.
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}
func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) {
prof, ok := sched.Profiles[pod.Spec.SchedulerName]
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return prof, nil
}
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
func (sched *Scheduler) skipPodSchedule(pod *v1.Pod) bool {
func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool {
// Case 1: pod is being deleted.
if pod.DeletionTimestamp != nil {
sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return true
}

View File

@ -56,6 +56,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -92,7 +93,8 @@ func podWithID(id, desiredHost string) *v1.Pod {
SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
},
Spec: v1.PodSpec{
NodeName: desiredHost,
NodeName: desiredHost,
SchedulerName: testSchedulerName,
},
}
}
@ -107,7 +109,8 @@ func deletingPod(id string) *v1.Pod {
SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id),
},
Spec: v1.PodSpec{
NodeName: "",
NodeName: "",
SchedulerName: testSchedulerName,
},
}
}
@ -133,24 +136,16 @@ type mockScheduler struct {
err error
}
func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
return es.result, es.err
}
func (es mockScheduler) Extenders() []core.SchedulerExtender {
return nil
}
func (es mockScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
}
func (es mockScheduler) Snapshot() error {
return nil
}
func (es mockScheduler) Framework() framework.Framework {
return nil
}
func TestSchedulerCreation(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
@ -163,7 +158,7 @@ func TestSchedulerCreation(t *testing.T) {
_, err := New(client,
informerFactory,
NewPodInformer(client, 0),
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
profile.NewRecorderFactory(eventBroadcaster),
stopCh,
WithPodInitialBackoffSeconds(1),
WithPodMaxBackoffSeconds(10),
@ -187,7 +182,7 @@ func TestSchedulerCreation(t *testing.T) {
_, err = New(client,
informerFactory,
NewPodInformer(client, 0),
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
profile.NewRecorderFactory(eventBroadcaster),
stopCh,
WithPodInitialBackoffSeconds(1),
WithPodMaxBackoffSeconds(10),
@ -308,8 +303,12 @@ func TestSchedulerScheduleOne(t *testing.T) {
NextPod: func() *framework.PodInfo {
return &framework.PodInfo{Pod: item.sendPod}
},
Framework: fwk,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
Profiles: profile.Map{
testSchedulerName: &profile.Profile{
Framework: fwk,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName),
},
},
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
called := make(chan struct{})
@ -598,7 +597,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, recorder events.EventRecorder, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
if fakeVolumeBinder == nil {
// Create default volume binder if it didn't set.
fakeVolumeBinder = volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true})
@ -615,13 +614,22 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
})
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(fakeVolumeBinder))
prof := &profile.Profile{
Framework: fwk,
Recorder: &events.FakeRecorder{},
}
if broadcaster != nil {
prof.Recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
}
profiles := profile.Map{
testSchedulerName: prof,
}
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil),
internalcache.NewEmptySnapshot(),
fwk,
[]core.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
@ -639,17 +647,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Error: func(p *framework.PodInfo, err error) {
errChan <- err
},
Recorder: &events.FakeRecorder{},
Profiles: profiles,
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
Framework: fwk,
VolumeBinder: fakeVolumeBinder,
}
if recorder != nil {
sched.Recorder = recorder
}
return sched, bindingChan, errChan
}
@ -667,13 +670,12 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
informerFactory := informers.NewSharedInformerFactory(client, 0)
recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, recorder, fakeVolumeBinder, fns...)
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fakeVolumeBinder, fns...)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
s.VolumeBinder = fakeVolumeBinder
@ -987,6 +989,10 @@ func TestSchedulerBinding(t *testing.T) {
if err != nil {
t.Fatal(err)
}
prof := &profile.Profile{
Framework: fwk,
Recorder: &events.FakeRecorder{},
}
stop := make(chan struct{})
defer close(stop)
scache := internalcache.New(100*time.Millisecond, stop)
@ -994,22 +1000,18 @@ func TestSchedulerBinding(t *testing.T) {
scache,
nil,
nil,
fwk,
test.extenders,
nil,
nil,
nil,
false,
0,
false,
)
sched := Scheduler{
Algorithm: algo,
Framework: fwk,
Recorder: &events.FakeRecorder{},
SchedulerCache: scache,
}
err = sched.bind(context.Background(), pod, "node", nil)
err = sched.bind(context.Background(), prof, pod, "node", nil)
if err != nil {
t.Error(err)
}

View File

@ -14,12 +14,12 @@ go_test(
],
tags = ["integration"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/daemon:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/util/labels:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -40,12 +40,12 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/test/integration/framework"
)
@ -92,10 +92,7 @@ func setupScheduler(
cs,
informerFactory,
informerFactory.Core().V1().Pods(),
eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
),
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
)
if err != nil {

View File

@ -23,7 +23,6 @@ go_test(
embed = [":go_default_library"],
tags = ["integration"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/scheduling:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
@ -34,6 +33,7 @@ go_test(
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//plugin/pkg/admission/defaulttolerationseconds:go_default_library",
"//plugin/pkg/admission/podtolerationrestriction:go_default_library",
@ -85,13 +85,13 @@ go_library(
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/test/integration/scheduler",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller/disruption:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/apis/config/scheme:go_default_library",
"//pkg/scheduler/apis/config/v1:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//pkg/util/taints:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@ -470,11 +470,12 @@ func TestPreFilterPlugin(t *testing.T) {
registry := framework.Registry{prefilterPluginName: newPlugin(preFilterPlugin)}
// Setup initial prefilter plugin for testing.
plugins := &schedulerconfig.Plugins{
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: prefilterPluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: prefilterPluginName},
},
},
},
@ -482,7 +483,7 @@ func TestPreFilterPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prefilter-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -541,19 +542,19 @@ func TestScorePlugin(t *testing.T) {
scorePluginName: newPlugin(scorePlugin),
}
// Setup initial score plugin for testing.
plugins := &schedulerconfig.Plugins{
Score: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: scorePluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Score: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: scorePluginName},
},
},
},
}
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -601,17 +602,18 @@ func TestNormalizeScorePlugin(t *testing.T) {
}
// Setup initial score plugin for testing.
plugins := &schedulerconfig.Plugins{
Score: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: scoreWithNormalizePluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Score: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: scoreWithNormalizePluginName},
},
},
},
}
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "score-plugin", nil), 10,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -644,11 +646,14 @@ func TestReservePlugin(t *testing.T) {
registry := framework.Registry{reservePluginName: newPlugin(reservePlugin)}
// Setup initial reserve plugin for testing.
plugins := &schedulerconfig.Plugins{
Reserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: reservePluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Reserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: reservePluginName,
},
},
},
},
@ -656,7 +661,7 @@ func TestReservePlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "reserve-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -696,11 +701,14 @@ func TestPrebindPlugin(t *testing.T) {
registry := framework.Registry{preBindPluginName: newPlugin(preBindPlugin)}
// Setup initial prebind plugin for testing.
plugins := &schedulerconfig.Plugins{
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
},
},
},
},
@ -708,7 +716,7 @@ func TestPrebindPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "prebind-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -772,18 +780,21 @@ func TestUnreservePlugin(t *testing.T) {
}
// Setup initial unreserve and prebind plugin for testing.
plugins := &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: unreservePluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: unreservePluginName,
},
},
},
},
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
},
},
},
},
@ -791,7 +802,7 @@ func TestUnreservePlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "unreserve-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -867,23 +878,26 @@ func TestBindPlugin(t *testing.T) {
}
// Setup initial unreserve and bind plugins for testing.
plugins := &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}},
},
Bind: &schedulerconfig.PluginSet{
// Put DefaultBinder last.
Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}},
Disabled: []schedulerconfig.Plugin{{Name: defaultbinder.Name}},
},
PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}},
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Unreserve: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: unreservePlugin.Name()}},
},
Bind: &schedulerconfig.PluginSet{
// Put DefaultBinder last.
Enabled: []schedulerconfig.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}},
Disabled: []schedulerconfig.Plugin{{Name: defaultbinder.Name}},
},
PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{{Name: postBindPlugin.Name()}},
},
},
}
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerWithOptions(t, testContext, false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1024,18 +1038,21 @@ func TestPostBindPlugin(t *testing.T) {
}
// Setup initial prebind and postbind plugin for testing.
plugins := &schedulerconfig.Plugins{
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
PreBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preBindPluginName,
},
},
},
},
PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: postBindPluginName,
PostBind: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: postBindPluginName,
},
},
},
},
@ -1043,7 +1060,7 @@ func TestPostBindPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "postbind-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1095,11 +1112,11 @@ func TestPostBindPlugin(t *testing.T) {
func TestPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
perPlugin := &PermitPlugin{name: permitPluginName}
registry, plugins := initRegistryAndConfig(perPlugin)
registry, prof := initRegistryAndConfig(perPlugin)
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1183,11 +1200,11 @@ func TestMultiplePermitPlugins(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
registry, prof := initRegistryAndConfig(perPlugin1, perPlugin2)
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1238,11 +1255,11 @@ func TestPermitPluginsCancelled(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
registry, prof := initRegistryAndConfig(perPlugin1, perPlugin2)
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1279,11 +1296,11 @@ func TestPermitPluginsCancelled(t *testing.T) {
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{name: permitPluginName}
registry, plugins := initRegistryAndConfig(permitPlugin)
registry, prof := initRegistryAndConfig(permitPlugin)
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1352,11 +1369,14 @@ func TestFilterPlugin(t *testing.T) {
registry := framework.Registry{filterPluginName: newPlugin(filterPlugin)}
// Setup initial filter plugin for testing.
plugins := &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
},
},
},
},
@ -1364,7 +1384,7 @@ func TestFilterPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "filter-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1403,11 +1423,14 @@ func TestPreScorePlugin(t *testing.T) {
registry := framework.Registry{preScorePluginName: newPlugin(preScorePlugin)}
// Setup initial pre-score plugin for testing.
plugins := &schedulerconfig.Plugins{
PreScore: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preScorePluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
PreScore: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: preScorePluginName,
},
},
},
},
@ -1415,7 +1438,7 @@ func TestPreScorePlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "pre-score-plugin", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1451,11 +1474,11 @@ func TestPreScorePlugin(t *testing.T) {
func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{}
registry, plugins := initRegistryAndConfig(permitPlugin)
registry, prof := initRegistryAndConfig(permitPlugin)
// Create the master and the scheduler with the test plugin set.
testCtx := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)
@ -1532,19 +1555,23 @@ func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testContext, nodeC
// initRegistryAndConfig returns registry and plugins config based on give plugins.
// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments
func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) {
func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, prof schedulerconfig.KubeSchedulerProfile) {
if len(pp) == 0 {
return
}
registry = framework.Registry{}
plugins = &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{},
}
var plugins []schedulerconfig.Plugin
for _, p := range pp {
registry.Register(p.Name(), newPermitPlugin(p))
plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()})
plugins = append(plugins, schedulerconfig.Plugin{Name: p.Name()})
}
prof.SchedulerName = v1.DefaultSchedulerName
prof.Plugins = &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: plugins,
},
}
return
}

View File

@ -128,18 +128,17 @@ func TestPreemption(t *testing.T) {
if err != nil {
t.Fatalf("Error registering a filter: %v", err)
}
plugins := &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
prof := schedulerconfig.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: filterPluginName},
},
},
},
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPluginName,
PreFilter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{Name: filterPluginName},
},
},
},
@ -147,7 +146,7 @@ func TestPreemption(t *testing.T) {
testCtx := initTestSchedulerWithOptions(t,
initTestMaster(t, "preemptiom", nil),
false, nil, time.Second,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
defer cleanupTest(t, testCtx)

View File

@ -37,9 +37,9 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/scheduler"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/test/integration/framework"
)
@ -270,9 +270,8 @@ priorities: []
sched, err := scheduler.New(clientSet,
informerFactory,
scheduler.NewPodInformer(clientSet, 0),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName),
profile.NewRecorderFactory(eventBroadcaster),
nil,
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
@ -287,7 +286,7 @@ priorities: []
t.Fatalf("couldn't make scheduler config for test %d: %v", i, err)
}
schedPlugins := sched.Framework.ListPlugins()
schedPlugins := sched.Profiles[v1.DefaultSchedulerName].ListPlugins()
if diff := cmp.Diff(test.expectedPlugins, schedPlugins); diff != "" {
t.Errorf("unexpected plugins diff (-want, +got): %s", diff)
}
@ -317,9 +316,8 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) {
_, err := scheduler.New(clientSet,
informerFactory,
scheduler.NewPodInformer(clientSet, 0),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName),
profile.NewRecorderFactory(eventBroadcaster),
nil,
scheduler.WithName(v1.DefaultSchedulerName),
scheduler.WithAlgorithmSource(kubeschedulerconfig.SchedulerAlgorithmSource{
Policy: &kubeschedulerconfig.SchedulerPolicySource{
ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{
@ -461,7 +459,7 @@ func TestUnschedulableNodes(t *testing.T) {
}
}
func TestMultiScheduler(t *testing.T) {
func TestMultipleSchedulers(t *testing.T) {
// This integration tests the multi-scheduler feature in the following way:
// 1. create a default scheduler
// 2. create a node
@ -538,7 +536,8 @@ func TestMultiScheduler(t *testing.T) {
}
// 5. create and start a scheduler with name "foo-scheduler"
testCtx = initTestSchedulerWithOptions(t, testCtx, true, nil, time.Second, scheduler.WithName(fooScheduler))
fooProf := kubeschedulerconfig.KubeSchedulerProfile{SchedulerName: fooScheduler}
testCtx = initTestSchedulerWithOptions(t, testCtx, true, nil, time.Second, scheduler.WithProfiles(fooProf))
// 6. **check point-2**:
// - testPodWithAnnotationFitsFoo should be scheduled

View File

@ -46,13 +46,13 @@ import (
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
schedulerapiv1 "k8s.io/kubernetes/pkg/scheduler/apis/config/v1"
"k8s.io/kubernetes/pkg/scheduler/profile"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/test/integration/framework"
imageutils "k8s.io/kubernetes/test/utils/image"
@ -174,10 +174,6 @@ func initTestSchedulerWithOptions(
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: testCtx.clientSet.EventsV1beta1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
if policy != nil {
opts = append(opts, scheduler.WithAlgorithmSource(createAlgorithmSourceFromPolicy(policy, testCtx.clientSet)))
}
@ -186,7 +182,7 @@ func initTestSchedulerWithOptions(
testCtx.clientSet,
testCtx.informerFactory,
podInformer,
recorder,
profile.NewRecorderFactory(eventBroadcaster),
testCtx.ctx.Done(),
opts...,
)

View File

@ -13,9 +13,9 @@ go_library(
],
importpath = "k8s.io/kubernetes/test/integration/util",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",

View File

@ -29,9 +29,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/legacyscheme"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/test/integration/framework"
)
@ -69,12 +69,12 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein
evtBroadcaster.StartRecordingToSink(ctx.Done())
recorder := evtBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
sched, err := createScheduler(clientSet, informerFactory, podInformer, recorder, ctx.Done())
sched, err := scheduler.New(
clientSet,
informerFactory,
podInformer,
profile.NewRecorderFactory(evtBroadcaster),
ctx.Done())
if err != nil {
klog.Fatalf("Error creating scheduler: %v", err)
}
@ -131,20 +131,3 @@ func StartFakePVController(clientSet clientset.Interface) ShutdownFunc {
informerFactory.Start(ctx.Done())
return ShutdownFunc(cancel)
}
// createScheduler create a scheduler with given informer factory and default name.
func createScheduler(
clientSet clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorder events.EventRecorder,
stopCh <-chan struct{},
) (*scheduler.Scheduler, error) {
return scheduler.New(
clientSet,
informerFactory,
podInformer,
recorder,
stopCh,
)
}

View File

@ -54,9 +54,9 @@ go_library(
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/test/integration/volumescheduling",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/profile:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
@ -64,7 +64,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",

View File

@ -30,13 +30,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/test/integration/framework"
)
@ -108,14 +107,14 @@ func initTestSchedulerWithOptions(
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: testCtx.clientSet.EventsV1beta1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
var err error
testCtx.scheduler, err = createSchedulerWithPodInformer(
testCtx.clientSet, podInformer, testCtx.informerFactory, recorder, testCtx.ctx.Done())
testCtx.scheduler, err = scheduler.New(
testCtx.clientSet,
testCtx.informerFactory,
podInformer,
profile.NewRecorderFactory(eventBroadcaster),
testCtx.ctx.Done())
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
@ -130,23 +129,6 @@ func initTestSchedulerWithOptions(
return testCtx
}
// createSchedulerWithPodInformer creates a new scheduler.
func createSchedulerWithPodInformer(
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
recorder events.EventRecorder,
stopCh <-chan struct{},
) (*scheduler.Scheduler, error) {
return scheduler.New(
clientSet,
informerFactory,
podInformer,
recorder,
stopCh,
)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
// at the end of a test.
func cleanupTest(t *testing.T, testCtx *testContext) {