Cancel context to make sure all plugins are cancelled when each schedule finishes
This commit is contained in:
		@@ -614,7 +614,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
	// Synchronously attempt to find a fit for the pod.
 | 
						// Synchronously attempt to find a fit for the pod.
 | 
				
			||||||
	start := time.Now()
 | 
						start := time.Now()
 | 
				
			||||||
	state := framework.NewCycleState()
 | 
						state := framework.NewCycleState()
 | 
				
			||||||
	scheduleResult, err := sched.Algorithm.Schedule(ctx, state, pod)
 | 
						schedulingCycleCtx, cancel := context.WithCancel(ctx)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
 | 
							sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
 | 
				
			||||||
		// Schedule() may have failed because the pod would not fit on any host, so we try to
 | 
							// Schedule() may have failed because the pod would not fit on any host, so we try to
 | 
				
			||||||
@@ -627,7 +629,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
					" No preemption is performed.")
 | 
										" No preemption is performed.")
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
				preemptionStartTime := time.Now()
 | 
									preemptionStartTime := time.Now()
 | 
				
			||||||
				sched.preempt(ctx, state, fwk, pod, fitError)
 | 
									sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
 | 
				
			||||||
				metrics.PreemptionAttempts.Inc()
 | 
									metrics.PreemptionAttempts.Inc()
 | 
				
			||||||
				metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
 | 
									metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
 | 
				
			||||||
				metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
 | 
									metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
 | 
				
			||||||
@@ -667,7 +669,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Run "reserve" plugins.
 | 
						// Run "reserve" plugins.
 | 
				
			||||||
	if sts := fwk.RunReservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
 | 
						if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
 | 
				
			||||||
		sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
 | 
							sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
 | 
				
			||||||
		metrics.PodScheduleErrors.Inc()
 | 
							metrics.PodScheduleErrors.Inc()
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
@@ -684,11 +686,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
		sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
 | 
							sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
 | 
				
			||||||
		metrics.PodScheduleErrors.Inc()
 | 
							metrics.PodScheduleErrors.Inc()
 | 
				
			||||||
		// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
							// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
				
			||||||
		fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
							fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
 | 
						// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
 | 
							bindingCycleCtx, cancel := context.WithCancel(ctx)
 | 
				
			||||||
 | 
							defer cancel()
 | 
				
			||||||
		metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
 | 
							metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
 | 
				
			||||||
		defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
 | 
							defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -699,13 +703,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
				sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
 | 
									sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
 | 
				
			||||||
				metrics.PodScheduleErrors.Inc()
 | 
									metrics.PodScheduleErrors.Inc()
 | 
				
			||||||
				// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
									// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
				
			||||||
				fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
									fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Run "permit" plugins.
 | 
							// Run "permit" plugins.
 | 
				
			||||||
		permitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
							permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
		if !permitStatus.IsSuccess() {
 | 
							if !permitStatus.IsSuccess() {
 | 
				
			||||||
			var reason string
 | 
								var reason string
 | 
				
			||||||
			if permitStatus.IsUnschedulable() {
 | 
								if permitStatus.IsUnschedulable() {
 | 
				
			||||||
@@ -719,13 +723,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 | 
									klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
								// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
				
			||||||
			fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
								fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
			sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
 | 
								sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Run "prebind" plugins.
 | 
							// Run "prebind" plugins.
 | 
				
			||||||
		preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
							preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
		if !preBindStatus.IsSuccess() {
 | 
							if !preBindStatus.IsSuccess() {
 | 
				
			||||||
			var reason string
 | 
								var reason string
 | 
				
			||||||
			metrics.PodScheduleErrors.Inc()
 | 
								metrics.PodScheduleErrors.Inc()
 | 
				
			||||||
@@ -734,18 +738,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 | 
									klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
								// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
				
			||||||
			fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
								fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
			sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
 | 
								sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		err := sched.bind(ctx, assumedPod, scheduleResult.SuggestedHost, state)
 | 
							err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
 | 
				
			||||||
		metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
 | 
							metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
 | 
				
			||||||
		metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
							metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			metrics.PodScheduleErrors.Inc()
 | 
								metrics.PodScheduleErrors.Inc()
 | 
				
			||||||
			// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
								// trigger un-reserve plugins to clean up state associated with the reserved Pod
 | 
				
			||||||
			fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
								fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
			sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
 | 
								sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
 | 
								// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
 | 
				
			||||||
@@ -758,7 +762,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
 | 
				
			|||||||
			metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
 | 
								metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			// Run "postbind" plugins.
 | 
								// Run "postbind" plugins.
 | 
				
			||||||
			fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
								fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -101,6 +101,7 @@ type PermitPlugin struct {
 | 
				
			|||||||
	waitAndRejectPermit bool
 | 
						waitAndRejectPermit bool
 | 
				
			||||||
	waitAndAllowPermit  bool
 | 
						waitAndAllowPermit  bool
 | 
				
			||||||
	allowPermit         bool
 | 
						allowPermit         bool
 | 
				
			||||||
 | 
						cancelled           bool
 | 
				
			||||||
	fh                  framework.FrameworkHandle
 | 
						fh                  framework.FrameworkHandle
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -395,8 +396,15 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
 | 
				
			|||||||
		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
 | 
							return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if pp.timeoutPermit {
 | 
						if pp.timeoutPermit {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case <-ctx.Done():
 | 
				
			||||||
 | 
									pp.cancelled = true
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
		return framework.NewStatus(framework.Wait, ""), 3 * time.Second
 | 
							return framework.NewStatus(framework.Wait, ""), 3 * time.Second
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if pp.allowPermit && pod.Name != "waiting-pod" {
 | 
						if pp.allowPermit && pod.Name != "waiting-pod" {
 | 
				
			||||||
		return nil, 0
 | 
							return nil, 0
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -429,6 +437,11 @@ func (pp *PermitPlugin) allowAllPods() {
 | 
				
			|||||||
	pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
 | 
						pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// rejectAllPods rejects all waiting pods.
 | 
				
			||||||
 | 
					func (pp *PermitPlugin) rejectAllPods() {
 | 
				
			||||||
 | 
						pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject("rejectAllPods") })
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// reset used to reset permit plugin.
 | 
					// reset used to reset permit plugin.
 | 
				
			||||||
func (pp *PermitPlugin) reset() {
 | 
					func (pp *PermitPlugin) reset() {
 | 
				
			||||||
	pp.numPermitCalled = 0
 | 
						pp.numPermitCalled = 0
 | 
				
			||||||
@@ -438,6 +451,7 @@ func (pp *PermitPlugin) reset() {
 | 
				
			|||||||
	pp.waitAndRejectPermit = false
 | 
						pp.waitAndRejectPermit = false
 | 
				
			||||||
	pp.waitAndAllowPermit = false
 | 
						pp.waitAndAllowPermit = false
 | 
				
			||||||
	pp.allowPermit = false
 | 
						pp.allowPermit = false
 | 
				
			||||||
 | 
						pp.cancelled = false
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
 | 
					// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
 | 
				
			||||||
@@ -1079,18 +1093,7 @@ func TestPostBindPlugin(t *testing.T) {
 | 
				
			|||||||
func TestPermitPlugin(t *testing.T) {
 | 
					func TestPermitPlugin(t *testing.T) {
 | 
				
			||||||
	// Create a plugin registry for testing. Register only a permit plugin.
 | 
						// Create a plugin registry for testing. Register only a permit plugin.
 | 
				
			||||||
	perPlugin := &PermitPlugin{name: permitPluginName}
 | 
						perPlugin := &PermitPlugin{name: permitPluginName}
 | 
				
			||||||
	registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)}
 | 
						registry, plugins := initRegistryAndConfig(perPlugin)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Setup initial permit plugin for testing.
 | 
					 | 
				
			||||||
	plugins := &schedulerconfig.Plugins{
 | 
					 | 
				
			||||||
		Permit: &schedulerconfig.PluginSet{
 | 
					 | 
				
			||||||
			Enabled: []schedulerconfig.Plugin{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					Name: permitPluginName,
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create the master and the scheduler with the test plugin set.
 | 
						// Create the master and the scheduler with the test plugin set.
 | 
				
			||||||
	context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
 | 
						context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
 | 
				
			||||||
@@ -1178,24 +1181,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
 | 
				
			|||||||
	// Create a plugin registry for testing.
 | 
						// Create a plugin registry for testing.
 | 
				
			||||||
	perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
 | 
						perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
 | 
				
			||||||
	perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
 | 
						perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
 | 
				
			||||||
	registry := framework.Registry{
 | 
						registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
 | 
				
			||||||
		perPlugin1.Name(): newPermitPlugin(perPlugin1),
 | 
					 | 
				
			||||||
		perPlugin2.Name(): newPermitPlugin(perPlugin2),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Setup initial permit plugins for testing.
 | 
					 | 
				
			||||||
	plugins := &schedulerconfig.Plugins{
 | 
					 | 
				
			||||||
		Permit: &schedulerconfig.PluginSet{
 | 
					 | 
				
			||||||
			Enabled: []schedulerconfig.Plugin{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					Name: perPlugin1.Name(),
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					Name: perPlugin2.Name(),
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create the master and the scheduler with the test plugin set.
 | 
						// Create the master and the scheduler with the test plugin set.
 | 
				
			||||||
	context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
 | 
						context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
 | 
				
			||||||
@@ -1245,22 +1231,53 @@ func TestMultiplePermitPlugins(t *testing.T) {
 | 
				
			|||||||
	cleanupPods(context.clientSet, t, []*v1.Pod{pod})
 | 
						cleanupPods(context.clientSet, t, []*v1.Pod{pod})
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected.
 | 
				
			||||||
 | 
					func TestPermitPluginsCancelled(t *testing.T) {
 | 
				
			||||||
 | 
						// Create a plugin registry for testing.
 | 
				
			||||||
 | 
						perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
 | 
				
			||||||
 | 
						perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
 | 
				
			||||||
 | 
						registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create the master and the scheduler with the test plugin set.
 | 
				
			||||||
 | 
						context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2,
 | 
				
			||||||
 | 
							scheduler.WithFrameworkPlugins(plugins),
 | 
				
			||||||
 | 
							scheduler.WithFrameworkDefaultRegistry(registry))
 | 
				
			||||||
 | 
						defer cleanupTest(t, context)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Both permit plugins will return Wait for permitting
 | 
				
			||||||
 | 
						perPlugin1.timeoutPermit = true
 | 
				
			||||||
 | 
						perPlugin2.timeoutPermit = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create a test pod.
 | 
				
			||||||
 | 
						podName := "test-pod"
 | 
				
			||||||
 | 
						pod, err := createPausePod(context.clientSet,
 | 
				
			||||||
 | 
							initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name}))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("Error while creating a test pod: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var waitingPod framework.WaitingPod
 | 
				
			||||||
 | 
						// Wait until the test pod is actually waiting.
 | 
				
			||||||
 | 
						wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
 | 
				
			||||||
 | 
							waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
 | 
				
			||||||
 | 
							return waitingPod != nil, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						perPlugin1.rejectAllPods()
 | 
				
			||||||
 | 
						// Wait some time for the permit plugins to be cancelled
 | 
				
			||||||
 | 
						err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
 | 
				
			||||||
 | 
							return perPlugin1.cancelled && perPlugin2.cancelled, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("Expected all permit plugins to be cancelled")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
 | 
					// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
 | 
				
			||||||
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
 | 
					func TestCoSchedulingWithPermitPlugin(t *testing.T) {
 | 
				
			||||||
	// Create a plugin registry for testing. Register only a permit plugin.
 | 
						// Create a plugin registry for testing. Register only a permit plugin.
 | 
				
			||||||
	permitPlugin := &PermitPlugin{name: permitPluginName}
 | 
						permitPlugin := &PermitPlugin{name: permitPluginName}
 | 
				
			||||||
	registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
 | 
						registry, plugins := initRegistryAndConfig(permitPlugin)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Setup initial permit plugin for testing.
 | 
					 | 
				
			||||||
	plugins := &schedulerconfig.Plugins{
 | 
					 | 
				
			||||||
		Permit: &schedulerconfig.PluginSet{
 | 
					 | 
				
			||||||
			Enabled: []schedulerconfig.Plugin{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					Name: permitPluginName,
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create the master and the scheduler with the test plugin set.
 | 
						// Create the master and the scheduler with the test plugin set.
 | 
				
			||||||
	context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
 | 
						context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
 | 
				
			||||||
@@ -1432,18 +1449,7 @@ func TestPostFilterPlugin(t *testing.T) {
 | 
				
			|||||||
func TestPreemptWithPermitPlugin(t *testing.T) {
 | 
					func TestPreemptWithPermitPlugin(t *testing.T) {
 | 
				
			||||||
	// Create a plugin registry for testing. Register only a permit plugin.
 | 
						// Create a plugin registry for testing. Register only a permit plugin.
 | 
				
			||||||
	permitPlugin := &PermitPlugin{}
 | 
						permitPlugin := &PermitPlugin{}
 | 
				
			||||||
	registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
 | 
						registry, plugins := initRegistryAndConfig(permitPlugin)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Setup initial permit plugin for testing.
 | 
					 | 
				
			||||||
	plugins := &schedulerconfig.Plugins{
 | 
					 | 
				
			||||||
		Permit: &schedulerconfig.PluginSet{
 | 
					 | 
				
			||||||
			Enabled: []schedulerconfig.Plugin{
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					Name: permitPluginName,
 | 
					 | 
				
			||||||
				},
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Create the master and the scheduler with the test plugin set.
 | 
						// Create the master and the scheduler with the test plugin set.
 | 
				
			||||||
	context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
 | 
						context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
 | 
				
			||||||
@@ -1522,3 +1528,22 @@ func initTestSchedulerForFrameworkTest(t *testing.T, context *testContext, nodeC
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return c
 | 
						return c
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// initRegistryAndConfig returns registry and plugins config based on give plugins.
 | 
				
			||||||
 | 
					// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments
 | 
				
			||||||
 | 
					func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) {
 | 
				
			||||||
 | 
						if len(pp) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						registry = framework.Registry{}
 | 
				
			||||||
 | 
						plugins = &schedulerconfig.Plugins{
 | 
				
			||||||
 | 
							Permit: &schedulerconfig.PluginSet{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, p := range pp {
 | 
				
			||||||
 | 
							registry.Register(p.Name(), newPermitPlugin(p))
 | 
				
			||||||
 | 
							plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user