
node cache don't need to get all the information about every candidate node. For huge clusters, sending full node information of all nodes in the cluster on the wire every time a pod is scheduled is expensive. If the scheduler is capable of caching node information along with its capabilities, sending node name alone is sufficient. These changes provide that optimization in a backward compatible way - removed the inadvertent signature change of Prioritize() function - added defensive checks as suggested - added annotation specific test case - updated the comments in the scheduler types - got rid of apiVersion thats unused - using *v1.NodeList as suggested - backing out pod annotation update related changes made in the 1st commit - Adjusted the comments in types.go and v1/types.go as suggested in the code review
404 lines
13 KiB
Go
404 lines
13 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package core
|
|
|
|
import (
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"k8s.io/apimachinery/pkg/util/errors"
|
|
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/kubernetes/pkg/api/v1"
|
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
|
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
|
|
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
|
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
|
)
|
|
|
|
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
|
|
|
|
type FitError struct {
|
|
Pod *v1.Pod
|
|
FailedPredicates FailedPredicateMap
|
|
}
|
|
|
|
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
|
|
|
|
const NoNodeAvailableMsg = "No nodes are available that match all of the following predicates:"
|
|
|
|
// Error returns detailed information of why the pod failed to fit on each node
|
|
func (f *FitError) Error() string {
|
|
reasons := make(map[string]int)
|
|
for _, predicates := range f.FailedPredicates {
|
|
for _, pred := range predicates {
|
|
reasons[pred.GetReason()] += 1
|
|
}
|
|
}
|
|
|
|
sortReasonsHistogram := func() []string {
|
|
reasonStrings := []string{}
|
|
for k, v := range reasons {
|
|
reasonStrings = append(reasonStrings, fmt.Sprintf("%v (%v)", k, v))
|
|
}
|
|
sort.Strings(reasonStrings)
|
|
return reasonStrings
|
|
}
|
|
reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+": %v.", strings.Join(sortReasonsHistogram(), ", "))
|
|
return reasonMsg
|
|
}
|
|
|
|
type genericScheduler struct {
|
|
cache schedulercache.Cache
|
|
predicates map[string]algorithm.FitPredicate
|
|
priorityMetaProducer algorithm.MetadataProducer
|
|
predicateMetaProducer algorithm.MetadataProducer
|
|
prioritizers []algorithm.PriorityConfig
|
|
extenders []algorithm.SchedulerExtender
|
|
pods algorithm.PodLister
|
|
lastNodeIndexLock sync.Mutex
|
|
lastNodeIndex uint64
|
|
|
|
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
|
|
|
equivalenceCache *EquivalenceCache
|
|
}
|
|
|
|
// Schedule tries to schedule the given pod to one of node in the node list.
|
|
// If it succeeds, it will return the name of the node.
|
|
// If it fails, it will return a Fiterror error with reasons.
|
|
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
|
|
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
|
|
defer trace.LogIfLong(100 * time.Millisecond)
|
|
|
|
nodes, err := nodeLister.List()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if len(nodes) == 0 {
|
|
return "", ErrNoNodesAvailable
|
|
}
|
|
|
|
// Used for all fit and priority funcs.
|
|
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
|
|
|
|
trace.Step("Computing predicates")
|
|
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if len(filteredNodes) == 0 {
|
|
return "", &FitError{
|
|
Pod: pod,
|
|
FailedPredicates: failedPredicateMap,
|
|
}
|
|
}
|
|
|
|
trace.Step("Prioritizing")
|
|
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
|
|
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
trace.Step("Selecting host")
|
|
return g.selectHost(priorityList)
|
|
}
|
|
|
|
// selectHost takes a prioritized list of nodes and then picks one
|
|
// in a round-robin manner from the nodes that had the highest score.
|
|
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
|
|
if len(priorityList) == 0 {
|
|
return "", fmt.Errorf("empty priorityList")
|
|
}
|
|
|
|
sort.Sort(sort.Reverse(priorityList))
|
|
maxScore := priorityList[0].Score
|
|
firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })
|
|
|
|
g.lastNodeIndexLock.Lock()
|
|
ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
|
|
g.lastNodeIndex++
|
|
g.lastNodeIndexLock.Unlock()
|
|
|
|
return priorityList[ix].Host, nil
|
|
}
|
|
|
|
// Filters the nodes to find the ones that fit based on the given predicate functions
|
|
// Each node is passed through the predicate functions to determine if it is a fit
|
|
func findNodesThatFit(
|
|
pod *v1.Pod,
|
|
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
|
nodes []*v1.Node,
|
|
predicateFuncs map[string]algorithm.FitPredicate,
|
|
extenders []algorithm.SchedulerExtender,
|
|
metadataProducer algorithm.MetadataProducer,
|
|
) ([]*v1.Node, FailedPredicateMap, error) {
|
|
var filtered []*v1.Node
|
|
failedPredicateMap := FailedPredicateMap{}
|
|
|
|
if len(predicateFuncs) == 0 {
|
|
filtered = nodes
|
|
} else {
|
|
// Create filtered list with enough space to avoid growing it
|
|
// and allow assigning.
|
|
filtered = make([]*v1.Node, len(nodes))
|
|
errs := []error{}
|
|
var predicateResultLock sync.Mutex
|
|
var filteredLen int32
|
|
|
|
// We can use the same metadata producer for all nodes.
|
|
meta := metadataProducer(pod, nodeNameToInfo)
|
|
checkNode := func(i int) {
|
|
nodeName := nodes[i].Name
|
|
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
|
|
if err != nil {
|
|
predicateResultLock.Lock()
|
|
errs = append(errs, err)
|
|
predicateResultLock.Unlock()
|
|
return
|
|
}
|
|
if fits {
|
|
filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
|
|
} else {
|
|
predicateResultLock.Lock()
|
|
failedPredicateMap[nodeName] = failedPredicates
|
|
predicateResultLock.Unlock()
|
|
}
|
|
}
|
|
workqueue.Parallelize(16, len(nodes), checkNode)
|
|
filtered = filtered[:filteredLen]
|
|
if len(errs) > 0 {
|
|
return []*v1.Node{}, FailedPredicateMap{}, errors.NewAggregate(errs)
|
|
}
|
|
}
|
|
|
|
if len(filtered) > 0 && len(extenders) != 0 {
|
|
for _, extender := range extenders {
|
|
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo)
|
|
if err != nil {
|
|
return []*v1.Node{}, FailedPredicateMap{}, err
|
|
}
|
|
|
|
for failedNodeName, failedMsg := range failedMap {
|
|
if _, found := failedPredicateMap[failedNodeName]; !found {
|
|
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
|
|
}
|
|
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
|
|
}
|
|
filtered = filteredList
|
|
if len(filtered) == 0 {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return filtered, failedPredicateMap, nil
|
|
}
|
|
|
|
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
|
|
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
|
|
var failedPredicates []algorithm.PredicateFailureReason
|
|
for _, predicate := range predicateFuncs {
|
|
fit, reasons, err := predicate(pod, meta, info)
|
|
if err != nil {
|
|
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
|
|
return false, []algorithm.PredicateFailureReason{}, err
|
|
}
|
|
if !fit {
|
|
failedPredicates = append(failedPredicates, reasons...)
|
|
}
|
|
}
|
|
return len(failedPredicates) == 0, failedPredicates, nil
|
|
}
|
|
|
|
// Prioritizes the nodes by running the individual priority functions in parallel.
|
|
// Each priority function is expected to set a score of 0-10
|
|
// 0 is the lowest priority score (least preferred node) and 10 is the highest
|
|
// Each priority function can also have its own weight
|
|
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
|
|
// All scores are finally combined (added) to get the total weighted scores of all nodes
|
|
func PrioritizeNodes(
|
|
pod *v1.Pod,
|
|
nodeNameToInfo map[string]*schedulercache.NodeInfo,
|
|
meta interface{},
|
|
priorityConfigs []algorithm.PriorityConfig,
|
|
nodes []*v1.Node,
|
|
extenders []algorithm.SchedulerExtender,
|
|
) (schedulerapi.HostPriorityList, error) {
|
|
// If no priority configs are provided, then the EqualPriority function is applied
|
|
// This is required to generate the priority list in the required format
|
|
if len(priorityConfigs) == 0 && len(extenders) == 0 {
|
|
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
|
for i := range nodes {
|
|
hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, hostPriority)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
var (
|
|
mu = sync.Mutex{}
|
|
wg = sync.WaitGroup{}
|
|
errs []error
|
|
)
|
|
appendError := func(err error) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
errs = append(errs, err)
|
|
}
|
|
|
|
results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
|
|
for range priorityConfigs {
|
|
results = append(results, nil)
|
|
}
|
|
for i, priorityConfig := range priorityConfigs {
|
|
if priorityConfig.Function != nil {
|
|
// DEPRECATED
|
|
wg.Add(1)
|
|
go func(index int, config algorithm.PriorityConfig) {
|
|
defer wg.Done()
|
|
var err error
|
|
results[index], err = config.Function(pod, nodeNameToInfo, nodes)
|
|
if err != nil {
|
|
appendError(err)
|
|
}
|
|
}(i, priorityConfig)
|
|
} else {
|
|
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
|
|
}
|
|
}
|
|
processNode := func(index int) {
|
|
nodeInfo := nodeNameToInfo[nodes[index].Name]
|
|
var err error
|
|
for i := range priorityConfigs {
|
|
if priorityConfigs[i].Function != nil {
|
|
continue
|
|
}
|
|
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
|
|
if err != nil {
|
|
appendError(err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
workqueue.Parallelize(16, len(nodes), processNode)
|
|
for i, priorityConfig := range priorityConfigs {
|
|
if priorityConfig.Reduce == nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(index int, config algorithm.PriorityConfig) {
|
|
defer wg.Done()
|
|
if err := config.Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
|
|
appendError(err)
|
|
}
|
|
}(i, priorityConfig)
|
|
}
|
|
// Wait for all computations to be finished.
|
|
wg.Wait()
|
|
if len(errs) != 0 {
|
|
return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
|
|
}
|
|
|
|
// Summarize all scores.
|
|
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
|
|
// TODO: Consider parallelizing it.
|
|
for i := range nodes {
|
|
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
|
|
for j := range priorityConfigs {
|
|
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
|
|
}
|
|
}
|
|
|
|
if len(extenders) != 0 && nodes != nil {
|
|
combinedScores := make(map[string]int, len(nodeNameToInfo))
|
|
for _, extender := range extenders {
|
|
wg.Add(1)
|
|
go func(ext algorithm.SchedulerExtender) {
|
|
defer wg.Done()
|
|
prioritizedList, weight, err := ext.Prioritize(pod, nodes)
|
|
if err != nil {
|
|
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
|
|
return
|
|
}
|
|
mu.Lock()
|
|
for i := range *prioritizedList {
|
|
host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
|
|
combinedScores[host] += score * weight
|
|
}
|
|
mu.Unlock()
|
|
}(extender)
|
|
}
|
|
// wait for all go routines to finish
|
|
wg.Wait()
|
|
for i := range result {
|
|
result[i].Score += combinedScores[result[i].Host]
|
|
}
|
|
}
|
|
|
|
if glog.V(10) {
|
|
for i := range result {
|
|
glog.V(10).Infof("Host %s => Score %d", result[i].Host, result[i].Score)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
|
|
func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
|
|
node := nodeInfo.Node()
|
|
if node == nil {
|
|
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
|
|
}
|
|
return schedulerapi.HostPriority{
|
|
Host: node.Name,
|
|
Score: 1,
|
|
}, nil
|
|
}
|
|
|
|
func NewGenericScheduler(
|
|
cache schedulercache.Cache,
|
|
predicates map[string]algorithm.FitPredicate,
|
|
predicateMetaProducer algorithm.MetadataProducer,
|
|
prioritizers []algorithm.PriorityConfig,
|
|
priorityMetaProducer algorithm.MetadataProducer,
|
|
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
|
|
return &genericScheduler{
|
|
cache: cache,
|
|
predicates: predicates,
|
|
predicateMetaProducer: predicateMetaProducer,
|
|
prioritizers: prioritizers,
|
|
priorityMetaProducer: priorityMetaProducer,
|
|
extenders: extenders,
|
|
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
|
|
}
|
|
}
|