Merge pull request #117865 from aleksandra-malinowska/parallel-sts-3
Parallel StatefulSet pod create & delete
This commit is contained in:
commit
229dd79efd
@ -19,6 +19,7 @@ package statefulset
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
@ -30,8 +31,12 @@ import (
|
|||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/controller/history"
|
"k8s.io/kubernetes/pkg/controller/history"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
"k8s.io/utils/integer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Realistic value for maximum in-flight requests when processing in parallel mode.
|
||||||
|
const MaxBatchSize = 500
|
||||||
|
|
||||||
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
|
// StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
|
||||||
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
|
// as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
|
||||||
type StatefulSetControlInterface interface {
|
type StatefulSetControlInterface interface {
|
||||||
@ -87,7 +92,11 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
|
|||||||
|
|
||||||
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
|
currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
|
errs := []error{err}
|
||||||
|
if agg, ok := err.(utilerrors.Aggregate); ok {
|
||||||
|
errs = agg.Errors()
|
||||||
|
}
|
||||||
|
return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// maintain the set's revision history limit
|
// maintain the set's revision history limit
|
||||||
@ -269,6 +278,244 @@ func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
|
|||||||
return currentRevision, updateRevision, collisionCount, nil
|
return currentRevision, updateRevision, collisionCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
|
||||||
|
successes := 0
|
||||||
|
j := 0
|
||||||
|
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(integer.IntMin(2*batchSize, remaining), MaxBatchSize) {
|
||||||
|
errCh := make(chan error, batchSize)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(batchSize)
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
go func(k int) {
|
||||||
|
defer wg.Done()
|
||||||
|
// Ignore the first parameter - relevant for monotonic only.
|
||||||
|
if _, err := fn(k); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
}(j)
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
successes += batchSize - len(errCh)
|
||||||
|
close(errCh)
|
||||||
|
if len(errCh) > 0 {
|
||||||
|
errs := make([]error, 0)
|
||||||
|
for err := range errCh {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
return successes, utilerrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
remaining -= batchSize
|
||||||
|
}
|
||||||
|
return successes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type replicaStatus struct {
|
||||||
|
replicas int32
|
||||||
|
readyReplicas int32
|
||||||
|
availableReplicas int32
|
||||||
|
currentReplicas int32
|
||||||
|
updatedReplicas int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
|
||||||
|
status := replicaStatus{}
|
||||||
|
for _, pod := range pods {
|
||||||
|
if isCreated(pod) {
|
||||||
|
status.replicas++
|
||||||
|
}
|
||||||
|
|
||||||
|
// count the number of running and ready replicas
|
||||||
|
if isRunningAndReady(pod) {
|
||||||
|
status.readyReplicas++
|
||||||
|
// count the number of running and available replicas
|
||||||
|
if isRunningAndAvailable(pod, minReadySeconds) {
|
||||||
|
status.availableReplicas++
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// count the number of current and update replicas
|
||||||
|
if isCreated(pod) && !isTerminating(pod) {
|
||||||
|
if getPodRevision(pod) == currentRevision.Name {
|
||||||
|
status.currentReplicas++
|
||||||
|
}
|
||||||
|
if getPodRevision(pod) == updateRevision.Name {
|
||||||
|
status.updatedReplicas++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
|
||||||
|
status.Replicas = 0
|
||||||
|
status.ReadyReplicas = 0
|
||||||
|
status.AvailableReplicas = 0
|
||||||
|
status.CurrentReplicas = 0
|
||||||
|
status.UpdatedReplicas = 0
|
||||||
|
for _, list := range podLists {
|
||||||
|
replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
|
||||||
|
status.Replicas += replicaStatus.replicas
|
||||||
|
status.ReadyReplicas += replicaStatus.readyReplicas
|
||||||
|
status.AvailableReplicas += replicaStatus.availableReplicas
|
||||||
|
status.CurrentReplicas += replicaStatus.currentReplicas
|
||||||
|
status.UpdatedReplicas += replicaStatus.updatedReplicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ssc *defaultStatefulSetControl) processReplica(
|
||||||
|
ctx context.Context,
|
||||||
|
set *apps.StatefulSet,
|
||||||
|
currentRevision *apps.ControllerRevision,
|
||||||
|
updateRevision *apps.ControllerRevision,
|
||||||
|
currentSet *apps.StatefulSet,
|
||||||
|
updateSet *apps.StatefulSet,
|
||||||
|
monotonic bool,
|
||||||
|
replicas []*v1.Pod,
|
||||||
|
i int) (bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
// delete and recreate failed pods
|
||||||
|
if isFailed(replicas[i]) {
|
||||||
|
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
||||||
|
"StatefulSet %s/%s is recreating failed Pod %s",
|
||||||
|
set.Namespace,
|
||||||
|
set.Name,
|
||||||
|
replicas[i].Name)
|
||||||
|
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
replicaOrd := i + getStartOrdinal(set)
|
||||||
|
replicas[i] = newVersionedStatefulSetPod(
|
||||||
|
currentSet,
|
||||||
|
updateSet,
|
||||||
|
currentRevision.Name,
|
||||||
|
updateRevision.Name,
|
||||||
|
replicaOrd)
|
||||||
|
}
|
||||||
|
// If we find a Pod that has not been created we create the Pod
|
||||||
|
if !isCreated(replicas[i]) {
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||||
|
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
||||||
|
return true, err
|
||||||
|
} else if isStale {
|
||||||
|
// If a pod has a stale PVC, no more work can be done this round.
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
if monotonic {
|
||||||
|
// if the set does not allow bursting, return immediately
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
||||||
|
if isPending(replicas[i]) {
|
||||||
|
logger.V(4).Info(
|
||||||
|
"StatefulSet is triggering PVC creation for pending Pod",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||||
|
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
||||||
|
// completes before we continue to make progress.
|
||||||
|
if isTerminating(replicas[i]) && monotonic {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
||||||
|
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||||
|
// ordinal, are Running and Ready.
|
||||||
|
if !isRunningAndReady(replicas[i]) && monotonic {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we have a Pod that has been created but is not available we can not make progress.
|
||||||
|
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
||||||
|
// ordinal, are Available.
|
||||||
|
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enforce the StatefulSet invariants
|
||||||
|
retentionMatch := true
|
||||||
|
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||||
|
var err error
|
||||||
|
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
|
||||||
|
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
||||||
|
if err != nil {
|
||||||
|
retentionMatch = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a deep copy so we don't mutate the shared cache
|
||||||
|
replica := replicas[i].DeepCopy()
|
||||||
|
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
if isTerminating(condemned[i]) {
|
||||||
|
// if we are in monotonic mode, block and wait for terminating pods to expire
|
||||||
|
if monotonic {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
||||||
|
if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
|
||||||
|
if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
|
||||||
|
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
|
||||||
|
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
|
||||||
|
return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
|
||||||
|
if monotonic {
|
||||||
|
for i := range pods {
|
||||||
|
if shouldExit, err := fn(i); shouldExit || err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if _, err := slowStartBatch(1, len(pods), fn); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
// updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
|
||||||
// the set in order to conform the system to the target state for the set. The target state always contains
|
// the set in order to conform the system to the target state for the set. The target state always contains
|
||||||
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
// set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
|
||||||
@ -304,6 +551,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
status.CollisionCount = new(int32)
|
status.CollisionCount = new(int32)
|
||||||
*status.CollisionCount = collisionCount
|
*status.CollisionCount = collisionCount
|
||||||
|
|
||||||
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
|
||||||
|
|
||||||
replicaCount := int(*set.Spec.Replicas)
|
replicaCount := int(*set.Spec.Replicas)
|
||||||
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
|
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
|
||||||
replicas := make([]*v1.Pod, replicaCount)
|
replicas := make([]*v1.Pod, replicaCount)
|
||||||
@ -314,28 +563,6 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
|
|
||||||
// First we partition pods into two lists valid replicas and condemned Pods
|
// First we partition pods into two lists valid replicas and condemned Pods
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
status.Replicas++
|
|
||||||
|
|
||||||
// count the number of running and ready replicas
|
|
||||||
if isRunningAndReady(pod) {
|
|
||||||
status.ReadyReplicas++
|
|
||||||
// count the number of running and available replicas
|
|
||||||
if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) {
|
|
||||||
status.AvailableReplicas++
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// count the number of current and update replicas
|
|
||||||
if isCreated(pod) && !isTerminating(pod) {
|
|
||||||
if getPodRevision(pod) == currentRevision.Name {
|
|
||||||
status.CurrentReplicas++
|
|
||||||
}
|
|
||||||
if getPodRevision(pod) == updateRevision.Name {
|
|
||||||
status.UpdatedReplicas++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if podInOrdinalRange(pod, set) {
|
if podInOrdinalRange(pod, set) {
|
||||||
// if the ordinal of the pod is within the range of the current number of replicas,
|
// if the ordinal of the pod is within the range of the current number of replicas,
|
||||||
// insert it at the indirection of its ordinal
|
// insert it at the indirection of its ordinal
|
||||||
@ -360,7 +587,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// sort the condemned Pods by their ordinals
|
// sort the condemned Pods by their ordinals
|
||||||
sort.Sort(ascendingOrdinal(condemned))
|
sort.Sort(descendingOrdinal(condemned))
|
||||||
|
|
||||||
// find the first unhealthy Pod
|
// find the first unhealthy Pod
|
||||||
for i := range replicas {
|
for i := range replicas {
|
||||||
@ -372,7 +599,8 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range condemned {
|
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
|
||||||
|
for i := len(condemned) - 1; i >= 0; i-- {
|
||||||
if !isHealthy(condemned[i]) {
|
if !isHealthy(condemned[i]) {
|
||||||
unhealthy++
|
unhealthy++
|
||||||
if firstUnhealthyPod == nil {
|
if firstUnhealthyPod == nil {
|
||||||
@ -393,169 +621,48 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
|
|||||||
|
|
||||||
monotonic := !allowsBurst(set)
|
monotonic := !allowsBurst(set)
|
||||||
|
|
||||||
// Examine each replica with respect to its ordinal
|
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
|
||||||
for i := range replicas {
|
processReplicaFn := func(i int) (bool, error) {
|
||||||
// delete and recreate failed pods
|
return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
|
||||||
if isFailed(replicas[i]) {
|
}
|
||||||
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
|
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
|
||||||
"StatefulSet %s/%s is recreating failed Pod %s",
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
set.Namespace,
|
return &status, err
|
||||||
set.Name,
|
}
|
||||||
replicas[i].Name)
|
|
||||||
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
|
// Fix pod claims for condemned pods, if necessary.
|
||||||
return &status, err
|
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
||||||
}
|
fixPodClaim := func(i int) (bool, error) {
|
||||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
||||||
status.CurrentReplicas--
|
return true, err
|
||||||
}
|
} else if !matchPolicy {
|
||||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
||||||
status.UpdatedReplicas--
|
return true, err
|
||||||
}
|
|
||||||
status.Replicas--
|
|
||||||
replicaOrd := i + getStartOrdinal(set)
|
|
||||||
replicas[i] = newVersionedStatefulSetPod(
|
|
||||||
currentSet,
|
|
||||||
updateSet,
|
|
||||||
currentRevision.Name,
|
|
||||||
updateRevision.Name,
|
|
||||||
replicaOrd)
|
|
||||||
}
|
|
||||||
// If we find a Pod that has not been created we create the Pod
|
|
||||||
if !isCreated(replicas[i]) {
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
|
||||||
if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
|
|
||||||
return &status, err
|
|
||||||
} else if isStale {
|
|
||||||
// If a pod has a stale PVC, no more work can be done this round.
|
|
||||||
return &status, err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
|
return false, nil
|
||||||
return &status, err
|
|
||||||
}
|
|
||||||
status.Replicas++
|
|
||||||
if getPodRevision(replicas[i]) == currentRevision.Name {
|
|
||||||
status.CurrentReplicas++
|
|
||||||
}
|
|
||||||
if getPodRevision(replicas[i]) == updateRevision.Name {
|
|
||||||
status.UpdatedReplicas++
|
|
||||||
}
|
|
||||||
// if the set does not allow bursting, return immediately
|
|
||||||
if monotonic {
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
// pod created, no more work possible for this round
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
|
||||||
// If the Pod is in pending state then trigger PVC creation to create missing PVCs
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
if isPending(replicas[i]) {
|
|
||||||
logger.V(4).Info("StatefulSet is triggering PVC Creation for pending Pod",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
|
||||||
if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
|
|
||||||
return &status, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If we find a Pod that is currently terminating, we must wait until graceful deletion
|
|
||||||
// completes before we continue to make progress.
|
|
||||||
if isTerminating(replicas[i]) && monotonic {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
// If we have a Pod that has been created but is not running and ready we can not make progress.
|
|
||||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
|
||||||
// ordinal, are Running and Ready.
|
|
||||||
if !isRunningAndReady(replicas[i]) && monotonic {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
// If we have a Pod that has been created but is not available we can not make progress.
|
|
||||||
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
|
|
||||||
// ordinal, are Available.
|
|
||||||
if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
// Enforce the StatefulSet invariants
|
|
||||||
retentionMatch := true
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
|
||||||
var err error
|
|
||||||
retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
|
|
||||||
// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
|
|
||||||
if err != nil {
|
|
||||||
retentionMatch = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Make a deep copy so we don't mutate the shared cache
|
|
||||||
replica := replicas[i].DeepCopy()
|
|
||||||
if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
|
|
||||||
return &status, err
|
return &status, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
|
// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
|
||||||
// Ensure ownerRefs are set correctly for the condemned pods.
|
// and we can consider termination.
|
||||||
for i := range condemned {
|
|
||||||
if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
|
||||||
return &status, err
|
|
||||||
} else if !matchPolicy {
|
|
||||||
if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
|
|
||||||
return &status, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
|
|
||||||
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
|
||||||
// We will terminate Pods in a monotonically decreasing order.
|
// We will terminate Pods in a monotonically decreasing order.
|
||||||
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
|
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
|
||||||
// updates.
|
// updates.
|
||||||
for target := len(condemned) - 1; target >= 0; target-- {
|
processCondemnedFn := func(i int) (bool, error) {
|
||||||
// wait for terminating pods to expire
|
return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
|
||||||
if isTerminating(condemned[target]) {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
|
|
||||||
// block if we are in monotonic mode
|
|
||||||
if monotonic {
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
|
|
||||||
if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
|
|
||||||
if !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
|
|
||||||
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
|
|
||||||
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[target]))
|
|
||||||
|
|
||||||
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
|
|
||||||
return &status, err
|
|
||||||
}
|
|
||||||
if getPodRevision(condemned[target]) == currentRevision.Name {
|
|
||||||
status.CurrentReplicas--
|
|
||||||
}
|
|
||||||
if getPodRevision(condemned[target]) == updateRevision.Name {
|
|
||||||
status.UpdatedReplicas--
|
|
||||||
}
|
|
||||||
if monotonic {
|
|
||||||
return &status, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
|
||||||
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
|
return &status, err
|
||||||
|
}
|
||||||
|
|
||||||
|
updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
|
||||||
|
|
||||||
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
|
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
|
||||||
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
|
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -2357,22 +2358,58 @@ func TestStatefulSetStatusUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type requestTracker struct {
|
type requestTracker struct {
|
||||||
|
sync.Mutex
|
||||||
requests int
|
requests int
|
||||||
err error
|
err error
|
||||||
after int
|
after int
|
||||||
|
|
||||||
|
parallelLock sync.Mutex
|
||||||
|
parallel int
|
||||||
|
maxParallel int
|
||||||
|
|
||||||
|
delay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) errorReady() bool {
|
func (rt *requestTracker) errorReady() bool {
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
return rt.err != nil && rt.requests >= rt.after
|
return rt.err != nil && rt.requests >= rt.after
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) inc() {
|
func (rt *requestTracker) inc() {
|
||||||
|
rt.parallelLock.Lock()
|
||||||
|
rt.parallel++
|
||||||
|
if rt.maxParallel < rt.parallel {
|
||||||
|
rt.maxParallel = rt.parallel
|
||||||
|
}
|
||||||
|
rt.parallelLock.Unlock()
|
||||||
|
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
rt.requests++
|
rt.requests++
|
||||||
|
if rt.delay != 0 {
|
||||||
|
time.Sleep(rt.delay)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rt *requestTracker) reset() {
|
func (rt *requestTracker) reset() {
|
||||||
|
rt.parallelLock.Lock()
|
||||||
|
rt.parallel = 0
|
||||||
|
rt.parallelLock.Unlock()
|
||||||
|
|
||||||
|
rt.Lock()
|
||||||
|
defer rt.Unlock()
|
||||||
rt.err = nil
|
rt.err = nil
|
||||||
rt.after = 0
|
rt.after = 0
|
||||||
|
rt.delay = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRequestTracker(requests int, err error, after int) requestTracker {
|
||||||
|
return requestTracker{
|
||||||
|
requests: requests,
|
||||||
|
err: err,
|
||||||
|
after: after,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeObjectManager struct {
|
type fakeObjectManager struct {
|
||||||
@ -2402,9 +2439,10 @@ func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fake
|
|||||||
claimInformer.Informer().GetIndexer(),
|
claimInformer.Informer().GetIndexer(),
|
||||||
setInformer.Informer().GetIndexer(),
|
setInformer.Informer().GetIndexer(),
|
||||||
revisionInformer.Informer().GetIndexer(),
|
revisionInformer.Informer().GetIndexer(),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
requestTracker{0, nil, 0}}
|
newRequestTracker(0, nil, 0),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
|
||||||
@ -2619,7 +2657,7 @@ func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInform
|
|||||||
return &fakeStatefulSetStatusUpdater{
|
return &fakeStatefulSetStatusUpdater{
|
||||||
setInformer.Lister(),
|
setInformer.Lister(),
|
||||||
setInformer.Informer().GetIndexer(),
|
setInformer.Informer().GetIndexer(),
|
||||||
requestTracker{0, nil, 0},
|
newRequestTracker(0, nil, 0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2834,6 +2872,182 @@ func fakeResourceVersion(object interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParallelScale(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
replicas int32
|
||||||
|
desiredReplicas int32
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "scale up from 3 to 30",
|
||||||
|
replicas: 3,
|
||||||
|
desiredReplicas: 30,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "scale down from 10 to 1",
|
||||||
|
replicas: 10,
|
||||||
|
desiredReplicas: 1,
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
desc: "scale down to 0",
|
||||||
|
replicas: 501,
|
||||||
|
desiredReplicas: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "scale up from 0",
|
||||||
|
replicas: 0,
|
||||||
|
desiredReplicas: 1000,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
set := burst(newStatefulSet(0))
|
||||||
|
parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) {
|
||||||
|
var err error
|
||||||
|
diff := desiredReplicas - replicas
|
||||||
|
client := fake.NewSimpleClientset(set)
|
||||||
|
om, _, ssc := setupController(client)
|
||||||
|
om.createPodTracker.delay = time.Millisecond
|
||||||
|
|
||||||
|
*set.Spec.Replicas = replicas
|
||||||
|
if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
|
||||||
|
t.Errorf("Failed to turn up StatefulSet : %s", err)
|
||||||
|
}
|
||||||
|
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
if set.Status.Replicas != replicas {
|
||||||
|
t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := parallelScaleUpStatefulSetControl
|
||||||
|
if diff < 0 {
|
||||||
|
fn = parallelScaleDownStatefulSetControl
|
||||||
|
}
|
||||||
|
*set.Spec.Replicas = desiredReplicas
|
||||||
|
if err := fn(set, ssc, om, invariants); err != nil {
|
||||||
|
t.Errorf("Failed to scale StatefulSet : %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if set.Status.Replicas != desiredReplicas {
|
||||||
|
t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
|
||||||
|
t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
|
||||||
|
ssc StatefulSetControlInterface,
|
||||||
|
om *fakeObjectManager,
|
||||||
|
invariants invariantFunc) error {
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give up after 2 loops.
|
||||||
|
// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
|
||||||
|
// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
|
||||||
|
maxLoops := 2
|
||||||
|
loops := maxLoops
|
||||||
|
for set.Status.Replicas < *set.Spec.Replicas {
|
||||||
|
if loops < 1 {
|
||||||
|
return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
|
||||||
|
}
|
||||||
|
loops--
|
||||||
|
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sort.Sort(ascendingOrdinal(pods))
|
||||||
|
|
||||||
|
ordinals := []int{}
|
||||||
|
for _, pod := range pods {
|
||||||
|
if pod.Status.Phase == "" {
|
||||||
|
ordinals = append(ordinals, getOrdinal(pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ensure all pods are valid (have a phase)
|
||||||
|
for _, ord := range ordinals {
|
||||||
|
if pods, err = om.setPodPending(set, ord); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the controller once and check invariants
|
||||||
|
_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := invariants(set, om); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return invariants(set, om)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give up after 2 loops.
|
||||||
|
// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
|
||||||
|
// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
|
||||||
|
maxLoops := 2
|
||||||
|
loops := maxLoops
|
||||||
|
for set.Status.Replicas > *set.Spec.Replicas {
|
||||||
|
if loops < 1 {
|
||||||
|
return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
|
||||||
|
}
|
||||||
|
loops--
|
||||||
|
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sort.Sort(ascendingOrdinal(pods))
|
||||||
|
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := invariants(set, om); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func scaleUpStatefulSetControl(set *apps.StatefulSet,
|
func scaleUpStatefulSetControl(set *apps.StatefulSet,
|
||||||
ssc StatefulSetControlInterface,
|
ssc StatefulSetControlInterface,
|
||||||
om *fakeObjectManager,
|
om *fakeObjectManager,
|
||||||
|
@ -602,6 +602,23 @@ func (ao ascendingOrdinal) Less(i, j int) bool {
|
|||||||
return getOrdinal(ao[i]) < getOrdinal(ao[j])
|
return getOrdinal(ao[i]) < getOrdinal(ao[j])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// descendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
|
||||||
|
// from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
|
||||||
|
// to the end of the list.
|
||||||
|
type descendingOrdinal []*v1.Pod
|
||||||
|
|
||||||
|
func (do descendingOrdinal) Len() int {
|
||||||
|
return len(do)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (do descendingOrdinal) Swap(i, j int) {
|
||||||
|
do[i], do[j] = do[j], do[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (do descendingOrdinal) Less(i, j int) bool {
|
||||||
|
return getOrdinal(do[i]) > getOrdinal(do[j])
|
||||||
|
}
|
||||||
|
|
||||||
// getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count
|
// getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count
|
||||||
// and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is
|
// and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is
|
||||||
// not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.
|
// not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.
|
||||||
|
Loading…
Reference in New Issue
Block a user