scheduler: make Profile an interface.
This commit is contained in:
		| @@ -1348,7 +1348,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { | ||||
| 			} | ||||
|  | ||||
| 			defProf := sched.Profiles["default-scheduler"] | ||||
| 			gotPlugins := defProf.Framework.ListPlugins() | ||||
| 			gotPlugins := defProf.ListPlugins() | ||||
| 			if diff := cmp.Diff(tc.wantPlugins, gotPlugins); diff != "" { | ||||
| 				t.Errorf("unexpected plugins diff (-want, +got): %s", diff) | ||||
| 			} | ||||
|   | ||||
| @@ -15,7 +15,6 @@ go_library( | ||||
|         "//pkg/scheduler/internal/cache:go_default_library", | ||||
|         "//pkg/scheduler/internal/parallelize:go_default_library", | ||||
|         "//pkg/scheduler/metrics:go_default_library", | ||||
|         "//pkg/scheduler/profile:go_default_library", | ||||
|         "//pkg/scheduler/util:go_default_library", | ||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", | ||||
| @@ -48,7 +47,6 @@ go_test( | ||||
|         "//pkg/scheduler/framework/runtime:go_default_library", | ||||
|         "//pkg/scheduler/internal/cache:go_default_library", | ||||
|         "//pkg/scheduler/internal/queue:go_default_library", | ||||
|         "//pkg/scheduler/profile:go_default_library", | ||||
|         "//pkg/scheduler/testing:go_default_library", | ||||
|         "//pkg/scheduler/util:go_default_library", | ||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||
|   | ||||
| @@ -36,7 +36,6 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework/runtime" | ||||
| 	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" | ||||
| 	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/profile" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| ) | ||||
|  | ||||
| @@ -278,9 +277,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 			prof := &profile.Profile{ | ||||
| 				Framework: fwk, | ||||
| 			} | ||||
|  | ||||
| 			scheduler := NewGenericScheduler( | ||||
| 				cache, | ||||
| @@ -288,7 +284,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { | ||||
| 				extenders, | ||||
| 				schedulerapi.DefaultPercentageOfNodesToScore) | ||||
| 			podIgnored := &v1.Pod{} | ||||
| 			result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored) | ||||
| 			result, err := scheduler.Schedule(context.Background(), fwk, framework.NewCycleState(), podIgnored) | ||||
| 			if test.expectsErr { | ||||
| 				if err == nil { | ||||
| 					t.Errorf("Unexpected non-error, result %+v", result) | ||||
|   | ||||
| @@ -36,7 +36,6 @@ import ( | ||||
| 	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/internal/parallelize" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/metrics" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/profile" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/util" | ||||
| 	utiltrace "k8s.io/utils/trace" | ||||
| ) | ||||
| @@ -94,7 +93,7 @@ func (f *FitError) Error() string { | ||||
| // onto machines. | ||||
| // TODO: Rename this type. | ||||
| type ScheduleAlgorithm interface { | ||||
| 	Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) | ||||
| 	Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) | ||||
| 	// Extenders returns a slice of extender config. This is exposed for | ||||
| 	// testing. | ||||
| 	Extenders() []framework.Extender | ||||
| @@ -129,7 +128,7 @@ func (g *genericScheduler) snapshot() error { | ||||
| // Schedule tries to schedule the given pod to one of the nodes in the node list. | ||||
| // If it succeeds, it will return the name of the node. | ||||
| // If it fails, it will return a FitError error with reasons. | ||||
| func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { | ||||
| func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { | ||||
| 	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) | ||||
| 	defer trace.LogIfLong(100 * time.Millisecond) | ||||
|  | ||||
| @@ -142,7 +141,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, | ||||
| 		return result, ErrNoNodesAvailable | ||||
| 	} | ||||
|  | ||||
| 	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) | ||||
| 	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, fwk, state, pod) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
| @@ -165,7 +164,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, | ||||
| 		}, nil | ||||
| 	} | ||||
|  | ||||
| 	priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) | ||||
| 	priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
| @@ -235,11 +234,11 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i | ||||
|  | ||||
| // Filters the nodes to find the ones that fit the pod based on the framework | ||||
| // filter plugins and filter extenders. | ||||
| func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { | ||||
| func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { | ||||
| 	filteredNodesStatuses := make(framework.NodeToStatusMap) | ||||
|  | ||||
| 	// Run "prefilter" plugins. | ||||
| 	s := prof.RunPreFilterPlugins(ctx, state, pod) | ||||
| 	s := fwk.RunPreFilterPlugins(ctx, state, pod) | ||||
| 	if !s.IsSuccess() { | ||||
| 		if !s.IsUnschedulable() { | ||||
| 			return nil, nil, s.AsError() | ||||
| @@ -256,7 +255,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil | ||||
| 		return nil, filteredNodesStatuses, nil | ||||
| 	} | ||||
|  | ||||
| 	feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) | ||||
| 	feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, filteredNodesStatuses) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| 	} | ||||
| @@ -269,7 +268,7 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil | ||||
| } | ||||
|  | ||||
| // findNodesThatPassFilters finds the nodes that fit the filter plugins. | ||||
| func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { | ||||
| func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { | ||||
| 	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -281,7 +280,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p | ||||
| 	// and allow assigning. | ||||
| 	feasibleNodes := make([]*v1.Node, numNodesToFind) | ||||
|  | ||||
| 	if !prof.HasFilterPlugins() { | ||||
| 	if !fwk.HasFilterPlugins() { | ||||
| 		length := len(allNodes) | ||||
| 		for i := range feasibleNodes { | ||||
| 			feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node() | ||||
| @@ -298,7 +297,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p | ||||
| 		// We check the nodes starting from where we left off in the previous scheduling cycle, | ||||
| 		// this is to make sure all nodes have the same chance of being examined across pods. | ||||
| 		nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] | ||||
| 		fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo) | ||||
| 		fits, status, err := PodPassesFiltersOnNode(ctx, fwk.PreemptHandle(), state, pod, nodeInfo) | ||||
| 		if err != nil { | ||||
| 			errCh.SendErrorWithCancel(err, cancel) | ||||
| 			return | ||||
| @@ -326,7 +325,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p | ||||
| 		// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins | ||||
| 		// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle. | ||||
| 		// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod. | ||||
| 		metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), prof.Name).Observe(metrics.SinceInSeconds(beginCheckNode)) | ||||
| 		metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode)) | ||||
| 	}() | ||||
|  | ||||
| 	// Stops searching for more nodes once the configured number of feasible nodes | ||||
| @@ -470,14 +469,14 @@ func PodPassesFiltersOnNode( | ||||
| // All scores are finally combined (added) to get the total weighted scores of all nodes | ||||
| func (g *genericScheduler) prioritizeNodes( | ||||
| 	ctx context.Context, | ||||
| 	prof *profile.Profile, | ||||
| 	fwk framework.Framework, | ||||
| 	state *framework.CycleState, | ||||
| 	pod *v1.Pod, | ||||
| 	nodes []*v1.Node, | ||||
| ) (framework.NodeScoreList, error) { | ||||
| 	// If no priority configs are provided, then all nodes will have a score of one. | ||||
| 	// This is required to generate the priority list in the required format | ||||
| 	if len(g.extenders) == 0 && !prof.HasScorePlugins() { | ||||
| 	if len(g.extenders) == 0 && !fwk.HasScorePlugins() { | ||||
| 		result := make(framework.NodeScoreList, 0, len(nodes)) | ||||
| 		for i := range nodes { | ||||
| 			result = append(result, framework.NodeScore{ | ||||
| @@ -489,13 +488,13 @@ func (g *genericScheduler) prioritizeNodes( | ||||
| 	} | ||||
|  | ||||
| 	// Run PreScore plugins. | ||||
| 	preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes) | ||||
| 	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes) | ||||
| 	if !preScoreStatus.IsSuccess() { | ||||
| 		return nil, preScoreStatus.AsError() | ||||
| 	} | ||||
|  | ||||
| 	// Run the Score plugins. | ||||
| 	scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) | ||||
| 	scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) | ||||
| 	if !scoreStatus.IsSuccess() { | ||||
| 		return nil, scoreStatus.AsError() | ||||
| 	} | ||||
|   | ||||
| @@ -47,7 +47,6 @@ import ( | ||||
| 	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" | ||||
| 	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" | ||||
| 	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/profile" | ||||
| 	st "k8s.io/kubernetes/pkg/scheduler/testing" | ||||
| 	schedutil "k8s.io/kubernetes/pkg/scheduler/util" | ||||
| ) | ||||
| @@ -759,9 +758,6 @@ func TestGenericScheduler(t *testing.T) { | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 			prof := &profile.Profile{ | ||||
| 				Framework: fwk, | ||||
| 			} | ||||
|  | ||||
| 			scheduler := NewGenericScheduler( | ||||
| 				cache, | ||||
| @@ -772,7 +768,7 @@ func TestGenericScheduler(t *testing.T) { | ||||
| 			informerFactory.Start(ctx.Done()) | ||||
| 			informerFactory.WaitForCacheSync(ctx.Done()) | ||||
|  | ||||
| 			result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod) | ||||
| 			result, err := scheduler.Schedule(ctx, fwk, framework.NewCycleState(), test.pod) | ||||
| 			if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) { | ||||
| 				t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr) | ||||
| 			} | ||||
| @@ -817,9 +813,8 @@ func TestFindFitAllError(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	prof := &profile.Profile{Framework: fwk} | ||||
|  | ||||
| 	_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) | ||||
| 	_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| @@ -858,10 +853,9 @@ func TestFindFitSomeError(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	prof := &profile.Profile{Framework: fwk} | ||||
|  | ||||
| 	pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}} | ||||
| 	_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), pod) | ||||
| 	_, nodeToStatusMap, err := scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), pod) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| @@ -928,7 +922,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) { | ||||
| 		if err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		prof := &profile.Profile{Framework: fwk} | ||||
|  | ||||
| 		scheduler := makeScheduler(nodes) | ||||
| 		if err := scheduler.cache.UpdateSnapshot(scheduler.nodeInfoSnapshot); err != nil { | ||||
| @@ -936,7 +929,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) { | ||||
| 		} | ||||
| 		fwk.PreemptHandle().AddNominatedPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "nominated"}, Spec: v1.PodSpec{Priority: &midPriority}}, "1") | ||||
|  | ||||
| 		_, _, err = scheduler.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), test.pod) | ||||
| 		_, _, err = scheduler.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), test.pod) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| @@ -1084,7 +1077,6 @@ func TestZeroRequest(t *testing.T) { | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("error creating framework: %+v", err) | ||||
| 			} | ||||
| 			prof := &profile.Profile{Framework: fwk} | ||||
|  | ||||
| 			scheduler := NewGenericScheduler( | ||||
| 				nil, | ||||
| @@ -1095,12 +1087,12 @@ func TestZeroRequest(t *testing.T) { | ||||
|  | ||||
| 			ctx := context.Background() | ||||
| 			state := framework.NewCycleState() | ||||
| 			_, _, err = scheduler.findNodesThatFitPod(ctx, prof, state, test.pod) | ||||
| 			_, _, err = scheduler.findNodesThatFitPod(ctx, fwk, state, test.pod) | ||||
| 			if err != nil { | ||||
| 				t.Fatalf("error filtering nodes: %+v", err) | ||||
| 			} | ||||
| 			prof.RunPreScorePlugins(ctx, state, test.pod, test.nodes) | ||||
| 			list, err := scheduler.prioritizeNodes(ctx, prof, state, test.pod, test.nodes) | ||||
| 			fwk.RunPreScorePlugins(ctx, state, test.pod, test.nodes) | ||||
| 			list, err := scheduler.prioritizeNodes(ctx, fwk, state, test.pod, test.nodes) | ||||
| 			if err != nil { | ||||
| 				t.Errorf("unexpected error: %v", err) | ||||
| 			} | ||||
| @@ -1187,14 +1179,14 @@ func TestFairEvaluationForNodes(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	prof := &profile.Profile{Framework: fwk} | ||||
|  | ||||
| 	// To make numAllNodes % nodesToFind != 0 | ||||
| 	g.percentageOfNodesToScore = 30 | ||||
| 	nodesToFind := int(g.numFeasibleNodesToFind(int32(numAllNodes))) | ||||
|  | ||||
| 	// Iterating over all nodes more than twice | ||||
| 	for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ { | ||||
| 		nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), prof, framework.NewCycleState(), &v1.Pod{}) | ||||
| 		nodesThatFit, _, err := g.findNodesThatFitPod(context.Background(), fwk, framework.NewCycleState(), &v1.Pod{}) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("unexpected error: %v", err) | ||||
| 		} | ||||
|   | ||||
| @@ -205,14 +205,14 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { | ||||
| 	if err := sched.SchedulingQueue.Delete(pod); err != nil { | ||||
| 		utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) | ||||
| 	} | ||||
| 	prof, err := sched.profileForPod(pod) | ||||
| 	fwk, err := sched.frameworkForPod(pod) | ||||
| 	if err != nil { | ||||
| 		// This shouldn't happen, because we only accept for scheduling the pods | ||||
| 		// which specify a scheduler name that matches one of the profiles. | ||||
| 		klog.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	prof.Framework.RejectWaitingPod(pod.UID) | ||||
| 	fwk.RejectWaitingPod(pod.UID) | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) addPodToCache(obj interface{}) { | ||||
|   | ||||
| @@ -158,7 +158,7 @@ func (c *Configurator) create() (*Scheduler, error) { | ||||
| 		return nil, errors.New("at least one profile is required") | ||||
| 	} | ||||
| 	// Profiles are required to have equivalent queue sort plugins. | ||||
| 	lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc() | ||||
| 	lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc() | ||||
| 	podQueue := internalqueue.NewSchedulingQueue( | ||||
| 		lessFn, | ||||
| 		internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), | ||||
|   | ||||
| @@ -504,6 +504,9 @@ type Framework interface { | ||||
|  | ||||
| 	// ListPlugins returns a map of extension point name to list of configured Plugins. | ||||
| 	ListPlugins() map[string][]config.Plugin | ||||
|  | ||||
| 	// ProfileName returns the profile name associated to this framework. | ||||
| 	ProfileName() string | ||||
| } | ||||
|  | ||||
| // Handle provides data and some tools that plugins can use. It is | ||||
|   | ||||
| @@ -1027,3 +1027,8 @@ func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config | ||||
| func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle { | ||||
| 	return f.preemptHandle | ||||
| } | ||||
|  | ||||
| // ProfileName returns the profile name associated to this framework. | ||||
| func (f *frameworkImpl) ProfileName() string { | ||||
| 	return f.profileName | ||||
| } | ||||
|   | ||||
| @@ -39,9 +39,7 @@ go_test( | ||||
|         "//pkg/scheduler/framework:go_default_library", | ||||
|         "//pkg/scheduler/framework/runtime:go_default_library", | ||||
|         "//staging/src/k8s.io/api/core/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/api/events/v1:go_default_library", | ||||
|         "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", | ||||
|         "//staging/src/k8s.io/client-go/tools/events:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -36,33 +36,22 @@ type RecorderFactory func(string) events.EventRecorder | ||||
| // FrameworkFactory builds a Framework for a given profile configuration. | ||||
| type FrameworkFactory func(config.KubeSchedulerProfile, ...frameworkruntime.Option) (framework.Framework, error) | ||||
|  | ||||
| // Profile is a scheduling profile. | ||||
| type Profile struct { | ||||
| 	framework.Framework | ||||
| 	Recorder events.EventRecorder | ||||
| 	Name     string | ||||
| } | ||||
|  | ||||
| // NewProfile builds a Profile for the given configuration. | ||||
| func NewProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, | ||||
| 	opts ...frameworkruntime.Option) (*Profile, error) { | ||||
| // newProfile builds a Profile for the given configuration. | ||||
| func newProfile(cfg config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, | ||||
| 	opts ...frameworkruntime.Option) (framework.Framework, error) { | ||||
| 	recorder := recorderFact(cfg.SchedulerName) | ||||
| 	opts = append(opts, frameworkruntime.WithEventRecorder(recorder), frameworkruntime.WithProfileName(cfg.SchedulerName)) | ||||
| 	fwk, err := frameworkFact(cfg, opts...) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &Profile{ | ||||
| 		Name:      cfg.SchedulerName, | ||||
| 		Framework: fwk, | ||||
| 		Recorder:  recorder, | ||||
| 	}, nil | ||||
| 	return fwk, nil | ||||
| } | ||||
|  | ||||
| // Map holds profiles indexed by scheduler name. | ||||
| type Map map[string]*Profile | ||||
| // Map holds frameworks indexed by scheduler name. | ||||
| type Map map[string]framework.Framework | ||||
|  | ||||
| // NewMap builds the profiles given by the configuration, indexed by name. | ||||
| // NewMap builds the frameworks given by the configuration, indexed by name. | ||||
| func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, recorderFact RecorderFactory, | ||||
| 	opts ...frameworkruntime.Option) (Map, error) { | ||||
| 	m := make(Map) | ||||
| @@ -72,7 +61,7 @@ func NewMap(cfgs []config.KubeSchedulerProfile, frameworkFact FrameworkFactory, | ||||
| 		if err := v.validate(cfg); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		p, err := NewProfile(cfg, frameworkFact, recorderFact, opts...) | ||||
| 		p, err := newProfile(cfg, frameworkFact, recorderFact, opts...) | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err) | ||||
| 		} | ||||
|   | ||||
| @@ -23,9 +23,7 @@ import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	v1 "k8s.io/api/core/v1" | ||||
| 	eventsv1 "k8s.io/api/events/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	"k8s.io/client-go/tools/events" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/apis/config" | ||||
| 	"k8s.io/kubernetes/pkg/scheduler/framework" | ||||
| @@ -39,94 +37,6 @@ var fakeRegistry = frameworkruntime.Registry{ | ||||
| 	"Another":   newFakePlugin, | ||||
| } | ||||
|  | ||||
| func TestNewProfile(t *testing.T) { | ||||
| 	cases := []struct { | ||||
| 		name    string | ||||
| 		cfg     config.KubeSchedulerProfile | ||||
| 		wantErr string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name: "valid", | ||||
| 			cfg: config.KubeSchedulerProfile{ | ||||
| 				SchedulerName: "valid-profile", | ||||
| 				Plugins: &config.Plugins{ | ||||
| 					QueueSort: &config.PluginSet{ | ||||
| 						Enabled: []config.Plugin{ | ||||
| 							{Name: "QueueSort"}, | ||||
| 						}, | ||||
| 					}, | ||||
| 					Bind: &config.PluginSet{ | ||||
| 						Enabled: []config.Plugin{ | ||||
| 							{Name: "Bind1"}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 				PluginConfig: []config.PluginConfig{ | ||||
| 					{ | ||||
| 						Name: "QueueSort", | ||||
| 						Args: &runtime.Unknown{Raw: []byte("{}")}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "invalid framework configuration", | ||||
| 			cfg: config.KubeSchedulerProfile{ | ||||
| 				SchedulerName: "invalid-profile", | ||||
| 				Plugins: &config.Plugins{ | ||||
| 					QueueSort: &config.PluginSet{ | ||||
| 						Enabled: []config.Plugin{ | ||||
| 							{Name: "QueueSort"}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			wantErr: "at least one bind plugin is needed", | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "one queue sort plugin required for profile", | ||||
| 			cfg: config.KubeSchedulerProfile{ | ||||
| 				SchedulerName: "profile-1", | ||||
| 				Plugins: &config.Plugins{ | ||||
| 					Bind: &config.PluginSet{ | ||||
| 						Enabled: []config.Plugin{ | ||||
| 							{Name: "Bind1"}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			wantErr: "no queue sort plugin is enabled", | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tc := range cases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			c := fake.NewSimpleClientset() | ||||
| 			b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1()}) | ||||
| 			p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b)) | ||||
| 			if err := checkErr(err, tc.wantErr); err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 			if len(tc.wantErr) != 0 { | ||||
| 				return | ||||
| 			} | ||||
|  | ||||
| 			called := make(chan struct{}) | ||||
| 			var ctrl string | ||||
| 			stopFn := b.StartEventWatcher(func(obj runtime.Object) { | ||||
| 				e, _ := obj.(*eventsv1.Event) | ||||
| 				ctrl = e.ReportingController | ||||
| 				close(called) | ||||
| 			}) | ||||
| 			p.Recorder.Eventf(&v1.Pod{}, nil, v1.EventTypeNormal, "", "", "") | ||||
| 			<-called | ||||
| 			stopFn() | ||||
| 			if ctrl != tc.cfg.SchedulerName { | ||||
| 				t.Errorf("got controller name %q in event, want %q", ctrl, tc.cfg.SchedulerName) | ||||
| 			} | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestNewMap(t *testing.T) { | ||||
| 	cases := []struct { | ||||
| 		name    string | ||||
| @@ -317,6 +227,22 @@ func TestNewMap(t *testing.T) { | ||||
| 			}, | ||||
| 			wantErr: "plugins required for profile", | ||||
| 		}, | ||||
| 		{ | ||||
| 			name: "invalid framework configuration", | ||||
| 			cfgs: []config.KubeSchedulerProfile{ | ||||
| 				{ | ||||
| 					SchedulerName: "invalid-profile", | ||||
| 					Plugins: &config.Plugins{ | ||||
| 						QueueSort: &config.PluginSet{ | ||||
| 							Enabled: []config.Plugin{ | ||||
| 								{Name: "QueueSort"}, | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			wantErr: "at least one bind plugin is needed", | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tc := range cases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
|   | ||||
| @@ -307,7 +307,7 @@ func (sched *Scheduler) Run(ctx context.Context) { | ||||
|  | ||||
| // recordSchedulingFailure records an event for the pod that indicates the | ||||
| // pod has failed to schedule. Also, update the pod condition and nominated node name if set. | ||||
| func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { | ||||
| func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) { | ||||
| 	sched.Error(podInfo, err) | ||||
|  | ||||
| 	// Update the scheduling queue with the nominated pod information. Without | ||||
| @@ -319,7 +319,7 @@ func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo * | ||||
| 	} | ||||
|  | ||||
| 	pod := podInfo.Pod | ||||
| 	prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error()) | ||||
| 	fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error()) | ||||
| 	if err := updatePod(sched.client, pod, &v1.PodCondition{ | ||||
| 		Type:    v1.PodScheduled, | ||||
| 		Status:  v1.ConditionFalse, | ||||
| @@ -369,17 +369,17 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { | ||||
| // bind binds a pod to a given node defined in a binding object. | ||||
| // The precedence for binding is: (1) extenders and (2) framework plugins. | ||||
| // We expect this to run asynchronously, so we handle binding metrics internally. | ||||
| func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { | ||||
| func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { | ||||
| 	start := time.Now() | ||||
| 	defer func() { | ||||
| 		sched.finishBinding(prof, assumed, targetNode, start, err) | ||||
| 		sched.finishBinding(fwk, assumed, targetNode, start, err) | ||||
| 	}() | ||||
|  | ||||
| 	bound, err := sched.extendersBinding(assumed, targetNode) | ||||
| 	if bound { | ||||
| 		return err | ||||
| 	} | ||||
| 	bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) | ||||
| 	bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode) | ||||
| 	if bindStatus.IsSuccess() { | ||||
| 		return nil | ||||
| 	} | ||||
| @@ -403,7 +403,7 @@ func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) | ||||
| 	return false, nil | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, targetNode string, start time.Time, err error) { | ||||
| func (sched *Scheduler) finishBinding(fwk framework.Framework, assumed *v1.Pod, targetNode string, start time.Time, err error) { | ||||
| 	if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { | ||||
| 		klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) | ||||
| 	} | ||||
| @@ -413,7 +413,7 @@ func (sched *Scheduler) finishBinding(prof *profile.Profile, assumed *v1.Pod, ta | ||||
| 	} | ||||
|  | ||||
| 	metrics.DeprecatedBindingLatency.Observe(metrics.SinceInSeconds(start)) | ||||
| 	prof.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) | ||||
| 	fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode) | ||||
| } | ||||
|  | ||||
| // scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting. | ||||
| @@ -424,14 +424,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 		return | ||||
| 	} | ||||
| 	pod := podInfo.Pod | ||||
| 	prof, err := sched.profileForPod(pod) | ||||
| 	fwk, err := sched.frameworkForPod(pod) | ||||
| 	if err != nil { | ||||
| 		// This shouldn't happen, because we only accept for scheduling the pods | ||||
| 		// which specify a scheduler name that matches one of the profiles. | ||||
| 		klog.Error(err) | ||||
| 		return | ||||
| 	} | ||||
| 	if sched.skipPodSchedule(prof, pod) { | ||||
| 	if sched.skipPodSchedule(fwk, pod) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -443,7 +443,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) | ||||
| 	schedulingCycleCtx, cancel := context.WithCancel(ctx) | ||||
| 	defer cancel() | ||||
| 	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) | ||||
| 	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod) | ||||
| 	if err != nil { | ||||
| 		// Schedule() may have failed because the pod would not fit on any host, so we try to | ||||
| 		// preempt, with the expectation that the next time the pod is tried for scheduling it | ||||
| @@ -451,11 +451,11 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 		// into the resources that were preempted, but this is harmless. | ||||
| 		nominatedNode := "" | ||||
| 		if fitError, ok := err.(*core.FitError); ok { | ||||
| 			if !prof.HasPostFilterPlugins() { | ||||
| 			if !fwk.HasPostFilterPlugins() { | ||||
| 				klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.") | ||||
| 			} else { | ||||
| 				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. | ||||
| 				result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) | ||||
| 				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses) | ||||
| 				if status.Code() == framework.Error { | ||||
| 					klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status) | ||||
| 				} else { | ||||
| @@ -468,15 +468,15 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 			// Pod did not fit anywhere, so it is counted as a failure. If preemption | ||||
| 			// succeeds, the pod should get counted as a success the next time we try to | ||||
| 			// schedule it. (hopefully) | ||||
| 			metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 		} else if err == core.ErrNoNodesAvailable { | ||||
| 			// No nodes available is counted as unschedulable rather than an error. | ||||
| 			metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 		} else { | ||||
| 			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) | ||||
| 			metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 		} | ||||
| 		sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) | ||||
| 		sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode) | ||||
| 		return | ||||
| 	} | ||||
| 	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) | ||||
| @@ -487,45 +487,45 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost | ||||
| 	err = sched.assume(assumedPod, scheduleResult.SuggestedHost) | ||||
| 	if err != nil { | ||||
| 		metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 		// This is most probably result of a BUG in retrying logic. | ||||
| 		// We report an error here so that pod scheduling can be retried. | ||||
| 		// This relies on the fact that Error will check if the pod has been bound | ||||
| 		// to a node and if so will not add it back to the unscheduled pods queue | ||||
| 		// (otherwise this would cause an infinite loop). | ||||
| 		sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "") | ||||
| 		sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Run the Reserve method of reserve plugins. | ||||
| 	if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { | ||||
| 		metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { | ||||
| 		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 		// trigger un-reserve to clean up state associated with the reserved Pod | ||||
| 		prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { | ||||
| 			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) | ||||
| 		} | ||||
| 		sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "") | ||||
| 		sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// Run "permit" plugins. | ||||
| 	runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 	if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { | ||||
| 		var reason string | ||||
| 		if runPermitStatus.IsUnschedulable() { | ||||
| 			metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 			reason = v1.PodReasonUnschedulable | ||||
| 		} else { | ||||
| 			metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 			reason = SchedulerError | ||||
| 		} | ||||
| 		// One of the plugins returned status different than success or wait. | ||||
| 		prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { | ||||
| 			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) | ||||
| 		} | ||||
| 		sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "") | ||||
| 		sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -536,58 +536,58 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { | ||||
| 		metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() | ||||
| 		defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() | ||||
|  | ||||
| 		waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) | ||||
| 		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) | ||||
| 		if !waitOnPermitStatus.IsSuccess() { | ||||
| 			var reason string | ||||
| 			if waitOnPermitStatus.IsUnschedulable() { | ||||
| 				metrics.PodUnschedulable(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 				metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 				reason = v1.PodReasonUnschedulable | ||||
| 			} else { | ||||
| 				metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 				metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 				reason = SchedulerError | ||||
| 			} | ||||
| 			// trigger un-reserve plugins to clean up state associated with the reserved Pod | ||||
| 			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { | ||||
| 				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) | ||||
| 			} | ||||
| 			sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") | ||||
| 			sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "") | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// Run "prebind" plugins. | ||||
| 		preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		if !preBindStatus.IsSuccess() { | ||||
| 			metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 			// trigger un-reserve plugins to clean up state associated with the reserved Pod | ||||
| 			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil { | ||||
| 				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) | ||||
| 			} | ||||
| 			sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") | ||||
| 			sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "") | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) | ||||
| 		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) | ||||
| 		if err != nil { | ||||
| 			metrics.PodScheduleError(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 			// trigger un-reserve plugins to clean up state associated with the reserved Pod | ||||
| 			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil { | ||||
| 				klog.Errorf("scheduler cache ForgetPod failed: %v", err) | ||||
| 			} | ||||
| 			sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") | ||||
| 			sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "") | ||||
| 		} else { | ||||
| 			// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. | ||||
| 			if klog.V(2).Enabled() { | ||||
| 				klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) | ||||
| 			} | ||||
| 			metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) | ||||
| 			metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) | ||||
| 			metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) | ||||
|  | ||||
| 			// Run "postbind" plugins. | ||||
| 			prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 			fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
| @@ -601,19 +601,19 @@ func getAttemptsLabel(p *framework.QueuedPodInfo) string { | ||||
| 	return strconv.Itoa(p.Attempts) | ||||
| } | ||||
|  | ||||
| func (sched *Scheduler) profileForPod(pod *v1.Pod) (*profile.Profile, error) { | ||||
| 	prof, ok := sched.Profiles[pod.Spec.SchedulerName] | ||||
| func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) { | ||||
| 	fwk, ok := sched.Profiles[pod.Spec.SchedulerName] | ||||
| 	if !ok { | ||||
| 		return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) | ||||
| 	} | ||||
| 	return prof, nil | ||||
| 	return fwk, nil | ||||
| } | ||||
|  | ||||
| // skipPodSchedule returns true if we could skip scheduling the pod for specified cases. | ||||
| func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool { | ||||
| func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool { | ||||
| 	// Case 1: pod is being deleted. | ||||
| 	if pod.DeletionTimestamp != nil { | ||||
| 		prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) | ||||
| 		fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) | ||||
| 		klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) | ||||
| 		return true | ||||
| 	} | ||||
|   | ||||
| @@ -112,7 +112,7 @@ type mockScheduler struct { | ||||
| 	err    error | ||||
| } | ||||
|  | ||||
| func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { | ||||
| func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) { | ||||
| 	return es.result, es.err | ||||
| } | ||||
|  | ||||
| @@ -340,7 +340,10 @@ func TestSchedulerScheduleOne(t *testing.T) { | ||||
| 				st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), | ||||
| 				st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 			) | ||||
| 			fwk, err := st.NewFramework(registerPluginFuncs, frameworkruntime.WithClientSet(client)) | ||||
| 			fwk, err := st.NewFramework(registerPluginFuncs, | ||||
| 				frameworkruntime.WithClientSet(client), | ||||
| 				frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), | ||||
| 				frameworkruntime.WithProfileName(testSchedulerName)) | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| @@ -357,11 +360,7 @@ func TestSchedulerScheduleOne(t *testing.T) { | ||||
| 					return &framework.QueuedPodInfo{Pod: item.sendPod} | ||||
| 				}, | ||||
| 				Profiles: profile.Map{ | ||||
| 					testSchedulerName: &profile.Profile{ | ||||
| 						Framework: fwk, | ||||
| 						Recorder:  eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName), | ||||
| 						Name:      testSchedulerName, | ||||
| 					}, | ||||
| 					testSchedulerName: fwk, | ||||
| 				}, | ||||
| 			} | ||||
| 			called := make(chan struct{}) | ||||
| @@ -808,23 +807,21 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C | ||||
| 		return true, b, nil | ||||
| 	}) | ||||
|  | ||||
| 	var recorder events.EventRecorder | ||||
| 	if broadcaster != nil { | ||||
| 		recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) | ||||
| 	} else { | ||||
| 		recorder = &events.FakeRecorder{} | ||||
| 	} | ||||
|  | ||||
| 	fwk, _ := st.NewFramework( | ||||
| 		fns, | ||||
| 		frameworkruntime.WithClientSet(client), | ||||
| 		frameworkruntime.WithEventRecorder(recorder), | ||||
| 		frameworkruntime.WithInformerFactory(informerFactory), | ||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()), | ||||
| 		frameworkruntime.WithProfileName(testSchedulerName), | ||||
| 	) | ||||
| 	prof := &profile.Profile{ | ||||
| 		Framework: fwk, | ||||
| 		Recorder:  &events.FakeRecorder{}, | ||||
| 		Name:      testSchedulerName, | ||||
| 	} | ||||
| 	if broadcaster != nil { | ||||
| 		prof.Recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName) | ||||
| 	} | ||||
| 	profiles := profile.Map{ | ||||
| 		testSchedulerName: prof, | ||||
| 	} | ||||
|  | ||||
| 	algo := core.NewGenericScheduler( | ||||
| 		scache, | ||||
| @@ -843,8 +840,10 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C | ||||
| 		Error: func(p *framework.QueuedPodInfo, err error) { | ||||
| 			errChan <- err | ||||
| 		}, | ||||
| 		Profiles: profiles, | ||||
| 		client:   client, | ||||
| 		Profiles: profile.Map{ | ||||
| 			testSchedulerName: fwk, | ||||
| 		}, | ||||
| 		client: client, | ||||
| 	} | ||||
|  | ||||
| 	return sched, bindingChan, errChan | ||||
| @@ -1164,14 +1163,10 @@ func TestSchedulerBinding(t *testing.T) { | ||||
| 			fwk, err := st.NewFramework([]st.RegisterPluginFunc{ | ||||
| 				st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), | ||||
| 				st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||
| 			}, frameworkruntime.WithClientSet(client)) | ||||
| 			}, frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{})) | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 			prof := &profile.Profile{ | ||||
| 				Framework: fwk, | ||||
| 				Recorder:  &events.FakeRecorder{}, | ||||
| 			} | ||||
| 			stop := make(chan struct{}) | ||||
| 			defer close(stop) | ||||
| 			scache := internalcache.New(100*time.Millisecond, stop) | ||||
| @@ -1185,7 +1180,7 @@ func TestSchedulerBinding(t *testing.T) { | ||||
| 				Algorithm:      algo, | ||||
| 				SchedulerCache: scache, | ||||
| 			} | ||||
| 			err = sched.bind(context.Background(), prof, pod, "node", nil) | ||||
| 			err = sched.bind(context.Background(), fwk, pod, "node", nil) | ||||
| 			if err != nil { | ||||
| 				t.Error(err) | ||||
| 			} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 tangwz
					tangwz