move PodPreemptor to scheduler
This commit is contained in:
		@@ -79,9 +79,6 @@ type Config struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	Algorithm core.ScheduleAlgorithm
 | 
						Algorithm core.ScheduleAlgorithm
 | 
				
			||||||
	GetBinder func(pod *v1.Pod) Binder
 | 
						GetBinder func(pod *v1.Pod) Binder
 | 
				
			||||||
	// PodPreemptor is used to evict pods and update 'NominatedNode' field of
 | 
					 | 
				
			||||||
	// the preemptor pod.
 | 
					 | 
				
			||||||
	PodPreemptor PodPreemptor
 | 
					 | 
				
			||||||
	// Framework runs scheduler plugins at configured extension points.
 | 
						// Framework runs scheduler plugins at configured extension points.
 | 
				
			||||||
	Framework framework.Framework
 | 
						Framework framework.Framework
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -119,15 +116,6 @@ type Config struct {
 | 
				
			|||||||
	PluginConfig []config.PluginConfig
 | 
						PluginConfig []config.PluginConfig
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
 | 
					 | 
				
			||||||
// field of the preemptor pod.
 | 
					 | 
				
			||||||
type PodPreemptor interface {
 | 
					 | 
				
			||||||
	GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
 | 
					 | 
				
			||||||
	DeletePod(pod *v1.Pod) error
 | 
					 | 
				
			||||||
	SetNominatedNodeName(pod *v1.Pod, nominatedNode string) error
 | 
					 | 
				
			||||||
	RemoveNominatedNodeName(pod *v1.Pod) error
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Configurator defines I/O, caching, and other functionality needed to
 | 
					// Configurator defines I/O, caching, and other functionality needed to
 | 
				
			||||||
// construct a new scheduler.
 | 
					// construct a new scheduler.
 | 
				
			||||||
type Configurator struct {
 | 
					type Configurator struct {
 | 
				
			||||||
@@ -471,7 +459,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
 | 
				
			|||||||
		SchedulerCache: c.schedulerCache,
 | 
							SchedulerCache: c.schedulerCache,
 | 
				
			||||||
		Algorithm:      algo,
 | 
							Algorithm:      algo,
 | 
				
			||||||
		GetBinder:      getBinderFunc(c.client, extenders),
 | 
							GetBinder:      getBinderFunc(c.client, extenders),
 | 
				
			||||||
		PodPreemptor:   &podPreemptor{c.client},
 | 
					 | 
				
			||||||
		Framework:      framework,
 | 
							Framework:      framework,
 | 
				
			||||||
		WaitForCacheSync: func() bool {
 | 
							WaitForCacheSync: func() bool {
 | 
				
			||||||
			return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
 | 
								return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
 | 
				
			||||||
@@ -719,29 +706,3 @@ func (b *binder) Bind(binding *v1.Binding) error {
 | 
				
			|||||||
	klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
 | 
						klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name)
 | 
				
			||||||
	return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
 | 
						return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
type podPreemptor struct {
 | 
					 | 
				
			||||||
	Client clientset.Interface
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (p *podPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
 | 
					 | 
				
			||||||
	return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (p *podPreemptor) DeletePod(pod *v1.Pod) error {
 | 
					 | 
				
			||||||
	return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (p *podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
 | 
					 | 
				
			||||||
	podCopy := pod.DeepCopy()
 | 
					 | 
				
			||||||
	podCopy.Status.NominatedNodeName = nominatedNodeName
 | 
					 | 
				
			||||||
	_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
 | 
					 | 
				
			||||||
	return err
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (p *podPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
 | 
					 | 
				
			||||||
	if len(pod.Status.NominatedNodeName) == 0 {
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return p.SetNominatedNodeName(pod, "")
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
@@ -55,10 +55,21 @@ const (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// podConditionUpdater updates the condition of a pod based on the passed
 | 
					// podConditionUpdater updates the condition of a pod based on the passed
 | 
				
			||||||
// PodCondition
 | 
					// PodCondition
 | 
				
			||||||
 | 
					// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
 | 
				
			||||||
type podConditionUpdater interface {
 | 
					type podConditionUpdater interface {
 | 
				
			||||||
	update(pod *v1.Pod, podCondition *v1.PodCondition) error
 | 
						update(pod *v1.Pod, podCondition *v1.PodCondition) error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
 | 
				
			||||||
 | 
					// field of the preemptor pod.
 | 
				
			||||||
 | 
					// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
 | 
				
			||||||
 | 
					type podPreemptor interface {
 | 
				
			||||||
 | 
						getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
 | 
				
			||||||
 | 
						deletePod(pod *v1.Pod) error
 | 
				
			||||||
 | 
						setNominatedNodeName(pod *v1.Pod, nominatedNode string) error
 | 
				
			||||||
 | 
						removeNominatedNodeName(pod *v1.Pod) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Scheduler watches for new unscheduled pods. It attempts to find
 | 
					// Scheduler watches for new unscheduled pods. It attempts to find
 | 
				
			||||||
// nodes that they fit on and writes bindings back to the api server.
 | 
					// nodes that they fit on and writes bindings back to the api server.
 | 
				
			||||||
type Scheduler struct {
 | 
					type Scheduler struct {
 | 
				
			||||||
@@ -74,7 +85,7 @@ type Scheduler struct {
 | 
				
			|||||||
	podConditionUpdater podConditionUpdater
 | 
						podConditionUpdater podConditionUpdater
 | 
				
			||||||
	// PodPreemptor is used to evict pods and update 'NominatedNode' field of
 | 
						// PodPreemptor is used to evict pods and update 'NominatedNode' field of
 | 
				
			||||||
	// the preemptor pod.
 | 
						// the preemptor pod.
 | 
				
			||||||
	PodPreemptor factory.PodPreemptor
 | 
						podPreemptor podPreemptor
 | 
				
			||||||
	// Framework runs scheduler plugins at configured extension points.
 | 
						// Framework runs scheduler plugins at configured extension points.
 | 
				
			||||||
	Framework framework.Framework
 | 
						Framework framework.Framework
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -344,6 +355,8 @@ func New(client clientset.Interface,
 | 
				
			|||||||
	// Create the scheduler.
 | 
						// Create the scheduler.
 | 
				
			||||||
	sched := NewFromConfig(config)
 | 
						sched := NewFromConfig(config)
 | 
				
			||||||
	sched.podConditionUpdater = &podConditionUpdaterImpl{client}
 | 
						sched.podConditionUpdater = &podConditionUpdaterImpl{client}
 | 
				
			||||||
 | 
						sched.podPreemptor = &podPreemptorImpl{client}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
 | 
						AddAllEventHandlers(sched, options.schedulerName, informerFactory, podInformer)
 | 
				
			||||||
	return sched, nil
 | 
						return sched, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -391,7 +404,6 @@ func NewFromConfig(config *factory.Config) *Scheduler {
 | 
				
			|||||||
		SchedulerCache:    config.SchedulerCache,
 | 
							SchedulerCache:    config.SchedulerCache,
 | 
				
			||||||
		Algorithm:         config.Algorithm,
 | 
							Algorithm:         config.Algorithm,
 | 
				
			||||||
		GetBinder:         config.GetBinder,
 | 
							GetBinder:         config.GetBinder,
 | 
				
			||||||
		PodPreemptor:      config.PodPreemptor,
 | 
					 | 
				
			||||||
		Framework:         config.Framework,
 | 
							Framework:         config.Framework,
 | 
				
			||||||
		NextPod:           config.NextPod,
 | 
							NextPod:           config.NextPod,
 | 
				
			||||||
		WaitForCacheSync:  config.WaitForCacheSync,
 | 
							WaitForCacheSync:  config.WaitForCacheSync,
 | 
				
			||||||
@@ -434,7 +446,7 @@ func (sched *Scheduler) recordSchedulingFailure(podInfo *framework.PodInfo, err
 | 
				
			|||||||
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
 | 
					// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
 | 
				
			||||||
// It returns the node name and an error if any.
 | 
					// It returns the node name and an error if any.
 | 
				
			||||||
func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
 | 
					func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
 | 
				
			||||||
	preemptor, err := sched.PodPreemptor.GetUpdatedPod(preemptor)
 | 
						preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		klog.Errorf("Error getting the updated preemptor pod object: %v", err)
 | 
							klog.Errorf("Error getting the updated preemptor pod object: %v", err)
 | 
				
			||||||
		return "", err
 | 
							return "", err
 | 
				
			||||||
@@ -454,7 +466,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
 | 
				
			|||||||
		sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
 | 
							sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Make a call to update nominated node name of the pod on the API server.
 | 
							// Make a call to update nominated node name of the pod on the API server.
 | 
				
			||||||
		err = sched.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
 | 
							err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
 | 
								klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
 | 
				
			||||||
			sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
 | 
								sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
 | 
				
			||||||
@@ -462,7 +474,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		for _, victim := range victims {
 | 
							for _, victim := range victims {
 | 
				
			||||||
			if err := sched.PodPreemptor.DeletePod(victim); err != nil {
 | 
								if err := sched.podPreemptor.deletePod(victim); err != nil {
 | 
				
			||||||
				klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
 | 
									klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
 | 
				
			||||||
				return "", err
 | 
									return "", err
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -481,7 +493,7 @@ func (sched *Scheduler) preempt(state *framework.CycleState, fwk framework.Frame
 | 
				
			|||||||
	// function of generic_scheduler.go returns the pod itself for removal of
 | 
						// function of generic_scheduler.go returns the pod itself for removal of
 | 
				
			||||||
	// the 'NominatedPod' field.
 | 
						// the 'NominatedPod' field.
 | 
				
			||||||
	for _, p := range nominatedPodsToClear {
 | 
						for _, p := range nominatedPodsToClear {
 | 
				
			||||||
		rErr := sched.PodPreemptor.RemoveNominatedNodeName(p)
 | 
							rErr := sched.podPreemptor.removeNominatedNodeName(p)
 | 
				
			||||||
		if rErr != nil {
 | 
							if rErr != nil {
 | 
				
			||||||
			klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
 | 
								klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
 | 
				
			||||||
			// We do not return as this error is not critical.
 | 
								// We do not return as this error is not critical.
 | 
				
			||||||
@@ -756,6 +768,32 @@ func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type podPreemptorImpl struct {
 | 
				
			||||||
 | 
						Client clientset.Interface
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *podPreemptorImpl) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
 | 
				
			||||||
 | 
						return p.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
 | 
				
			||||||
 | 
						return p.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
 | 
				
			||||||
 | 
						podCopy := pod.DeepCopy()
 | 
				
			||||||
 | 
						podCopy.Status.NominatedNodeName = nominatedNodeName
 | 
				
			||||||
 | 
						_, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)
 | 
				
			||||||
 | 
						return err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
 | 
				
			||||||
 | 
						if len(pod.Status.NominatedNodeName) == 0 {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return p.setNominatedNodeName(pod, "")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// nodeResourceString returns a string representation of node resources.
 | 
					// nodeResourceString returns a string representation of node resources.
 | 
				
			||||||
func nodeResourceString(n *v1.Node) string {
 | 
					func nodeResourceString(n *v1.Node) string {
 | 
				
			||||||
	if n == nil {
 | 
						if n == nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -78,19 +78,19 @@ func (fc fakePodConditionUpdater) update(pod *v1.Pod, podCondition *v1.PodCondit
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type fakePodPreemptor struct{}
 | 
					type fakePodPreemptor struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fp fakePodPreemptor) GetUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
 | 
					func (fp fakePodPreemptor) getUpdatedPod(pod *v1.Pod) (*v1.Pod, error) {
 | 
				
			||||||
	return pod, nil
 | 
						return pod, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fp fakePodPreemptor) DeletePod(pod *v1.Pod) error {
 | 
					func (fp fakePodPreemptor) deletePod(pod *v1.Pod) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fp fakePodPreemptor) SetNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
 | 
					func (fp fakePodPreemptor) setNominatedNodeName(pod *v1.Pod, nomNodeName string) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
 | 
					func (fp fakePodPreemptor) removeNominatedNodeName(pod *v1.Pod) error {
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -674,7 +674,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		Recorder:            &events.FakeRecorder{},
 | 
							Recorder:            &events.FakeRecorder{},
 | 
				
			||||||
		podConditionUpdater: fakePodConditionUpdater{},
 | 
							podConditionUpdater: fakePodConditionUpdater{},
 | 
				
			||||||
		PodPreemptor:        fakePodPreemptor{},
 | 
							podPreemptor:        fakePodPreemptor{},
 | 
				
			||||||
		Framework:           emptyFramework,
 | 
							Framework:           emptyFramework,
 | 
				
			||||||
		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
 | 
							VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -728,7 +728,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		Recorder:            &events.FakeRecorder{},
 | 
							Recorder:            &events.FakeRecorder{},
 | 
				
			||||||
		podConditionUpdater: fakePodConditionUpdater{},
 | 
							podConditionUpdater: fakePodConditionUpdater{},
 | 
				
			||||||
		PodPreemptor:        fakePodPreemptor{},
 | 
							podPreemptor:        fakePodPreemptor{},
 | 
				
			||||||
		StopEverything:      stop,
 | 
							StopEverything:      stop,
 | 
				
			||||||
		Framework:           fwk,
 | 
							Framework:           fwk,
 | 
				
			||||||
		VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
 | 
							VolumeBinder:        volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user