Merge pull request #118529 from kerthcet/feat/record-failed-plugins
Record failed plugins in Reserver Plugin
This commit is contained in:
		| @@ -1265,6 +1265,11 @@ func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *fra | |||||||
| 		ctx := klog.NewContext(ctx, logger) | 		ctx := klog.NewContext(ctx, logger) | ||||||
| 		status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) | 		status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName) | ||||||
| 		if !status.IsSuccess() { | 		if !status.IsSuccess() { | ||||||
|  | 			if status.IsUnschedulable() { | ||||||
|  | 				logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message()) | ||||||
|  | 				status.SetFailedPlugin(pl.Name()) | ||||||
|  | 				return status | ||||||
|  | 			} | ||||||
| 			err := status.AsError() | 			err := status.AsError() | ||||||
| 			logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod)) | 			logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod)) | ||||||
| 			return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) | 			return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err)) | ||||||
|   | |||||||
| @@ -2422,7 +2422,7 @@ func TestReservePlugins(t *testing.T) { | |||||||
| 					inj:  injectedResult{ReserveStatus: int(framework.Unschedulable)}, | 					inj:  injectedResult{ReserveStatus: int(framework.Unschedulable)}, | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), | 			wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"), | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			name: "ErrorReservePlugin", | 			name: "ErrorReservePlugin", | ||||||
| @@ -2442,7 +2442,7 @@ func TestReservePlugins(t *testing.T) { | |||||||
| 					inj:  injectedResult{ReserveStatus: int(framework.UnschedulableAndUnresolvable)}, | 					inj:  injectedResult{ReserveStatus: int(framework.UnschedulableAndUnresolvable)}, | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), | 			wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, injectReason).WithFailedPlugin("TestPlugin"), | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			name: "SuccessSuccessReservePlugins", | 			name: "SuccessSuccessReservePlugins", | ||||||
| @@ -2512,7 +2512,7 @@ func TestReservePlugins(t *testing.T) { | |||||||
| 					inj:  injectedResult{ReserveStatus: int(framework.Success)}, | 					inj:  injectedResult{ReserveStatus: int(framework.Success)}, | ||||||
| 				}, | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			wantStatus: framework.AsStatus(fmt.Errorf(`running Reserve plugin "TestPlugin": %w`, errInjectedStatus)), | 			wantStatus: framework.NewStatus(framework.Unschedulable, injectReason).WithFailedPlugin("TestPlugin"), | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1458,6 +1458,17 @@ func TestFitError_Error(t *testing.T) { | |||||||
| 			}, | 			}, | ||||||
| 			wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.", | 			wantReasonMsg: "0/1 nodes are available: 1 Node failed Permit plugin Permit-1.", | ||||||
| 		}, | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name:        "failed to Reserve on node", | ||||||
|  | 			numAllNodes: 1, | ||||||
|  | 			diagnosis: Diagnosis{ | ||||||
|  | 				NodeToStatusMap: NodeToStatusMap{ | ||||||
|  | 					// There should be only one node here. | ||||||
|  | 					"node1": NewStatus(Unschedulable, "Node failed Reserve plugin Reserve-1"), | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			wantReasonMsg: "0/1 nodes are available: 1 Node failed Reserve plugin Reserve-1.", | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
| 	for _, tt := range tests { | 	for _, tt := range tests { | ||||||
| 		t.Run(tt.name, func(t *testing.T) { | 		t.Run(tt.name, func(t *testing.T) { | ||||||
|   | |||||||
| @@ -195,6 +195,17 @@ func (sched *Scheduler) schedulingCycle( | |||||||
| 			logger.Error(forgetErr, "Scheduler cache ForgetPod failed") | 			logger.Error(forgetErr, "Scheduler cache ForgetPod failed") | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		if sts.IsUnschedulable() { | ||||||
|  | 			fitErr := &framework.FitError{ | ||||||
|  | 				NumAllNodes: 1, | ||||||
|  | 				Pod:         pod, | ||||||
|  | 				Diagnosis: framework.Diagnosis{ | ||||||
|  | 					NodeToStatusMap:      framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts}, | ||||||
|  | 					UnschedulablePlugins: sets.New(sts.FailedPlugin()), | ||||||
|  | 				}, | ||||||
|  | 			} | ||||||
|  | 			return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr) | ||||||
|  | 		} | ||||||
| 		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts | 		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -31,6 +31,50 @@ import ( | |||||||
|  |  | ||||||
| var _ framework.PermitPlugin = &PermitPlugin{} | var _ framework.PermitPlugin = &PermitPlugin{} | ||||||
| var _ framework.EnqueueExtensions = &PermitPlugin{} | var _ framework.EnqueueExtensions = &PermitPlugin{} | ||||||
|  | var _ framework.ReservePlugin = &ReservePlugin{} | ||||||
|  | var _ framework.EnqueueExtensions = &ReservePlugin{} | ||||||
|  |  | ||||||
|  | type ReservePlugin struct { | ||||||
|  | 	name               string | ||||||
|  | 	statusCode         framework.Code | ||||||
|  | 	numReserveCalled   int | ||||||
|  | 	numUnreserveCalled int | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rp *ReservePlugin) Name() string { | ||||||
|  | 	return rp.name | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { | ||||||
|  | 	rp.numReserveCalled += 1 | ||||||
|  |  | ||||||
|  | 	if rp.statusCode == framework.Error { | ||||||
|  | 		return framework.NewStatus(framework.Error, "failed to reserve") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if rp.statusCode == framework.Unschedulable { | ||||||
|  | 		if rp.numReserveCalled <= 1 { | ||||||
|  | 			return framework.NewStatus(framework.Unschedulable, "reject to reserve") | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { | ||||||
|  | 	rp.numUnreserveCalled += 1 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rp *ReservePlugin) EventsToRegister() []framework.ClusterEventWithHint { | ||||||
|  | 	return []framework.ClusterEventWithHint{ | ||||||
|  | 		{ | ||||||
|  | 			Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, | ||||||
|  | 			QueueingHintFn: func(pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint { | ||||||
|  | 				return framework.QueueImmediately | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| type PermitPlugin struct { | type PermitPlugin struct { | ||||||
| 	name            string | 	name            string | ||||||
| @@ -118,6 +162,41 @@ func TestReScheduling(t *testing.T) { | |||||||
| 			wantFirstSchedulingError: true, | 			wantFirstSchedulingError: true, | ||||||
| 			wantError:                true, | 			wantError:                true, | ||||||
| 		}, | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Rescheduling pod rejected by Reserve Plugin", | ||||||
|  | 			plugins: []framework.Plugin{ | ||||||
|  | 				&ReservePlugin{name: "reserve", statusCode: framework.Unschedulable}, | ||||||
|  | 			}, | ||||||
|  | 			action: func() error { | ||||||
|  | 				_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) | ||||||
|  | 				return err | ||||||
|  | 			}, | ||||||
|  | 			wantScheduled: true, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Rescheduling pod rejected by Reserve Plugin with unrelated event", | ||||||
|  | 			plugins: []framework.Plugin{ | ||||||
|  | 				&ReservePlugin{name: "reserve", statusCode: framework.Unschedulable}, | ||||||
|  | 			}, | ||||||
|  | 			action: func() error { | ||||||
|  | 				_, err := createPausePod(testContext.ClientSet, | ||||||
|  | 					initPausePod(&testutils.PausePodConfig{Name: "test-pod-2", Namespace: testContext.NS.Name})) | ||||||
|  | 				return err | ||||||
|  | 			}, | ||||||
|  | 			wantScheduled: false, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "Rescheduling pod failed by Reserve Plugin", | ||||||
|  | 			plugins: []framework.Plugin{ | ||||||
|  | 				&ReservePlugin{name: "reserve", statusCode: framework.Error}, | ||||||
|  | 			}, | ||||||
|  | 			action: func() error { | ||||||
|  | 				_, err := createNode(testContext.ClientSet, st.MakeNode().Name("fake-node").Obj()) | ||||||
|  | 				return err | ||||||
|  | 			}, | ||||||
|  | 			wantFirstSchedulingError: true, | ||||||
|  | 			wantError:                true, | ||||||
|  | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, test := range tests { | 	for _, test := range tests { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot