Merge pull request #33834 from wojtek-t/scheduler_map_reduce_6
Automatic merge from submit-queue Migrate EqualPriority in scheduler to map-reduce-like framework Ref #24246
This commit is contained in:
@@ -39,19 +39,16 @@ const maxPriority float32 = 10
|
||||
const zoneWeighting = 2.0 / 3.0
|
||||
|
||||
type SelectorSpread struct {
|
||||
podLister algorithm.PodLister
|
||||
serviceLister algorithm.ServiceLister
|
||||
controllerLister algorithm.ControllerLister
|
||||
replicaSetLister algorithm.ReplicaSetLister
|
||||
}
|
||||
|
||||
func NewSelectorSpreadPriority(
|
||||
podLister algorithm.PodLister,
|
||||
serviceLister algorithm.ServiceLister,
|
||||
controllerLister algorithm.ControllerLister,
|
||||
replicaSetLister algorithm.ReplicaSetLister) algorithm.PriorityFunction {
|
||||
selectorSpread := &SelectorSpread{
|
||||
podLister: podLister,
|
||||
serviceLister: serviceLister,
|
||||
controllerLister: controllerLister,
|
||||
replicaSetLister: replicaSetLister,
|
||||
@@ -59,31 +56,41 @@ func NewSelectorSpreadPriority(
|
||||
return selectorSpread.CalculateSpreadPriority
|
||||
}
|
||||
|
||||
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
|
||||
// When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors.
|
||||
// It favors nodes that have fewer existing matching pods.
|
||||
// i.e. it pushes the scheduler towards a node where there's the smallest number of
|
||||
// pods which match the same service selectors or RC selectors as the pod being scheduled.
|
||||
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
||||
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
|
||||
// Returns selectors of services, RCs and RSs matching the given pod.
|
||||
func getSelectors(pod *api.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister) []labels.Selector {
|
||||
selectors := make([]labels.Selector, 0, 3)
|
||||
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
|
||||
if services, err := sl.GetPodServices(pod); err == nil {
|
||||
for _, service := range services {
|
||||
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rcs, err := s.controllerLister.GetPodControllers(pod); err == nil {
|
||||
if rcs, err := cl.GetPodControllers(pod); err == nil {
|
||||
for _, rc := range rcs {
|
||||
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
|
||||
}
|
||||
}
|
||||
if rss, err := s.replicaSetLister.GetPodReplicaSets(pod); err == nil {
|
||||
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
|
||||
for _, rs := range rss {
|
||||
if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
|
||||
selectors = append(selectors, selector)
|
||||
}
|
||||
}
|
||||
}
|
||||
return selectors
|
||||
}
|
||||
|
||||
func (s *SelectorSpread) getSelectors(pod *api.Pod) []labels.Selector {
|
||||
return getSelectors(pod, s.serviceLister, s.controllerLister, s.replicaSetLister)
|
||||
}
|
||||
|
||||
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
|
||||
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
|
||||
// It favors nodes that have fewer existing matching pods.
|
||||
// i.e. it pushes the scheduler towards a node where there's the smallest number of
|
||||
// pods which match the same service, RC or RS selectors as the pod being scheduled.
|
||||
// Where zone information is included on the nodes, it favors nodes in zones with fewer existing matching pods.
|
||||
func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
|
||||
selectors := s.getSelectors(pod)
|
||||
|
||||
// Count similar pods by node
|
||||
countsByNodeName := make(map[string]float32, len(nodes))
|
||||
|
@@ -286,7 +286,6 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
|
||||
selectorSpread := SelectorSpread{
|
||||
podLister: algorithm.FakePodLister(test.pods),
|
||||
serviceLister: algorithm.FakeServiceLister(test.services),
|
||||
controllerLister: algorithm.FakeControllerLister(test.rcs),
|
||||
replicaSetLister: algorithm.FakeReplicaSetLister(test.rss),
|
||||
@@ -494,7 +493,6 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
|
||||
selectorSpread := SelectorSpread{
|
||||
podLister: algorithm.FakePodLister(test.pods),
|
||||
serviceLister: algorithm.FakeServiceLister(test.services),
|
||||
controllerLister: algorithm.FakeControllerLister(test.rcs),
|
||||
replicaSetLister: algorithm.FakeReplicaSetLister(test.rss),
|
||||
|
Reference in New Issue
Block a user