Count ttl for assumed pod when binding is finished

In such cases when api server is overloaded and returns a lot of
429 (too many requests) errors - binding may take a lot of time
to succeed due to retry policy implemented in rest client.
In such events cache ttl for assumed pods wasn't big enough.

In order to minimize probability of such errors ttl for assumed pods
will be counted from the time when binding for particular pod is finished
(either with error or success)

Change-Id: Ib0122f8a76dc57c82f2c7c52497aad1bdd8be411
This commit is contained in:
Dmitry Shulyak
2016-12-08 18:15:06 +02:00
parent 0a0294cad6
commit 530ee716e3
6 changed files with 143 additions and 14 deletions

View File

@@ -138,6 +138,9 @@ func (s *Scheduler) scheduleOne() {
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := s.config.Binder.Bind(b)
if err := s.config.SchedulerCache.FinishBinding(&assumed); err != nil {
glog.Errorf("scheduler cache FinishBinding failed: %v", err)
}
if err != nil {
glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name)
if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil {

View File

@@ -297,6 +297,65 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
}
}
// Scheduler should preserve predicate constraint even if binding was longer
// than cache ttl
func TestSchedulerErrorWithLongBinding(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
firstPod := podWithPort("foo", "", 8080)
conflictPod := podWithPort("bar", "", 8080)
pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod}
for _, test := range []struct {
Expected map[string]bool
CacheTTL time.Duration
BindingDuration time.Duration
}{
{
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 100 * time.Millisecond,
BindingDuration: 300 * time.Millisecond,
},
{
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 10 * time.Second,
BindingDuration: 300 * time.Millisecond,
},
} {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(test.CacheTTL, stop)
node := v1.Node{ObjectMeta: v1.ObjectMeta{Name: "machine1"}}
scache.AddNode(&node)
nodeLister := algorithm.FakeNodeLister([]*v1.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration)
scheduler.Run()
queuedPodStore.Add(firstPod)
queuedPodStore.Add(conflictPod)
resultBindings := map[string]bool{}
waitChan := time.After(5 * time.Second)
for finished := false; !finished; {
select {
case b := <-bindingChan:
resultBindings[b.Name] = true
p := pods[b.Name]
p.Spec.NodeName = b.Target.Name
scache.AddPod(p)
case <-waitChan:
finished = true
}
}
if !reflect.DeepEqual(resultBindings, test.Expected) {
t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected)
}
}
}
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache,
@@ -429,3 +488,34 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
}
return New(cfg), bindingChan, errChan
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := NewGenericScheduler(
scache,
predicateMap,
algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
bindingChan := make(chan *v1.Binding, 2)
cfg := &Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
Binder: fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
StopEverything: stop,
}
return New(cfg), bindingChan
}

View File

@@ -60,6 +60,8 @@ type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
@@ -105,11 +107,6 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
return cache.assumePod(pod, time.Now())
}
// assumePod exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
key, err := getPodKey(pod)
if err != nil {
return err
@@ -122,16 +119,38 @@ func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
}
cache.addPod(pod)
dl := now.Add(cache.ttl)
ps := &podState{
pod: pod,
deadline: &dl,
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := getPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods[key] {
dl := now.Add(cache.ttl)
currState.bindingFinished = true
currState.deadline = &dl
}
return nil
}
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := getPodKey(pod)
if err != nil {
@@ -343,6 +362,11 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if !ps.bindingFinished {
glog.Warningf("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
if now.After(*ps.deadline) {
glog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {

View File

@@ -123,6 +123,13 @@ type testExpirePodStruct struct {
assumedTime time.Time
}
func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
if err := cache.AssumePod(pod); err != nil {
return err
}
return cache.finishBinding(pod, assumedTime)
}
// TestExpirePod tests that assumed pods will be removed if expired.
// The removal will be reflected in node info.
func TestExpirePod(t *testing.T) {
@@ -168,7 +175,7 @@ func TestExpirePod(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil {
if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@@ -215,7 +222,7 @@ func TestAddPodWillConfirm(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePod(podToAssume, now); err != nil {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@@ -259,7 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
now := time.Now()
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
if err := cache.assumePod(tt.pod, now); err != nil {
if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
@@ -388,7 +395,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePod(podToAssume, now); err != nil {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@@ -471,7 +478,7 @@ func TestForgetPod(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePod(pod, now); err != nil {
if err := assumeAndFinishBinding(cache, pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@@ -565,7 +572,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.assumePod(pod, assumedTime)
err := assumeAndFinishBinding(cache, pod, assumedTime)
if err != nil {
b.Fatalf("assumePod failed: %v", err)
}

View File

@@ -61,6 +61,9 @@ type Cache interface {
// After expiration, its information would be subtracted.
AssumePod(pod *v1.Pod) error
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *v1.Pod) error
// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *v1.Pod) error

View File

@@ -32,6 +32,8 @@ func (f *FakeCache) AssumePod(pod *v1.Pod) error {
return nil
}
func (f *FakeCache) FinishBinding(pod *v1.Pod) error { return nil }
func (f *FakeCache) ForgetPod(pod *v1.Pod) error { return nil }
func (f *FakeCache) AddPod(pod *v1.Pod) error { return nil }