Add score plugin for NodeResourcesFit

This commit is contained in:
yuzhiquan
2021-05-07 18:15:18 +08:00
committed by Abdullah Gharaibeh
parent 06dfe683ce
commit deb14b995a
32 changed files with 1107 additions and 537 deletions

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
@@ -383,16 +384,31 @@ func NewLegacyRegistry() *LegacyRegistry {
plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
})
registry.registerPriorityConfigProducer(MostRequestedPriority,
func(args ConfigProducerArgs, plugins *config.Plugins, _ *[]config.PluginConfig) {
func(args ConfigProducerArgs, plugins *config.Plugins, pluginConfig *[]config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
*pluginConfig = append(*pluginConfig,
config.PluginConfig{Name: noderesources.MostAllocatedName, Args: &config.NodeResourcesMostAllocatedArgs{
Resources: []config.ResourceSpec{
{Name: string(v1.ResourceCPU), Weight: 1},
{Name: string(v1.ResourceMemory), Weight: 1},
},
}})
})
registry.registerPriorityConfigProducer(BalancedResourceAllocation,
func(args ConfigProducerArgs, plugins *config.Plugins, _ *[]config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
})
registry.registerPriorityConfigProducer(LeastRequestedPriority,
func(args ConfigProducerArgs, plugins *config.Plugins, _ *[]config.PluginConfig) {
func(args ConfigProducerArgs, plugins *config.Plugins, pluginConfig *[]config.PluginConfig) {
plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
*pluginConfig = append(*pluginConfig,
config.PluginConfig{Name: noderesources.LeastAllocatedName, Args: &config.NodeResourcesLeastAllocatedArgs{
Resources: []config.ResourceSpec{
{Name: string(v1.ResourceCPU), Weight: 1},
{Name: string(v1.ResourceMemory), Weight: 1},
},
}})
})
registry.registerPriorityConfigProducer(noderesources.RequestedToCapacityRatioName,
func(args ConfigProducerArgs, plugins *config.Plugins, pluginConfig *[]config.PluginConfig) {

View File

@@ -20,6 +20,7 @@ import (
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
@@ -145,6 +146,15 @@ func TestAppendPriorityConfigs(t *testing.T) {
DefaultingType: config.SystemDefaulting,
},
},
{
Name: noderesources.LeastAllocatedName,
Args: &config.NodeResourcesLeastAllocatedArgs{
Resources: []config.ResourceSpec{
{Name: string(v1.ResourceCPU), Weight: 1},
{Name: string(v1.ResourceMemory), Weight: 1},
},
},
},
},
},
{

View File

@@ -35,6 +35,7 @@ import (
var _ framework.PreFilterPlugin = &Fit{}
var _ framework.FilterPlugin = &Fit{}
var _ framework.EnqueueExtensions = &Fit{}
var _ framework.ScorePlugin = &Fit{}
const (
// FitName is the name of the plugin used in the plugin registry and configurations.
@@ -45,11 +46,46 @@ const (
preFilterStateKey = "PreFilter" + FitName
)
// nodeResourceStrategyTypeMap maps strategy to scorer implementation
var nodeResourceStrategyTypeMap = map[config.ScoringStrategyType]scorer{
config.LeastAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
resToWeightMap := resourcesToWeightMap(args.ScoringStrategy.Resources)
return &resourceAllocationScorer{
Name: LeastAllocatedName,
scorer: leastResourceScorer(resToWeightMap),
resourceToWeightMap: resToWeightMap,
}
},
config.MostAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
resToWeightMap := resourcesToWeightMap(args.ScoringStrategy.Resources)
return &resourceAllocationScorer{
Name: MostAllocatedName,
scorer: mostResourceScorer(resToWeightMap),
resourceToWeightMap: resToWeightMap,
}
},
config.RequestedToCapacityRatio: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
resToWeightMap := resourcesToWeightMap(args.ScoringStrategy.Resources)
return &resourceAllocationScorer{
Name: RequestedToCapacityRatioName,
scorer: requestedToCapacityRatioScorer(resToWeightMap, args.ScoringStrategy.RequestedToCapacityRatio.Shape),
resourceToWeightMap: resToWeightMap,
}
},
}
// Fit is a plugin that checks if a node has sufficient resources.
type Fit struct {
ignoredResources sets.String
ignoredResourceGroups sets.String
enablePodOverhead bool
handle framework.Handle
resourceAllocationScorer
}
// ScoreExtensions of the Score plugin.
func (f *Fit) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// preFilterState computed at PreFilter and used at Filter.
@@ -68,7 +104,7 @@ func (f *Fit) Name() string {
}
// NewFit initializes a new plugin and returns it.
func NewFit(plArgs runtime.Object, _ framework.Handle, fts feature.Features) (framework.Plugin, error) {
func NewFit(plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
args, ok := plArgs.(*config.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs)
@@ -76,10 +112,23 @@ func NewFit(plArgs runtime.Object, _ framework.Handle, fts feature.Features) (fr
if err := validation.ValidateNodeResourcesFitArgs(nil, args); err != nil {
return nil, err
}
if args.ScoringStrategy == nil {
return nil, fmt.Errorf("scoring strategy not specified")
}
strategy := args.ScoringStrategy.Type
scorePlugin, ok := nodeResourceStrategyTypeMap[strategy]
if !ok {
return nil, fmt.Errorf("scoring strategy %s is not supported", strategy)
}
return &Fit{
ignoredResources: sets.NewString(args.IgnoredResources...),
ignoredResourceGroups: sets.NewString(args.IgnoredResourceGroups...),
enablePodOverhead: fts.EnablePodOverhead,
ignoredResources: sets.NewString(args.IgnoredResources...),
ignoredResourceGroups: sets.NewString(args.IgnoredResourceGroups...),
enablePodOverhead: fts.EnablePodOverhead,
handle: h,
resourceAllocationScorer: *scorePlugin(args),
}, nil
}
@@ -278,3 +327,13 @@ func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignor
return insufficientResources
}
// Score invoked at the Score extension point.
func (f *Fit) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := f.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
}
return f.score(pod, nodeInfo)
}

View File

@@ -30,6 +30,8 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
var (
@@ -105,6 +107,14 @@ func getErrReason(rn v1.ResourceName) string {
return fmt.Sprintf("Insufficient %v", rn)
}
var defaultScoringStrategy = &config.ScoringStrategy{
Type: config.LeastAllocated,
Resources: []config.ResourceSpec{
{Name: "cpu", Weight: 1},
{Name: "memory", Weight: 1},
},
}
func TestEnoughRequests(t *testing.T) {
enoughPodsTests := []struct {
pod *v1.Pod
@@ -414,6 +424,10 @@ func TestEnoughRequests(t *testing.T) {
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)
if test.args.ScoringStrategy == nil {
test.args.ScoringStrategy = defaultScoringStrategy
}
p, err := NewFit(&test.args, nil, plfeature.Features{EnablePodOverhead: true})
if err != nil {
t.Fatal(err)
@@ -442,7 +456,7 @@ func TestPreFilterDisabled(t *testing.T) {
nodeInfo := framework.NewNodeInfo()
node := v1.Node{}
nodeInfo.SetNode(&node)
p, err := NewFit(&config.NodeResourcesFitArgs{}, nil, plfeature.Features{EnablePodOverhead: true})
p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnablePodOverhead: true})
if err != nil {
t.Fatal(err)
}
@@ -492,7 +506,7 @@ func TestNotEnoughRequests(t *testing.T) {
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}}
test.nodeInfo.SetNode(&node)
p, err := NewFit(&config.NodeResourcesFitArgs{}, nil, plfeature.Features{EnablePodOverhead: true})
p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnablePodOverhead: true})
if err != nil {
t.Fatal(err)
}
@@ -564,7 +578,7 @@ func TestStorageRequests(t *testing.T) {
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)
p, err := NewFit(&config.NodeResourcesFitArgs{}, nil, plfeature.Features{EnablePodOverhead: true})
p, err := NewFit(&config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnablePodOverhead: true})
if err != nil {
t.Fatal(err)
}
@@ -582,3 +596,119 @@ func TestStorageRequests(t *testing.T) {
}
}
func TestFitScore(t *testing.T) {
type test struct {
name string
requestedPod *v1.Pod
nodes []*v1.Node
scheduledPods []*v1.Pod
expectedPriorities framework.NodeScoreList
nodeResourcesFitArgs config.NodeResourcesFitArgs
}
tests := []test{
{
name: "test case for ScoringStrategy RequestedToCapacityRatio case1",
requestedPod: makePod("", 3000, 5000),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 2000, 4000), makePod("node2", 1000, 2000)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 10}, {Name: "node2", Score: 32}},
nodeResourcesFitArgs: config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.RequestedToCapacityRatio,
Resources: []config.ResourceSpec{
{Name: "memory", Weight: 1},
{Name: "cpu", Weight: 1},
},
RequestedToCapacityRatio: &config.RequestedToCapacityRatioParam{
Shape: []config.UtilizationShapePoint{
{Utilization: 0, Score: 10},
{Utilization: 100, Score: 0},
},
},
},
},
},
{
name: "test case for ScoringStrategy RequestedToCapacityRatio case2",
requestedPod: makePod("", 3000, 5000),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 2000, 4000), makePod("node2", 1000, 2000)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 95}, {Name: "node2", Score: 68}},
nodeResourcesFitArgs: config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.RequestedToCapacityRatio,
Resources: []config.ResourceSpec{
{Name: "memory", Weight: 1},
{Name: "cpu", Weight: 1},
},
RequestedToCapacityRatio: &config.RequestedToCapacityRatioParam{
Shape: []config.UtilizationShapePoint{
{Utilization: 0, Score: 0},
{Utilization: 100, Score: 10},
},
},
},
},
},
{
name: "test case for ScoringStrategy MostAllocated",
requestedPod: makePod("", 1000, 2000),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 2000, 4000), makePod("node2", 1000, 2000)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 67}, {Name: "node2", Score: 36}},
nodeResourcesFitArgs: config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.MostAllocated,
Resources: []config.ResourceSpec{
{Name: "memory", Weight: 1},
{Name: "cpu", Weight: 1},
},
},
},
},
{
name: "test case for ScoringStrategy LeastAllocated",
requestedPod: makePod("", 1000, 2000),
nodes: []*v1.Node{makeNode("node1", 4000, 10000), makeNode("node2", 6000, 10000)},
scheduledPods: []*v1.Pod{makePod("node1", 2000, 4000), makePod("node2", 1000, 2000)},
expectedPriorities: []framework.NodeScore{{Name: "node1", Score: 32}, {Name: "node2", Score: 63}},
nodeResourcesFitArgs: config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.LeastAllocated,
Resources: []config.ResourceSpec{
{Name: "memory", Weight: 1},
{Name: "cpu", Weight: 1},
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.scheduledPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, runtime.WithSnapshotSharedLister(snapshot))
args := test.nodeResourcesFitArgs
p, err := NewFit(&args, fh, plfeature.Features{EnablePodOverhead: true})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
var gotPriorities framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotPriorities = append(gotPriorities, framework.NodeScore{Name: n.Name, Score: score})
}
if !reflect.DeepEqual(test.expectedPriorities, gotPriorities) {
t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", test.expectedPriorities, gotPriorities)
}
})
}
}

View File

@@ -43,35 +43,18 @@ func NewRequestedToCapacityRatio(plArgs runtime.Object, handle framework.Handle,
if err != nil {
return nil, err
}
if err := validation.ValidateRequestedToCapacityRatioArgs(nil, &args); err != nil {
return nil, err
}
shape := make([]helper.FunctionShapePoint, 0, len(args.Shape))
for _, point := range args.Shape {
shape = append(shape, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
resourceToWeightMap := make(resourceToWeightMap)
for _, resource := range args.Resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
if resource.Weight == 0 {
// Apply the default weight.
resourceToWeightMap[v1.ResourceName(resource.Name)] = 1
}
}
resourceToWeightMap := resourcesToWeightMap(args.Resources)
return &RequestedToCapacityRatio{
handle: handle,
resourceAllocationScorer: resourceAllocationScorer{
Name: RequestedToCapacityRatioName,
scorer: buildRequestedToCapacityRatioScorerFunction(shape, resourceToWeightMap),
scorer: requestedToCapacityRatioScorer(resourceToWeightMap, args.Shape),
resourceToWeightMap: resourceToWeightMap,
enablePodOverhead: fts.EnablePodOverhead,
},
@@ -138,3 +121,18 @@ func buildRequestedToCapacityRatioScorerFunction(scoringFunctionShape helper.Fun
return int64(math.Round(float64(nodeScore) / float64(weightSum)))
}
}
func requestedToCapacityRatioScorer(weightMap resourceToWeightMap, shape []config.UtilizationShapePoint) func(resourceToValueMap, resourceToValueMap) int64 {
shapes := make([]helper.FunctionShapePoint, 0, len(shape))
for _, point := range shape {
shapes = append(shapes, helper.FunctionShapePoint{
Utilization: int64(point.Utilization),
// MaxCustomPriorityScore may diverge from the max score used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by requested to capacity ratio to the score range
// used by the scheduler.
Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
})
}
return buildRequestedToCapacityRatioScorerFunction(shapes, weightMap)
}

View File

@@ -19,6 +19,7 @@ package noderesources
import (
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@@ -26,6 +27,9 @@ import (
// resourceToWeightMap contains resource name and weight.
type resourceToWeightMap map[v1.ResourceName]int64
// scorer is decorator for resourceAllocationScorer
type scorer func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer
// defaultRequestedRatioResources is used to set default requestToWeight map for CPU and memory
var defaultRequestedRatioResources = resourceToWeightMap{v1.ResourceMemory: 1, v1.ResourceCPU: 1}
@@ -124,3 +128,12 @@ func calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName, enablePo
return podRequest
}
// resourcesToWeightMap make weightmap from resources spec
func resourcesToWeightMap(resources []config.ResourceSpec) resourceToWeightMap {
resourceToWeightMap := make(resourceToWeightMap)
for _, resource := range resources {
resourceToWeightMap[v1.ResourceName(resource.Name)] = resource.Weight
}
return resourceToWeightMap
}