scheduler: change fit predicates and priority func as needed for optimization

This commit is contained in:
Hongchao Deng
2016-01-28 12:14:45 -08:00
parent 9942f6bf3e
commit 9236e4a0b4
17 changed files with 343 additions and 255 deletions

View File

@@ -24,6 +24,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/unversioned"
@@ -122,9 +123,9 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
// - AWS EBS forbids any two pods mounting the same volume ID
// - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image.
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func NoDiskConflict(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
for _, v := range pod.Spec.Volumes {
for _, ev := range existingPods {
for _, ev := range nodeInfo.Pods() {
if isVolumeConflict(v, ev) {
return false, nil
}
@@ -198,7 +199,7 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace
return nil
}
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
newVolumes := make(map[string]bool)
if err := c.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
return false, err
@@ -211,7 +212,7 @@ func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, existingPods []*api.Po
// count unique volumes
existingVolumes := make(map[string]bool)
for _, existingPod := range existingPods {
for _, existingPod := range nodeInfo.Pods() {
if err := c.filterVolumes(existingPod.Spec.Volumes, existingPod.Namespace, existingVolumes); err != nil {
return false, err
}
@@ -297,13 +298,13 @@ func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcI
return c.predicate
}
func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeID)
func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeID)
return false, fmt.Errorf("node not found: %q", nodeName)
}
nodeConstraints := make(map[string]string)
@@ -360,7 +361,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, existingPods []*api.Pod, nod
}
nodeV, _ := nodeConstraints[k]
if v != nodeV {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeID, pvName, k)
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, nodeName, pvName, k)
return false, nil
}
}
@@ -389,18 +390,6 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
return result
}
func getTotalResourceRequest(pods []*api.Pod) resourceRequest {
result := resourceRequest{}
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
result.memory += requests.Memory().Value()
result.milliCPU += requests.Cpu().MilliValue()
}
}
return result
}
func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory []*api.Pod) {
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
@@ -433,16 +422,17 @@ func podName(pod *api.Pod) string {
}
// PodFitsResources calculates fit based on requested, rather than used resources
func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
info, err := r.info.GetNodeInfo(node)
func (r *ResourceFit) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
allocatable := info.Status.Allocatable
if int64(len(existingPods))+1 > allocatable.Pods().Value() {
return false, newInsufficientResourceError(podCountResourceName, 1,
int64(len(existingPods)), allocatable.Pods().Value())
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber)
}
podRequest := getResourceRequest(pod)
@@ -450,17 +440,18 @@ func (r *ResourceFit) PodFitsResources(pod *api.Pod, existingPods []*api.Pod, no
return true, nil
}
pods := append(existingPods, pod)
_, exceedingCPU, exceedingMemory := CheckPodsExceedingFreeResources(pods, allocatable)
if len(exceedingCPU) > 0 {
return false, newInsufficientResourceError(cpuResourceName, podRequest.milliCPU,
getTotalResourceRequest(existingPods).milliCPU, allocatable.Cpu().MilliValue())
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
if totalMilliCPU < podRequest.milliCPU+nodeInfo.RequestedResource().MilliCPU {
return false,
newInsufficientResourceError(cpuResourceName, podRequest.milliCPU, nodeInfo.RequestedResource().MilliCPU, totalMilliCPU)
}
if len(exceedingMemory) > 0 {
return false, newInsufficientResourceError(memoryResoureceName, podRequest.memory,
getTotalResourceRequest(existingPods).memory, allocatable.Memory().Value())
if totalMemory < podRequest.memory+nodeInfo.RequestedResource().Memory {
return false,
newInsufficientResourceError(memoryResoureceName, podRequest.memory, nodeInfo.RequestedResource().Memory, totalMemory)
}
glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", podName(pod), node, len(pods)-1, allocatable.Pods().Value())
glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), nodeName, len(nodeInfo.Pods()), allowedPodNumber)
return true, nil
}
@@ -548,19 +539,19 @@ type NodeSelector struct {
info NodeInfo
}
func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
node, err := n.info.GetNodeInfo(nodeID)
func (n *NodeSelector) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return PodMatchesNodeLabels(pod, node), nil
}
func PodFitsHost(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
if len(pod.Spec.NodeName) == 0 {
return true, nil
}
return pod.Spec.NodeName == node, nil
return pod.Spec.NodeName == nodeName, nil
}
type NodeLabelChecker struct {
@@ -590,9 +581,9 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori
// Alternately, eliminating nodes that have a certain label, regardless of value, is also useful
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var exists bool
node, err := n.info.GetNodeInfo(nodeID)
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
@@ -632,7 +623,7 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
// - L is listed in the ServiceAffinity object that is passed into the function
// - the pod does not have any NodeSelector for L
// - some other pod from the same service is already scheduled onto a node that has value V for label L
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api.Pod, nodeID string) (bool, error) {
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var affinitySelector labels.Selector
// check if the pod being scheduled has the affinity labels specified in its NodeSelector
@@ -692,7 +683,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api
affinitySelector = labels.Set(affinityLabels).AsSelector()
}
node, err := s.nodeInfo.GetNodeInfo(nodeID)
node, err := s.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
@@ -701,12 +692,12 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, existingPods []*api
return affinitySelector.Matches(labels.Set(node.Labels)), nil
}
func PodFitsHostPorts(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) {
func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
wantPorts := getUsedPorts(pod)
if len(wantPorts) == 0 {
return true, nil
}
existingPorts := getUsedPorts(existingPods...)
existingPorts := getUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts {
if wport == 0 {
continue
@@ -730,22 +721,6 @@ func getUsedPorts(pods ...*api.Pod) map[int]bool {
return ports
}
// MapPodsToMachines obtains a list of pods and pivots that list into a map where the keys are host names
// and the values are the list of pods running on that host.
func MapPodsToMachines(lister algorithm.PodLister) (map[string][]*api.Pod, error) {
machineToPods := map[string][]*api.Pod{}
// TODO: perform more targeted query...
pods, err := lister.List(labels.Everything())
if err != nil {
return map[string][]*api.Pod{}, err
}
for _, scheduledPod := range pods {
host := scheduledPod.Spec.NodeName
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
return machineToPods, nil
}
// search two arrays and return true if they have at least one common element; return false otherwise
func haveSame(a1, a2 []string) bool {
for _, val1 := range a1 {