248 lines
9.5 KiB
Go
248 lines
9.5 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/informers"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/klog/v2"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
|
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
|
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
)
|
|
|
|
// Binder knows how to write a binding.
|
|
type Binder interface {
|
|
Bind(binding *v1.Binding) error
|
|
}
|
|
|
|
// Configurator defines I/O, caching, and other functionality needed to
|
|
// construct a new scheduler.
|
|
type Configurator struct {
|
|
client clientset.Interface
|
|
kubeConfig *restclient.Config
|
|
|
|
recorderFactory profile.RecorderFactory
|
|
|
|
informerFactory informers.SharedInformerFactory
|
|
|
|
// Close this to stop all reflectors
|
|
StopEverything <-chan struct{}
|
|
|
|
schedulerCache internalcache.Cache
|
|
|
|
componentConfigVersion string
|
|
|
|
// Always check all predicates even if the middle of one predicate fails.
|
|
alwaysCheckAllPredicates bool
|
|
|
|
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
|
|
percentageOfNodesToScore int32
|
|
|
|
podInitialBackoffSeconds int64
|
|
|
|
podMaxBackoffSeconds int64
|
|
|
|
podMaxUnschedulableQDuration time.Duration
|
|
|
|
profiles []schedulerapi.KubeSchedulerProfile
|
|
registry frameworkruntime.Registry
|
|
nodeInfoSnapshot *internalcache.Snapshot
|
|
extenders []schedulerapi.Extender
|
|
frameworkCapturer FrameworkCapturer
|
|
parallellism int32
|
|
// A "cluster event" -> "plugin names" map.
|
|
clusterEventMap map[framework.ClusterEvent]sets.String
|
|
}
|
|
|
|
// create a scheduler from a set of registered plugins.
|
|
func (c *Configurator) create() (*Scheduler, error) {
|
|
var extenders []framework.Extender
|
|
var ignoredExtendedResources []string
|
|
if len(c.extenders) != 0 {
|
|
var ignorableExtenders []framework.Extender
|
|
for ii := range c.extenders {
|
|
klog.V(2).InfoS("Creating extender", "extender", c.extenders[ii])
|
|
extender, err := NewHTTPExtender(&c.extenders[ii])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !extender.IsIgnorable() {
|
|
extenders = append(extenders, extender)
|
|
} else {
|
|
ignorableExtenders = append(ignorableExtenders, extender)
|
|
}
|
|
for _, r := range c.extenders[ii].ManagedResources {
|
|
if r.IgnoredByScheduler {
|
|
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
|
|
}
|
|
}
|
|
}
|
|
// place ignorable extenders to the tail of extenders
|
|
extenders = append(extenders, ignorableExtenders...)
|
|
}
|
|
|
|
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
|
|
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
|
|
// plugin args (and in which case the extender ignored resources take precedence).
|
|
// For earlier versions, using both policy and custom plugin config is disallowed, so this should be the only
|
|
// plugin config for this plugin.
|
|
if len(ignoredExtendedResources) > 0 {
|
|
for i := range c.profiles {
|
|
prof := &c.profiles[i]
|
|
var found = false
|
|
for k := range prof.PluginConfig {
|
|
if prof.PluginConfig[k].Name == noderesources.Name {
|
|
// Update the existing args
|
|
pc := &prof.PluginConfig[k]
|
|
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
|
|
if !ok {
|
|
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
|
|
}
|
|
args.IgnoredResources = ignoredExtendedResources
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
|
|
}
|
|
}
|
|
}
|
|
|
|
// The nominator will be passed all the way to framework instantiation.
|
|
nominator := internalqueue.NewPodNominator(c.informerFactory.Core().V1().Pods().Lister())
|
|
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
|
|
frameworkruntime.WithComponentConfigVersion(c.componentConfigVersion),
|
|
frameworkruntime.WithClientSet(c.client),
|
|
frameworkruntime.WithKubeConfig(c.kubeConfig),
|
|
frameworkruntime.WithInformerFactory(c.informerFactory),
|
|
frameworkruntime.WithSnapshotSharedLister(c.nodeInfoSnapshot),
|
|
frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),
|
|
frameworkruntime.WithPodNominator(nominator),
|
|
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)),
|
|
frameworkruntime.WithClusterEventMap(c.clusterEventMap),
|
|
frameworkruntime.WithParallelism(int(c.parallellism)),
|
|
frameworkruntime.WithExtenders(extenders),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("initializing profiles: %v", err)
|
|
}
|
|
if len(profiles) == 0 {
|
|
return nil, errors.New("at least one profile is required")
|
|
}
|
|
// Profiles are required to have equivalent queue sort plugins.
|
|
lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()
|
|
podQueue := internalqueue.NewSchedulingQueue(
|
|
lessFn,
|
|
c.informerFactory,
|
|
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
|
|
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
|
|
internalqueue.WithPodNominator(nominator),
|
|
internalqueue.WithClusterEventMap(c.clusterEventMap),
|
|
internalqueue.WithPodMaxUnschedulableQDuration(c.podMaxUnschedulableQDuration),
|
|
)
|
|
|
|
// Setup cache debugger.
|
|
debugger := cachedebugger.New(
|
|
c.informerFactory.Core().V1().Nodes().Lister(),
|
|
c.informerFactory.Core().V1().Pods().Lister(),
|
|
c.schedulerCache,
|
|
podQueue,
|
|
)
|
|
debugger.ListenForSignal(c.StopEverything)
|
|
|
|
sched := newScheduler(
|
|
c.schedulerCache,
|
|
extenders,
|
|
internalqueue.MakeNextPodFunc(podQueue),
|
|
MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
|
|
c.StopEverything,
|
|
podQueue,
|
|
profiles,
|
|
c.client,
|
|
c.nodeInfoSnapshot,
|
|
c.percentageOfNodesToScore)
|
|
return sched, nil
|
|
}
|
|
|
|
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
|
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
|
|
return func(podInfo *framework.QueuedPodInfo, err error) {
|
|
pod := podInfo.Pod
|
|
if err == ErrNoNodesAvailable {
|
|
klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
|
|
} else if fitError, ok := err.(*framework.FitError); ok {
|
|
// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
|
|
podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
|
|
klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err)
|
|
} else if apierrors.IsNotFound(err) {
|
|
klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err)
|
|
if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" {
|
|
nodeName := errStatus.Status().Details.Name
|
|
// when node is not found, We do not remove the node right away. Trying again to get
|
|
// the node and if the node is still not found, then remove it from the scheduler cache.
|
|
_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
|
if err != nil && apierrors.IsNotFound(err) {
|
|
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
|
|
if err := schedulerCache.RemoveNode(&node); err != nil {
|
|
klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
|
|
}
|
|
|
|
// Check if the Pod exists in informer cache.
|
|
cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
|
|
if err != nil {
|
|
klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err)
|
|
return
|
|
}
|
|
|
|
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
|
|
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
|
|
if len(cachedPod.Spec.NodeName) != 0 {
|
|
klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
|
|
return
|
|
}
|
|
|
|
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
|
|
podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())
|
|
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
|
|
klog.ErrorS(err, "Error occurred")
|
|
}
|
|
}
|
|
}
|