Move scheduler fake artifacts to pkg/scheduler/testing

- move some fake artifacts from pkg/scheduler/core to pkg/scheduler/testing so it can be consumed
by core as well as plugin testings
This commit is contained in:
Wei Huang
2020-06-12 17:15:33 -07:00
parent 98f250f883
commit dd5db75840
6 changed files with 603 additions and 513 deletions

View File

@@ -18,22 +18,17 @@ package core
import (
"context"
"fmt"
"reflect"
"sort"
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
@@ -42,340 +37,29 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
)
type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
type priorityFunc func(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error)
type priorityConfig struct {
function priorityFunc
weight int64
}
func errorPredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return false, fmt.Errorf("Some error")
}
func falsePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return false, nil
}
func truePredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
return true, nil
}
func machine1PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
if node.Name == "machine1" {
return true, nil
}
return false, nil
}
func machine2PredicateExtender(pod *v1.Pod, node *v1.Node) (bool, error) {
if node.Name == "machine2" {
return true, nil
}
return false, nil
}
func errorPrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) {
return &framework.NodeScoreList{}, fmt.Errorf("Some error")
}
func machine1PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) {
result := framework.NodeScoreList{}
for _, node := range nodes {
score := 1
if node.Name == "machine1" {
score = 10
}
result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)})
}
return &result, nil
}
func machine2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) {
result := framework.NodeScoreList{}
for _, node := range nodes {
score := 1
if node.Name == "machine2" {
score = 10
}
result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)})
}
return &result, nil
}
type machine2PrioritizerPlugin struct{}
func newMachine2PrioritizerPlugin() framework.PluginFactory {
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
return &machine2PrioritizerPlugin{}, nil
}
}
func (pl *machine2PrioritizerPlugin) Name() string {
return "Machine2Prioritizer"
}
func (pl *machine2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
score := 10
if nodeName == "machine2" {
score = 100
}
return int64(score), nil
}
func (pl *machine2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
return nil
}
type FakeExtender struct {
predicates []fitPredicate
prioritizers []priorityConfig
weight int64
nodeCacheCapable bool
filteredNodes []*v1.Node
unInterested bool
ignorable bool
// Cached node information for fake extender
cachedNodeNameToInfo map[string]*framework.NodeInfo
}
func (f *FakeExtender) Name() string {
return "FakeExtender"
}
func (f *FakeExtender) IsIgnorable() bool {
return f.ignorable
}
func (f *FakeExtender) SupportsPreemption() bool {
// Assume preempt verb is always defined.
return true
}
func (f *FakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister,
) (map[string]*extenderv1.Victims, error) {
nodeNameToVictimsCopy := map[string]*extenderv1.Victims{}
// We don't want to change the original nodeNameToVictims
for k, v := range nodeNameToVictims {
// In real world implementation, extender's user should have their own way to get node object
// by name if needed (e.g. query kube-apiserver etc).
//
// For test purpose, we just use node from parameters directly.
nodeNameToVictimsCopy[k] = v
}
for nodeName, victims := range nodeNameToVictimsCopy {
// Try to do preemption on extender side.
nodeInfo, _ := nodeInfos.Get(nodeName)
extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node())
if err != nil {
return nil, err
}
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
// let's remove it from potential preemption nodes.
if !fits {
delete(nodeNameToVictimsCopy, nodeName)
} else {
// Append new victims to original victims
nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...)
nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations)
}
}
return nodeNameToVictimsCopy, nil
}
// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
// Returns:
// 1. More victim pods (if any) amended by preemption phase of extender.
// 2. Number of violating victim (used to calculate PDB).
// 3. Fits or not after preemption phase on extender's side.
func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node) ([]*v1.Pod, int, bool, error) {
// If a extender support preemption but have no cached node info, let's run filter to make sure
// default scheduler's decision still stand with given pod and node.
if !f.nodeCacheCapable {
fits, err := f.runPredicate(pod, node)
if err != nil {
return nil, 0, false, err
}
if !fits {
return nil, 0, false, nil
}
return []*v1.Pod{}, 0, true, nil
}
// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
// and get cached node info by given node name.
nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone()
var potentialVictims []*v1.Pod
removePod := func(rp *v1.Pod) {
nodeInfoCopy.RemovePod(rp)
}
addPod := func(ap *v1.Pod) {
nodeInfoCopy.AddPod(ap)
}
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfoCopy.Pods {
if podutil.GetPodPriority(p.Pod) < podPriority {
potentialVictims = append(potentialVictims, p.Pod)
removePod(p.Pod)
}
}
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption.
fits, err := f.runPredicate(pod, nodeInfoCopy.Node())
if err != nil {
return nil, 0, false, err
}
if !fits {
return nil, 0, false, nil
}
var victims []*v1.Pod
// TODO(harry): handle PDBs in the future.
numViolatingVictim := 0
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _ := f.runPredicate(pod, nodeInfoCopy.Node())
if !fits {
removePod(p)
victims = append(victims, p)
}
return fits
}
// For now, assume all potential victims to be non-violating.
// Now we try to reprieve non-violating victims.
for _, p := range potentialVictims {
reprievePod(p)
}
return victims, numViolatingVictim, true, nil
}
// runPredicate run predicates of extender one by one for given pod and node.
// Returns: fits or not.
func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) {
fits := true
var err error
for _, predicate := range f.predicates {
fits, err = predicate(pod, node)
if err != nil {
return false, err
}
if !fits {
break
}
}
return fits, nil
}
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
filtered := []*v1.Node{}
failedNodesMap := extenderv1.FailedNodesMap{}
for _, node := range nodes {
fits, err := f.runPredicate(pod, node)
if err != nil {
return []*v1.Node{}, extenderv1.FailedNodesMap{}, err
}
if fits {
filtered = append(filtered, node)
} else {
failedNodesMap[node.Name] = "FakeExtender failed"
}
}
f.filteredNodes = filtered
if f.nodeCacheCapable {
return filtered, failedNodesMap, nil
}
return filtered, failedNodesMap, nil
}
func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) {
result := extenderv1.HostPriorityList{}
combinedScores := map[string]int64{}
for _, prioritizer := range f.prioritizers {
weight := prioritizer.weight
if weight == 0 {
continue
}
priorityFunc := prioritizer.function
prioritizedList, err := priorityFunc(pod, nodes)
if err != nil {
return &extenderv1.HostPriorityList{}, 0, err
}
for _, hostEntry := range *prioritizedList {
combinedScores[hostEntry.Name] += hostEntry.Score * weight
}
}
for host, score := range combinedScores {
result = append(result, extenderv1.HostPriority{Host: host, Score: score})
}
return &result, f.weight, nil
}
func (f *FakeExtender) Bind(binding *v1.Binding) error {
if len(f.filteredNodes) != 0 {
for _, node := range f.filteredNodes {
if node.Name == binding.Target.Name {
f.filteredNodes = nil
return nil
}
}
err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.filteredNodes)
f.filteredNodes = nil
return err
}
return nil
}
func (f *FakeExtender) IsBinder() bool {
return true
}
func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
return !f.unInterested
}
var _ framework.Extender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct {
name string
registerPlugins []st.RegisterPluginFunc
extenders []FakeExtender
extenders []st.FakeExtender
nodes []string
expectedResult ScheduleResult
expectsErr bool
}{
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
Predicates: []st.FitPredicate{st.TruePredicateExtender},
},
{
predicates: []fitPredicate{errorPredicateExtender},
Predicates: []st.FitPredicate{st.ErrorPredicateExtender},
},
},
nodes: []string{"machine1", "machine2"},
@@ -384,16 +68,16 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
Predicates: []st.FitPredicate{st.TruePredicateExtender},
},
{
predicates: []fitPredicate{falsePredicateExtender},
Predicates: []st.FitPredicate{st.FalsePredicateExtender},
},
},
nodes: []string{"machine1", "machine2"},
@@ -402,16 +86,16 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
Predicates: []st.FitPredicate{st.TruePredicateExtender},
},
{
predicates: []fitPredicate{machine1PredicateExtender},
Predicates: []st.FitPredicate{st.Machine1PredicateExtender},
},
},
nodes: []string{"machine1", "machine2"},
@@ -424,16 +108,16 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{machine2PredicateExtender},
Predicates: []st.FitPredicate{st.Machine2PredicateExtender},
},
{
predicates: []fitPredicate{machine1PredicateExtender},
Predicates: []st.FitPredicate{st.Machine1PredicateExtender},
},
},
nodes: []string{"machine1", "machine2"},
@@ -442,15 +126,15 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
weight: 1,
Predicates: []st.FitPredicate{st.TruePredicateExtender},
Prioritizers: []st.PriorityConfig{{Function: st.ErrorPrioritizerExtender, Weight: 10}},
Weight: 1,
},
},
nodes: []string{"machine1"},
@@ -463,20 +147,20 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}},
weight: 1,
Predicates: []st.FitPredicate{st.TruePredicateExtender},
Prioritizers: []st.PriorityConfig{{Function: st.Machine1PrioritizerExtender, Weight: 10}},
Weight: 1,
},
{
predicates: []fitPredicate{truePredicateExtender},
prioritizers: []priorityConfig{{machine2PrioritizerExtender, 10}},
weight: 5,
Predicates: []st.FitPredicate{st.TruePredicateExtender},
Prioritizers: []st.PriorityConfig{{Function: st.Machine2PrioritizerExtender, Weight: 10}},
Weight: 5,
},
},
nodes: []string{"machine1", "machine2"},
@@ -489,16 +173,16 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
},
{
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 20),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", st.NewMachine2PrioritizerPlugin(), 20),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{truePredicateExtender},
prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}},
weight: 1,
Predicates: []st.FitPredicate{st.TruePredicateExtender},
Prioritizers: []st.PriorityConfig{{Function: st.Machine1PrioritizerExtender, Weight: 10}},
Weight: 1,
},
},
nodes: []string{"machine1", "machine2"},
@@ -518,16 +202,16 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// because of the errors from errorPredicateExtender and/or
// errorPrioritizerExtender.
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", newMachine2PrioritizerPlugin(), 1),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterScorePlugin("Machine2Prioritizer", st.NewMachine2PrioritizerPlugin(), 1),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
unInterested: true,
Predicates: []st.FitPredicate{st.ErrorPredicateExtender},
Prioritizers: []st.PriorityConfig{{Function: st.ErrorPrioritizerExtender, Weight: 10}},
UnInterested: true,
},
},
nodes: []string{"machine1", "machine2"},
@@ -546,17 +230,17 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
// If scheduler did not ignore the extender, the test would fail
// because of the errors from errorPredicateExtender.
registerPlugins: []st.RegisterPluginFunc{
st.RegisterFilterPlugin("TrueFilter", NewTrueFilterPlugin),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
extenders: []FakeExtender{
extenders: []st.FakeExtender{
{
predicates: []fitPredicate{errorPredicateExtender},
ignorable: true,
Predicates: []st.FitPredicate{st.ErrorPredicateExtender},
Ignorable: true,
},
{
predicates: []fitPredicate{machine1PredicateExtender},
Predicates: []st.FitPredicate{st.Machine1PredicateExtender},
},
},
nodes: []string{"machine1", "machine2"},