Merge pull request #124001 from kerthcet/fix/multi-prifile
Revert: enhancement(scheduler): share waitingPods among profiles
This commit is contained in:
		| @@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) { | |||||||
| 				frameworkruntime.WithExtenders(extenders), | 				frameworkruntime.WithExtenders(extenders), | ||||||
| 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), | 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)), | ||||||
| 				frameworkruntime.WithLogger(logger), | 				frameworkruntime.WithLogger(logger), | ||||||
| 				frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), |  | ||||||
| 			) | 			) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatal(err) | 				t.Fatal(err) | ||||||
| @@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) { | |||||||
| 			ctx, cancel := context.WithCancel(ctx) | 			ctx, cancel := context.WithCancel(ctx) | ||||||
| 			defer cancel() | 			defer cancel() | ||||||
|  |  | ||||||
| 			waitingPods := frameworkruntime.NewWaitingPodsMap() |  | ||||||
|  |  | ||||||
| 			cache := internalcache.New(ctx, time.Duration(0)) | 			cache := internalcache.New(ctx, time.Duration(0)) | ||||||
| 			for _, pod := range test.pods { | 			for _, pod := range test.pods { | ||||||
| 				cache.AddPod(logger, pod) | 				cache.AddPod(logger, pod) | ||||||
| @@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) { | |||||||
| 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | 				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||||
| 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), | 				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)), | ||||||
| 				frameworkruntime.WithInformerFactory(informerFactory), | 				frameworkruntime.WithInformerFactory(informerFactory), | ||||||
| 				frameworkruntime.WithWaitingPods(waitingPods), |  | ||||||
| 				frameworkruntime.WithLogger(logger), | 				frameworkruntime.WithLogger(logger), | ||||||
| 			) | 			) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
|   | |||||||
| @@ -132,7 +132,6 @@ type frameworkOptions struct { | |||||||
| 	extenders              []framework.Extender | 	extenders              []framework.Extender | ||||||
| 	captureProfile         CaptureProfile | 	captureProfile         CaptureProfile | ||||||
| 	parallelizer           parallelize.Parallelizer | 	parallelizer           parallelize.Parallelizer | ||||||
| 	waitingPods            *waitingPodsMap |  | ||||||
| 	logger                 *klog.Logger | 	logger                 *klog.Logger | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // WithWaitingPods sets waitingPods for the scheduling frameworkImpl. |  | ||||||
| func WithWaitingPods(wp *waitingPodsMap) Option { |  | ||||||
| 	return func(o *frameworkOptions) { |  | ||||||
| 		o.waitingPods = wp |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // WithLogger overrides the default logger from k8s.io/klog. | // WithLogger overrides the default logger from k8s.io/klog. | ||||||
| func WithLogger(logger klog.Logger) Option { | func WithLogger(logger klog.Logger) Option { | ||||||
| 	return func(o *frameworkOptions) { | 	return func(o *frameworkOptions) { | ||||||
| @@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler | |||||||
| 		registry:             r, | 		registry:             r, | ||||||
| 		snapshotSharedLister: options.snapshotSharedLister, | 		snapshotSharedLister: options.snapshotSharedLister, | ||||||
| 		scorePluginWeight:    make(map[string]int), | 		scorePluginWeight:    make(map[string]int), | ||||||
| 		waitingPods:          options.waitingPods, | 		waitingPods:          newWaitingPodsMap(), | ||||||
| 		clientSet:            options.clientSet, | 		clientSet:            options.clientSet, | ||||||
| 		kubeConfig:           options.kubeConfig, | 		kubeConfig:           options.kubeConfig, | ||||||
| 		eventRecorder:        options.eventRecorder, | 		eventRecorder:        options.eventRecorder, | ||||||
|   | |||||||
| @@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) { | |||||||
| 			profile := config.KubeSchedulerProfile{Plugins: configPlugins} | 			profile := config.KubeSchedulerProfile{Plugins: configPlugins} | ||||||
| 			ctx, cancel := context.WithCancel(context.Background()) | 			ctx, cancel := context.WithCancel(context.Background()) | ||||||
| 			defer cancel() | 			defer cancel() | ||||||
| 			f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile, | 			f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile) | ||||||
| 				WithWaitingPods(NewWaitingPodsMap()), |  | ||||||
| 			) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatalf("fail to create framework: %s", err) | 				t.Fatalf("fail to create framework: %s", err) | ||||||
| 			} | 			} | ||||||
| @@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) { | |||||||
| 				SchedulerName:            testProfileName, | 				SchedulerName:            testProfileName, | ||||||
| 				Plugins:                  plugins, | 				Plugins:                  plugins, | ||||||
| 			} | 			} | ||||||
| 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, | 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder)) | ||||||
| 				withMetricsRecorder(recorder), |  | ||||||
| 				WithWaitingPods(NewWaitingPodsMap()), |  | ||||||
| 			) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				cancel() | 				cancel() | ||||||
| 				t.Fatalf("Failed to create framework for testing: %v", err) | 				t.Fatalf("Failed to create framework for testing: %v", err) | ||||||
| @@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) { | |||||||
| 			profile := config.KubeSchedulerProfile{Plugins: plugins} | 			profile := config.KubeSchedulerProfile{Plugins: plugins} | ||||||
| 			ctx, cancel := context.WithCancel(context.Background()) | 			ctx, cancel := context.WithCancel(context.Background()) | ||||||
| 			defer cancel() | 			defer cancel() | ||||||
| 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, | 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) | ||||||
| 				WithWaitingPods(NewWaitingPodsMap()), |  | ||||||
| 			) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatalf("Failed to create framework for testing: %v", err) | 				t.Fatalf("Failed to create framework for testing: %v", err) | ||||||
| 			} | 			} | ||||||
| @@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) { | |||||||
| 			profile := config.KubeSchedulerProfile{Plugins: plugins} | 			profile := config.KubeSchedulerProfile{Plugins: plugins} | ||||||
| 			ctx, cancel := context.WithCancel(context.Background()) | 			ctx, cancel := context.WithCancel(context.Background()) | ||||||
| 			defer cancel() | 			defer cancel() | ||||||
| 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, | 			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile) | ||||||
| 				WithWaitingPods(NewWaitingPodsMap()), |  | ||||||
| 			) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatalf("Failed to create framework for testing: %v", err) | 				t.Fatalf("Failed to create framework for testing: %v", err) | ||||||
| 			} | 			} | ||||||
|   | |||||||
| @@ -32,8 +32,8 @@ type waitingPodsMap struct { | |||||||
| 	mu   sync.RWMutex | 	mu   sync.RWMutex | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewWaitingPodsMap returns a new waitingPodsMap. | // newWaitingPodsMap returns a new waitingPodsMap. | ||||||
| func NewWaitingPodsMap() *waitingPodsMap { | func newWaitingPodsMap() *waitingPodsMap { | ||||||
| 	return &waitingPodsMap{ | 	return &waitingPodsMap{ | ||||||
| 		pods: make(map[types.UID]*waitingPod), | 		pods: make(map[types.UID]*waitingPod), | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) { | |||||||
| 				registerPluginFuncs, | 				registerPluginFuncs, | ||||||
| 				testSchedulerName, | 				testSchedulerName, | ||||||
| 				frameworkruntime.WithClientSet(client), | 				frameworkruntime.WithClientSet(client), | ||||||
| 				frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), | 				frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName))) | ||||||
| 				frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), |  | ||||||
| 			) |  | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				t.Fatal(err) | 				t.Fatal(err) | ||||||
| 			} | 			} | ||||||
| @@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien | |||||||
| 		informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0) | 		informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0) | ||||||
| 	} | 	} | ||||||
| 	schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) | 	schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory) | ||||||
| 	waitingPods := frameworkruntime.NewWaitingPodsMap() |  | ||||||
|  |  | ||||||
| 	fwk, _ := tf.NewFramework( | 	fwk, _ := tf.NewFramework( | ||||||
| 		ctx, | 		ctx, | ||||||
| @@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien | |||||||
| 		frameworkruntime.WithEventRecorder(recorder), | 		frameworkruntime.WithEventRecorder(recorder), | ||||||
| 		frameworkruntime.WithInformerFactory(informerFactory), | 		frameworkruntime.WithInformerFactory(informerFactory), | ||||||
| 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | 		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())), | ||||||
| 		frameworkruntime.WithWaitingPods(waitingPods), |  | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	errChan := make(chan error, 1) | 	errChan := make(chan error, 1) | ||||||
|   | |||||||
| @@ -292,8 +292,6 @@ func New(ctx context.Context, | |||||||
|  |  | ||||||
| 	snapshot := internalcache.NewEmptySnapshot() | 	snapshot := internalcache.NewEmptySnapshot() | ||||||
| 	metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) | 	metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) | ||||||
| 	// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage |  | ||||||
| 	waitingPods := frameworkruntime.NewWaitingPodsMap() |  | ||||||
|  |  | ||||||
| 	profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, | 	profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, | ||||||
| 		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), | 		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), | ||||||
| @@ -305,7 +303,6 @@ func New(ctx context.Context, | |||||||
| 		frameworkruntime.WithParallelism(int(options.parallelism)), | 		frameworkruntime.WithParallelism(int(options.parallelism)), | ||||||
| 		frameworkruntime.WithExtenders(extenders), | 		frameworkruntime.WithExtenders(extenders), | ||||||
| 		frameworkruntime.WithMetricsRecorder(metricsRecorder), | 		frameworkruntime.WithMetricsRecorder(metricsRecorder), | ||||||
| 		frameworkruntime.WithWaitingPods(waitingPods), |  | ||||||
| 	) | 	) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, fmt.Errorf("initializing profiles: %v", err) | 		return nil, fmt.Errorf("initializing profiles: %v", err) | ||||||
|   | |||||||
| @@ -21,17 +21,13 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" |  | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/google/go-cmp/cmp" | 	"github.com/google/go-cmp/cmp" | ||||||
| 	v1 "k8s.io/api/core/v1" | 	v1 "k8s.io/api/core/v1" | ||||||
| 	eventsv1 "k8s.io/api/events/v1" |  | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
| 	"k8s.io/apimachinery/pkg/util/sets" |  | ||||||
| 	"k8s.io/apimachinery/pkg/util/wait" |  | ||||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||||
| 	"k8s.io/client-go/informers" | 	"k8s.io/client-go/informers" | ||||||
| 	"k8s.io/client-go/kubernetes" | 	"k8s.io/client-go/kubernetes" | ||||||
| @@ -441,14 +437,12 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna | |||||||
| 		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | 		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), | ||||||
| 	} | 	} | ||||||
| 	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) | 	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) | ||||||
| 	waitingPods := frameworkruntime.NewWaitingPodsMap() |  | ||||||
| 	fwk, err := tf.NewFramework(ctx, | 	fwk, err := tf.NewFramework(ctx, | ||||||
| 		registerPluginFuncs, | 		registerPluginFuncs, | ||||||
| 		testSchedulerName, | 		testSchedulerName, | ||||||
| 		frameworkruntime.WithClientSet(client), | 		frameworkruntime.WithClientSet(client), | ||||||
| 		frameworkruntime.WithInformerFactory(informerFactory), | 		frameworkruntime.WithInformerFactory(informerFactory), | ||||||
| 		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), | 		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)), | ||||||
| 		frameworkruntime.WithWaitingPods(waitingPods), |  | ||||||
| 	) | 	) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, nil, err | 		return nil, nil, err | ||||||
| @@ -597,7 +591,6 @@ const ( | |||||||
| 	queueSort                      = "no-op-queue-sort-plugin" | 	queueSort                      = "no-op-queue-sort-plugin" | ||||||
| 	fakeBind                       = "bind-plugin" | 	fakeBind                       = "bind-plugin" | ||||||
| 	emptyEventExtensions           = "emptyEventExtensions" | 	emptyEventExtensions           = "emptyEventExtensions" | ||||||
| 	fakePermit                     = "fakePermit" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func Test_buildQueueingHintMap(t *testing.T) { | func Test_buildQueueingHintMap(t *testing.T) { | ||||||
| @@ -912,168 +905,6 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche | |||||||
| 	) | 	) | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) { |  | ||||||
| 	const ( |  | ||||||
| 		testSchedulerProfile1 = "test-scheduler-profile-1" |  | ||||||
| 		testSchedulerProfile2 = "test-scheduler-profile-2" |  | ||||||
| 		testSchedulerProfile3 = "test-scheduler-profile-3" |  | ||||||
| 	) |  | ||||||
|  |  | ||||||
| 	nodes := []runtime.Object{ |  | ||||||
| 		st.MakeNode().Name("node1").UID("node1").Obj(), |  | ||||||
| 		st.MakeNode().Name("node2").UID("node2").Obj(), |  | ||||||
| 		st.MakeNode().Name("node3").UID("node3").Obj(), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	cases := []struct { |  | ||||||
| 		name                        string |  | ||||||
| 		profiles                    []schedulerapi.KubeSchedulerProfile |  | ||||||
| 		waitSchedulingPods          []*v1.Pod |  | ||||||
| 		expectPodNamesInWaitingPods []string |  | ||||||
| 	}{ |  | ||||||
| 		{ |  | ||||||
| 			name: "pods with same profile are waiting on permit stage", |  | ||||||
| 			profiles: []schedulerapi.KubeSchedulerProfile{ |  | ||||||
| 				{ |  | ||||||
| 					SchedulerName: testSchedulerProfile1, |  | ||||||
| 					Plugins: &schedulerapi.Plugins{ |  | ||||||
| 						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, |  | ||||||
| 						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, |  | ||||||
| 						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			waitSchedulingPods: []*v1.Pod{ |  | ||||||
| 				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), |  | ||||||
| 				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), |  | ||||||
| 				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(), |  | ||||||
| 			}, |  | ||||||
| 			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"}, |  | ||||||
| 		}, |  | ||||||
| 		{ |  | ||||||
| 			name: "pods with different profiles are waiting on permit stage", |  | ||||||
| 			profiles: []schedulerapi.KubeSchedulerProfile{ |  | ||||||
| 				{ |  | ||||||
| 					SchedulerName: testSchedulerProfile1, |  | ||||||
| 					Plugins: &schedulerapi.Plugins{ |  | ||||||
| 						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, |  | ||||||
| 						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, |  | ||||||
| 						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 				{ |  | ||||||
| 					SchedulerName: testSchedulerProfile2, |  | ||||||
| 					Plugins: &schedulerapi.Plugins{ |  | ||||||
| 						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, |  | ||||||
| 						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, |  | ||||||
| 						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 				{ |  | ||||||
| 					SchedulerName: testSchedulerProfile3, |  | ||||||
| 					Plugins: &schedulerapi.Plugins{ |  | ||||||
| 						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}}, |  | ||||||
| 						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}}, |  | ||||||
| 						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			waitSchedulingPods: []*v1.Pod{ |  | ||||||
| 				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(), |  | ||||||
| 				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(), |  | ||||||
| 				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(), |  | ||||||
| 				st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(), |  | ||||||
| 			}, |  | ||||||
| 			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"}, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for _, tc := range cases { |  | ||||||
| 		t.Run(tc.name, func(t *testing.T) { |  | ||||||
| 			// Set up scheduler for the 3 nodes. |  | ||||||
| 			objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...) |  | ||||||
| 			fakeClient := fake.NewSimpleClientset(objs...) |  | ||||||
| 			informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) |  | ||||||
| 			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()}) |  | ||||||
| 			defer eventBroadcaster.Shutdown() |  | ||||||
| 			eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit) |  | ||||||
|  |  | ||||||
| 			outOfTreeRegistry := frameworkruntime.Registry{ |  | ||||||
| 				fakePermit: newFakePermitPlugin(eventRecorder), |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			_, ctx := ktesting.NewTestContext(t) |  | ||||||
| 			// timeout equals to the permit plugin waiting time. |  | ||||||
| 			ctx, cancel := context.WithTimeout(ctx, 100*time.Second) |  | ||||||
| 			defer cancel() |  | ||||||
|  |  | ||||||
| 			scheduler, err := New( |  | ||||||
| 				ctx, |  | ||||||
| 				fakeClient, |  | ||||||
| 				informerFactory, |  | ||||||
| 				nil, |  | ||||||
| 				profile.NewRecorderFactory(eventBroadcaster), |  | ||||||
| 				WithProfiles(tc.profiles...), |  | ||||||
| 				WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), |  | ||||||
| 			) |  | ||||||
|  |  | ||||||
| 			if err != nil { |  | ||||||
| 				t.Fatalf("Failed to create scheduler: %v", err) |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			var wg sync.WaitGroup |  | ||||||
| 			waitSchedulingPodNumber := len(tc.waitSchedulingPods) |  | ||||||
| 			wg.Add(waitSchedulingPodNumber) |  | ||||||
| 			stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { |  | ||||||
| 				e, ok := obj.(*eventsv1.Event) |  | ||||||
| 				if !ok || (e.Reason != podWaitingReason) { |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 				wg.Done() |  | ||||||
| 			}) |  | ||||||
| 			if err != nil { |  | ||||||
| 				t.Fatal(err) |  | ||||||
| 			} |  | ||||||
| 			defer stopFn() |  | ||||||
|  |  | ||||||
| 			// Run scheduler. |  | ||||||
| 			informerFactory.Start(ctx.Done()) |  | ||||||
| 			informerFactory.WaitForCacheSync(ctx.Done()) |  | ||||||
| 			go scheduler.Run(ctx) |  | ||||||
|  |  | ||||||
| 			// Send pods to be scheduled. |  | ||||||
| 			for _, p := range tc.waitSchedulingPods { |  | ||||||
| 				_, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{}) |  | ||||||
| 				if err != nil { |  | ||||||
| 					t.Fatal(err) |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
|  |  | ||||||
| 			// Wait all pods in waitSchedulingPods to be scheduled. |  | ||||||
| 			wg.Wait() |  | ||||||
|  |  | ||||||
| 			// When permit plugin emits the event, pod may not been added to the waitingPods pool yet, so we use pollUntil here. |  | ||||||
| 			if err := wait.PollUntilContextCancel(ctx, 100*time.Microsecond, true, func(context.Context) (done bool, err error) { |  | ||||||
| 				// Ensure that all waitingPods in scheduler can be obtained from any profiles. |  | ||||||
| 				for _, fwk := range scheduler.Profiles { |  | ||||||
| 					actualPodNamesInWaitingPods := sets.NewString() |  | ||||||
| 					fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) { |  | ||||||
| 						actualPodNamesInWaitingPods.Insert(pod.GetPod().Name) |  | ||||||
| 					}) |  | ||||||
| 					// Validate the name of pods in waitingPods matches expectations. |  | ||||||
| 					if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) || |  | ||||||
| 						!actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) { |  | ||||||
| 						return false, fmt.Errorf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List()) |  | ||||||
| 					} |  | ||||||
| 				} |  | ||||||
| 				return true, nil |  | ||||||
| 			}); err != nil { |  | ||||||
| 				t.Fatal("got unexpected result") |  | ||||||
| 			} |  | ||||||
| 		}) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{} | var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{} | ||||||
|  |  | ||||||
| // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point. | // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point. | ||||||
| @@ -1173,36 +1004,3 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle | |||||||
| } | } | ||||||
|  |  | ||||||
| func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } | func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil } | ||||||
|  |  | ||||||
| // fakePermitPlugin only implements PermitPlugin interface. |  | ||||||
| type fakePermitPlugin struct { |  | ||||||
| 	eventRecorder events.EventRecorder |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory { |  | ||||||
| 	return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) { |  | ||||||
| 		pl := &fakePermitPlugin{ |  | ||||||
| 			eventRecorder: eventRecorder, |  | ||||||
| 		} |  | ||||||
| 		return pl, nil |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (f fakePermitPlugin) Name() string { |  | ||||||
| 	return fakePermit |  | ||||||
| } |  | ||||||
|  |  | ||||||
| const ( |  | ||||||
| 	podWaitingReason = "podWaiting" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) { |  | ||||||
| 	defer func() { |  | ||||||
| 		// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage. |  | ||||||
| 		f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "") |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	return framework.NewStatus(framework.Wait), 100 * time.Second |  | ||||||
| } |  | ||||||
|  |  | ||||||
| var _ framework.PermitPlugin = &fakePermitPlugin{} |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot