Merge pull request #87948 from notpad/feature/pod_backoff

Remove PodBackoffMap
This commit is contained in:
Kubernetes Prow Robot
2020-02-14 13:11:28 -08:00
committed by GitHub
5 changed files with 120 additions and 345 deletions

View File

@@ -4,7 +4,6 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"events.go", "events.go",
"pod_backoff.go",
"scheduling_queue.go", "scheduling_queue.go",
], ],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue", importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue",
@@ -25,10 +24,7 @@ go_library(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = ["scheduling_queue_test.go"],
"pod_backoff_test.go",
"scheduling_queue_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",

View File

@@ -1,119 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"sync"
"time"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/util"
)
// PodBackoffMap is a structure that stores backoff related information for pods
type PodBackoffMap struct {
// lock for performing actions on this PodBackoffMap
lock sync.RWMutex
clock util.Clock
// initial backoff duration
initialDuration time.Duration
// maximal backoff duration
maxDuration time.Duration
// map for pod -> number of attempts for this pod
podAttempts map[ktypes.NamespacedName]int
// map for pod -> lastUpdateTime pod of this pod
podLastUpdateTime map[ktypes.NamespacedName]time.Time
}
// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration.
func NewPodBackoffMap(initialDuration, maxDuration time.Duration, clock util.Clock) *PodBackoffMap {
return &PodBackoffMap{
clock: clock,
initialDuration: initialDuration,
maxDuration: maxDuration,
podAttempts: make(map[ktypes.NamespacedName]int),
podLastUpdateTime: make(map[ktypes.NamespacedName]time.Time),
}
}
// GetBackoffTime returns the time that nsPod completes backoff
func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time, bool) {
pbm.lock.RLock()
defer pbm.lock.RUnlock()
if _, found := pbm.podAttempts[nsPod]; found == false {
return time.Time{}, false
}
lastUpdateTime := pbm.podLastUpdateTime[nsPod]
backoffDuration := pbm.calculateBackoffDuration(nsPod)
backoffTime := lastUpdateTime.Add(backoffDuration)
return backoffTime, true
}
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
backoffDuration := pbm.initialDuration
if _, found := pbm.podAttempts[nsPod]; found {
for i := 1; i < pbm.podAttempts[nsPod]; i++ {
backoffDuration = backoffDuration * 2
if backoffDuration > pbm.maxDuration {
return pbm.maxDuration
}
}
}
return backoffDuration
}
// clearPodBackoff removes all tracking information for nsPod.
// Lock is supposed to be acquired by caller.
func (pbm *PodBackoffMap) clearPodBackoff(nsPod ktypes.NamespacedName) {
delete(pbm.podAttempts, nsPod)
delete(pbm.podLastUpdateTime, nsPod)
}
// ClearPodBackoff is the thread safe version of clearPodBackoff
func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) {
pbm.lock.Lock()
pbm.clearPodBackoff(nsPod)
pbm.lock.Unlock()
}
// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff,
// i.e, it will remove a pod from the PodBackoffMap if
// lastUpdateTime + maxDuration >> timestamp
// We should wait longer than the maxDuration so that the pod gets a chance to
// (1) move to the active queue and (2) get an schedule attempt.
func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
pbm.lock.Lock()
defer pbm.lock.Unlock()
for pod, value := range pbm.podLastUpdateTime {
// Here we assume that maxDuration should be enough for a pod to move up the
// active queue and get an schedule attempt.
if value.Add(2 * pbm.maxDuration).Before(pbm.clock.Now()) {
pbm.clearPodBackoff(pod)
}
}
}
// BackoffPod updates the lastUpdateTime for an nsPod,
// and increases its numberOfAttempts by 1
func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) {
pbm.lock.Lock()
pbm.podLastUpdateTime[nsPod] = pbm.clock.Now()
pbm.podAttempts[nsPod]++
pbm.lock.Unlock()
}

View File

@@ -1,99 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"fmt"
"testing"
"time"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
)
func TestBackoffPod(t *testing.T) {
timestamp := time.Now()
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second, clock.NewFakeClock(timestamp))
tests := []struct {
podID ktypes.NamespacedName
expectedDuration time.Duration
advanceClock time.Duration
}{
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 1 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 2 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 4 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 8 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 10 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 10 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"},
expectedDuration: 1 * time.Second,
},
}
for i, test := range tests {
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
bpm.BackoffPod(test.podID)
backoff, ok := bpm.GetBackoffTime(test.podID)
if !ok {
t.Errorf("%v should be backed off", test.podID)
}
duration := backoff.Sub(timestamp)
if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
}
})
}
}
func TestClearPodBackoff(t *testing.T) {
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second, clock.NewFakeClock(time.Now()))
// Clear backoff on an not existed pod
bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"})
// Backoff twice for pod foo
podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"}
bpm.BackoffPod(podID)
bpm.BackoffPod(podID)
if duration := bpm.calculateBackoffDuration(podID); duration != 2*time.Second {
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
}
// Clear backoff for pod foo
bpm.clearPodBackoff(podID)
// Backoff once for pod foo
bpm.BackoffPod(podID)
if duration := bpm.calculateBackoffDuration(podID); duration != 1*time.Second {
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
}
}

View File

@@ -118,8 +118,11 @@ func NominatedNodeName(pod *v1.Pod) string {
type PriorityQueue struct { type PriorityQueue struct {
stop chan struct{} stop chan struct{}
clock util.Clock clock util.Clock
// podBackoff tracks backoff for pods attempting to be rescheduled
podBackoff *PodBackoffMap // pod initial backoff duration.
podInitialBackoffDuration time.Duration
// pod maximum backoff duration.
podMaxBackoffDuration time.Duration
lock sync.RWMutex lock sync.RWMutex
cond sync.Cond cond sync.Cond
@@ -212,13 +215,14 @@ func NewPriorityQueue(
} }
pq := &PriorityQueue{ pq := &PriorityQueue{
clock: options.clock, clock: options.clock,
stop: make(chan struct{}), stop: make(chan struct{}),
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration, options.clock), podInitialBackoffDuration: options.podInitialBackoffDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), podMaxBackoffDuration: options.podMaxBackoffDuration,
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()), activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
nominatedPods: newNominatedPodMap(), unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1, nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
} }
pq.cond.L = &pq.lock pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@@ -265,33 +269,13 @@ func nsNameForPod(pod *v1.Pod) ktypes.NamespacedName {
} }
} }
// clearPodBackoff clears all backoff state for a pod (resets expiry) // isPodBackingoff returns true if a pod is still waiting for its backoff timer.
func (p *PriorityQueue) clearPodBackoff(pod *v1.Pod) {
p.podBackoff.ClearPodBackoff(nsNameForPod(pod))
}
// isPodBackingOff returns true if a pod is still waiting for its backoff timer.
// If this returns true, the pod should not be re-tried. // If this returns true, the pod should not be re-tried.
func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { func (p *PriorityQueue) isPodBackingoff(podInfo *framework.PodInfo) bool {
boTime, exists := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) boTime := p.getBackoffTime(podInfo)
if !exists {
return false
}
return boTime.After(p.clock.Now()) return boTime.After(p.clock.Now())
} }
// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
// timeout otherwise it does nothing.
func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
p.podBackoff.CleanupPodsCompletesBackingoff()
podID := nsNameForPod(pod)
boTime, found := p.podBackoff.GetBackoffTime(podID)
if !found || boTime.Before(p.clock.Now()) {
p.podBackoff.BackoffPod(podID)
}
}
// SchedulingCycle returns current scheduling cycle. // SchedulingCycle returns current scheduling cycle.
func (p *PriorityQueue) SchedulingCycle() int64 { func (p *PriorityQueue) SchedulingCycle() int64 {
p.lock.RLock() p.lock.RLock()
@@ -320,9 +304,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.PodInfo, p
return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod)) return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
} }
// Every unschedulable pod is subject to backoff timers.
p.backoffPod(pod)
// If a move request has been received, move it to the BackoffQ, otherwise move // If a move request has been received, move it to the BackoffQ, otherwise move
// it to unschedulableQ. // it to unschedulableQ.
if p.moveRequestCycle >= podSchedulingCycle { if p.moveRequestCycle >= podSchedulingCycle {
@@ -350,16 +331,7 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
return return
} }
pod := rawPodInfo.(*framework.PodInfo).Pod pod := rawPodInfo.(*framework.PodInfo).Pod
boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
if !found {
klog.Errorf("Unable to find backoff value for pod %v in backoff queue", nsNameForPod(pod))
p.podBackoffQ.Pop()
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
defer p.cond.Broadcast()
continue
}
if boTime.After(p.clock.Now()) { if boTime.After(p.clock.Now()) {
return return
} }
@@ -465,8 +437,6 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil { if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
p.nominatedPods.update(oldPod, newPod) p.nominatedPods.update(oldPod, newPod)
if isPodUpdated(oldPod, newPod) { if isPodUpdated(oldPod, newPod) {
// If the pod is updated reset backoff
p.clearPodBackoff(newPod)
p.unschedulableQ.delete(usPodInfo.Pod) p.unschedulableQ.delete(usPodInfo.Pod)
err := p.activeQ.Add(updatePod(usPodInfo, newPod)) err := p.activeQ.Add(updatePod(usPodInfo, newPod))
if err == nil { if err == nil {
@@ -495,7 +465,6 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.nominatedPods.delete(pod) p.nominatedPods.delete(pod)
err := p.activeQ.Delete(newPodInfoNoTimestamp(pod)) err := p.activeQ.Delete(newPodInfoNoTimestamp(pod))
if err != nil { // The item was probably not found in the activeQ. if err != nil { // The item was probably not found in the activeQ.
p.clearPodBackoff(pod)
p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod)) p.podBackoffQ.Delete(newPodInfoNoTimestamp(pod))
p.unschedulableQ.delete(pod) p.unschedulableQ.delete(pod)
} }
@@ -538,7 +507,7 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) { func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.PodInfo, event string) {
for _, pInfo := range podInfoList { for _, pInfo := range podInfoList {
pod := pInfo.Pod pod := pInfo.Pod
if p.isPodBackingOff(pod) { if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil { if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
} else { } else {
@@ -640,8 +609,8 @@ func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.PodInfo) pInfo1 := podInfo1.(*framework.PodInfo)
pInfo2 := podInfo2.(*framework.PodInfo) pInfo2 := podInfo2.(*framework.PodInfo)
bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod)) bo1 := p.getBackoffTime(pInfo1)
bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod)) bo2 := p.getBackoffTime(pInfo2)
return bo1.Before(bo2) return bo1.Before(bo2)
} }
@@ -662,6 +631,26 @@ func (p *PriorityQueue) newPodInfo(pod *v1.Pod) *framework.PodInfo {
} }
} }
// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
duration := p.calculateBackoffDuration(podInfo)
backoffTime := podInfo.Timestamp.Add(duration)
return backoffTime
}
// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
duration := p.podInitialBackoffDuration
for i := 1; i < podInfo.Attempts; i++ {
duration = duration * 2
if duration > p.podMaxBackoffDuration {
return p.podMaxBackoffDuration
}
}
return duration
}
func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo { func updatePod(oldPodInfo interface{}, newPod *v1.Pod) *framework.PodInfo {
pInfo := oldPodInfo.(*framework.PodInfo) pInfo := oldPodInfo.(*framework.PodInfo)
pInfo.Pod = newPod pInfo.Pod = newPod

View File

@@ -111,12 +111,6 @@ var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.
}, },
} }
func addOrUpdateUnschedulablePod(p *PriorityQueue, podInfo *framework.PodInfo) {
p.lock.Lock()
defer p.lock.Unlock()
p.unschedulableQ.addOrUpdate(podInfo)
}
func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod { func getUnschedulablePod(p *PriorityQueue, pod *v1.Pod) *v1.Pod {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@@ -347,6 +341,18 @@ func TestPriorityQueue_Update(t *testing.T) {
if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod { if p, err := q.Pop(); err != nil || p.Pod != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
} }
// Updating a pod that is in unschedulableQ in a way that it may
// become schedulable should add the pod to the activeQ.
q.AddUnschedulableIfNotPresent(q.newPodInfo(&medPriorityPod), q.SchedulingCycle())
if len(q.unschedulableQ.podInfoMap) != 1 {
t.Error("Expected unschedulableQ to be 1.")
}
updatedPod := medPriorityPod.DeepCopy()
updatedPod.ClusterName = "test"
q.Update(&medPriorityPod, updatedPod)
if p, err := q.Pop(); err != nil || p.Pod != updatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", updatedPod.Name, p.Pod.Name)
}
} }
func TestPriorityQueue_Delete(t *testing.T) { func TestPriorityQueue_Delete(t *testing.T) {
@@ -373,14 +379,17 @@ func TestPriorityQueue_Delete(t *testing.T) {
} }
} }
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) { func TestPriorityQueue_MoveAllToActiveOrBackoffQueue(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework()) q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
q.MoveAllToActiveOrBackoffQueue("test") q.MoveAllToActiveOrBackoffQueue("test")
if q.activeQ.Len() != 3 { if q.activeQ.Len() != 1 {
t.Error("Expected all items to be in activeQ.") t.Error("Expected 1 item to be in activeQ")
}
if q.podBackoffQ.Len() != 2 {
t.Error("Expected 2 items to be in podBackoffQ")
} }
} }
@@ -420,11 +429,15 @@ func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
Spec: v1.PodSpec{NodeName: "machine1"}, Spec: v1.PodSpec{NodeName: "machine1"},
} }
q := createAndRunPriorityQueue(newDefaultFramework()) c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ. // Add a couple of pods to the unschedulableQ.
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
addOrUpdateUnschedulablePod(q, q.newPodInfo(affinityPod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(affinityPod), q.SchedulingCycle())
// Move clock to make the unschedulable pods complete backoff.
c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Simulate addition of an assigned pod. The pod has matching labels for // Simulate addition of an assigned pod. The pod has matching labels for
// affinityPod. So, affinityPod should go to activeQ. // affinityPod. So, affinityPod should go to activeQ.
q.AssignedPodAdded(&labelPod) q.AssignedPodAdded(&labelPod)
@@ -468,8 +481,9 @@ func TestPriorityQueue_PendingPods(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework()) q := createAndRunPriorityQueue(newDefaultFramework())
q.Add(&medPriorityPod) q.Add(&medPriorityPod)
addOrUpdateUnschedulablePod(q, q.newPodInfo(&unschedulablePod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(&unschedulablePod), q.SchedulingCycle())
addOrUpdateUnschedulablePod(q, q.newPodInfo(&highPriorityPod)) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPriorityPod), q.SchedulingCycle())
expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod}) expectedSet := makeSet([]*v1.Pod{&medPriorityPod, &unschedulablePod, &highPriorityPod})
if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) { if !reflect.DeepEqual(expectedSet, makeSet(q.PendingPods())) {
t.Error("Unexpected list of pending Pods.") t.Error("Unexpected list of pending Pods.")
@@ -557,12 +571,12 @@ func TestPriorityQueue_NewWithOptions(t *testing.T) {
WithPodMaxBackoffDuration(20*time.Second), WithPodMaxBackoffDuration(20*time.Second),
) )
if q.podBackoff.initialDuration != 2*time.Second { if q.podInitialBackoffDuration != 2*time.Second {
t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.initialDuration) t.Errorf("Unexpected pod backoff initial duration. Expected: %v, got: %v", 2*time.Second, q.podInitialBackoffDuration)
} }
if q.podBackoff.maxDuration != 20*time.Second { if q.podMaxBackoffDuration != 20*time.Second {
t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podBackoff.maxDuration) t.Errorf("Unexpected pod backoff max duration. Expected: %v, got: %v", 2*time.Second, q.podMaxBackoffDuration)
} }
} }
@@ -807,7 +821,8 @@ func TestRecentlyTriedPodsGoBack(t *testing.T) {
// This behavior ensures that an unschedulable pod does not block head of the queue when there // This behavior ensures that an unschedulable pod does not block head of the queue when there
// are frequent events that move pods to the active queue. // are frequent events that move pods to the active queue.
func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) { func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework()) c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
// Add an unschedulable pod to a priority queue. // Add an unschedulable pod to a priority queue.
// This makes a situation that the pod was tried to schedule // This makes a situation that the pod was tried to schedule
@@ -836,8 +851,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// Put in the unschedulable queue // Put in the unschedulable queue
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
// Clear its backoff to simulate backoff its expiration // Move clock to make the unschedulable pods complete backoff.
q.clearPodBackoff(&unschedulablePod) c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue. // Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue("test") q.MoveAllToActiveOrBackoffQueue("test")
@@ -879,8 +894,8 @@ func TestPodFailedSchedulingMultipleTimesDoesNotBlockNewerPod(t *testing.T) {
// And then, put unschedulable pod to the unschedulable queue // And then, put unschedulable pod to the unschedulable queue
q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle()) q.AddUnschedulableIfNotPresent(newPodInfoNoTimestamp(&unschedulablePod), q.SchedulingCycle())
// Clear its backoff to simulate its backoff expiration // Move clock to make the unschedulable pods complete backoff.
q.clearPodBackoff(&unschedulablePod) c.Step(DefaultPodInitialBackoffDuration + time.Second)
// Move all unschedulable pods to the active queue. // Move all unschedulable pods to the active queue.
q.MoveAllToActiveOrBackoffQueue("test") q.MoveAllToActiveOrBackoffQueue("test")
@@ -961,7 +976,8 @@ func TestHighPriorityBackoff(t *testing.T) {
// TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to // TestHighPriorityFlushUnschedulableQLeftover tests that pods will be moved to
// activeQ after one minutes if it is in unschedulableQ // activeQ after one minutes if it is in unschedulableQ
func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) { func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
q := createAndRunPriorityQueue(newDefaultFramework()) c := clock.NewFakeClock(time.Now())
q := createAndRunPriorityQueue(newDefaultFramework(), WithClock(c))
midPod := v1.Pod{ midPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "test-midpod", Name: "test-midpod",
@@ -1005,12 +1021,9 @@ func TestHighPriorityFlushUnschedulableQLeftover(t *testing.T) {
Message: "fake scheduling failure", Message: "fake scheduling failure",
}) })
highPodInfo := q.newPodInfo(&highPod) q.AddUnschedulableIfNotPresent(q.newPodInfo(&highPod), q.SchedulingCycle())
highPodInfo.Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval) q.AddUnschedulableIfNotPresent(q.newPodInfo(&midPod), q.SchedulingCycle())
midPodInfo := q.newPodInfo(&midPod) c.Step(unschedulableQTimeInterval + time.Second)
midPodInfo.Timestamp = time.Now().Add(-1 * unschedulableQTimeInterval)
addOrUpdateUnschedulablePod(q, highPodInfo)
addOrUpdateUnschedulablePod(q, midPodInfo)
if p, err := q.Pop(); err != nil || p.Pod != &highPod { if p, err := q.Pop(); err != nil || p.Pod != &highPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name) t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Pod.Name)
@@ -1062,9 +1075,6 @@ var (
moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) { moveAllToActiveOrBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.MoveAllToActiveOrBackoffQueue("test") queue.MoveAllToActiveOrBackoffQueue("test")
} }
backoffPod = func(queue *PriorityQueue, pInfo *framework.PodInfo) {
queue.backoffPod(pInfo.Pod)
}
flushBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) { flushBackoffQ = func(queue *PriorityQueue, _ *framework.PodInfo) {
queue.clock.(*clock.FakeClock).Step(2 * time.Second) queue.clock.(*clock.FakeClock).Step(2 * time.Second)
queue.flushBackoffQCompleted() queue.flushBackoffQCompleted()
@@ -1137,9 +1147,10 @@ func TestPodTimestamp(t *testing.T) {
operations: []operation{ operations: []operation{
addPodUnschedulableQ, addPodUnschedulableQ,
addPodUnschedulableQ, addPodUnschedulableQ,
moveClockForward,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
operands: []*framework.PodInfo{pInfo2, pInfo1, nil}, operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil},
expected: []*framework.PodInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
{ {
@@ -1147,11 +1158,10 @@ func TestPodTimestamp(t *testing.T) {
operations: []operation{ operations: []operation{
addPodActiveQ, addPodActiveQ,
addPodBackoffQ, addPodBackoffQ,
backoffPod,
flushBackoffQ, flushBackoffQ,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
operands: []*framework.PodInfo{pInfo2, pInfo1, pInfo1, nil, nil}, operands: []*framework.PodInfo{pInfo2, pInfo1, nil, nil},
expected: []*framework.PodInfo{pInfo1, pInfo2}, expected: []*framework.PodInfo{pInfo1, pInfo2},
}, },
} }
@@ -1183,23 +1193,13 @@ func TestPodTimestamp(t *testing.T) {
// TestPendingPodsMetric tests Prometheus metrics related with pending pods // TestPendingPodsMetric tests Prometheus metrics related with pending pods
func TestPendingPodsMetric(t *testing.T) { func TestPendingPodsMetric(t *testing.T) {
total := 50
timestamp := time.Now() timestamp := time.Now()
metrics.Register() metrics.Register()
var pInfos = make([]*framework.PodInfo, 0, total) total := 50
for i := 1; i <= total; i++ { pInfos := makePodInfos(total, timestamp)
p := &framework.PodInfo{ totalWithDelay := 20
Pod: &v1.Pod{ pInfosWithDelay := makePodInfos(totalWithDelay, timestamp.Add(2*time.Second))
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("tp-%d", i)),
},
},
Timestamp: timestamp,
}
pInfos = append(pInfos, p)
}
tests := []struct { tests := []struct {
name string name string
operations []operation operations []operation
@@ -1230,14 +1230,12 @@ scheduler_pending_pods{queue="unschedulable"} 20
name: "add pods to all kinds of queues", name: "add pods to all kinds of queues",
operations: []operation{ operations: []operation{
addPodActiveQ, addPodActiveQ,
backoffPod,
addPodBackoffQ, addPodBackoffQ,
addPodUnschedulableQ, addPodUnschedulableQ,
}, },
operands: [][]*framework.PodInfo{ operands: [][]*framework.PodInfo{
pInfos[:15], pInfos[:15],
pInfos[15:40], pInfos[15:40],
pInfos[15:40],
pInfos[40:], pInfos[40:],
}, },
metricsName: "scheduler_pending_pods", metricsName: "scheduler_pending_pods",
@@ -1253,11 +1251,13 @@ scheduler_pending_pods{queue="unschedulable"} 10
name: "add pods to unschedulableQ and then move all to activeQ", name: "add pods to unschedulableQ and then move all to activeQ",
operations: []operation{ operations: []operation{
addPodUnschedulableQ, addPodUnschedulableQ,
moveClockForward,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
operands: [][]*framework.PodInfo{ operands: [][]*framework.PodInfo{
pInfos[:total], pInfos[:total],
{nil}, {nil},
{nil},
}, },
metricsName: "scheduler_pending_pods", metricsName: "scheduler_pending_pods",
wants: ` wants: `
@@ -1271,13 +1271,15 @@ scheduler_pending_pods{queue="unschedulable"} 0
{ {
name: "make some pods subject to backoff, add pods to unschedulableQ, and then move all to activeQ", name: "make some pods subject to backoff, add pods to unschedulableQ, and then move all to activeQ",
operations: []operation{ operations: []operation{
backoffPod, addPodUnschedulableQ,
moveClockForward,
addPodUnschedulableQ, addPodUnschedulableQ,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
}, },
operands: [][]*framework.PodInfo{ operands: [][]*framework.PodInfo{
pInfos[:20], pInfos[20:total],
pInfos[:total], {nil},
pInfosWithDelay[:20],
{nil}, {nil},
}, },
metricsName: "scheduler_pending_pods", metricsName: "scheduler_pending_pods",
@@ -1292,14 +1294,12 @@ scheduler_pending_pods{queue="unschedulable"} 0
{ {
name: "make some pods subject to backoff, add pods to unschedulableQ/activeQ, move all to activeQ, and finally flush backoffQ", name: "make some pods subject to backoff, add pods to unschedulableQ/activeQ, move all to activeQ, and finally flush backoffQ",
operations: []operation{ operations: []operation{
backoffPod,
addPodUnschedulableQ, addPodUnschedulableQ,
addPodActiveQ, addPodActiveQ,
moveAllToActiveOrBackoffQ, moveAllToActiveOrBackoffQ,
flushBackoffQ, flushBackoffQ,
}, },
operands: [][]*framework.PodInfo{ operands: [][]*framework.PodInfo{
pInfos[:20],
pInfos[:40], pInfos[:40],
pInfos[40:], pInfos[40:],
{nil}, {nil},
@@ -1564,10 +1564,7 @@ func TestBackOffFlow(t *testing.T) {
} }
// Check backoff duration. // Check backoff duration.
deadline, ok := q.podBackoff.GetBackoffTime(podID) deadline := q.getBackoffTime(podInfo)
if !ok {
t.Errorf("didn't get backoff for pod %s", podID)
}
backoff := deadline.Sub(timestamp) backoff := deadline.Sub(timestamp)
if backoff != step.wantBackoff { if backoff != step.wantBackoff {
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff) t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
@@ -1588,11 +1585,22 @@ func TestBackOffFlow(t *testing.T) {
} }
}) })
} }
// After some time, backoff information is cleared. }
cl.Step(time.Hour)
q.podBackoff.CleanupPodsCompletesBackingoff() func makePodInfos(num int, timestamp time.Time) []*framework.PodInfo {
_, ok := q.podBackoff.GetBackoffTime(podID) var pInfos = make([]*framework.PodInfo, 0, num)
if ok { for i := 1; i <= num; i++ {
t.Errorf("backoff information for pod %s was not cleared", podID) p := &framework.PodInfo{
} Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod-%d", i),
Namespace: fmt.Sprintf("ns%d", i),
UID: types.UID(fmt.Sprintf("tp-%d", i)),
},
},
Timestamp: timestamp,
}
pInfos = append(pInfos, p)
}
return pInfos
} }