Merge pull request #83389 from ahmad-diaa/move-PodPreemptor-to-sched
Move PodPreemptor to Scheduler
This commit is contained in:
		| @@ -79,9 +79,6 @@ type Config struct { | |||||||
|  |  | ||||||
| 	Algorithm core.ScheduleAlgorithm | 	Algorithm core.ScheduleAlgorithm | ||||||
| 	GetBinder func(pod *v1.Pod) Binder | 	GetBinder func(pod *v1.Pod) Binder | ||||||
| 	// PodPreemptor is used to evict pods and update 'NominatedNode' field of |  | ||||||
| 	// the preemptor pod. |  | ||||||
| 	PodPreemptor PodPreemptor |  | ||||||
| 	// Framework runs scheduler plugins at configured extension points. | 	// Framework runs scheduler plugins at configured extension points. | ||||||
| 	Framework framework.Framework | 	Framework framework.Framework | ||||||
|  |  | ||||||
| @@ -119,15 +116,6 @@ type Config struct { | |||||||
| 	PluginConfig []config.PluginConfig | 	PluginConfig []config.PluginConfig | ||||||
| } | } | ||||||
|  |  | ||||||
| // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' |  | ||||||
| // field of the preemptor pod. |  | ||||||
| type PodPreemptor interface { |  | ||||||
| 	GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) |  | ||||||
| 	DeletePod(pod *v1.Pod) error |  | ||||||
| 	SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error |  | ||||||
| 	RemoveNominatedNodeName(pod *v1.Pod) error |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Configurator defines I/O, caching, and other functionality needed to | // Configurator defines I/O, caching, and other functionality needed to | ||||||
| // construct a new scheduler. | // construct a new scheduler. | ||||||
| type Configurator struct { | type Configurator struct { | ||||||
| @@ -471,7 +459,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e | |||||||
| 		SchedulerCache: c.schedulerCache, | 		SchedulerCache: c.schedulerCache, | ||||||
| 		Algorithm:      algo, | 		Algorithm:      algo, | ||||||
| 		GetBinder:      getBinderFunc(c.client, extenders), | 		GetBinder:      getBinderFunc(c.client, extenders), | ||||||
| 		PodPreemptor:   &podPreemptor{c.client}, |  | ||||||
| 		Framework:      framework, | 		Framework:      framework, | ||||||
| 		WaitForCacheSync: func() bool { | 		WaitForCacheSync: func() bool { | ||||||
| 			return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) | 			return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) | ||||||
| @@ -719,29 +706,3 @@ func (b *binder) Bind(binding *v1.Binding) error { | |||||||
| 	klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) | 	klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) | ||||||
| 	return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) | 	return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) | ||||||
| } | } | ||||||
|  |  | ||||||
| type podPreemptor struct { |  | ||||||
| 	Client clientset.Interface |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { |  | ||||||
| 	return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *podPreemptor) DeletePod(pod *v1.Pod) error { |  | ||||||
| 	return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { |  | ||||||
| 	podCopy := pod.DeepCopy() |  | ||||||
| 	podCopy.Status.NominatedNodeName = nominatedNodeName |  | ||||||
| 	_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) |  | ||||||
| 	return err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { |  | ||||||
| 	if len(pod.Status.NominatedNodeName) == 0 { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
| 	return p.SetNominatedNodeName(pod, "") |  | ||||||
| } |  | ||||||
|   | |||||||
| @@ -55,10 +55,21 @@ const ( | |||||||
|  |  | ||||||
| // podConditionUpdater updates the condition of a pod based on the passed | // podConditionUpdater updates the condition of a pod based on the passed | ||||||
| // PodCondition | // PodCondition | ||||||
|  | // TODO (ahmad-diaa): Remove type and replace it with scheduler methods | ||||||
| type podConditionUpdater interface { | type podConditionUpdater interface { | ||||||
| 	update(pod *v1.Pod, podCondition *v1.PodCondition) error | 	update(pod *v1.Pod, podCondition *v1.PodCondition) error | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // PodPreemptor has methods needed to delete a pod and to update 'NominatedPod' | ||||||
|  | // field of the preemptor pod. | ||||||
|  | // TODO (ahmad-diaa): Remove type and replace it with scheduler methods | ||||||
|  | type podPreemptor interface { | ||||||
|  | 	getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) | ||||||
|  | 	deletePod(pod *v1.Pod) error | ||||||
|  | 	setNominatedNodeName(pod *v1.Pod, nominatedNode string) error | ||||||
|  | 	removeNominatedNodeName(pod *v1.Pod) error | ||||||
|  | } | ||||||
|  |  | ||||||
| // Scheduler watches for new unscheduled pods. It attempts to find | // Scheduler watches for new unscheduled pods. It attempts to find | ||||||
| // nodes that they fit on and writes bindings back to the api server. | // nodes that they fit on and writes bindings back to the api server. | ||||||
| type Scheduler struct { | type Scheduler struct { | ||||||
| @@ -74,7 +85,7 @@ type Scheduler struct { | |||||||
| 	podConditionUpdater podConditionUpdater | 	podConditionUpdater podConditionUpdater | ||||||
| 	// PodPreemptor is used to evict pods and update 'NominatedNode' field of | 	// PodPreemptor is used to evict pods and update 'NominatedNode' field of | ||||||
| 	// the preemptor pod. | 	// the preemptor pod. | ||||||
| 	PodPreemptor factory.PodPreemptor | 	podPreemptor podPreemptor | ||||||
| 	// Framework runs scheduler plugins at configured extension points. | 	// Framework runs scheduler plugins at configured extension points. | ||||||
| 	Framework framework.Framework | 	Framework framework.Framework | ||||||
|  |  | ||||||
| @@ -344,6 +355,8 @@ func New(client clientset.Interface, | |||||||
| 	// Create the scheduler. | 	// Create the scheduler. | ||||||
| 	sched := NewFromConfig(config) | 	sched := NewFromConfig(config) | ||||||
| 	sched.podConditionUpdater = &podConditionUpdaterImpl{client} | 	sched.podConditionUpdater = &podConditionUpdaterImpl{client} | ||||||
|  | 	sched.podPreemptor = &podPreemptorImpl{client} | ||||||
|  |  | ||||||
| 	AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) | 	AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer) | ||||||
| 	return sched, nil | 	return sched, nil | ||||||
| } | } | ||||||
| @@ -391,7 +404,6 @@ func NewFromConfig(config *factory.Config) *Scheduler { | |||||||
| 		SchedulerCache:    config.SchedulerCache, | 		SchedulerCache:    config.SchedulerCache, | ||||||
| 		Algorithm:         config.Algorithm, | 		Algorithm:         config.Algorithm, | ||||||
| 		GetBinder:         config.GetBinder, | 		GetBinder:         config.GetBinder, | ||||||
| 		PodPreemptor:      config.PodPreemptor, |  | ||||||
| 		Framework:         config.Framework, | 		Framework:         config.Framework, | ||||||
| 		NextPod:           config.NextPod, | 		NextPod:           config.NextPod, | ||||||
| 		WaitForCacheSync:  config.WaitForCacheSync, | 		WaitForCacheSync:  config.WaitForCacheSync, | ||||||
| @@ -434,7 +446,7 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err | |||||||
| // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. | // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. | ||||||
| // It returns the node name and an error if any. | // It returns the node name and an error if any. | ||||||
| func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { | func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) { | ||||||
| 	preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor) | 	preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("Error getting the updated preemptor pod object: %v", err) | 		klog.Errorf("Error getting the updated preemptor pod object: %v", err) | ||||||
| 		return "", err | 		return "", err | ||||||
| @@ -454,7 +466,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame | |||||||
| 		sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) | 		sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) | ||||||
|  |  | ||||||
| 		// Make a call to update nominated node name of the pod on the API server. | 		// Make a call to update nominated node name of the pod on the API server. | ||||||
| 		err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName) | 		err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) | 			klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) | ||||||
| 			sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) | 			sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) | ||||||
| @@ -462,7 +474,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		for _, victim := range victims { | 		for _, victim := range victims { | ||||||
| 			if err := sched.PodPreemptor.DeletePod(victim); err != nil { | 			if err := sched.podPreemptor.deletePod(victim); err != nil { | ||||||
| 				klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) | 				klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) | ||||||
| 				return "", err | 				return "", err | ||||||
| 			} | 			} | ||||||
| @@ -481,7 +493,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame | |||||||
| 	// function of generic_scheduler.go returns the pod itself for removal of | 	// function of generic_scheduler.go returns the pod itself for removal of | ||||||
| 	// the 'NominatedPod' field. | 	// the 'NominatedPod' field. | ||||||
| 	for _, p := range nominatedPodsToClear { | 	for _, p := range nominatedPodsToClear { | ||||||
| 		rErr := sched.PodPreemptor.RemoveNominatedNodeName(p) | 		rErr := sched.podPreemptor.removeNominatedNodeName(p) | ||||||
| 		if rErr != nil { | 		if rErr != nil { | ||||||
| 			klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) | 			klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) | ||||||
| 			// We do not return as this error is not critical. | 			// We do not return as this error is not critical. | ||||||
| @@ -756,6 +768,32 @@ func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type podPreemptorImpl struct { | ||||||
|  | 	Client clientset.Interface | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { | ||||||
|  | 	return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error { | ||||||
|  | 	return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { | ||||||
|  | 	podCopy := pod.DeepCopy() | ||||||
|  | 	podCopy.Status.NominatedNodeName = nominatedNodeName | ||||||
|  | 	_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error { | ||||||
|  | 	if len(pod.Status.NominatedNodeName) == 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	return p.setNominatedNodeName(pod, "") | ||||||
|  | } | ||||||
|  |  | ||||||
| // nodeResourceString returns a string representation of node resources. | // nodeResourceString returns a string representation of node resources. | ||||||
| func nodeResourceString(n *v1.Node) string { | func nodeResourceString(n *v1.Node) string { | ||||||
| 	if n == nil { | 	if n == nil { | ||||||
|   | |||||||
| @@ -78,19 +78,19 @@ func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondit | |||||||
|  |  | ||||||
| type fakePodPreemptor struct{} | type fakePodPreemptor struct{} | ||||||
|  |  | ||||||
| func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { | func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) { | ||||||
| 	return pod, nil | 	return pod, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error { | func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error { | func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { | func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error { | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -674,7 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C | |||||||
| 		}, | 		}, | ||||||
| 		Recorder:            &events.FakeRecorder{}, | 		Recorder:            &events.FakeRecorder{}, | ||||||
| 		podConditionUpdater: fakePodConditionUpdater{}, | 		podConditionUpdater: fakePodConditionUpdater{}, | ||||||
| 		PodPreemptor:        fakePodPreemptor{}, | 		podPreemptor:        fakePodPreemptor{}, | ||||||
| 		Framework:           emptyFramework, | 		Framework:           emptyFramework, | ||||||
| 		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), | 		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), | ||||||
| 	} | 	} | ||||||
| @@ -728,7 +728,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc | |||||||
| 		}, | 		}, | ||||||
| 		Recorder:            &events.FakeRecorder{}, | 		Recorder:            &events.FakeRecorder{}, | ||||||
| 		podConditionUpdater: fakePodConditionUpdater{}, | 		podConditionUpdater: fakePodConditionUpdater{}, | ||||||
| 		PodPreemptor:        fakePodPreemptor{}, | 		podPreemptor:        fakePodPreemptor{}, | ||||||
| 		StopEverything:      stop, | 		StopEverything:      stop, | ||||||
| 		Framework:           fwk, | 		Framework:           fwk, | ||||||
| 		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), | 		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot