/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package scheduler import ( "context" "fmt" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" ) func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) { var fExtenders []framework.Extender if len(extenders) == 0 { return nil, nil } var ignoredExtendedResources []string var ignorableExtenders []framework.Extender for i := range extenders { klog.V(2).InfoS("Creating extender", "extender", extenders[i]) extender, err := NewHTTPExtender(&extenders[i]) if err != nil { return nil, err } if !extender.IsIgnorable() { fExtenders = append(fExtenders, extender) } else { ignorableExtenders = append(ignorableExtenders, extender) } for _, r := range extenders[i].ManagedResources { if r.IgnoredByScheduler { ignoredExtendedResources = append(ignoredExtendedResources, r.Name) } } } // place ignorable extenders to the tail of extenders fExtenders = append(fExtenders, ignorableExtenders...) // If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile. // This should only have an effect on ComponentConfig, where it is possible to configure Extenders and // plugin args (and in which case the extender ignored resources take precedence). if len(ignoredExtendedResources) == 0 { return fExtenders, nil } for i := range profiles { prof := &profiles[i] var found = false for k := range prof.PluginConfig { if prof.PluginConfig[k].Name == noderesources.Name { // Update the existing args pc := &prof.PluginConfig[k] args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs) if !ok { return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args) } args.IgnoredResources = ignoredExtendedResources found = true break } } if !found { return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config") } } return fExtenders, nil } // MakeDefaultErrorFunc construct a function to handle pod scheduler error func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { return func(podInfo *framework.QueuedPodInfo, err error) { pod := podInfo.Pod if err == ErrNoNodesAvailable { klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) } else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) } else if apierrors.IsNotFound(err) { klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { nodeName := errStatus.Status().Details.Name // when node is not found, We do not remove the node right away. Trying again to get // the node and if the node is still not found, then remove it from the scheduler cache. _, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) if err != nil && apierrors.IsNotFound(err) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} if err := schedulerCache.RemoveNode(&node); err != nil { klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name) } } } } else { klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod)) } // Check if the Pod exists in informer cache. cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) if err != nil { klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err) return } // In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. // It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. if len(cachedPod.Spec.NodeName) != 0 { klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) return } // As is from SharedInformer, we need to do a DeepCopy() here. podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy()) if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { klog.ErrorS(err, "Error occurred") } } }