Split scheduler framework implementation into new runtime package
This commit is contained in:
@@ -10,6 +10,7 @@ filegroup(
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/scheduler/framework/plugins:all-srcs",
|
||||
"//pkg/scheduler/framework/runtime:all-srcs",
|
||||
"//pkg/scheduler/framework/v1alpha1:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
@@ -29,7 +29,7 @@ go_library(
|
||||
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
],
|
||||
|
@@ -19,7 +19,7 @@ go_test(
|
||||
srcs = ["default_binder_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
|
@@ -27,7 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
)
|
||||
|
||||
func TestDefaultBinder(t *testing.T) {
|
||||
@@ -66,7 +66,7 @@ func TestDefaultBinder(t *testing.T) {
|
||||
return true, gotBinding, nil
|
||||
})
|
||||
|
||||
fh, err := framework.NewFramework(nil, nil, nil, framework.WithClientSet(client))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, nil, frameworkruntime.WithClientSet(client))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -23,6 +23,7 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||
@@ -67,7 +68,7 @@ func BenchmarkTestSelectorSpreadPriority(b *testing.B) {
|
||||
b.Errorf("error waiting for informer cache sync")
|
||||
}
|
||||
}
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot), runtime.WithInformerFactory(informerFactory))
|
||||
plugin := &DefaultPodTopologySpread{handle: fh}
|
||||
b.ResetTimer()
|
||||
|
||||
|
@@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -375,7 +376,7 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("error creating informerFactory: %+v", err)
|
||||
}
|
||||
fh, err := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
t.Errorf("error creating new framework handle: %+v", err)
|
||||
}
|
||||
@@ -629,7 +630,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("error creating informerFactory: %+v", err)
|
||||
}
|
||||
fh, err := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot), framework.WithInformerFactory(informerFactory))
|
||||
fh, err := frameworkruntime.NewFramework(nil, nil, nil, frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithInformerFactory(informerFactory))
|
||||
if err != nil {
|
||||
t.Errorf("error creating new framework handle: %+v", err)
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ go_test(
|
||||
srcs = ["image_locality_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -334,7 +335,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
||||
|
||||
state := framework.NewCycleState()
|
||||
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, _ := New(nil, fh)
|
||||
var gotList framework.NodeScoreList
|
||||
|
@@ -30,6 +30,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -625,7 +626,7 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
args := &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}
|
||||
p, err := New(args, fh)
|
||||
|
@@ -35,6 +35,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -848,7 +849,7 @@ func TestNodeAffinityPriority(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
p, _ := New(nil, fh)
|
||||
var gotList framework.NodeScoreList
|
||||
for _, n := range test.nodes {
|
||||
|
@@ -21,6 +21,7 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -236,7 +237,7 @@ func TestNodeLabelScore(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
node := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: map[string]string{"foo": "", "bar": ""}}}
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(cache.NewSnapshot(nil, []*v1.Node{node})))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, []*v1.Node{node})))
|
||||
p, err := New(&test.args, fh)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create plugin: %+v", err)
|
||||
@@ -270,7 +271,7 @@ func TestNodeLabelFilterWithoutNode(t *testing.T) {
|
||||
|
||||
func TestNodeLabelScoreWithoutNode(t *testing.T) {
|
||||
t.Run("node does not exist", func(t *testing.T) {
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(cache.NewEmptySnapshot()))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(cache.NewEmptySnapshot()))
|
||||
p, err := New(&config.NodeLabelArgs{}, fh)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create plugin: %+v", err)
|
||||
|
@@ -19,6 +19,7 @@ go_test(
|
||||
srcs = ["node_prefer_avoid_pods_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -143,7 +144,7 @@ func TestNodePreferAvoidPods(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
|
||||
p, _ := New(nil, fh)
|
||||
var gotList framework.NodeScoreList
|
||||
for _, n := range test.nodes {
|
||||
|
@@ -58,6 +58,7 @@ go_test(
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -27,6 +27,7 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -388,7 +389,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
|
||||
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
|
||||
}
|
||||
}
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
p, _ := NewBalancedAllocation(nil, fh)
|
||||
|
||||
for i := range test.nodes {
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -286,7 +287,7 @@ func TestNodeResourcesLeastAllocated(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
p, err := NewLeastAllocated(&test.args, fh)
|
||||
|
||||
if len(test.wantErr) != 0 {
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -246,7 +247,7 @@ func TestNodeResourcesMostAllocated(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
p, err := NewMostAllocated(&test.args, fh)
|
||||
|
||||
if len(test.wantErr) != 0 {
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -66,7 +67,7 @@ func TestRequestedToCapacityRatio(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.scheduledPods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
args := config.RequestedToCapacityRatioArgs{
|
||||
Shape: []config.UtilizationShapePoint{
|
||||
{Utilization: 0, Score: 10},
|
||||
@@ -318,7 +319,7 @@ func TestResourceBinPackingSingleExtended(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
args := config.RequestedToCapacityRatioArgs{
|
||||
Shape: []config.UtilizationShapePoint{
|
||||
{Utilization: 0, Score: 0},
|
||||
@@ -561,7 +562,7 @@ func TestResourceBinPackingMultipleExtended(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(test.pods, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
args := config.RequestedToCapacityRatioArgs{
|
||||
Shape: []config.UtilizationShapePoint{
|
||||
{Utilization: 0, Score: 0},
|
||||
|
@@ -36,14 +36,14 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
)
|
||||
|
||||
// NewInTreeRegistry builds the registry with all the in-tree plugins.
|
||||
// A scheduler that runs out of tree plugins can register additional plugins
|
||||
// through the WithFrameworkOutOfTreeRegistry option.
|
||||
func NewInTreeRegistry() framework.Registry {
|
||||
return framework.Registry{
|
||||
func NewInTreeRegistry() runtime.Registry {
|
||||
return runtime.Registry{
|
||||
defaultpodtopologyspread.Name: defaultpodtopologyspread.New,
|
||||
imagelocality.Name: imagelocality.New,
|
||||
tainttoleration.Name: tainttoleration.New,
|
||||
|
@@ -33,6 +33,7 @@ go_test(
|
||||
srcs = ["taint_toleration_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
|
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
)
|
||||
@@ -229,7 +230,7 @@ func TestTaintTolerationScore(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
state := framework.NewCycleState()
|
||||
snapshot := cache.NewSnapshot(nil, test.nodes)
|
||||
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
|
||||
fh, _ := runtime.NewFramework(nil, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
p, _ := New(nil, fh)
|
||||
status := p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
|
||||
|
@@ -39,6 +39,7 @@ go_test(
|
||||
"//pkg/controller/volume/persistentvolume/util:go_default_library",
|
||||
"//pkg/controller/volume/scheduling:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/runtime:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/storage/v1:go_default_library",
|
||||
|
@@ -29,6 +29,7 @@ import (
|
||||
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
|
||||
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
@@ -231,11 +232,11 @@ func TestVolumeBinding(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
client := fake.NewSimpleClientset()
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
opts := []framework.Option{
|
||||
framework.WithClientSet(client),
|
||||
framework.WithInformerFactory(informerFactory),
|
||||
opts := []runtime.Option{
|
||||
runtime.WithClientSet(client),
|
||||
runtime.WithInformerFactory(informerFactory),
|
||||
}
|
||||
fh, err := framework.NewFramework(nil, nil, nil, opts...)
|
||||
fh, err := runtime.NewFramework(nil, nil, nil, opts...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
67
pkg/scheduler/framework/runtime/BUILD
Normal file
67
pkg/scheduler/framework/runtime/BUILD
Normal file
@@ -0,0 +1,67 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"framework.go",
|
||||
"metrics_recorder.go",
|
||||
"registry.go",
|
||||
"waiting_pods_map.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/runtime",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/apis/config/scheme:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/kube-scheduler/config/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
"//vendor/sigs.k8s.io/yaml:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"framework_test.go",
|
||||
"registry_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_model/go:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/kube-scheduler/config/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
)
|
||||
@@ -59,25 +60,25 @@ const (
|
||||
|
||||
var configDecoder = scheme.Codecs.UniversalDecoder()
|
||||
|
||||
// framework is the component responsible for initializing and running scheduler
|
||||
// frameworkImpl is the component responsible for initializing and running scheduler
|
||||
// plugins.
|
||||
type framework struct {
|
||||
type frameworkImpl struct {
|
||||
registry Registry
|
||||
snapshotSharedLister SharedLister
|
||||
snapshotSharedLister framework.SharedLister
|
||||
waitingPods *waitingPodsMap
|
||||
pluginNameToWeightMap map[string]int
|
||||
queueSortPlugins []QueueSortPlugin
|
||||
preFilterPlugins []PreFilterPlugin
|
||||
filterPlugins []FilterPlugin
|
||||
postFilterPlugins []PostFilterPlugin
|
||||
preScorePlugins []PreScorePlugin
|
||||
scorePlugins []ScorePlugin
|
||||
reservePlugins []ReservePlugin
|
||||
preBindPlugins []PreBindPlugin
|
||||
bindPlugins []BindPlugin
|
||||
postBindPlugins []PostBindPlugin
|
||||
unreservePlugins []UnreservePlugin
|
||||
permitPlugins []PermitPlugin
|
||||
queueSortPlugins []framework.QueueSortPlugin
|
||||
preFilterPlugins []framework.PreFilterPlugin
|
||||
filterPlugins []framework.FilterPlugin
|
||||
postFilterPlugins []framework.PostFilterPlugin
|
||||
preScorePlugins []framework.PreScorePlugin
|
||||
scorePlugins []framework.ScorePlugin
|
||||
reservePlugins []framework.ReservePlugin
|
||||
preBindPlugins []framework.PreBindPlugin
|
||||
bindPlugins []framework.BindPlugin
|
||||
postBindPlugins []framework.PostBindPlugin
|
||||
unreservePlugins []framework.UnreservePlugin
|
||||
permitPlugins []framework.PermitPlugin
|
||||
|
||||
clientSet clientset.Interface
|
||||
eventRecorder events.EventRecorder
|
||||
@@ -85,7 +86,7 @@ type framework struct {
|
||||
|
||||
metricsRecorder *metricsRecorder
|
||||
|
||||
preemptHandle PreemptHandle
|
||||
preemptHandle framework.PreemptHandle
|
||||
|
||||
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
|
||||
// after the first failure.
|
||||
@@ -94,7 +95,7 @@ type framework struct {
|
||||
|
||||
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
|
||||
// point. This is used to simplify iterating over all extension points supported by the
|
||||
// framework.
|
||||
// frameworkImpl.
|
||||
type extensionPoint struct {
|
||||
// the set of plugins to be configured at this extension point.
|
||||
plugins *config.PluginSet
|
||||
@@ -103,7 +104,7 @@ type extensionPoint struct {
|
||||
slicePtr interface{}
|
||||
}
|
||||
|
||||
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
|
||||
func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
|
||||
return []extensionPoint{
|
||||
{plugins.PreFilter, &f.preFilterPlugins},
|
||||
{plugins.Filter, &f.filterPlugins},
|
||||
@@ -124,31 +125,31 @@ type frameworkOptions struct {
|
||||
clientSet clientset.Interface
|
||||
eventRecorder events.EventRecorder
|
||||
informerFactory informers.SharedInformerFactory
|
||||
snapshotSharedLister SharedLister
|
||||
snapshotSharedLister framework.SharedLister
|
||||
metricsRecorder *metricsRecorder
|
||||
podNominator PodNominator
|
||||
extenders []Extender
|
||||
podNominator framework.PodNominator
|
||||
extenders []framework.Extender
|
||||
runAllFilters bool
|
||||
}
|
||||
|
||||
// Option for the framework.
|
||||
// Option for the frameworkImpl.
|
||||
type Option func(*frameworkOptions)
|
||||
|
||||
// WithClientSet sets clientSet for the scheduling framework.
|
||||
// WithClientSet sets clientSet for the scheduling frameworkImpl.
|
||||
func WithClientSet(clientSet clientset.Interface) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.clientSet = clientSet
|
||||
}
|
||||
}
|
||||
|
||||
// WithEventRecorder sets clientSet for the scheduling framework.
|
||||
// WithEventRecorder sets clientSet for the scheduling frameworkImpl.
|
||||
func WithEventRecorder(recorder events.EventRecorder) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.eventRecorder = recorder
|
||||
}
|
||||
}
|
||||
|
||||
// WithInformerFactory sets informer factory for the scheduling framework.
|
||||
// WithInformerFactory sets informer factory for the scheduling frameworkImpl.
|
||||
func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.informerFactory = informerFactory
|
||||
@@ -156,7 +157,7 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
|
||||
}
|
||||
|
||||
// WithSnapshotSharedLister sets the SharedLister of the snapshot.
|
||||
func WithSnapshotSharedLister(snapshotSharedLister SharedLister) Option {
|
||||
func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.snapshotSharedLister = snapshotSharedLister
|
||||
}
|
||||
@@ -177,15 +178,15 @@ func withMetricsRecorder(recorder *metricsRecorder) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithPodNominator sets podNominator for the scheduling framework.
|
||||
func WithPodNominator(nominator PodNominator) Option {
|
||||
// WithPodNominator sets podNominator for the scheduling frameworkImpl.
|
||||
func WithPodNominator(nominator framework.PodNominator) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.podNominator = nominator
|
||||
}
|
||||
}
|
||||
|
||||
// WithExtenders sets extenders for the scheduling framework.
|
||||
func WithExtenders(extenders []Extender) Option {
|
||||
// WithExtenders sets extenders for the scheduling frameworkImpl.
|
||||
func WithExtenders(extenders []framework.Extender) Option {
|
||||
return func(o *frameworkOptions) {
|
||||
o.extenders = extenders
|
||||
}
|
||||
@@ -195,30 +196,30 @@ var defaultFrameworkOptions = frameworkOptions{
|
||||
metricsRecorder: newMetricsRecorder(1000, time.Second),
|
||||
}
|
||||
|
||||
// TODO(#91029): move this to framework runtime package.
|
||||
var _ PreemptHandle = &preemptHandle{}
|
||||
// TODO(#91029): move this to frameworkImpl runtime package.
|
||||
var _ framework.PreemptHandle = &preemptHandle{}
|
||||
|
||||
type preemptHandle struct {
|
||||
extenders []Extender
|
||||
PodNominator
|
||||
PluginsRunner
|
||||
extenders []framework.Extender
|
||||
framework.PodNominator
|
||||
framework.PluginsRunner
|
||||
}
|
||||
|
||||
// Extenders returns the registered extenders.
|
||||
func (ph *preemptHandle) Extenders() []Extender {
|
||||
func (ph *preemptHandle) Extenders() []framework.Extender {
|
||||
return ph.extenders
|
||||
}
|
||||
|
||||
var _ Framework = &framework{}
|
||||
var _ framework.Framework = &frameworkImpl{}
|
||||
|
||||
// NewFramework initializes plugins given the configuration and the registry.
|
||||
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig, opts ...Option) (Framework, error) {
|
||||
func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfig, opts ...Option) (framework.Framework, error) {
|
||||
options := defaultFrameworkOptions
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
|
||||
f := &framework{
|
||||
f := &frameworkImpl{
|
||||
registry: r,
|
||||
snapshotSharedLister: options.snapshotSharedLister,
|
||||
pluginNameToWeightMap: make(map[string]int),
|
||||
@@ -250,7 +251,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
pluginConfig[name] = args[i].Args
|
||||
}
|
||||
|
||||
pluginsMap := make(map[string]Plugin)
|
||||
pluginsMap := make(map[string]framework.Plugin)
|
||||
var totalPriority int64
|
||||
for name, factory := range r {
|
||||
// initialize only needed plugins.
|
||||
@@ -275,10 +276,10 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
||||
f.pluginNameToWeightMap[name] = 1
|
||||
}
|
||||
// Checks totalPriority against MaxTotalScore to avoid overflow
|
||||
if int64(f.pluginNameToWeightMap[name])*MaxNodeScore > MaxTotalScore-totalPriority {
|
||||
if int64(f.pluginNameToWeightMap[name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
|
||||
return nil, fmt.Errorf("total score of Score plugins could overflow")
|
||||
}
|
||||
totalPriority += int64(f.pluginNameToWeightMap[name]) * MaxNodeScore
|
||||
totalPriority += int64(f.pluginNameToWeightMap[name]) * framework.MaxNodeScore
|
||||
}
|
||||
|
||||
for _, e := range f.getExtensionPoints(plugins) {
|
||||
@@ -327,7 +328,7 @@ func getPluginArgsOrDefault(pluginConfig map[string]runtime.Object, name string)
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error {
|
||||
func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]framework.Plugin) error {
|
||||
if pluginSet == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -358,15 +359,15 @@ func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, plugi
|
||||
}
|
||||
|
||||
// QueueSortFunc returns the function to sort pods in scheduling queue
|
||||
func (f *framework) QueueSortFunc() LessFunc {
|
||||
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
|
||||
if f == nil {
|
||||
// If framework is nil, simply keep their order unchanged.
|
||||
// If frameworkImpl is nil, simply keep their order unchanged.
|
||||
// NOTE: this is primarily for tests.
|
||||
return func(_, _ *QueuedPodInfo) bool { return false }
|
||||
return func(_, _ *framework.QueuedPodInfo) bool { return false }
|
||||
}
|
||||
|
||||
if len(f.queueSortPlugins) == 0 {
|
||||
panic("No QueueSort plugin is registered in the framework.")
|
||||
panic("No QueueSort plugin is registered in the frameworkImpl.")
|
||||
}
|
||||
|
||||
// Only one QueueSort plugin can be enabled.
|
||||
@@ -377,7 +378,7 @@ func (f *framework) QueueSortFunc() LessFunc {
|
||||
// *Status and its code is set to non-success if any of the plugins returns
|
||||
// anything but Success. If a non-success status is returned, then the scheduling
|
||||
// cycle is aborted.
|
||||
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) {
|
||||
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
@@ -388,18 +389,18 @@ func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState,
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
|
||||
klog.V(4).Infof(msg)
|
||||
return NewStatus(status.Code(), msg)
|
||||
return framework.NewStatus(status.Code(), msg)
|
||||
}
|
||||
msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin, state *CycleState, pod *v1.Pod) *Status {
|
||||
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PreFilter(ctx, state, pod)
|
||||
}
|
||||
@@ -412,13 +413,13 @@ func (f *framework) runPreFilterPlugin(ctx context.Context, pl PreFilterPlugin,
|
||||
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
func (f *framework) RunPreFilterExtensionAddPod(
|
||||
func (f *frameworkImpl) RunPreFilterExtensionAddPod(
|
||||
ctx context.Context,
|
||||
state *CycleState,
|
||||
state *framework.CycleState,
|
||||
podToSchedule *v1.Pod,
|
||||
podToAdd *v1.Pod,
|
||||
nodeInfo *NodeInfo,
|
||||
) (status *Status) {
|
||||
nodeInfo *framework.NodeInfo,
|
||||
) (status *framework.Status) {
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.PreFilterExtensions() == nil {
|
||||
continue
|
||||
@@ -428,14 +429,14 @@ func (f *framework) RunPreFilterExtensionAddPod(
|
||||
msg := fmt.Sprintf("error while running AddPod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status {
|
||||
func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
}
|
||||
@@ -448,13 +449,13 @@ func (f *framework) runPreFilterExtensionAddPod(ctx context.Context, pl PreFilte
|
||||
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
|
||||
// PreFilter plugins. It returns directly if any of the plugins return any
|
||||
// status other than Success.
|
||||
func (f *framework) RunPreFilterExtensionRemovePod(
|
||||
func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
|
||||
ctx context.Context,
|
||||
state *CycleState,
|
||||
state *framework.CycleState,
|
||||
podToSchedule *v1.Pod,
|
||||
podToRemove *v1.Pod,
|
||||
nodeInfo *NodeInfo,
|
||||
) (status *Status) {
|
||||
nodeInfo *framework.NodeInfo,
|
||||
) (status *framework.Status) {
|
||||
for _, pl := range f.preFilterPlugins {
|
||||
if pl.PreFilterExtensions() == nil {
|
||||
continue
|
||||
@@ -464,14 +465,14 @@ func (f *framework) RunPreFilterExtensionRemovePod(
|
||||
msg := fmt.Sprintf("error while running RemovePod for plugin %q while scheduling pod %q: %v",
|
||||
pl.Name(), podToSchedule.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFilterPlugin, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status {
|
||||
func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podToAdd, nodeInfo)
|
||||
}
|
||||
@@ -485,21 +486,21 @@ func (f *framework) runPreFilterExtensionRemovePod(ctx context.Context, pl PreFi
|
||||
// the given node. If any of these plugins doesn't return "Success", the
|
||||
// given node is not suitable for running pod.
|
||||
// Meanwhile, the failure message and status are set for the given node.
|
||||
func (f *framework) RunFilterPlugins(
|
||||
func (f *frameworkImpl) RunFilterPlugins(
|
||||
ctx context.Context,
|
||||
state *CycleState,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
nodeInfo *NodeInfo,
|
||||
) PluginToStatus {
|
||||
statuses := make(PluginToStatus)
|
||||
nodeInfo *framework.NodeInfo,
|
||||
) framework.PluginToStatus {
|
||||
statuses := make(framework.PluginToStatus)
|
||||
for _, pl := range f.filterPlugins {
|
||||
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
|
||||
if !pluginStatus.IsSuccess() {
|
||||
if !pluginStatus.IsUnschedulable() {
|
||||
// Filter plugins are not supposed to return any status other than
|
||||
// Success or Unschedulable.
|
||||
errStatus := NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
|
||||
return map[string]*Status{pl.Name(): errStatus}
|
||||
errStatus := framework.NewStatus(framework.Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
|
||||
return map[string]*framework.Status{pl.Name(): errStatus}
|
||||
}
|
||||
statuses[pl.Name()] = pluginStatus
|
||||
if !f.runAllFilters {
|
||||
@@ -512,7 +513,7 @@ func (f *framework) RunFilterPlugins(
|
||||
return statuses
|
||||
}
|
||||
|
||||
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status {
|
||||
func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.Filter(ctx, state, pod, nodeInfo)
|
||||
}
|
||||
@@ -524,22 +525,22 @@ func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state
|
||||
|
||||
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
|
||||
// Success or Error is met, otherwise continues to execute all plugins.
|
||||
func (f *framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
|
||||
statuses := make(PluginToStatus)
|
||||
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
|
||||
statuses := make(framework.PluginToStatus)
|
||||
for _, pl := range f.postFilterPlugins {
|
||||
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
|
||||
if s.IsSuccess() {
|
||||
return r, s
|
||||
} else if !s.IsUnschedulable() {
|
||||
// Any status other than Success or Unschedulable is Error.
|
||||
return nil, NewStatus(Error, s.Message())
|
||||
return nil, framework.NewStatus(framework.Error, s.Message())
|
||||
}
|
||||
statuses[pl.Name()] = s
|
||||
}
|
||||
return nil, statuses.Merge()
|
||||
}
|
||||
|
||||
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
|
||||
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
|
||||
}
|
||||
@@ -551,12 +552,12 @@ func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin
|
||||
|
||||
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
|
||||
// of these plugins returns any status other than "Success", the given pod is rejected.
|
||||
func (f *framework) RunPreScorePlugins(
|
||||
func (f *frameworkImpl) RunPreScorePlugins(
|
||||
ctx context.Context,
|
||||
state *CycleState,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
nodes []*v1.Node,
|
||||
) (status *Status) {
|
||||
) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(preScore, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
@@ -566,14 +567,14 @@ func (f *framework) RunPreScorePlugins(
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q prescore plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreScorePlugin(ctx context.Context, pl PreScorePlugin, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status {
|
||||
func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreScorePlugin, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PreScore(ctx, state, pod, nodes)
|
||||
}
|
||||
@@ -587,14 +588,14 @@ func (f *framework) runPreScorePlugin(ctx context.Context, pl PreScorePlugin, st
|
||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||
// a non-success status.
|
||||
func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) {
|
||||
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins))
|
||||
pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
|
||||
for _, pl := range f.scorePlugins {
|
||||
pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes))
|
||||
pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
errCh := parallelize.NewErrorChannel()
|
||||
@@ -608,7 +609,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel)
|
||||
return
|
||||
}
|
||||
pluginToNodeScores[pl.Name()][index] = NodeScore{
|
||||
pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
|
||||
Name: nodeName,
|
||||
Score: int64(s),
|
||||
}
|
||||
@@ -617,7 +618,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err)
|
||||
klog.Error(msg)
|
||||
return nil, NewStatus(Error, msg)
|
||||
return nil, framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
|
||||
// Run NormalizeScore method for each ScorePlugin in parallel.
|
||||
@@ -637,7 +638,7 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err)
|
||||
klog.Error(msg)
|
||||
return nil, NewStatus(Error, msg)
|
||||
return nil, framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
|
||||
// Apply score defaultWeights for each ScorePlugin in parallel.
|
||||
@@ -649,8 +650,8 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
|
||||
for i, nodeScore := range nodeScoreList {
|
||||
// return error if score plugin returns invalid score.
|
||||
if nodeScore.Score > int64(MaxNodeScore) || nodeScore.Score < int64(MinNodeScore) {
|
||||
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore)
|
||||
if nodeScore.Score > int64(framework.MaxNodeScore) || nodeScore.Score < int64(framework.MinNodeScore) {
|
||||
err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore)
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
}
|
||||
@@ -660,13 +661,13 @@ func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err)
|
||||
klog.Error(msg)
|
||||
return nil, NewStatus(Error, msg)
|
||||
return nil, framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
|
||||
return pluginToNodeScores, nil
|
||||
}
|
||||
|
||||
func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeName string) (int64, *Status) {
|
||||
func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.Score(ctx, state, pod, nodeName)
|
||||
}
|
||||
@@ -676,7 +677,7 @@ func (f *framework) runScorePlugin(ctx context.Context, pl ScorePlugin, state *C
|
||||
return s, status
|
||||
}
|
||||
|
||||
func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state *CycleState, pod *v1.Pod, nodeScoreList NodeScoreList) *Status {
|
||||
func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeScoreList framework.NodeScoreList) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
|
||||
}
|
||||
@@ -689,7 +690,7 @@ func (f *framework) runScoreExtension(ctx context.Context, pl ScorePlugin, state
|
||||
// RunPreBindPlugins runs the set of configured prebind plugins. It returns a
|
||||
// failure (bool) if any of the plugins returns an error. It also returns an
|
||||
// error containing the rejection message or the error occurred in the plugin.
|
||||
func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(preBind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
@@ -699,13 +700,13 @@ func (f *framework) RunPreBindPlugins(ctx context.Context, state *CycleState, po
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q prebind plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.PreBind(ctx, state, pod, nodeName)
|
||||
}
|
||||
@@ -716,30 +717,30 @@ func (f *framework) runPreBindPlugin(ctx context.Context, pl PreBindPlugin, stat
|
||||
}
|
||||
|
||||
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
|
||||
func (f *framework) RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(bind, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
if len(f.bindPlugins) == 0 {
|
||||
return NewStatus(Skip, "")
|
||||
return framework.NewStatus(framework.Skip, "")
|
||||
}
|
||||
for _, bp := range f.bindPlugins {
|
||||
status = f.runBindPlugin(ctx, bp, state, pod, nodeName)
|
||||
if status != nil && status.Code() == Skip {
|
||||
if status != nil && status.Code() == framework.Skip {
|
||||
continue
|
||||
}
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("plugin %q failed to bind pod \"%v/%v\": %v", bp.Name(), pod.Namespace, pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
return status
|
||||
}
|
||||
return status
|
||||
}
|
||||
|
||||
func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return bp.Bind(ctx, state, pod, nodeName)
|
||||
}
|
||||
@@ -750,17 +751,17 @@ func (f *framework) runBindPlugin(ctx context.Context, bp BindPlugin, state *Cyc
|
||||
}
|
||||
|
||||
// RunPostBindPlugins runs the set of configured postbind plugins.
|
||||
func (f *framework) RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, Success.String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(postBind, framework.Success.String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
for _, pl := range f.postBindPlugins {
|
||||
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
pl.PostBind(ctx, state, pod, nodeName)
|
||||
return
|
||||
@@ -773,7 +774,7 @@ func (f *framework) runPostBindPlugin(ctx context.Context, pl PostBindPlugin, st
|
||||
// RunReservePlugins runs the set of configured reserve plugins. If any of these
|
||||
// plugins returns an error, it does not continue running the remaining ones and
|
||||
// returns the error. In such case, pod will not be scheduled.
|
||||
func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
func (f *frameworkImpl) RunReservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(reserve, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
@@ -783,13 +784,13 @@ func (f *framework) RunReservePlugins(ctx context.Context, state *CycleState, po
|
||||
if !status.IsSuccess() {
|
||||
msg := fmt.Sprintf("error while running %q reserve plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, state *CycleState, pod *v1.Pod, nodeName string) *Status {
|
||||
func (f *frameworkImpl) runReservePlugin(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.Reserve(ctx, state, pod, nodeName)
|
||||
}
|
||||
@@ -800,17 +801,17 @@ func (f *framework) runReservePlugin(ctx context.Context, pl ReservePlugin, stat
|
||||
}
|
||||
|
||||
// RunUnreservePlugins runs the set of configured unreserve plugins.
|
||||
func (f *framework) RunUnreservePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
func (f *frameworkImpl) RunUnreservePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, Success.String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(unreserve, framework.Success.String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
for _, pl := range f.unreservePlugins {
|
||||
f.runUnreservePlugin(ctx, pl, state, pod, nodeName)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, state *CycleState, pod *v1.Pod, nodeName string) {
|
||||
func (f *frameworkImpl) runUnreservePlugin(ctx context.Context, pl framework.UnreservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
pl.Unreserve(ctx, state, pod, nodeName)
|
||||
return
|
||||
@@ -826,46 +827,46 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin,
|
||||
// plugins returns "Wait", then this function will create and add waiting pod
|
||||
// to a map of currently waiting pods and return status with "Wait" code.
|
||||
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
|
||||
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
|
||||
func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime))
|
||||
}()
|
||||
pluginsWaitTime := make(map[string]time.Duration)
|
||||
statusCode := Success
|
||||
statusCode := framework.Success
|
||||
for _, pl := range f.permitPlugins {
|
||||
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
|
||||
if !status.IsSuccess() {
|
||||
if status.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
|
||||
klog.V(4).Infof(msg)
|
||||
return NewStatus(status.Code(), msg)
|
||||
return framework.NewStatus(status.Code(), msg)
|
||||
}
|
||||
if status.Code() == Wait {
|
||||
if status.Code() == framework.Wait {
|
||||
// Not allowed to be greater than maxTimeout.
|
||||
if timeout > maxTimeout {
|
||||
timeout = maxTimeout
|
||||
}
|
||||
pluginsWaitTime[pl.Name()] = timeout
|
||||
statusCode = Wait
|
||||
statusCode = framework.Wait
|
||||
} else {
|
||||
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
if statusCode == Wait {
|
||||
if statusCode == framework.Wait {
|
||||
waitingPod := newWaitingPod(pod, pluginsWaitTime)
|
||||
f.waitingPods.add(waitingPod)
|
||||
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
|
||||
klog.V(4).Infof(msg)
|
||||
return NewStatus(Wait, msg)
|
||||
return framework.NewStatus(framework.Wait, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state *CycleState, pod *v1.Pod, nodeName string) (*Status, time.Duration) {
|
||||
func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.PermitPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
|
||||
if !state.ShouldRecordPluginMetrics() {
|
||||
return pl.Permit(ctx, state, pod, nodeName)
|
||||
}
|
||||
@@ -876,7 +877,7 @@ func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state
|
||||
}
|
||||
|
||||
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
|
||||
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
|
||||
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
|
||||
waitingPod := f.waitingPods.get(pod.UID)
|
||||
if waitingPod == nil {
|
||||
return nil
|
||||
@@ -892,11 +893,11 @@ func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Stat
|
||||
if s.IsUnschedulable() {
|
||||
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
|
||||
klog.V(4).Infof(msg)
|
||||
return NewStatus(s.Code(), msg)
|
||||
return framework.NewStatus(s.Code(), msg)
|
||||
}
|
||||
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
|
||||
klog.Error(msg)
|
||||
return NewStatus(Error, msg)
|
||||
return framework.NewStatus(framework.Error, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -905,17 +906,17 @@ func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Stat
|
||||
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
|
||||
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
|
||||
// remains unchanged after "Reserve".
|
||||
func (f *framework) SnapshotSharedLister() SharedLister {
|
||||
func (f *frameworkImpl) SnapshotSharedLister() framework.SharedLister {
|
||||
return f.snapshotSharedLister
|
||||
}
|
||||
|
||||
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
|
||||
func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
|
||||
func (f *frameworkImpl) IterateOverWaitingPods(callback func(framework.WaitingPod)) {
|
||||
f.waitingPods.iterate(callback)
|
||||
}
|
||||
|
||||
// GetWaitingPod returns a reference to a WaitingPod given its UID.
|
||||
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
|
||||
func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod {
|
||||
if wp := f.waitingPods.get(uid); wp != nil {
|
||||
return wp
|
||||
}
|
||||
@@ -923,7 +924,7 @@ func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
|
||||
}
|
||||
|
||||
// RejectWaitingPod rejects a WaitingPod given its UID.
|
||||
func (f *framework) RejectWaitingPod(uid types.UID) {
|
||||
func (f *frameworkImpl) RejectWaitingPod(uid types.UID) {
|
||||
waitingPod := f.waitingPods.get(uid)
|
||||
if waitingPod != nil {
|
||||
waitingPod.Reject("removed")
|
||||
@@ -931,18 +932,18 @@ func (f *framework) RejectWaitingPod(uid types.UID) {
|
||||
}
|
||||
|
||||
// HasFilterPlugins returns true if at least one filter plugin is defined.
|
||||
func (f *framework) HasFilterPlugins() bool {
|
||||
func (f *frameworkImpl) HasFilterPlugins() bool {
|
||||
return len(f.filterPlugins) > 0
|
||||
}
|
||||
|
||||
// HasScorePlugins returns true if at least one score plugin is defined.
|
||||
func (f *framework) HasScorePlugins() bool {
|
||||
func (f *frameworkImpl) HasScorePlugins() bool {
|
||||
return len(f.scorePlugins) > 0
|
||||
}
|
||||
|
||||
// ListPlugins returns a map of extension point name to plugin names configured at each extension
|
||||
// point. Returns nil if no plugins where configured.
|
||||
func (f *framework) ListPlugins() map[string][]config.Plugin {
|
||||
func (f *frameworkImpl) ListPlugins() map[string][]config.Plugin {
|
||||
m := make(map[string][]config.Plugin)
|
||||
|
||||
for _, e := range f.getExtensionPoints(&config.Plugins{}) {
|
||||
@@ -950,7 +951,7 @@ func (f *framework) ListPlugins() map[string][]config.Plugin {
|
||||
extName := plugins.Type().Elem().Name()
|
||||
var cfgs []config.Plugin
|
||||
for i := 0; i < plugins.Len(); i++ {
|
||||
name := plugins.Index(i).Interface().(Plugin).Name()
|
||||
name := plugins.Index(i).Interface().(framework.Plugin).Name()
|
||||
p := config.Plugin{Name: name}
|
||||
if extName == "ScorePlugin" {
|
||||
// Weights apply only to score plugins.
|
||||
@@ -969,21 +970,21 @@ func (f *framework) ListPlugins() map[string][]config.Plugin {
|
||||
}
|
||||
|
||||
// ClientSet returns a kubernetes clientset.
|
||||
func (f *framework) ClientSet() clientset.Interface {
|
||||
func (f *frameworkImpl) ClientSet() clientset.Interface {
|
||||
return f.clientSet
|
||||
}
|
||||
|
||||
// EventRecorder returns an event recorder.
|
||||
func (f *framework) EventRecorder() events.EventRecorder {
|
||||
func (f *frameworkImpl) EventRecorder() events.EventRecorder {
|
||||
return f.eventRecorder
|
||||
}
|
||||
|
||||
// SharedInformerFactory returns a shared informer factory.
|
||||
func (f *framework) SharedInformerFactory() informers.SharedInformerFactory {
|
||||
func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory {
|
||||
return f.informerFactory
|
||||
}
|
||||
|
||||
func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
|
||||
func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin {
|
||||
pgMap := make(map[string]config.Plugin)
|
||||
|
||||
if plugins == nil {
|
||||
@@ -1005,6 +1006,6 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
|
||||
}
|
||||
|
||||
// PreemptHandle returns the internal preemptHandle object.
|
||||
func (f *framework) PreemptHandle() PreemptHandle {
|
||||
func (f *frameworkImpl) PreemptHandle() framework.PreemptHandle {
|
||||
return f.preemptHandle
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
k8smetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
)
|
||||
|
||||
@@ -62,7 +63,7 @@ func newMetricsRecorder(bufferSize int, interval time.Duration) *metricsRecorder
|
||||
|
||||
// observePluginDurationAsync observes the plugin_execution_duration_seconds metric.
|
||||
// The metric will be flushed to Prometheus asynchronously.
|
||||
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *Status, value float64) {
|
||||
func (r *metricsRecorder) observePluginDurationAsync(extensionPoint, pluginName string, status *v1alpha1.Status, value float64) {
|
||||
newMetric := &frameworkMetric{
|
||||
metric: metrics.PluginExecutionDuration,
|
||||
labelValues: []string{pluginName, extensionPoint, status.Code().String()},
|
@@ -14,18 +14,19 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
// PluginFactory is a function that builds a plugin.
|
||||
type PluginFactory = func(configuration runtime.Object, f FrameworkHandle) (Plugin, error)
|
||||
type PluginFactory = func(configuration runtime.Object, f v1alpha1.FrameworkHandle) (v1alpha1.Plugin, error)
|
||||
|
||||
// DecodeInto decodes configuration whose type is *runtime.Unknown to the interface into.
|
||||
func DecodeInto(obj runtime.Object, into interface{}) error {
|
@@ -14,13 +14,14 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
)
|
||||
|
||||
func TestDecodeInto(t *testing.T) {
|
||||
@@ -102,7 +103,7 @@ func (p *mockNoopPlugin) Name() string {
|
||||
}
|
||||
|
||||
func NewMockNoopPluginFactory() PluginFactory {
|
||||
return func(_ runtime.Object, _ FrameworkHandle) (Plugin, error) {
|
||||
return func(_ runtime.Object, _ v1alpha1.FrameworkHandle) (v1alpha1.Plugin, error) {
|
||||
return &mockNoopPlugin{}, nil
|
||||
}
|
||||
}
|
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
)
|
||||
|
||||
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
|
||||
@@ -60,7 +61,7 @@ func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
|
||||
}
|
||||
|
||||
// iterate acquires a read lock and iterates over the WaitingPods map.
|
||||
func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
|
||||
func (m *waitingPodsMap) iterate(callback func(v1alpha1.WaitingPod)) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
for _, v := range m.pods {
|
||||
@@ -72,11 +73,11 @@ func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {
|
||||
type waitingPod struct {
|
||||
pod *v1.Pod
|
||||
pendingPlugins map[string]*time.Timer
|
||||
s chan *Status
|
||||
s chan *v1alpha1.Status
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
var _ WaitingPod = &waitingPod{}
|
||||
var _ v1alpha1.WaitingPod = &waitingPod{}
|
||||
|
||||
// newWaitingPod returns a new waitingPod instance.
|
||||
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
|
||||
@@ -86,7 +87,7 @@ func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *wa
|
||||
// by using non-blocking send to this channel. This channel has a buffer of size 1
|
||||
// to ensure that non-blocking send will not be ignored - possible situation when
|
||||
// receiving from this channel happens after non-blocking send.
|
||||
s: make(chan *Status, 1),
|
||||
s: make(chan *v1alpha1.Status, 1),
|
||||
}
|
||||
|
||||
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
|
||||
@@ -142,7 +143,7 @@ func (w *waitingPod) Allow(pluginName string) {
|
||||
// The select clause works as a non-blocking send.
|
||||
// If there is no receiver, it's a no-op (default case).
|
||||
select {
|
||||
case w.s <- NewStatus(Success, ""):
|
||||
case w.s <- v1alpha1.NewStatus(v1alpha1.Success, ""):
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -158,7 +159,7 @@ func (w *waitingPod) Reject(msg string) {
|
||||
// The select clause works as a non-blocking send.
|
||||
// If there is no receiver, it's a no-op (default case).
|
||||
select {
|
||||
case w.s <- NewStatus(Unschedulable, msg):
|
||||
case w.s <- v1alpha1.NewStatus(v1alpha1.Unschedulable, msg):
|
||||
default:
|
||||
}
|
||||
}
|
@@ -5,13 +5,9 @@ go_library(
|
||||
srcs = [
|
||||
"cycle_state.go",
|
||||
"extender.go",
|
||||
"framework.go",
|
||||
"interface.go",
|
||||
"listers.go",
|
||||
"metrics_recorder.go",
|
||||
"registry.go",
|
||||
"types.go",
|
||||
"waiting_pods_map.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
|
||||
visibility = ["//visibility:public"],
|
||||
@@ -19,27 +15,19 @@ go_library(
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/apis/config/scheme:go_default_library",
|
||||
"//pkg/scheduler/internal/parallelize:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/kube-scheduler/config/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
|
||||
"//vendor/k8s.io/klog/v2:go_default_library",
|
||||
"//vendor/sigs.k8s.io/yaml:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -64,22 +52,14 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"cycle_state_test.go",
|
||||
"framework_test.go",
|
||||
"interface_test.go",
|
||||
"registry_test.go",
|
||||
"types_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/scheduler/apis/config:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_model/go:go_default_library",
|
||||
],
|
||||
)
|
||||
|
Reference in New Issue
Block a user