Revert "enhancement(scheduler): share waitingPods among profiles"
This reverts commit 227c1915db.
			
			
This commit is contained in:
		@@ -364,7 +364,6 @@ func TestPostFilter(t *testing.T) {
 | 
				
			|||||||
				frameworkruntime.WithExtenders(extenders),
 | 
									frameworkruntime.WithExtenders(extenders),
 | 
				
			||||||
				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
 | 
									frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.pods, tt.nodes)),
 | 
				
			||||||
				frameworkruntime.WithLogger(logger),
 | 
									frameworkruntime.WithLogger(logger),
 | 
				
			||||||
				frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
								)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatal(err)
 | 
									t.Fatal(err)
 | 
				
			||||||
@@ -1703,8 +1702,6 @@ func TestPreempt(t *testing.T) {
 | 
				
			|||||||
			ctx, cancel := context.WithCancel(ctx)
 | 
								ctx, cancel := context.WithCancel(ctx)
 | 
				
			||||||
			defer cancel()
 | 
								defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			waitingPods := frameworkruntime.NewWaitingPodsMap()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			cache := internalcache.New(ctx, time.Duration(0))
 | 
								cache := internalcache.New(ctx, time.Duration(0))
 | 
				
			||||||
			for _, pod := range test.pods {
 | 
								for _, pod := range test.pods {
 | 
				
			||||||
				cache.AddPod(logger, pod)
 | 
									cache.AddPod(logger, pod)
 | 
				
			||||||
@@ -1748,7 +1745,6 @@ func TestPreempt(t *testing.T) {
 | 
				
			|||||||
				frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
 | 
									frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
 | 
				
			||||||
				frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
 | 
									frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
 | 
				
			||||||
				frameworkruntime.WithInformerFactory(informerFactory),
 | 
									frameworkruntime.WithInformerFactory(informerFactory),
 | 
				
			||||||
				frameworkruntime.WithWaitingPods(waitingPods),
 | 
					 | 
				
			||||||
				frameworkruntime.WithLogger(logger),
 | 
									frameworkruntime.WithLogger(logger),
 | 
				
			||||||
			)
 | 
								)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -132,7 +132,6 @@ type frameworkOptions struct {
 | 
				
			|||||||
	extenders              []framework.Extender
 | 
						extenders              []framework.Extender
 | 
				
			||||||
	captureProfile         CaptureProfile
 | 
						captureProfile         CaptureProfile
 | 
				
			||||||
	parallelizer           parallelize.Parallelizer
 | 
						parallelizer           parallelize.Parallelizer
 | 
				
			||||||
	waitingPods            *waitingPodsMap
 | 
					 | 
				
			||||||
	logger                 *klog.Logger
 | 
						logger                 *klog.Logger
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -222,13 +221,6 @@ func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// WithWaitingPods sets waitingPods for the scheduling frameworkImpl.
 | 
					 | 
				
			||||||
func WithWaitingPods(wp *waitingPodsMap) Option {
 | 
					 | 
				
			||||||
	return func(o *frameworkOptions) {
 | 
					 | 
				
			||||||
		o.waitingPods = wp
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// WithLogger overrides the default logger from k8s.io/klog.
 | 
					// WithLogger overrides the default logger from k8s.io/klog.
 | 
				
			||||||
func WithLogger(logger klog.Logger) Option {
 | 
					func WithLogger(logger klog.Logger) Option {
 | 
				
			||||||
	return func(o *frameworkOptions) {
 | 
						return func(o *frameworkOptions) {
 | 
				
			||||||
@@ -262,7 +254,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
 | 
				
			|||||||
		registry:             r,
 | 
							registry:             r,
 | 
				
			||||||
		snapshotSharedLister: options.snapshotSharedLister,
 | 
							snapshotSharedLister: options.snapshotSharedLister,
 | 
				
			||||||
		scorePluginWeight:    make(map[string]int),
 | 
							scorePluginWeight:    make(map[string]int),
 | 
				
			||||||
		waitingPods:          options.waitingPods,
 | 
							waitingPods:          newWaitingPodsMap(),
 | 
				
			||||||
		clientSet:            options.clientSet,
 | 
							clientSet:            options.clientSet,
 | 
				
			||||||
		kubeConfig:           options.kubeConfig,
 | 
							kubeConfig:           options.kubeConfig,
 | 
				
			||||||
		eventRecorder:        options.eventRecorder,
 | 
							eventRecorder:        options.eventRecorder,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2808,9 +2808,7 @@ func TestPermitPlugins(t *testing.T) {
 | 
				
			|||||||
			profile := config.KubeSchedulerProfile{Plugins: configPlugins}
 | 
								profile := config.KubeSchedulerProfile{Plugins: configPlugins}
 | 
				
			||||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
								ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
			defer cancel()
 | 
								defer cancel()
 | 
				
			||||||
			f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile,
 | 
								f, err := newFrameworkWithQueueSortAndBind(ctx, registry, profile)
 | 
				
			||||||
				WithWaitingPods(NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatalf("fail to create framework: %s", err)
 | 
									t.Fatalf("fail to create framework: %s", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -2992,10 +2990,7 @@ func TestRecordingMetrics(t *testing.T) {
 | 
				
			|||||||
				SchedulerName:            testProfileName,
 | 
									SchedulerName:            testProfileName,
 | 
				
			||||||
				Plugins:                  plugins,
 | 
									Plugins:                  plugins,
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
 | 
								f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile, withMetricsRecorder(recorder))
 | 
				
			||||||
				withMetricsRecorder(recorder),
 | 
					 | 
				
			||||||
				WithWaitingPods(NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				cancel()
 | 
									cancel()
 | 
				
			||||||
				t.Fatalf("Failed to create framework for testing: %v", err)
 | 
									t.Fatalf("Failed to create framework for testing: %v", err)
 | 
				
			||||||
@@ -3165,9 +3160,7 @@ func TestPermitWaitDurationMetric(t *testing.T) {
 | 
				
			|||||||
			profile := config.KubeSchedulerProfile{Plugins: plugins}
 | 
								profile := config.KubeSchedulerProfile{Plugins: plugins}
 | 
				
			||||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
								ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
			defer cancel()
 | 
								defer cancel()
 | 
				
			||||||
			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
 | 
								f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
 | 
				
			||||||
				WithWaitingPods(NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatalf("Failed to create framework for testing: %v", err)
 | 
									t.Fatalf("Failed to create framework for testing: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -3223,9 +3216,7 @@ func TestWaitOnPermit(t *testing.T) {
 | 
				
			|||||||
			profile := config.KubeSchedulerProfile{Plugins: plugins}
 | 
								profile := config.KubeSchedulerProfile{Plugins: plugins}
 | 
				
			||||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
								ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
			defer cancel()
 | 
								defer cancel()
 | 
				
			||||||
			f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile,
 | 
								f, err := newFrameworkWithQueueSortAndBind(ctx, r, profile)
 | 
				
			||||||
				WithWaitingPods(NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatalf("Failed to create framework for testing: %v", err)
 | 
									t.Fatalf("Failed to create framework for testing: %v", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -32,8 +32,8 @@ type waitingPodsMap struct {
 | 
				
			|||||||
	mu   sync.RWMutex
 | 
						mu   sync.RWMutex
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewWaitingPodsMap returns a new waitingPodsMap.
 | 
					// newWaitingPodsMap returns a new waitingPodsMap.
 | 
				
			||||||
func NewWaitingPodsMap() *waitingPodsMap {
 | 
					func newWaitingPodsMap() *waitingPodsMap {
 | 
				
			||||||
	return &waitingPodsMap{
 | 
						return &waitingPodsMap{
 | 
				
			||||||
		pods: make(map[types.UID]*waitingPod),
 | 
							pods: make(map[types.UID]*waitingPod),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -773,9 +773,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
 | 
				
			|||||||
				registerPluginFuncs,
 | 
									registerPluginFuncs,
 | 
				
			||||||
				testSchedulerName,
 | 
									testSchedulerName,
 | 
				
			||||||
				frameworkruntime.WithClientSet(client),
 | 
									frameworkruntime.WithClientSet(client),
 | 
				
			||||||
				frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
 | 
									frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
 | 
				
			||||||
				frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				t.Fatal(err)
 | 
									t.Fatal(err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -3525,7 +3523,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
 | 
				
			|||||||
		informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
 | 
							informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
 | 
						schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
 | 
				
			||||||
	waitingPods := frameworkruntime.NewWaitingPodsMap()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fwk, _ := tf.NewFramework(
 | 
						fwk, _ := tf.NewFramework(
 | 
				
			||||||
		ctx,
 | 
							ctx,
 | 
				
			||||||
@@ -3535,7 +3532,6 @@ func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clien
 | 
				
			|||||||
		frameworkruntime.WithEventRecorder(recorder),
 | 
							frameworkruntime.WithEventRecorder(recorder),
 | 
				
			||||||
		frameworkruntime.WithInformerFactory(informerFactory),
 | 
							frameworkruntime.WithInformerFactory(informerFactory),
 | 
				
			||||||
		frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
 | 
							frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
 | 
				
			||||||
		frameworkruntime.WithWaitingPods(waitingPods),
 | 
					 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	errChan := make(chan error, 1)
 | 
						errChan := make(chan error, 1)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -292,8 +292,6 @@ func New(ctx context.Context,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	snapshot := internalcache.NewEmptySnapshot()
 | 
						snapshot := internalcache.NewEmptySnapshot()
 | 
				
			||||||
	metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
 | 
						metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
 | 
				
			||||||
	// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
 | 
					 | 
				
			||||||
	waitingPods := frameworkruntime.NewWaitingPodsMap()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
 | 
						profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
 | 
				
			||||||
		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
 | 
							frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
 | 
				
			||||||
@@ -305,7 +303,6 @@ func New(ctx context.Context,
 | 
				
			|||||||
		frameworkruntime.WithParallelism(int(options.parallelism)),
 | 
							frameworkruntime.WithParallelism(int(options.parallelism)),
 | 
				
			||||||
		frameworkruntime.WithExtenders(extenders),
 | 
							frameworkruntime.WithExtenders(extenders),
 | 
				
			||||||
		frameworkruntime.WithMetricsRecorder(metricsRecorder),
 | 
							frameworkruntime.WithMetricsRecorder(metricsRecorder),
 | 
				
			||||||
		frameworkruntime.WithWaitingPods(waitingPods),
 | 
					 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("initializing profiles: %v", err)
 | 
							return nil, fmt.Errorf("initializing profiles: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,16 +21,13 @@ import (
 | 
				
			|||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"sort"
 | 
						"sort"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/google/go-cmp/cmp"
 | 
						"github.com/google/go-cmp/cmp"
 | 
				
			||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	eventsv1 "k8s.io/api/events/v1"
 | 
					 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
					 | 
				
			||||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
				
			||||||
	"k8s.io/client-go/informers"
 | 
						"k8s.io/client-go/informers"
 | 
				
			||||||
	"k8s.io/client-go/kubernetes"
 | 
						"k8s.io/client-go/kubernetes"
 | 
				
			||||||
@@ -440,14 +437,12 @@ func initScheduler(ctx context.Context, cache internalcache.Cache, queue interna
 | 
				
			|||||||
		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
 | 
							tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
 | 
						eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
 | 
				
			||||||
	waitingPods := frameworkruntime.NewWaitingPodsMap()
 | 
					 | 
				
			||||||
	fwk, err := tf.NewFramework(ctx,
 | 
						fwk, err := tf.NewFramework(ctx,
 | 
				
			||||||
		registerPluginFuncs,
 | 
							registerPluginFuncs,
 | 
				
			||||||
		testSchedulerName,
 | 
							testSchedulerName,
 | 
				
			||||||
		frameworkruntime.WithClientSet(client),
 | 
							frameworkruntime.WithClientSet(client),
 | 
				
			||||||
		frameworkruntime.WithInformerFactory(informerFactory),
 | 
							frameworkruntime.WithInformerFactory(informerFactory),
 | 
				
			||||||
		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
 | 
							frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
 | 
				
			||||||
		frameworkruntime.WithWaitingPods(waitingPods),
 | 
					 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
@@ -596,7 +591,6 @@ const (
 | 
				
			|||||||
	queueSort                      = "no-op-queue-sort-plugin"
 | 
						queueSort                      = "no-op-queue-sort-plugin"
 | 
				
			||||||
	fakeBind                       = "bind-plugin"
 | 
						fakeBind                       = "bind-plugin"
 | 
				
			||||||
	emptyEventExtensions           = "emptyEventExtensions"
 | 
						emptyEventExtensions           = "emptyEventExtensions"
 | 
				
			||||||
	fakePermit                     = "fakePermit"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func Test_buildQueueingHintMap(t *testing.T) {
 | 
					func Test_buildQueueingHintMap(t *testing.T) {
 | 
				
			||||||
@@ -911,160 +905,6 @@ func newFramework(ctx context.Context, r frameworkruntime.Registry, profile sche
 | 
				
			|||||||
	)
 | 
						)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestFrameworkHandler_IterateOverWaitingPods(t *testing.T) {
 | 
					 | 
				
			||||||
	const (
 | 
					 | 
				
			||||||
		testSchedulerProfile1 = "test-scheduler-profile-1"
 | 
					 | 
				
			||||||
		testSchedulerProfile2 = "test-scheduler-profile-2"
 | 
					 | 
				
			||||||
		testSchedulerProfile3 = "test-scheduler-profile-3"
 | 
					 | 
				
			||||||
	)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	nodes := []runtime.Object{
 | 
					 | 
				
			||||||
		st.MakeNode().Name("node1").UID("node1").Obj(),
 | 
					 | 
				
			||||||
		st.MakeNode().Name("node2").UID("node2").Obj(),
 | 
					 | 
				
			||||||
		st.MakeNode().Name("node3").UID("node3").Obj(),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	cases := []struct {
 | 
					 | 
				
			||||||
		name                        string
 | 
					 | 
				
			||||||
		profiles                    []schedulerapi.KubeSchedulerProfile
 | 
					 | 
				
			||||||
		waitSchedulingPods          []*v1.Pod
 | 
					 | 
				
			||||||
		expectPodNamesInWaitingPods []string
 | 
					 | 
				
			||||||
	}{
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			name: "pods with same profile are waiting on permit stage",
 | 
					 | 
				
			||||||
			profiles: []schedulerapi.KubeSchedulerProfile{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					SchedulerName: testSchedulerProfile1,
 | 
					 | 
				
			||||||
					Plugins: &schedulerapi.Plugins{
 | 
					 | 
				
			||||||
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
					 | 
				
			||||||
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
					 | 
				
			||||||
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
					 | 
				
			||||||
					},
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			waitSchedulingPods: []*v1.Pod{
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile1).Obj(),
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3"},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			name: "pods with different profiles are waiting on permit stage",
 | 
					 | 
				
			||||||
			profiles: []schedulerapi.KubeSchedulerProfile{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					SchedulerName: testSchedulerProfile1,
 | 
					 | 
				
			||||||
					Plugins: &schedulerapi.Plugins{
 | 
					 | 
				
			||||||
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
					 | 
				
			||||||
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
					 | 
				
			||||||
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
					 | 
				
			||||||
					},
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					SchedulerName: testSchedulerProfile2,
 | 
					 | 
				
			||||||
					Plugins: &schedulerapi.Plugins{
 | 
					 | 
				
			||||||
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
					 | 
				
			||||||
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
					 | 
				
			||||||
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
					 | 
				
			||||||
					},
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					SchedulerName: testSchedulerProfile3,
 | 
					 | 
				
			||||||
					Plugins: &schedulerapi.Plugins{
 | 
					 | 
				
			||||||
						QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
 | 
					 | 
				
			||||||
						Permit:    schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: fakePermit}}},
 | 
					 | 
				
			||||||
						Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
 | 
					 | 
				
			||||||
					},
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			waitSchedulingPods: []*v1.Pod{
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod1").UID("pod1").SchedulerName(testSchedulerProfile1).Obj(),
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod2").UID("pod2").SchedulerName(testSchedulerProfile1).Obj(),
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod3").UID("pod3").SchedulerName(testSchedulerProfile2).Obj(),
 | 
					 | 
				
			||||||
				st.MakePod().Name("pod4").UID("pod4").SchedulerName(testSchedulerProfile3).Obj(),
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
			expectPodNamesInWaitingPods: []string{"pod1", "pod2", "pod3", "pod4"},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for _, tc := range cases {
 | 
					 | 
				
			||||||
		t.Run(tc.name, func(t *testing.T) {
 | 
					 | 
				
			||||||
			// Set up scheduler for the 3 nodes.
 | 
					 | 
				
			||||||
			objs := append([]runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
 | 
					 | 
				
			||||||
			fakeClient := fake.NewSimpleClientset(objs...)
 | 
					 | 
				
			||||||
			informerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
 | 
					 | 
				
			||||||
			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: fakeClient.EventsV1()})
 | 
					 | 
				
			||||||
			eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, fakePermit)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			outOfTreeRegistry := frameworkruntime.Registry{
 | 
					 | 
				
			||||||
				fakePermit: newFakePermitPlugin(eventRecorder),
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			_, ctx := ktesting.NewTestContext(t)
 | 
					 | 
				
			||||||
			ctx, cancel := context.WithCancel(ctx)
 | 
					 | 
				
			||||||
			defer cancel()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			scheduler, err := New(
 | 
					 | 
				
			||||||
				ctx,
 | 
					 | 
				
			||||||
				fakeClient,
 | 
					 | 
				
			||||||
				informerFactory,
 | 
					 | 
				
			||||||
				nil,
 | 
					 | 
				
			||||||
				profile.NewRecorderFactory(eventBroadcaster),
 | 
					 | 
				
			||||||
				WithProfiles(tc.profiles...),
 | 
					 | 
				
			||||||
				WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
 | 
					 | 
				
			||||||
			)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				t.Fatalf("Failed to create scheduler: %v", err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			var wg sync.WaitGroup
 | 
					 | 
				
			||||||
			waitSchedulingPodNumber := len(tc.waitSchedulingPods)
 | 
					 | 
				
			||||||
			wg.Add(waitSchedulingPodNumber)
 | 
					 | 
				
			||||||
			stopFn, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
 | 
					 | 
				
			||||||
				e, ok := obj.(*eventsv1.Event)
 | 
					 | 
				
			||||||
				if !ok || (e.Reason != podWaitingReason) {
 | 
					 | 
				
			||||||
					return
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				wg.Done()
 | 
					 | 
				
			||||||
			})
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				t.Fatal(err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			defer stopFn()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Run scheduler.
 | 
					 | 
				
			||||||
			informerFactory.Start(ctx.Done())
 | 
					 | 
				
			||||||
			informerFactory.WaitForCacheSync(ctx.Done())
 | 
					 | 
				
			||||||
			go scheduler.Run(ctx)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Send pods to be scheduled.
 | 
					 | 
				
			||||||
			for _, p := range tc.waitSchedulingPods {
 | 
					 | 
				
			||||||
				_, err = fakeClient.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
 | 
					 | 
				
			||||||
				if err != nil {
 | 
					 | 
				
			||||||
					t.Fatal(err)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Wait all pods in waitSchedulingPods to be scheduled.
 | 
					 | 
				
			||||||
			wg.Wait()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Ensure that all waitingPods in scheduler can be obtained from any profiles.
 | 
					 | 
				
			||||||
			for _, fwk := range scheduler.Profiles {
 | 
					 | 
				
			||||||
				actualPodNamesInWaitingPods := sets.NewString()
 | 
					 | 
				
			||||||
				fwk.IterateOverWaitingPods(func(pod framework.WaitingPod) {
 | 
					 | 
				
			||||||
					actualPodNamesInWaitingPods.Insert(pod.GetPod().Name)
 | 
					 | 
				
			||||||
				})
 | 
					 | 
				
			||||||
				// Validate the name of pods in waitingPods matches expectations.
 | 
					 | 
				
			||||||
				if actualPodNamesInWaitingPods.Len() != len(tc.expectPodNamesInWaitingPods) ||
 | 
					 | 
				
			||||||
					!actualPodNamesInWaitingPods.HasAll(tc.expectPodNamesInWaitingPods...) {
 | 
					 | 
				
			||||||
					t.Fatalf("Unexpected waitingPods in scheduler profile %s, expect: %#v, got: %#v", fwk.ProfileName(), tc.expectPodNamesInWaitingPods, actualPodNamesInWaitingPods.List())
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
 | 
					var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
 | 
					// fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
 | 
				
			||||||
@@ -1164,36 +1004,3 @@ func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.Cycle
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
 | 
					func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
 | 
				
			||||||
 | 
					 | 
				
			||||||
// fakePermitPlugin only implements PermitPlugin interface.
 | 
					 | 
				
			||||||
type fakePermitPlugin struct {
 | 
					 | 
				
			||||||
	eventRecorder events.EventRecorder
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func newFakePermitPlugin(eventRecorder events.EventRecorder) frameworkruntime.PluginFactory {
 | 
					 | 
				
			||||||
	return func(ctx context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
 | 
					 | 
				
			||||||
		pl := &fakePermitPlugin{
 | 
					 | 
				
			||||||
			eventRecorder: eventRecorder,
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return pl, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (f fakePermitPlugin) Name() string {
 | 
					 | 
				
			||||||
	return fakePermit
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	podWaitingReason = "podWaiting"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (f fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
 | 
					 | 
				
			||||||
	defer func() {
 | 
					 | 
				
			||||||
		// Send event with podWaiting reason to broadcast this pod is already waiting in the permit stage.
 | 
					 | 
				
			||||||
		f.eventRecorder.Eventf(p, nil, v1.EventTypeWarning, podWaitingReason, "", "")
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return framework.NewStatus(framework.Wait), 100 * time.Second
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
var _ framework.PermitPlugin = &fakePermitPlugin{}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user