Combine statefulset burst and monotonic scaling tests

Use subtests to avoid duplicating entire suite of control logic.
This commit is contained in:
Clayton Coleman 2017-04-25 00:40:45 -04:00
parent 2861ae5eb9
commit 20d45af694
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
3 changed files with 221 additions and 231 deletions

View File

@ -19,8 +19,12 @@ package statefulset
import ( import (
"errors" "errors"
"fmt" "fmt"
"math/rand"
"reflect"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
@ -28,9 +32,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1" appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
@ -40,24 +46,75 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
func TestDefaultStatefulSetControlCreatesPods(t *testing.T) { type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set)
func setupController(client clientset.Interface) (*fakeStatefulPodControl, StatefulSetControlInterface, chan struct{}) {
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc) ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{}) stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop) informerFactory.Start(stop)
cache.WaitForCacheSync( cache.WaitForCacheSync(
stop, stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced, informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced, informerFactory.Core().V1().Pods().Informer().HasSynced,
) )
return spc, ssc, stop
}
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { func burst(set *apps.StatefulSet) *apps.StatefulSet {
set.Spec.PodManagementPolicy = apps.ParallelPodManagement
return set
}
func TestStatefulSetControl(t *testing.T) {
simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
testCases := []struct {
fn func(*testing.T, *apps.StatefulSet, invariantFunc)
obj func() *apps.StatefulSet
}{
{CreatesPods, simpleSetFn},
{ScalesUp, simpleSetFn},
{ScalesDown, simpleSetFn},
{ReplacesPods, largeSetFn},
{RecreatesFailedPod, simpleSetFn},
{SetsInitAnnotation, simpleSetFn},
{CreatePodFailure, simpleSetFn},
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
}
for _, testCase := range testCases {
fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
if i := strings.LastIndex(fnName, "."); i != -1 {
fnName = fnName[i+1:]
}
t.Run(
fmt.Sprintf("%s/Monotonic", fnName),
func(t *testing.T) {
testCase.fn(t, testCase.obj(), assertMonotonicInvariants)
},
)
t.Run(
fmt.Sprintf("%s/Burst", fnName),
func(t *testing.T) {
set := burst(testCase.obj())
testCase.fn(t, set, assertBurstInvariants)
},
)
}
}
func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
defer close(stop)
if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error var err error
@ -70,28 +127,16 @@ func TestDefaultStatefulSetControlCreatesPods(t *testing.T) {
} }
} }
func TestStatefulSetControlScaleUp(t *testing.T) { func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
*set.Spec.Replicas = 4 *set.Spec.Replicas = 4
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to scale StatefulSet : %s", err) t.Errorf("Failed to scale StatefulSet : %s", err)
} }
var err error var err error
@ -104,28 +149,16 @@ func TestStatefulSetControlScaleUp(t *testing.T) {
} }
} }
func TestStatefulSetControlScaleDown(t *testing.T) { func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
*set.Spec.Replicas = 0 *set.Spec.Replicas = 0
if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to scale StatefulSet : %s", err) t.Errorf("Failed to scale StatefulSet : %s", err)
} }
if set.Status.Replicas != 0 { if set.Status.Replicas != 0 {
@ -133,24 +166,12 @@ func TestStatefulSetControlScaleDown(t *testing.T) {
} }
} }
func TestStatefulSetControlReplacesPods(t *testing.T) { func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(5)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error var err error
@ -215,12 +236,11 @@ func TestStatefulSetControlReplacesPods(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) { func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets()) spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc) ssc := NewDefaultStatefulSetControl(spc)
set := newStatefulSet(3)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -232,7 +252,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = spc.podsLister.Pods(set.Namespace).List(selector) pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@ -244,7 +264,7 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = spc.podsLister.Pods(set.Namespace).List(selector) pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@ -256,22 +276,10 @@ func TestDefaultStatefulSetControlRecreatesFailedPod(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) { func SetsInitAnnotation(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
@ -284,7 +292,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if err = ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err = assertInvariants(set, spc); err != nil { if err = invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
if pods, err = spc.setPodRunning(set, 0); err != nil { if pods, err = spc.setPodRunning(set, 0); err != nil {
@ -300,7 +308,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
if replicas != int(set.Status.Replicas) { if replicas != int(set.Status.Replicas) {
@ -309,7 +317,7 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
if pods, err = spc.setPodInitStatus(set, 0, true); err != nil { if pods, err = spc.setPodInitStatus(set, 0, true); err != nil {
t.Error(err) t.Error(err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -321,28 +329,16 @@ func TestDefaultStatefulSetControlInitAnnotation(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) { func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) defer close(stop)
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{}) if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error var err error
@ -355,26 +351,14 @@ func TestDefaultStatefulSetControlCreatePodFailure(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) { func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) defer close(stop)
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
// have to have 1 successful loop first // have to have 1 successful loop first
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
var err error var err error
@ -404,25 +388,13 @@ func TestDefaultStatefulSetControlUpdatePodFailure(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) { func testDefaultStatefulSetControlBlocksOnTerminating(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) defer close(stop)
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
stop := make(chan struct{}) if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
var err error var err error
@ -458,7 +430,7 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
if len(pods) != 2 { if len(pods) != 2 {
t.Fatalf("Expected 3 pods, got %d", len(pods)) t.Fatalf("Expected 3 pods, got %d", len(pods))
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -470,28 +442,16 @@ func TestDefaultStatefulSetControlBlocksOnTerminating(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) { func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) defer close(stop)
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
stop := make(chan struct{}) if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError found %s", err) t.Errorf("StatefulSetControl did not return InternalError found %s", err)
} }
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error var err error
@ -504,22 +464,10 @@ func TestDefaultStatefulSetControlUpdateSetStatusFailure(t *testing.T) {
} }
} }
func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) { func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
@ -532,7 +480,7 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = spc.podsLister.Pods(set.Namespace).List(selector) pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@ -545,13 +493,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) { if err := ssc.UpdateStatefulSet(set, pods); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSet failed to %s", err) t.Errorf("StatefulSet failed to %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err) t.Errorf("Error updating StatefulSet %s", err)
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
t.Error(err) t.Error(err)
} }
pods, err = spc.podsLister.Pods(set.Namespace).List(selector) pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@ -564,23 +512,13 @@ func TestDefaultStatefulSetControlPodRecreateDeleteError(t *testing.T) {
} }
func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
invariants := assertMonotonicInvariants
set := newStatefulSet(3) set := newStatefulSet(3)
client := fake.NewSimpleClientset(set) client := fake.NewSimpleClientset(set)
spc, ssc, stop := setupController(client)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1beta1().StatefulSets())
ssc := NewDefaultStatefulSetControl(spc)
stop := make(chan struct{})
defer close(stop) defer close(stop)
informerFactory.Start(stop)
cache.WaitForCacheSync(
stop,
informerFactory.Apps().V1beta1().StatefulSets().Informer().HasSynced,
informerFactory.Core().V1().Pods().Informer().HasSynced,
)
if err := scaleUpStatefulSetControl(set, ssc, spc); err != nil { if err := scaleUpStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err) t.Errorf("Failed to turn up StatefulSet : %s", err)
} }
var err error var err error
@ -590,10 +528,10 @@ func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
} }
*set.Spec.Replicas = 0 *set.Spec.Replicas = 0
spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
if err := scaleDownStatefulSetControl(set, ssc, spc); !apierrors.IsInternalError(err) { if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); !apierrors.IsInternalError(err) {
t.Errorf("StatefulSetControl failed to throw error on delete %s", err) t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
} }
if err := scaleDownStatefulSetControl(set, ssc, spc); err != nil { if err := scaleDownStatefulSetControl(t, set, ssc, spc, invariants); err != nil {
t.Errorf("Failed to turn down StatefulSet %s", err) t.Errorf("Failed to turn down StatefulSet %s", err)
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@ -672,6 +610,14 @@ func (spc *fakeStatefulPodControl) SetUpdateStatefulSetStatusError(err error, af
spc.updateStatusTracker.after = after spc.updateStatusTracker.after = after
} }
func copyPod(pod *v1.Pod) *v1.Pod {
obj, err := api.Scheme.Copy(pod)
if err != nil {
panic(err)
}
return obj.(*v1.Pod)
}
func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
@ -685,7 +631,7 @@ func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
} }
sort.Sort(ascendingOrdinal(pods)) sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal] pod := copyPod(pods[ordinal])
pod.Status.Phase = v1.PodPending pod.Status.Phase = v1.PodPending
fakeResourceVersion(pod) fakeResourceVersion(pod)
spc.podsIndexer.Update(pod) spc.podsIndexer.Update(pod)
@ -705,7 +651,7 @@ func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
} }
sort.Sort(ascendingOrdinal(pods)) sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal] pod := copyPod(pods[ordinal])
pod.Status.Phase = v1.PodRunning pod.Status.Phase = v1.PodRunning
fakeResourceVersion(pod) fakeResourceVersion(pod)
spc.podsIndexer.Update(pod) spc.podsIndexer.Update(pod)
@ -725,7 +671,7 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
} }
sort.Sort(ascendingOrdinal(pods)) sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal] pod := copyPod(pods[ordinal])
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
podutil.UpdatePodCondition(&pod.Status, &condition) podutil.UpdatePodCondition(&pod.Status, &condition)
fakeResourceVersion(pod) fakeResourceVersion(pod)
@ -746,7 +692,7 @@ func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordin
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
} }
sort.Sort(ascendingOrdinal(pods)) sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal] pod := copyPod(pods[ordinal])
if init { if init {
pod.Annotations[apps.StatefulSetInitAnnotation] = "true" pod.Annotations[apps.StatefulSetInitAnnotation] = "true"
} else { } else {
@ -850,7 +796,7 @@ func (spc *fakeStatefulPodControl) UpdateStatefulSetStatus(set *apps.StatefulSet
var _ StatefulPodControlInterface = &fakeStatefulPodControl{} var _ StatefulPodControlInterface = &fakeStatefulPodControl{}
func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
return err return err
@ -862,31 +808,61 @@ func assertInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error
sort.Sort(ascendingOrdinal(pods)) sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ { for ord := 0; ord < len(pods); ord++ {
if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) { if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
return fmt.Errorf("Predecessor %s is Running and Ready while %s is not", return fmt.Errorf("Successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
pods[ord-1].Name,
pods[ord].Name)
} }
if getOrdinal(pods[ord]) != ord { if getOrdinal(pods[ord]) != ord {
return fmt.Errorf("Pods %s deployed in the wrong order", return fmt.Errorf("pods %s deployed in the wrong order", pods[ord].Name)
pods[ord].Name)
} }
if !storageMatches(set, pods[ord]) { if !storageMatches(set, pods[ord]) {
return fmt.Errorf("Pods %s does not match the storage specification of StatefulSet %s ", return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
pods[ord]. }
Name, set.Name)
} else { for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name); err != nil { if err != nil {
return err return err
} else if claim == nil { }
return fmt.Errorf("claim %s for Pod %s was not created", if claim == nil {
claim.Name, return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
pods[ord].Name)
}
} }
} }
if !identityMatches(set, pods[ord]) { if !identityMatches(set, pods[ord]) {
return fmt.Errorf("Pods %s does not match the identity specification of StatefulSet %s ", return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
}
}
return nil
}
func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return err
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
return err
}
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err != nil {
return err
}
if claim == nil {
return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name)
}
}
if !identityMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
pods[ord].Name, pods[ord].Name,
set.Name) set.Name)
} }
@ -898,14 +874,15 @@ func fakeResourceVersion(object interface{}) {
obj, isObj := object.(metav1.Object) obj, isObj := object.(metav1.Object)
if !isObj { if !isObj {
return return
} else if version := obj.GetResourceVersion(); version == "" { }
if version := obj.GetResourceVersion(); version == "" {
obj.SetResourceVersion("1") obj.SetResourceVersion("1")
} else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil { } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10)) obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
} }
} }
func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error { func scaleUpStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
return err return err
@ -915,47 +892,59 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInte
if err != nil { if err != nil {
return err return err
} }
if ord := len(pods) - 1; ord >= 0 { sort.Sort(ascendingOrdinal(pods))
ord := len(pods) - 1
if pods, err = spc.setPodPending(set, ord); err != nil { // ensure all pods are valid (have a phase)
return err initialized := false
} for ord, pod := range pods {
if err = ssc.UpdateStatefulSet(set, pods); err != nil { if pod.Status.Phase == "" {
return err t.Logf("found pod %s pending", pod.Name)
} if pods, err = spc.setPodPending(set, ord); err != nil {
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) return err
if err != nil { }
return err break
}
if pods, err = spc.setPodRunning(set, ord); err != nil {
return err
}
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err
}
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
if pods, err = spc.setPodReady(set, ord); err != nil {
return err
} }
} }
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if initialized {
continue
}
// select one of the pods and move it forward in status
if len(pods) > 0 {
ord := int(rand.Int63n(int64(len(pods))))
pod := pods[ord]
switch pod.Status.Phase {
case v1.PodPending:
t.Logf("set pod %s running", pod.Name)
if pods, err = spc.setPodRunning(set, ord); err != nil {
return err
}
case v1.PodRunning:
t.Logf("set pod %s ready", pod.Name)
if pods, err = spc.setPodReady(set, ord); err != nil {
return err
}
default:
continue
}
}
// run the controller once and check invariants
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
} }
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil { if err != nil {
return err return err
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
return err return err
} }
} }
return assertInvariants(set, spc) return invariants(set, spc)
} }
func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl) error { func scaleDownStatefulSetControl(t *testing.T, set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil { if err != nil {
return err return err
@ -965,6 +954,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err != nil { if err != nil {
return err return err
} }
sort.Sort(ascendingOrdinal(pods))
if ordinal := len(pods) - 1; ordinal >= 0 { if ordinal := len(pods) - 1; ordinal >= 0 {
if err := ssc.UpdateStatefulSet(set, pods); err != nil { if err := ssc.UpdateStatefulSet(set, pods); err != nil {
return err return err
@ -997,9 +987,9 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err != nil { if err != nil {
return err return err
} }
if err := assertInvariants(set, spc); err != nil { if err := invariants(set, spc); err != nil {
return err return err
} }
} }
return assertInvariants(set, spc) return invariants(set, spc)
} }

View File

@ -640,7 +640,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
pod = getPodAtOrdinal(pods, ord) pod = getPodAtOrdinal(pods, ord)
ssc.updatePod(&prev, pod) ssc.updatePod(&prev, pod)
fakeWorker(ssc) fakeWorker(ssc)
if err := assertInvariants(set, spc); err != nil { if err := assertMonotonicInvariants(set, spc); err != nil {
return err return err
} }
if obj, _, err := spc.setsIndexer.Get(set); err != nil { if obj, _, err := spc.setsIndexer.Get(set); err != nil {
@ -650,7 +650,7 @@ func scaleUpStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetControl
} }
} }
return assertInvariants(set, spc) return assertMonotonicInvariants(set, spc)
} }
func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error { func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetController, spc *fakeStatefulPodControl) error {
@ -692,5 +692,5 @@ func scaleDownStatefulSetController(set *apps.StatefulSet, ssc *StatefulSetContr
set = obj.(*apps.StatefulSet) set = obj.(*apps.StatefulSet)
} }
} }
return assertInvariants(set, spc) return assertMonotonicInvariants(set, spc)
} }

View File

@ -229,7 +229,7 @@ func isHealthy(pod *v1.Pod) bool {
// allowsBurst is true if the alpha burst annotation is set. // allowsBurst is true if the alpha burst annotation is set.
func allowsBurst(set *apps.StatefulSet) bool { func allowsBurst(set *apps.StatefulSet) bool {
return set.Annotations[apps.StatefulSetBurstAnnotation] == "true" return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
} }
// newControllerRef returns an ControllerRef pointing to a given StatefulSet. // newControllerRef returns an ControllerRef pointing to a given StatefulSet.