
Since the filter status is missed for the phase of preemption, there will be no way to tell why the preemption failed for some reasons, and those reasons could be different with the status from the main scheduling process (the first failed plugin will hide other failures in the chain). This change provides verbose information based on the node status generated during pod preemption, those information helps us to diagnose the issue which is happened during pod preemption. Signed-off-by: Dave Chen <dave.chen@arm.com>
1387 lines
48 KiB
Go
1387 lines
48 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"reflect"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
v1 "k8s.io/api/core/v1"
|
|
eventsv1 "k8s.io/api/events/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
clienttesting "k8s.io/client-go/testing"
|
|
clientcache "k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/events"
|
|
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/core"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
|
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
|
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
|
)
|
|
|
|
func podWithID(id, desiredHost string) *v1.Pod {
|
|
return &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: id,
|
|
UID: types.UID(id),
|
|
},
|
|
Spec: v1.PodSpec{
|
|
NodeName: desiredHost,
|
|
SchedulerName: testSchedulerName,
|
|
},
|
|
}
|
|
}
|
|
|
|
func deletingPod(id string) *v1.Pod {
|
|
deletionTimestamp := metav1.Now()
|
|
return &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: id,
|
|
UID: types.UID(id),
|
|
DeletionTimestamp: &deletionTimestamp,
|
|
},
|
|
Spec: v1.PodSpec{
|
|
NodeName: "",
|
|
SchedulerName: testSchedulerName,
|
|
},
|
|
}
|
|
}
|
|
|
|
func podWithPort(id, desiredHost string, port int) *v1.Pod {
|
|
pod := podWithID(id, desiredHost)
|
|
pod.Spec.Containers = []v1.Container{
|
|
{Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
|
|
pod := podWithID(id, desiredHost)
|
|
pod.Spec.Containers = []v1.Container{
|
|
{Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
type mockScheduler struct {
|
|
result core.ScheduleResult
|
|
err error
|
|
}
|
|
|
|
func (es mockScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
|
return es.result, es.err
|
|
}
|
|
|
|
func (es mockScheduler) Extenders() []framework.Extender {
|
|
return nil
|
|
}
|
|
|
|
func TestSchedulerCreation(t *testing.T) {
|
|
invalidRegistry := map[string]frameworkruntime.PluginFactory{
|
|
defaultbinder.Name: defaultbinder.New,
|
|
}
|
|
validRegistry := map[string]frameworkruntime.PluginFactory{
|
|
"Foo": defaultbinder.New,
|
|
}
|
|
cases := []struct {
|
|
name string
|
|
opts []Option
|
|
wantErr string
|
|
wantProfiles []string
|
|
}{
|
|
{
|
|
name: "default scheduler",
|
|
wantProfiles: []string{"default-scheduler"},
|
|
},
|
|
{
|
|
name: "valid out-of-tree registry",
|
|
opts: []Option{WithFrameworkOutOfTreeRegistry(validRegistry)},
|
|
wantProfiles: []string{"default-scheduler"},
|
|
},
|
|
{
|
|
name: "repeated plugin name in out-of-tree plugin",
|
|
opts: []Option{WithFrameworkOutOfTreeRegistry(invalidRegistry)},
|
|
wantProfiles: []string{"default-scheduler"},
|
|
wantErr: "a plugin named DefaultBinder already exists",
|
|
},
|
|
{
|
|
name: "multiple profiles",
|
|
opts: []Option{WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "bar"},
|
|
)},
|
|
wantProfiles: []string{"bar", "foo"},
|
|
},
|
|
{
|
|
name: "Repeated profiles",
|
|
opts: []Option{WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "bar"},
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "foo"},
|
|
)},
|
|
wantErr: "duplicate profile with scheduler name \"foo\"",
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
client := clientsetfake.NewSimpleClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
s, err := New(client,
|
|
informerFactory,
|
|
profile.NewRecorderFactory(eventBroadcaster),
|
|
stopCh,
|
|
tc.opts...,
|
|
)
|
|
|
|
if len(tc.wantErr) != 0 {
|
|
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
|
|
t.Errorf("got error %q, want %q", err, tc.wantErr)
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Failed to create scheduler: %v", err)
|
|
}
|
|
profiles := make([]string, 0, len(s.Profiles))
|
|
for name := range s.Profiles {
|
|
profiles = append(profiles, name)
|
|
}
|
|
sort.Strings(profiles)
|
|
if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
|
|
t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchedulerScheduleOne(t *testing.T) {
|
|
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
|
|
client := clientsetfake.NewSimpleClientset(&testNode)
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
errS := errors.New("scheduler")
|
|
errB := errors.New("binder")
|
|
preBindErr := errors.New("on PreBind")
|
|
|
|
table := []struct {
|
|
name string
|
|
injectBindError error
|
|
sendPod *v1.Pod
|
|
algo core.ScheduleAlgorithm
|
|
registerPluginFuncs []st.RegisterPluginFunc
|
|
expectErrorPod *v1.Pod
|
|
expectForgetPod *v1.Pod
|
|
expectAssumedPod *v1.Pod
|
|
expectError error
|
|
expectBind *v1.Binding
|
|
eventReason string
|
|
}{
|
|
{
|
|
name: "error reserve pod",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
|
registerPluginFuncs: []st.RegisterPluginFunc{
|
|
st.RegisterReservePlugin("FakeReserve", st.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
|
|
},
|
|
expectErrorPod: podWithID("foo", testNode.Name),
|
|
expectForgetPod: podWithID("foo", testNode.Name),
|
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
|
expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, errors.New("reserve error")),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "error permit pod",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
|
registerPluginFuncs: []st.RegisterPluginFunc{
|
|
st.RegisterPermitPlugin("FakePermit", st.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
|
|
},
|
|
expectErrorPod: podWithID("foo", testNode.Name),
|
|
expectForgetPod: podWithID("foo", testNode.Name),
|
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
|
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, errors.New("permit error")),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "error prebind pod",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
|
registerPluginFuncs: []st.RegisterPluginFunc{
|
|
st.RegisterPreBindPlugin("FakePreBind", st.NewFakePreBindPlugin(framework.AsStatus(preBindErr))),
|
|
},
|
|
expectErrorPod: podWithID("foo", testNode.Name),
|
|
expectForgetPod: podWithID("foo", testNode.Name),
|
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
|
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "bind assumed pod scheduled",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
|
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "error pod failed scheduling",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
|
|
expectError: errS,
|
|
expectErrorPod: podWithID("foo", ""),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "error bind forget pod failed scheduling",
|
|
sendPod: podWithID("foo", ""),
|
|
algo: mockScheduler{core.ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
|
|
expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
|
|
expectAssumedPod: podWithID("foo", testNode.Name),
|
|
injectBindError: errB,
|
|
expectError: fmt.Errorf(`binding rejected: %w`, fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder"))),
|
|
expectErrorPod: podWithID("foo", testNode.Name),
|
|
expectForgetPod: podWithID("foo", testNode.Name),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "deleting pod",
|
|
sendPod: deletingPod("foo"),
|
|
algo: mockScheduler{core.ScheduleResult{}, nil},
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
}
|
|
|
|
stop := make(chan struct{})
|
|
defer close(stop)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
informerFactory.Start(stop)
|
|
informerFactory.WaitForCacheSync(stop)
|
|
|
|
for _, item := range table {
|
|
t.Run(item.name, func(t *testing.T) {
|
|
var gotError error
|
|
var gotPod *v1.Pod
|
|
var gotForgetPod *v1.Pod
|
|
var gotAssumedPod *v1.Pod
|
|
var gotBinding *v1.Binding
|
|
sCache := &fakecache.Cache{
|
|
ForgetFunc: func(pod *v1.Pod) {
|
|
gotForgetPod = pod
|
|
},
|
|
AssumeFunc: func(pod *v1.Pod) {
|
|
gotAssumedPod = pod
|
|
},
|
|
IsAssumedPodFunc: func(pod *v1.Pod) bool {
|
|
if pod == nil || gotAssumedPod == nil {
|
|
return false
|
|
}
|
|
return pod.UID == gotAssumedPod.UID
|
|
},
|
|
}
|
|
client := clientsetfake.NewSimpleClientset(item.sendPod)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "binding" {
|
|
return false, nil, nil
|
|
}
|
|
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
return true, gotBinding, item.injectBindError
|
|
})
|
|
registerPluginFuncs := append(item.registerPluginFuncs,
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
)
|
|
fwk, err := st.NewFramework(registerPluginFuncs,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
|
frameworkruntime.WithProfileName(testSchedulerName))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
s := &Scheduler{
|
|
SchedulerCache: sCache,
|
|
Algorithm: item.algo,
|
|
client: client,
|
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
|
gotPod = p.Pod
|
|
gotError = err
|
|
},
|
|
NextPod: func() *framework.QueuedPodInfo {
|
|
return &framework.QueuedPodInfo{Pod: item.sendPod}
|
|
},
|
|
Profiles: profile.Map{
|
|
testSchedulerName: fwk,
|
|
},
|
|
}
|
|
called := make(chan struct{})
|
|
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, _ := obj.(*eventsv1.Event)
|
|
if e.Reason != item.eventReason {
|
|
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
|
|
}
|
|
close(called)
|
|
})
|
|
s.scheduleOne(context.Background())
|
|
<-called
|
|
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
|
|
t.Errorf("assumed pod: wanted %v, got %v", e, a)
|
|
}
|
|
if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
|
|
t.Errorf("error pod: wanted %v, got %v", e, a)
|
|
}
|
|
if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) {
|
|
t.Errorf("forget pod: wanted %v, got %v", e, a)
|
|
}
|
|
if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
|
|
t.Errorf("error: wanted %v, got %v", e, a)
|
|
}
|
|
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
|
|
t.Errorf("got binding diff (-want, +got): %s", diff)
|
|
}
|
|
stopFunc()
|
|
})
|
|
}
|
|
}
|
|
|
|
type fakeNodeSelectorArgs struct {
|
|
NodeName string `json:"nodeName"`
|
|
}
|
|
|
|
type fakeNodeSelector struct {
|
|
fakeNodeSelectorArgs
|
|
}
|
|
|
|
func newFakeNodeSelector(args runtime.Object, _ framework.Handle) (framework.Plugin, error) {
|
|
pl := &fakeNodeSelector{}
|
|
if err := frameworkruntime.DecodeInto(args, &pl.fakeNodeSelectorArgs); err != nil {
|
|
return nil, err
|
|
}
|
|
return pl, nil
|
|
}
|
|
|
|
func (s *fakeNodeSelector) Name() string {
|
|
return "FakeNodeSelector"
|
|
}
|
|
|
|
func (s *fakeNodeSelector) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
|
if nodeInfo.Node().Name != s.NodeName {
|
|
return framework.NewStatus(framework.UnschedulableAndUnresolvable)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
|
|
nodes := []runtime.Object{
|
|
st.MakeNode().Name("machine1").UID("machine1").Obj(),
|
|
st.MakeNode().Name("machine2").UID("machine2").Obj(),
|
|
st.MakeNode().Name("machine3").UID("machine3").Obj(),
|
|
}
|
|
pods := []*v1.Pod{
|
|
st.MakePod().Name("pod1").UID("pod1").SchedulerName("match-machine3").Obj(),
|
|
st.MakePod().Name("pod2").UID("pod2").SchedulerName("match-machine2").Obj(),
|
|
st.MakePod().Name("pod3").UID("pod3").SchedulerName("match-machine2").Obj(),
|
|
st.MakePod().Name("pod4").UID("pod4").SchedulerName("match-machine3").Obj(),
|
|
}
|
|
wantBindings := map[string]string{
|
|
"pod1": "machine3",
|
|
"pod2": "machine2",
|
|
"pod3": "machine2",
|
|
"pod4": "machine3",
|
|
}
|
|
wantControllers := map[string]string{
|
|
"pod1": "match-machine3",
|
|
"pod2": "match-machine2",
|
|
"pod3": "match-machine2",
|
|
"pod4": "match-machine3",
|
|
}
|
|
|
|
// Set up scheduler for the 3 nodes.
|
|
// We use a fake filter that only allows one particular node. We create two
|
|
// profiles, each with a different node in the filter configuration.
|
|
client := clientsetfake.NewSimpleClientset(nodes...)
|
|
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
sched, err := New(client,
|
|
informerFactory,
|
|
profile.NewRecorderFactory(broadcaster),
|
|
ctx.Done(),
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "match-machine2",
|
|
Plugins: &schedulerapi.Plugins{
|
|
Filter: &schedulerapi.PluginSet{
|
|
Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}},
|
|
Disabled: []schedulerapi.Plugin{{Name: "*"}},
|
|
}},
|
|
PluginConfig: []schedulerapi.PluginConfig{
|
|
{Name: "FakeNodeSelector",
|
|
Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"machine2"}`)},
|
|
},
|
|
},
|
|
},
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "match-machine3",
|
|
Plugins: &schedulerapi.Plugins{
|
|
Filter: &schedulerapi.PluginSet{
|
|
Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}},
|
|
Disabled: []schedulerapi.Plugin{{Name: "*"}},
|
|
}},
|
|
PluginConfig: []schedulerapi.PluginConfig{
|
|
{Name: "FakeNodeSelector",
|
|
Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"machine3"}`)},
|
|
},
|
|
},
|
|
},
|
|
),
|
|
WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
|
|
"FakeNodeSelector": newFakeNodeSelector,
|
|
}),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Capture the bindings and events' controllers.
|
|
var wg sync.WaitGroup
|
|
wg.Add(2 * len(pods))
|
|
bindings := make(map[string]string)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "binding" {
|
|
return false, nil, nil
|
|
}
|
|
binding := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
bindings[binding.Name] = binding.Target.Name
|
|
wg.Done()
|
|
return true, binding, nil
|
|
})
|
|
controllers := make(map[string]string)
|
|
stopFn := broadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, ok := obj.(*eventsv1.Event)
|
|
if !ok || e.Reason != "Scheduled" {
|
|
return
|
|
}
|
|
controllers[e.Regarding.Name] = e.ReportingController
|
|
wg.Done()
|
|
})
|
|
defer stopFn()
|
|
|
|
// Run scheduler.
|
|
informerFactory.Start(ctx.Done())
|
|
informerFactory.WaitForCacheSync(ctx.Done())
|
|
go sched.Run(ctx)
|
|
|
|
// Send pods to be scheduled.
|
|
for _, p := range pods {
|
|
_, err := client.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
// Verify correct bindings and reporting controllers.
|
|
if diff := cmp.Diff(wantBindings, bindings); diff != "" {
|
|
t.Errorf("pods were scheduled incorrectly (-want, +got):\n%s", diff)
|
|
}
|
|
if diff := cmp.Diff(wantControllers, controllers); diff != "" {
|
|
t.Errorf("events were reported with wrong controllers (-want, +got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
|
|
stop := make(chan struct{})
|
|
defer close(stop)
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
scache := internalcache.New(100*time.Millisecond, stop)
|
|
pod := podWithPort("pod.Name", "", 8080)
|
|
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
|
|
scache.AddNode(&node)
|
|
client := clientsetfake.NewSimpleClientset(&node)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
fns := []st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
|
|
}
|
|
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, pod, &node, fns...)
|
|
|
|
waitPodExpireChan := make(chan struct{})
|
|
timeout := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return
|
|
default:
|
|
}
|
|
pods, err := scache.PodCount()
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("cache.List failed: %v", err)
|
|
return
|
|
}
|
|
if pods == 0 {
|
|
close(waitPodExpireChan)
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
// waiting for the assumed pod to expire
|
|
select {
|
|
case err := <-errChan:
|
|
t.Fatal(err)
|
|
case <-waitPodExpireChan:
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
close(timeout)
|
|
t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
|
|
}
|
|
|
|
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
|
|
secondPod := podWithPort("bar", "", 8080)
|
|
queuedPodStore.Add(secondPod)
|
|
scheduler.scheduleOne(context.Background())
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if !reflect.DeepEqual(expectBinding, b) {
|
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
|
|
}
|
|
}
|
|
|
|
func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
|
stop := make(chan struct{})
|
|
defer close(stop)
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
scache := internalcache.New(10*time.Minute, stop)
|
|
firstPod := podWithPort("pod.Name", "", 8080)
|
|
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
|
|
scache.AddNode(&node)
|
|
client := clientsetfake.NewSimpleClientset(&node)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
fns := []st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
|
|
}
|
|
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, firstPod, &node, fns...)
|
|
|
|
// We use conflicted pod ports to incur fit predicate failure.
|
|
secondPod := podWithPort("bar", "", 8080)
|
|
queuedPodStore.Add(secondPod)
|
|
// queuedPodStore: [bar:8080]
|
|
// cache: [(assumed)foo:8080]
|
|
|
|
scheduler.scheduleOne(context.Background())
|
|
select {
|
|
case err := <-errChan:
|
|
expectErr := &framework.FitError{
|
|
Pod: secondPod,
|
|
NumAllNodes: 1,
|
|
FilteredNodesStatuses: framework.NodeToStatusMap{
|
|
node.Name: framework.NewStatus(
|
|
framework.Unschedulable,
|
|
nodeports.ErrReason,
|
|
),
|
|
},
|
|
}
|
|
if !reflect.DeepEqual(expectErr, err) {
|
|
t.Errorf("err want=%v, get=%v", expectErr, err)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
|
|
}
|
|
|
|
// We mimic the workflow of cache behavior when a pod is removed by user.
|
|
// Note: if the schedulernodeinfo timeout would be super short, the first pod would expire
|
|
// and would be removed itself (without any explicit actions on schedulernodeinfo). Even in that case,
|
|
// explicitly AddPod will as well correct the behavior.
|
|
firstPod.Spec.NodeName = node.Name
|
|
if err := scache.AddPod(firstPod); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if err := scache.RemovePod(firstPod); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
queuedPodStore.Add(secondPod)
|
|
scheduler.scheduleOne(context.Background())
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if !reflect.DeepEqual(expectBinding, b) {
|
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
|
|
}
|
|
}
|
|
|
|
// queuedPodStore: pods queued before processing.
|
|
// cache: scheduler cache that might contain assumed pods.
|
|
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
|
|
informerFactory informers.SharedInformerFactory, stop chan struct{}, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
|
|
|
|
scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
|
|
|
|
informerFactory.Start(stop)
|
|
informerFactory.WaitForCacheSync(stop)
|
|
|
|
queuedPodStore.Add(pod)
|
|
// queuedPodStore: [foo:8080]
|
|
// cache: []
|
|
|
|
scheduler.scheduleOne(context.Background())
|
|
// queuedPodStore: []
|
|
// cache: [(assumed)foo:8080]
|
|
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if !reflect.DeepEqual(expectBinding, b) {
|
|
t.Errorf("binding want=%v, get=%v", expectBinding, b)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
return scheduler, bindingChan, errChan
|
|
}
|
|
|
|
func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
|
stop := make(chan struct{})
|
|
defer close(stop)
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
scache := internalcache.New(10*time.Minute, stop)
|
|
|
|
// Design the baseline for the pods, and we will make nodes that dont fit it later.
|
|
var cpu = int64(4)
|
|
var mem = int64(500)
|
|
podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
|
|
}, v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
|
|
})
|
|
|
|
// create several nodes which cannot schedule the above pod
|
|
var nodes []*v1.Node
|
|
var objects []runtime.Object
|
|
for i := 0; i < 100; i++ {
|
|
uid := fmt.Sprintf("machine%v", i)
|
|
node := v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
|
|
Status: v1.NodeStatus{
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
|
|
v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
|
|
},
|
|
Allocatable: v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
|
|
v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
|
|
}},
|
|
}
|
|
scache.AddNode(&node)
|
|
nodes = append(nodes, &node)
|
|
objects = append(objects, &node)
|
|
}
|
|
client := clientsetfake.NewSimpleClientset(objects...)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
|
|
failedNodeStatues := framework.NodeToStatusMap{}
|
|
for _, node := range nodes {
|
|
failedNodeStatues[node.Name] = framework.NewStatus(
|
|
framework.Unschedulable,
|
|
fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
|
|
fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
|
|
)
|
|
}
|
|
fns := []st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
st.RegisterPluginAsExtensions(noderesources.FitName, noderesources.NewFit, "Filter", "PreFilter"),
|
|
}
|
|
scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, nil, fns...)
|
|
|
|
informerFactory.Start(stop)
|
|
informerFactory.WaitForCacheSync(stop)
|
|
|
|
queuedPodStore.Add(podWithTooBigResourceRequests)
|
|
scheduler.scheduleOne(context.Background())
|
|
select {
|
|
case err := <-errChan:
|
|
expectErr := &framework.FitError{
|
|
Pod: podWithTooBigResourceRequests,
|
|
NumAllNodes: len(nodes),
|
|
FilteredNodesStatuses: failedNodeStatues,
|
|
}
|
|
if len(fmt.Sprint(expectErr)) > 150 {
|
|
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
|
}
|
|
if !reflect.DeepEqual(expectErr, err) {
|
|
t.Errorf("\n err \nWANT=%+v,\nGOT=%+v", expectErr, err)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
}
|
|
|
|
// queuedPodStore: pods queued before processing.
|
|
// scache: scheduler cache that might contain assumed pods.
|
|
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
|
|
bindingChan := make(chan *v1.Binding, 1)
|
|
client := clientsetfake.NewSimpleClientset()
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
var b *v1.Binding
|
|
if action.GetSubresource() == "binding" {
|
|
b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
bindingChan <- b
|
|
}
|
|
return true, b, nil
|
|
})
|
|
|
|
var recorder events.EventRecorder
|
|
if broadcaster != nil {
|
|
recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
|
|
} else {
|
|
recorder = &events.FakeRecorder{}
|
|
}
|
|
|
|
fwk, _ := st.NewFramework(
|
|
fns,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithEventRecorder(recorder),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
|
|
frameworkruntime.WithProfileName(testSchedulerName),
|
|
)
|
|
|
|
algo := core.NewGenericScheduler(
|
|
scache,
|
|
internalcache.NewEmptySnapshot(),
|
|
[]framework.Extender{},
|
|
schedulerapi.DefaultPercentageOfNodesToScore,
|
|
)
|
|
|
|
errChan := make(chan error, 1)
|
|
sched := &Scheduler{
|
|
SchedulerCache: scache,
|
|
Algorithm: algo,
|
|
NextPod: func() *framework.QueuedPodInfo {
|
|
return &framework.QueuedPodInfo{Pod: clientcache.Pop(queuedPodStore).(*v1.Pod)}
|
|
},
|
|
Error: func(p *framework.QueuedPodInfo, err error) {
|
|
errChan <- err
|
|
},
|
|
Profiles: profile.Map{
|
|
testSchedulerName: fwk,
|
|
},
|
|
client: client,
|
|
}
|
|
|
|
return sched, bindingChan, errChan
|
|
}
|
|
|
|
func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
|
|
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
pod := podWithID("foo", "")
|
|
pod.Namespace = "foo-ns"
|
|
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
|
|
VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
|
|
queuedPodStore.Add(pod)
|
|
scache := internalcache.New(10*time.Minute, stop)
|
|
scache.AddNode(&testNode)
|
|
testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
|
|
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
|
pvcInformer.Informer().GetStore().Add(&testPVC)
|
|
|
|
fns := []st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil
|
|
}, "PreFilter", "Filter", "Reserve", "PreBind"),
|
|
}
|
|
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
|
|
informerFactory.Start(stop)
|
|
informerFactory.WaitForCacheSync(stop)
|
|
return s, bindingChan, errChan
|
|
}
|
|
|
|
// This is a workaround because golint complains that errors cannot
|
|
// end with punctuation. However, the real predicate error message does
|
|
// end with a period.
|
|
func makePredicateError(failReason string) error {
|
|
s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
|
|
return fmt.Errorf(s)
|
|
}
|
|
|
|
func TestSchedulerWithVolumeBinding(t *testing.T) {
|
|
findErr := fmt.Errorf("find err")
|
|
assumeErr := fmt.Errorf("assume err")
|
|
bindErr := fmt.Errorf("bind err")
|
|
client := clientsetfake.NewSimpleClientset()
|
|
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
// This can be small because we wait for pod to finish scheduling first
|
|
chanTimeout := 2 * time.Second
|
|
|
|
table := []struct {
|
|
name string
|
|
expectError error
|
|
expectPodBind *v1.Binding
|
|
expectAssumeCalled bool
|
|
expectBindCalled bool
|
|
eventReason string
|
|
volumeBinderConfig *scheduling.FakeVolumeBinderConfig
|
|
}{
|
|
{
|
|
name: "all bound",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
AllBound: true,
|
|
},
|
|
expectAssumeCalled: true,
|
|
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "bound/invalid pv affinity",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
AllBound: true,
|
|
FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonNodeConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
|
|
},
|
|
{
|
|
name: "unbound/no matches",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonBindConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
|
|
},
|
|
{
|
|
name: "bound and unbound unsatisfied",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonBindConflict, scheduling.ErrReasonNodeConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
|
|
},
|
|
{
|
|
name: "unbound/found matches/bind succeeds",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{},
|
|
expectAssumeCalled: true,
|
|
expectBindCalled: true,
|
|
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "predicate error",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
FindErr: findErr,
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
|
|
},
|
|
{
|
|
name: "assume error",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
AssumeErr: assumeErr,
|
|
},
|
|
expectAssumeCalled: true,
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running Reserve plugin %q: %w", volumebinding.Name, assumeErr),
|
|
},
|
|
{
|
|
name: "bind error",
|
|
volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
|
|
BindErr: bindErr,
|
|
},
|
|
expectAssumeCalled: true,
|
|
expectBindCalled: true,
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running PreBind plugin %q: %w", volumebinding.Name, bindErr),
|
|
},
|
|
}
|
|
|
|
for _, item := range table {
|
|
t.Run(item.name, func(t *testing.T) {
|
|
stop := make(chan struct{})
|
|
fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig)
|
|
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
|
|
eventChan := make(chan struct{})
|
|
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, _ := obj.(*eventsv1.Event)
|
|
if e, a := item.eventReason, e.Reason; e != a {
|
|
t.Errorf("expected %v, got %v", e, a)
|
|
}
|
|
close(eventChan)
|
|
})
|
|
s.scheduleOne(context.Background())
|
|
// Wait for pod to succeed or fail scheduling
|
|
select {
|
|
case <-eventChan:
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
stopFunc()
|
|
// Wait for scheduling to return an error or succeed binding.
|
|
var (
|
|
gotErr error
|
|
gotBind *v1.Binding
|
|
)
|
|
select {
|
|
case gotErr = <-errChan:
|
|
case gotBind = <-bindingChan:
|
|
case <-time.After(chanTimeout):
|
|
t.Fatalf("did not receive pod binding or error after %v", chanTimeout)
|
|
}
|
|
if item.expectError != nil {
|
|
if gotErr == nil || item.expectError.Error() != gotErr.Error() {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
|
|
}
|
|
} else if gotErr != nil {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
|
|
}
|
|
if !cmp.Equal(item.expectPodBind, gotBind) {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, gotBind)
|
|
}
|
|
|
|
if item.expectAssumeCalled != fakeVolumeBinder.AssumeCalled {
|
|
t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
|
|
}
|
|
|
|
if item.expectBindCalled != fakeVolumeBinder.BindCalled {
|
|
t.Errorf("expectedBindCall %v", item.expectBindCalled)
|
|
}
|
|
|
|
close(stop)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestInitPolicyFromFile(t *testing.T) {
|
|
dir, err := ioutil.TempDir(os.TempDir(), "policy")
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
defer os.RemoveAll(dir)
|
|
|
|
for i, test := range []struct {
|
|
policy string
|
|
expectedPredicates sets.String
|
|
}{
|
|
// Test json format policy file
|
|
{
|
|
policy: `{
|
|
"kind" : "Policy",
|
|
"apiVersion" : "v1",
|
|
"predicates" : [
|
|
{"name" : "PredicateOne"},
|
|
{"name" : "PredicateTwo"}
|
|
],
|
|
"priorities" : [
|
|
{"name" : "PriorityOne", "weight" : 1},
|
|
{"name" : "PriorityTwo", "weight" : 5}
|
|
]
|
|
}`,
|
|
expectedPredicates: sets.NewString(
|
|
"PredicateOne",
|
|
"PredicateTwo",
|
|
),
|
|
},
|
|
// Test yaml format policy file
|
|
{
|
|
policy: `apiVersion: v1
|
|
kind: Policy
|
|
predicates:
|
|
- name: PredicateOne
|
|
- name: PredicateTwo
|
|
priorities:
|
|
- name: PriorityOne
|
|
weight: 1
|
|
- name: PriorityTwo
|
|
weight: 5
|
|
`,
|
|
expectedPredicates: sets.NewString(
|
|
"PredicateOne",
|
|
"PredicateTwo",
|
|
),
|
|
},
|
|
} {
|
|
file := fmt.Sprintf("scheduler-policy-config-file-%d", i)
|
|
fullPath := path.Join(dir, file)
|
|
|
|
if err := ioutil.WriteFile(fullPath, []byte(test.policy), 0644); err != nil {
|
|
t.Fatalf("Failed writing a policy config file: %v", err)
|
|
}
|
|
|
|
policy := &schedulerapi.Policy{}
|
|
|
|
if err := initPolicyFromFile(fullPath, policy); err != nil {
|
|
t.Fatalf("Failed writing a policy config file: %v", err)
|
|
}
|
|
|
|
// Verify that the policy is initialized correctly.
|
|
schedPredicates := sets.NewString()
|
|
for _, p := range policy.Predicates {
|
|
schedPredicates.Insert(p.Name)
|
|
}
|
|
schedPrioritizers := sets.NewString()
|
|
for _, p := range policy.Priorities {
|
|
schedPrioritizers.Insert(p.Name)
|
|
}
|
|
if !schedPredicates.Equal(test.expectedPredicates) {
|
|
t.Errorf("Expected predicates %v, got %v", test.expectedPredicates, schedPredicates)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSchedulerBinding(t *testing.T) {
|
|
table := []struct {
|
|
podName string
|
|
extenders []framework.Extender
|
|
wantBinderID int
|
|
name string
|
|
}{
|
|
{
|
|
name: "the extender is not a binder",
|
|
podName: "pod0",
|
|
extenders: []framework.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: -1, // default binding.
|
|
},
|
|
{
|
|
name: "one of the extenders is a binder and interested in pod",
|
|
podName: "pod0",
|
|
extenders: []framework.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
|
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: 1,
|
|
},
|
|
{
|
|
name: "one of the extenders is a binder, but not interested in pod",
|
|
podName: "pod1",
|
|
extenders: []framework.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
|
|
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: -1, // default binding.
|
|
},
|
|
}
|
|
|
|
for _, test := range table {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: test.podName,
|
|
},
|
|
}
|
|
defaultBound := false
|
|
client := clientsetfake.NewSimpleClientset(pod)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() == "binding" {
|
|
defaultBound = true
|
|
}
|
|
return false, nil, nil
|
|
})
|
|
fwk, err := st.NewFramework([]st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}, frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
stop := make(chan struct{})
|
|
defer close(stop)
|
|
scache := internalcache.New(100*time.Millisecond, stop)
|
|
algo := core.NewGenericScheduler(
|
|
scache,
|
|
nil,
|
|
test.extenders,
|
|
0,
|
|
)
|
|
sched := Scheduler{
|
|
Algorithm: algo,
|
|
SchedulerCache: scache,
|
|
}
|
|
err = sched.bind(context.Background(), fwk, pod, "node", nil)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
// Checking default binding.
|
|
if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
|
|
t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
|
|
}
|
|
|
|
// Checking extenders binding.
|
|
for i, ext := range test.extenders {
|
|
wantBound := i == test.wantBinderID
|
|
if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
|
|
t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
|
|
}
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpdatePod(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
currentPodConditions []v1.PodCondition
|
|
newPodCondition *v1.PodCondition
|
|
currentNominatedNodeName string
|
|
newNominatedNodeName string
|
|
expectedPatchRequests int
|
|
expectedPatchDataPattern string
|
|
}{
|
|
{
|
|
name: "Should make patch request to add pod condition when there are none currently",
|
|
currentPodConditions: []v1.PodCondition{},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "newType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectedPatchRequests: 1,
|
|
expectedPatchDataPattern: `{"status":{"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to add a new pod condition when there is already one with another type",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "someOtherType",
|
|
Status: "someOtherTypeStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 11, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "someOtherTypeReason",
|
|
Message: "someOtherTypeMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "newType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectedPatchRequests: 1,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"someOtherType"},{"type":"newType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to update an existing pod condition",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectedPatchRequests: 1,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to update an existing pod condition, but the transition time should remain unchanged because the status is the same",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectedPatchRequests: 1,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should not make patch request if pod condition already exists and is identical and nominated node name is not set",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
currentNominatedNodeName: "node1",
|
|
expectedPatchRequests: 0,
|
|
},
|
|
{
|
|
name: "Should make patch request if pod condition already exists and is identical but nominated node name is set and different",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
newNominatedNodeName: "node1",
|
|
expectedPatchRequests: 1,
|
|
expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`,
|
|
},
|
|
}
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
actualPatchRequests := 0
|
|
var actualPatchData string
|
|
cs := &clientsetfake.Clientset{}
|
|
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
actualPatchRequests++
|
|
patch := action.(clienttesting.PatchAction)
|
|
actualPatchData = string(patch.GetPatch())
|
|
// For this test, we don't care about the result of the patched pod, just that we got the expected
|
|
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
|
|
return true, &v1.Pod{}, nil
|
|
})
|
|
|
|
pod := &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
|
|
Status: v1.PodStatus{
|
|
Conditions: test.currentPodConditions,
|
|
NominatedNodeName: test.currentNominatedNodeName,
|
|
},
|
|
}
|
|
|
|
if err := updatePod(cs, pod, test.newPodCondition, test.newNominatedNodeName); err != nil {
|
|
t.Fatalf("Error calling update: %v", err)
|
|
}
|
|
|
|
if actualPatchRequests != test.expectedPatchRequests {
|
|
t.Fatalf("Actual patch requests (%d) does not equal expected patch requests (%d), actual patch data: %v", actualPatchRequests, test.expectedPatchRequests, actualPatchData)
|
|
}
|
|
|
|
regex, err := regexp.Compile(test.expectedPatchDataPattern)
|
|
if err != nil {
|
|
t.Fatalf("Error compiling regexp for %v: %v", test.expectedPatchDataPattern, err)
|
|
}
|
|
|
|
if test.expectedPatchRequests > 0 && !regex.MatchString(actualPatchData) {
|
|
t.Fatalf("Patch data mismatch: Actual was %v, but expected to match regexp %v", actualPatchData, test.expectedPatchDataPattern)
|
|
}
|
|
})
|
|
}
|
|
}
|