
* scheduler(NodeResourcesFit): calculatePodResourceRequest in PreScore phase * scheduler(NodeResourcesFit and NodeResourcesBalancedAllocation): calculatePodResourceRequest in PreScore phase * modify the comments and tests. * revert the tests. * don't need consider nodes. * use list instead of map. * add comment for podRequests. * avoid using negative wording in variable names.
386 lines
13 KiB
Go
386 lines
13 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package noderesources
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
|
|
"k8s.io/kubernetes/pkg/api/v1/resource"
|
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
|
)
|
|
|
|
var _ framework.PreFilterPlugin = &Fit{}
|
|
var _ framework.FilterPlugin = &Fit{}
|
|
var _ framework.EnqueueExtensions = &Fit{}
|
|
var _ framework.PreScorePlugin = &Fit{}
|
|
var _ framework.ScorePlugin = &Fit{}
|
|
|
|
const (
|
|
// Name is the name of the plugin used in the plugin registry and configurations.
|
|
Name = names.NodeResourcesFit
|
|
|
|
// preFilterStateKey is the key in CycleState to NodeResourcesFit pre-computed data.
|
|
// Using the name of the plugin will likely help us avoid collisions with other plugins.
|
|
preFilterStateKey = "PreFilter" + Name
|
|
|
|
// preScoreStateKey is the key in CycleState to NodeResourcesFit pre-computed data for Scoring.
|
|
preScoreStateKey = "PreScore" + Name
|
|
)
|
|
|
|
// nodeResourceStrategyTypeMap maps strategy to scorer implementation
|
|
var nodeResourceStrategyTypeMap = map[config.ScoringStrategyType]scorer{
|
|
config.LeastAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
|
|
resources := args.ScoringStrategy.Resources
|
|
return &resourceAllocationScorer{
|
|
Name: string(config.LeastAllocated),
|
|
scorer: leastResourceScorer(resources),
|
|
resources: resources,
|
|
}
|
|
},
|
|
config.MostAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
|
|
resources := args.ScoringStrategy.Resources
|
|
return &resourceAllocationScorer{
|
|
Name: string(config.MostAllocated),
|
|
scorer: mostResourceScorer(resources),
|
|
resources: resources,
|
|
}
|
|
},
|
|
config.RequestedToCapacityRatio: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
|
|
resources := args.ScoringStrategy.Resources
|
|
return &resourceAllocationScorer{
|
|
Name: string(config.RequestedToCapacityRatio),
|
|
scorer: requestedToCapacityRatioScorer(resources, args.ScoringStrategy.RequestedToCapacityRatio.Shape),
|
|
resources: resources,
|
|
}
|
|
},
|
|
}
|
|
|
|
// Fit is a plugin that checks if a node has sufficient resources.
|
|
type Fit struct {
|
|
ignoredResources sets.String
|
|
ignoredResourceGroups sets.String
|
|
enableInPlacePodVerticalScaling bool
|
|
handle framework.Handle
|
|
resourceAllocationScorer
|
|
}
|
|
|
|
// ScoreExtensions of the Score plugin.
|
|
func (f *Fit) ScoreExtensions() framework.ScoreExtensions {
|
|
return nil
|
|
}
|
|
|
|
// preFilterState computed at PreFilter and used at Filter.
|
|
type preFilterState struct {
|
|
framework.Resource
|
|
}
|
|
|
|
// Clone the prefilter state.
|
|
func (s *preFilterState) Clone() framework.StateData {
|
|
return s
|
|
}
|
|
|
|
// preScoreState computed at PreScore and used at Score.
|
|
type preScoreState struct {
|
|
// podRequests have the same order as the resources defined in NodeResourcesBalancedAllocationArgs.Resources,
|
|
// same for other place we store a list like that.
|
|
podRequests []int64
|
|
}
|
|
|
|
// Clone implements the mandatory Clone interface. We don't really copy the data since
|
|
// there is no need for that.
|
|
func (s *preScoreState) Clone() framework.StateData {
|
|
return s
|
|
}
|
|
|
|
// PreScore calculates incoming pod's resource requests and writes them to the cycle state used.
|
|
func (f *Fit) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
|
|
state := &preScoreState{
|
|
podRequests: f.calculatePodResourceRequestList(pod, f.resources),
|
|
}
|
|
cycleState.Write(preScoreStateKey, state)
|
|
return nil
|
|
}
|
|
|
|
func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
|
|
c, err := cycleState.Read(preScoreStateKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)
|
|
}
|
|
|
|
s, ok := c.(*preScoreState)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid PreScore state, got type %T", c)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// Name returns name of the plugin. It is used in logs, etc.
|
|
func (f *Fit) Name() string {
|
|
return Name
|
|
}
|
|
|
|
// NewFit initializes a new plugin and returns it.
|
|
func NewFit(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
|
|
args, ok := plArgs.(*config.NodeResourcesFitArgs)
|
|
if !ok {
|
|
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs)
|
|
}
|
|
if err := validation.ValidateNodeResourcesFitArgs(nil, args); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if args.ScoringStrategy == nil {
|
|
return nil, fmt.Errorf("scoring strategy not specified")
|
|
}
|
|
|
|
strategy := args.ScoringStrategy.Type
|
|
scorePlugin, exists := nodeResourceStrategyTypeMap[strategy]
|
|
if !exists {
|
|
return nil, fmt.Errorf("scoring strategy %s is not supported", strategy)
|
|
}
|
|
|
|
return &Fit{
|
|
ignoredResources: sets.NewString(args.IgnoredResources...),
|
|
ignoredResourceGroups: sets.NewString(args.IgnoredResourceGroups...),
|
|
enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling,
|
|
handle: h,
|
|
resourceAllocationScorer: *scorePlugin(args),
|
|
}, nil
|
|
}
|
|
|
|
// computePodResourceRequest returns a framework.Resource that covers the largest
|
|
// width in each resource dimension. Because init-containers run sequentially, we collect
|
|
// the max in each dimension iteratively. In contrast, we sum the resource vectors for
|
|
// regular containers since they run simultaneously.
|
|
//
|
|
// # The resources defined for Overhead should be added to the calculated Resource request sum
|
|
//
|
|
// Example:
|
|
//
|
|
// Pod:
|
|
//
|
|
// InitContainers
|
|
// IC1:
|
|
// CPU: 2
|
|
// Memory: 1G
|
|
// IC2:
|
|
// CPU: 2
|
|
// Memory: 3G
|
|
// Containers
|
|
// C1:
|
|
// CPU: 2
|
|
// Memory: 1G
|
|
// C2:
|
|
// CPU: 1
|
|
// Memory: 1G
|
|
//
|
|
// Result: CPU: 3, Memory: 3G
|
|
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
|
|
// pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled
|
|
reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
|
|
result := &preFilterState{}
|
|
result.SetMaxResource(reqs)
|
|
return result
|
|
}
|
|
|
|
// PreFilter invoked at the prefilter extension point.
|
|
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
|
|
cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
|
|
return nil, nil
|
|
}
|
|
|
|
// PreFilterExtensions returns prefilter extensions, pod add and remove.
|
|
func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
|
|
return nil
|
|
}
|
|
|
|
func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
|
|
c, err := cycleState.Read(preFilterStateKey)
|
|
if err != nil {
|
|
// preFilterState doesn't exist, likely PreFilter wasn't invoked.
|
|
return nil, fmt.Errorf("error reading %q from cycleState: %w", preFilterStateKey, err)
|
|
}
|
|
|
|
s, ok := c.(*preFilterState)
|
|
if !ok {
|
|
return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// EventsToRegister returns the possible events that may make a Pod
|
|
// failed by this plugin schedulable.
|
|
func (f *Fit) EventsToRegister() []framework.ClusterEvent {
|
|
podActionType := framework.Delete
|
|
if f.enableInPlacePodVerticalScaling {
|
|
// If InPlacePodVerticalScaling (KEP 1287) is enabled, then PodUpdate event should be registered
|
|
// for this plugin since a Pod update may free up resources that make other Pods schedulable.
|
|
podActionType |= framework.Update
|
|
}
|
|
return []framework.ClusterEvent{
|
|
{Resource: framework.Pod, ActionType: podActionType},
|
|
{Resource: framework.Node, ActionType: framework.Add | framework.Update},
|
|
}
|
|
}
|
|
|
|
// Filter invoked at the filter extension point.
|
|
// Checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
|
|
// It returns a list of insufficient resources, if empty, then the node has all the resources requested by the pod.
|
|
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
|
s, err := getPreFilterState(cycleState)
|
|
if err != nil {
|
|
return framework.AsStatus(err)
|
|
}
|
|
|
|
insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources, f.ignoredResourceGroups)
|
|
|
|
if len(insufficientResources) != 0 {
|
|
// We will keep all failure reasons.
|
|
failureReasons := make([]string, 0, len(insufficientResources))
|
|
for i := range insufficientResources {
|
|
failureReasons = append(failureReasons, insufficientResources[i].Reason)
|
|
}
|
|
return framework.NewStatus(framework.Unschedulable, failureReasons...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// InsufficientResource describes what kind of resource limit is hit and caused the pod to not fit the node.
|
|
type InsufficientResource struct {
|
|
ResourceName v1.ResourceName
|
|
// We explicitly have a parameter for reason to avoid formatting a message on the fly
|
|
// for common resources, which is expensive for cluster autoscaler simulations.
|
|
Reason string
|
|
Requested int64
|
|
Used int64
|
|
Capacity int64
|
|
}
|
|
|
|
// Fits checks if node have enough resources to host the pod.
|
|
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) []InsufficientResource {
|
|
return fitsRequest(computePodResourceRequest(pod), nodeInfo, nil, nil)
|
|
}
|
|
|
|
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
|
|
insufficientResources := make([]InsufficientResource, 0, 4)
|
|
|
|
allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
|
|
if len(nodeInfo.Pods)+1 > allowedPodNumber {
|
|
insufficientResources = append(insufficientResources, InsufficientResource{
|
|
ResourceName: v1.ResourcePods,
|
|
Reason: "Too many pods",
|
|
Requested: 1,
|
|
Used: int64(len(nodeInfo.Pods)),
|
|
Capacity: int64(allowedPodNumber),
|
|
})
|
|
}
|
|
|
|
if podRequest.MilliCPU == 0 &&
|
|
podRequest.Memory == 0 &&
|
|
podRequest.EphemeralStorage == 0 &&
|
|
len(podRequest.ScalarResources) == 0 {
|
|
return insufficientResources
|
|
}
|
|
|
|
if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
|
|
insufficientResources = append(insufficientResources, InsufficientResource{
|
|
ResourceName: v1.ResourceCPU,
|
|
Reason: "Insufficient cpu",
|
|
Requested: podRequest.MilliCPU,
|
|
Used: nodeInfo.Requested.MilliCPU,
|
|
Capacity: nodeInfo.Allocatable.MilliCPU,
|
|
})
|
|
}
|
|
if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
|
|
insufficientResources = append(insufficientResources, InsufficientResource{
|
|
ResourceName: v1.ResourceMemory,
|
|
Reason: "Insufficient memory",
|
|
Requested: podRequest.Memory,
|
|
Used: nodeInfo.Requested.Memory,
|
|
Capacity: nodeInfo.Allocatable.Memory,
|
|
})
|
|
}
|
|
if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - nodeInfo.Requested.EphemeralStorage) {
|
|
insufficientResources = append(insufficientResources, InsufficientResource{
|
|
ResourceName: v1.ResourceEphemeralStorage,
|
|
Reason: "Insufficient ephemeral-storage",
|
|
Requested: podRequest.EphemeralStorage,
|
|
Used: nodeInfo.Requested.EphemeralStorage,
|
|
Capacity: nodeInfo.Allocatable.EphemeralStorage,
|
|
})
|
|
}
|
|
|
|
for rName, rQuant := range podRequest.ScalarResources {
|
|
// Skip in case request quantity is zero
|
|
if rQuant == 0 {
|
|
continue
|
|
}
|
|
|
|
if v1helper.IsExtendedResourceName(rName) {
|
|
// If this resource is one of the extended resources that should be ignored, we will skip checking it.
|
|
// rName is guaranteed to have a slash due to API validation.
|
|
var rNamePrefix string
|
|
if ignoredResourceGroups.Len() > 0 {
|
|
rNamePrefix = strings.Split(string(rName), "/")[0]
|
|
}
|
|
if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
|
|
insufficientResources = append(insufficientResources, InsufficientResource{
|
|
ResourceName: rName,
|
|
Reason: fmt.Sprintf("Insufficient %v", rName),
|
|
Requested: podRequest.ScalarResources[rName],
|
|
Used: nodeInfo.Requested.ScalarResources[rName],
|
|
Capacity: nodeInfo.Allocatable.ScalarResources[rName],
|
|
})
|
|
}
|
|
}
|
|
|
|
return insufficientResources
|
|
}
|
|
|
|
// Score invoked at the Score extension point.
|
|
func (f *Fit) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
|
|
nodeInfo, err := f.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
|
|
if err != nil {
|
|
return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
|
|
}
|
|
|
|
s, err := getPreScoreState(state)
|
|
if err != nil {
|
|
s = &preScoreState{
|
|
podRequests: f.calculatePodResourceRequestList(pod, f.resources),
|
|
}
|
|
}
|
|
|
|
return f.score(pod, nodeInfo, s.podRequests)
|
|
}
|