diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index fa9f7ae6992..f55b7d1d67c 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -19,6 +19,7 @@ package statefulset import ( "context" "sort" + "sync" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -30,8 +31,12 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/features" + "k8s.io/utils/integer" ) +// Realistic value for maximum in-flight requests when processing in parallel mode. +const MaxBatchSize = 500 + // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation. type StatefulSetControlInterface interface { @@ -87,7 +92,11 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions) if err != nil { - return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}) + errs := []error{err} + if agg, ok := err.(utilerrors.Aggregate); ok { + errs = agg.Errors() + } + return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision))) } // maintain the set's revision history limit @@ -269,6 +278,244 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( return currentRevision, updateRevision, collisionCount, nil } +func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) { + successes := 0 + j := 0 + for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) { + errCh := make(chan error, batchSize) + var wg sync.WaitGroup + wg.Add(batchSize) + for i := 0; i < batchSize; i++ { + go func(k int) { + defer wg.Done() + // Ignore the first parameter - relevant for monotonic only. + if _, err := fn(k); err != nil { + errCh <- err + } + }(j) + j++ + } + wg.Wait() + successes += batchSize - len(errCh) + close(errCh) + if len(errCh) > 0 { + errs := make([]error, 0) + for err := range errCh { + errs = append(errs, err) + } + return successes, utilerrors.NewAggregate(errs) + } + remaining -= batchSize + } + return successes, nil +} + +type replicaStatus struct { + replicas int32 + readyReplicas int32 + availableReplicas int32 + currentReplicas int32 + updatedReplicas int32 +} + +func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus { + status := replicaStatus{} + for _, pod := range pods { + if isCreated(pod) { + status.replicas++ + } + + // count the number of running and ready replicas + if isRunningAndReady(pod) { + status.readyReplicas++ + // count the number of running and available replicas + if isRunningAndAvailable(pod, minReadySeconds) { + status.availableReplicas++ + } + + } + + // count the number of current and update replicas + if isCreated(pod) && !isTerminating(pod) { + if getPodRevision(pod) == currentRevision.Name { + status.currentReplicas++ + } + if getPodRevision(pod) == updateRevision.Name { + status.updatedReplicas++ + } + } + } + return status +} + +func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) { + status.Replicas = 0 + status.ReadyReplicas = 0 + status.AvailableReplicas = 0 + status.CurrentReplicas = 0 + status.UpdatedReplicas = 0 + for _, list := range podLists { + replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision) + status.Replicas += replicaStatus.replicas + status.ReadyReplicas += replicaStatus.readyReplicas + status.AvailableReplicas += replicaStatus.availableReplicas + status.CurrentReplicas += replicaStatus.currentReplicas + status.UpdatedReplicas += replicaStatus.updatedReplicas + } +} + +func (ssc *defaultStatefulSetControl) processReplica( + ctx context.Context, + set *apps.StatefulSet, + currentRevision *apps.ControllerRevision, + updateRevision *apps.ControllerRevision, + currentSet *apps.StatefulSet, + updateSet *apps.StatefulSet, + monotonic bool, + replicas []*v1.Pod, + i int) (bool, error) { + logger := klog.FromContext(ctx) + // delete and recreate failed pods + if isFailed(replicas[i]) { + ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", + "StatefulSet %s/%s is recreating failed Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { + return true, err + } + replicaOrd := i + getStartOrdinal(set) + replicas[i] = newVersionedStatefulSetPod( + currentSet, + updateSet, + currentRevision.Name, + updateRevision.Name, + replicaOrd) + } + // If we find a Pod that has not been created we create the Pod + if !isCreated(replicas[i]) { + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { + return true, err + } else if isStale { + // If a pod has a stale PVC, no more work can be done this round. + return true, err + } + } + if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { + return true, err + } + if monotonic { + // if the set does not allow bursting, return immediately + return true, nil + } + } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + logger.V(4).Info( + "StatefulSet is triggering PVC creation for pending Pod", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { + return true, err + } + } + + // If we find a Pod that is currently terminating, we must wait until graceful deletion + // completes before we continue to make progress. + if isTerminating(replicas[i]) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // If we have a Pod that has been created but is not running and ready we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Running and Ready. + if !isRunningAndReady(replicas[i]) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // If we have a Pod that has been created but is not available we can not make progress. + // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its + // ordinal, are Available. + if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available", + "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) + return true, nil + } + + // Enforce the StatefulSet invariants + retentionMatch := true + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + var err error + retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) + // An error is expected if the pod is not yet fully updated, and so return is treated as matching. + if err != nil { + retentionMatch = true + } + } + + if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { + return false, nil + } + + // Make a deep copy so we don't mutate the shared cache + replica := replicas[i].DeepCopy() + if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { + return true, err + } + + return false, nil +} + +func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) { + logger := klog.FromContext(ctx) + if isTerminating(condemned[i]) { + // if we are in monotonic mode, block and wait for terminating pods to expire + if monotonic { + logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + return true, nil + } + return false, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block + if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + return true, nil + } + // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. + if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod { + logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) + return true, nil + } + + logger.V(2).Info("Pod of StatefulSet is terminating for scale down", + "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) + return true, ssc.podControl.DeleteStatefulPod(set, condemned[i]) +} + +func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) { + if monotonic { + for i := range pods { + if shouldExit, err := fn(i); shouldExit || err != nil { + return true, err + } + } + } else { + if _, err := slowStartBatch(1, len(pods), fn); err != nil { + return true, err + } + } + return false, nil +} + // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in // the set in order to conform the system to the target state for the set. The target state always contains // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is @@ -304,6 +551,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( status.CollisionCount = new(int32) *status.CollisionCount = collisionCount + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods) + replicaCount := int(*set.Spec.Replicas) // slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set) replicas := make([]*v1.Pod, replicaCount) @@ -314,28 +563,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // First we partition pods into two lists valid replicas and condemned Pods for _, pod := range pods { - status.Replicas++ - - // count the number of running and ready replicas - if isRunningAndReady(pod) { - status.ReadyReplicas++ - // count the number of running and available replicas - if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) { - status.AvailableReplicas++ - } - - } - - // count the number of current and update replicas - if isCreated(pod) && !isTerminating(pod) { - if getPodRevision(pod) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(pod) == updateRevision.Name { - status.UpdatedReplicas++ - } - } - if podInOrdinalRange(pod, set) { // if the ordinal of the pod is within the range of the current number of replicas, // insert it at the indirection of its ordinal @@ -360,7 +587,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } // sort the condemned Pods by their ordinals - sort.Sort(ascendingOrdinal(condemned)) + sort.Sort(descendingOrdinal(condemned)) // find the first unhealthy Pod for i := range replicas { @@ -372,7 +599,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( } } - for i := range condemned { + // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use) + for i := len(condemned) - 1; i >= 0; i-- { if !isHealthy(condemned[i]) { unhealthy++ if firstUnhealthyPod == nil { @@ -393,169 +621,48 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( monotonic := !allowsBurst(set) - // Examine each replica with respect to its ordinal - for i := range replicas { - // delete and recreate failed pods - if isFailed(replicas[i]) { - ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", - "StatefulSet %s/%s is recreating failed Pod %s", - set.Namespace, - set.Name, - replicas[i].Name) - if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { - return &status, err - } - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas-- - } - status.Replicas-- - replicaOrd := i + getStartOrdinal(set) - replicas[i] = newVersionedStatefulSetPod( - currentSet, - updateSet, - currentRevision.Name, - updateRevision.Name, - replicaOrd) - } - // If we find a Pod that has not been created we create the Pod - if !isCreated(replicas[i]) { - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { - return &status, err - } else if isStale { - // If a pod has a stale PVC, no more work can be done this round. - return &status, err + // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. + processReplicaFn := func(i int) (bool, error) { + return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) + } + if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err + } + + // Fix pod claims for condemned pods, if necessary. + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + fixPodClaim := func(i int) (bool, error) { + if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { + return true, err + } else if !matchPolicy { + if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { + return true, err } } - if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { - return &status, err - } - status.Replicas++ - if getPodRevision(replicas[i]) == currentRevision.Name { - status.CurrentReplicas++ - } - if getPodRevision(replicas[i]) == updateRevision.Name { - status.UpdatedReplicas++ - } - // if the set does not allow bursting, return immediately - if monotonic { - return &status, nil - } - // pod created, no more work possible for this round - continue + return false, nil } - - // If the Pod is in pending state then trigger PVC creation to create missing PVCs - if isPending(replicas[i]) { - logger.V(4).Info("StatefulSet is triggering PVC Creation for pending Pod", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { - return &status, err - } - } - // If we find a Pod that is currently terminating, we must wait until graceful deletion - // completes before we continue to make progress. - if isTerminating(replicas[i]) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // If we have a Pod that has been created but is not running and ready we can not make progress. - // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its - // ordinal, are Running and Ready. - if !isRunningAndReady(replicas[i]) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // If we have a Pod that has been created but is not available we can not make progress. - // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its - // ordinal, are Available. - if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { - logger.V(4).Info("StatefulSet is waiting for Pod to be Available", - "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) - return &status, nil - } - // Enforce the StatefulSet invariants - retentionMatch := true - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - var err error - retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) - // An error is expected if the pod is not yet fully updated, and so return is treated as matching. - if err != nil { - retentionMatch = true - } - } - if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { - continue - } - // Make a deep copy so we don't mutate the shared cache - replica := replicas[i].DeepCopy() - if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { + if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } } - if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { - // Ensure ownerRefs are set correctly for the condemned pods. - for i := range condemned { - if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { - return &status, err - } else if !matchPolicy { - if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { - return &status, err - } - } - } - } - - // At this point, all of the current Replicas are Running, Ready and Available, we can consider termination. + // At this point, in monotonic mode all of the current Replicas are Running, Ready and Available, + // and we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. // We will terminate Pods in a monotonically decreasing order. // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over // updates. - for target := len(condemned) - 1; target >= 0; target-- { - // wait for terminating pods to expire - if isTerminating(condemned[target]) { - logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) - // block if we are in monotonic mode - if monotonic { - return &status, nil - } - continue - } - // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block - if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod { - logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) - return &status, nil - } - // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. - if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod { - logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) - return &status, nil - } - logger.V(2).Info("Pod of StatefulSet is terminating for scale down", - "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target])) - - if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil { - return &status, err - } - if getPodRevision(condemned[target]) == currentRevision.Name { - status.CurrentReplicas-- - } - if getPodRevision(condemned[target]) == updateRevision.Name { - status.UpdatedReplicas-- - } - if monotonic { - return &status, nil - } + processCondemnedFn := func(i int) (bool, error) { + return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i) } + if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil { + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) + return &status, err + } + + updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index f12fba34e3c..c7fe131996c 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -26,6 +26,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -2357,22 +2358,58 @@ func TestStatefulSetStatusUpdate(t *testing.T) { } type requestTracker struct { + sync.Mutex requests int err error after int + + parallelLock sync.Mutex + parallel int + maxParallel int + + delay time.Duration } func (rt *requestTracker) errorReady() bool { + rt.Lock() + defer rt.Unlock() return rt.err != nil && rt.requests >= rt.after } func (rt *requestTracker) inc() { + rt.parallelLock.Lock() + rt.parallel++ + if rt.maxParallel < rt.parallel { + rt.maxParallel = rt.parallel + } + rt.parallelLock.Unlock() + + rt.Lock() + defer rt.Unlock() rt.requests++ + if rt.delay != 0 { + time.Sleep(rt.delay) + } } func (rt *requestTracker) reset() { + rt.parallelLock.Lock() + rt.parallel = 0 + rt.parallelLock.Unlock() + + rt.Lock() + defer rt.Unlock() rt.err = nil rt.after = 0 + rt.delay = 0 +} + +func newRequestTracker(requests int, err error, after int) requestTracker { + return requestTracker{ + requests: requests, + err: err, + after: after, + } } type fakeObjectManager struct { @@ -2402,9 +2439,10 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake claimInformer.Informer().GetIndexer(), setInformer.Informer().GetIndexer(), revisionInformer.Informer().GetIndexer(), - requestTracker{0, nil, 0}, - requestTracker{0, nil, 0}, - requestTracker{0, nil, 0}} + newRequestTracker(0, nil, 0), + newRequestTracker(0, nil, 0), + newRequestTracker(0, nil, 0), + } } func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { @@ -2619,7 +2657,7 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform return &fakeStatefulSetStatusUpdater{ setInformer.Lister(), setInformer.Informer().GetIndexer(), - requestTracker{0, nil, 0}, + newRequestTracker(0, nil, 0), } } @@ -2834,6 +2872,182 @@ func fakeResourceVersion(object interface{}) { } } +func TestParallelScale(t *testing.T) { + for _, tc := range []struct { + desc string + replicas int32 + desiredReplicas int32 + }{ + { + desc: "scale up from 3 to 30", + replicas: 3, + desiredReplicas: 30, + }, + { + desc: "scale down from 10 to 1", + replicas: 10, + desiredReplicas: 1, + }, + + { + desc: "scale down to 0", + replicas: 501, + desiredReplicas: 0, + }, + { + desc: "scale up from 0", + replicas: 0, + desiredReplicas: 1000, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + set := burst(newStatefulSet(0)) + parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants) + }) + } + +} + +func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) { + var err error + diff := desiredReplicas - replicas + client := fake.NewSimpleClientset(set) + om, _, ssc := setupController(client) + om.createPodTracker.delay = time.Millisecond + + *set.Spec.Replicas = replicas + if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil { + t.Errorf("Failed to turn up StatefulSet : %s", err) + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + if set.Status.Replicas != replicas { + t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas) + } + + fn := parallelScaleUpStatefulSetControl + if diff < 0 { + fn = parallelScaleDownStatefulSetControl + } + *set.Spec.Replicas = desiredReplicas + if err := fn(set, ssc, om, invariants); err != nil { + t.Errorf("Failed to scale StatefulSet : %s", err) + } + + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatalf("Error getting updated StatefulSet: %v", err) + } + + if set.Status.Replicas != desiredReplicas { + t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas) + } + + if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 { + t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel) + } +} + +func parallelScaleUpStatefulSetControl(set *apps.StatefulSet, + ssc StatefulSetControlInterface, + om *fakeObjectManager, + invariants invariantFunc) error { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return err + } + + // Give up after 2 loops. + // 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases. + // Anything slower than that (requiring more iterations) indicates a problem and should fail the test. + maxLoops := 2 + loops := maxLoops + for set.Status.Replicas < *set.Spec.Replicas { + if loops < 1 { + return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas) + } + loops-- + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return err + } + sort.Sort(ascendingOrdinal(pods)) + + ordinals := []int{} + for _, pod := range pods { + if pod.Status.Phase == "" { + ordinals = append(ordinals, getOrdinal(pod)) + } + } + // ensure all pods are valid (have a phase) + for _, ord := range ordinals { + if pods, err = om.setPodPending(set, ord); err != nil { + return err + } + } + + // run the controller once and check invariants + _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods) + if err != nil { + return err + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if err := invariants(set, om); err != nil { + return err + } + } + return invariants(set, om) +} + +func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error { + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return err + } + + // Give up after 2 loops. + // 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases. + // Anything slower than that (requiring more iterations) indicates a problem and should fail the test. + maxLoops := 2 + loops := maxLoops + for set.Status.Replicas > *set.Spec.Replicas { + if loops < 1 { + return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas) + } + loops-- + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + return err + } + sort.Sort(ascendingOrdinal(pods)) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + return err + } + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + return err + } + } + + set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + return err + } + if err := invariants(set, om); err != nil { + return err + } + + return nil +} + func scaleUpStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index a741047f75f..5ab1f9af114 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -602,6 +602,23 @@ func (ao ascendingOrdinal) Less(i, j int) bool { return getOrdinal(ao[i]) < getOrdinal(ao[j]) } +// descendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted +// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed +// to the end of the list. +type descendingOrdinal []*v1.Pod + +func (do descendingOrdinal) Len() int { + return len(do) +} + +func (do descendingOrdinal) Swap(i, j int) { + do[i], do[j] = do[j], do[i] +} + +func (do descendingOrdinal) Less(i, j int) bool { + return getOrdinal(do[i]) > getOrdinal(do[j]) +} + // getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count // and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is // not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.