Merge pull request #112744 from pwschuurman/statefulset-slice-impl

Add implementation of KEP-3335, StatefulSetSlice
This commit is contained in:
Kubernetes Prow Robot
2022-11-09 11:12:28 -08:00
committed by GitHub
48 changed files with 2036 additions and 526 deletions

View File

@@ -303,52 +303,53 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
*status.CollisionCount = collisionCount
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
replicas := make([]*v1.Pod, replicaCount)
// slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
// slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set)
condemned := make([]*v1.Pod, 0, len(pods))
unhealthy := 0
var firstUnhealthyPod *v1.Pod
// First we partition pods into two lists valid replicas and condemned Pods
for i := range pods {
for _, pod := range pods {
status.Replicas++
// count the number of running and ready replicas
if isRunningAndReady(pods[i]) {
if isRunningAndReady(pod) {
status.ReadyReplicas++
// count the number of running and available replicas
if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
if isRunningAndAvailable(pod, set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}
}
// count the number of current and update replicas
if isCreated(pods[i]) && !isTerminating(pods[i]) {
if getPodRevision(pods[i]) == currentRevision.Name {
if isCreated(pod) && !isTerminating(pod) {
if getPodRevision(pod) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pods[i]) == updateRevision.Name {
if getPodRevision(pod) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
if podInOrdinalRange(pod, set) {
// if the ordinal of the pod is within the range of the current number of replicas,
// insert it at the indirection of its ordinal
replicas[ord] = pods[i]
} else if ord >= replicaCount {
// if the ordinal is greater than the number of replicas add it to the condemned list
condemned = append(condemned, pods[i])
replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
} else if getOrdinal(pod) >= 0 {
// if the ordinal is valid, but not within the range add it to the condemned list
condemned = append(condemned, pod)
}
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
}
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
replicaIdx := ord - getStartOrdinal(set)
if replicas[replicaIdx] == nil {
replicas[replicaIdx] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
@@ -409,12 +410,13 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
status.UpdatedReplicas--
}
status.Replicas--
replicaOrd := i + getStartOrdinal(set)
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
replicaOrd)
}
// If we find a Pod that has not been created we create the Pod
if !isCreated(replicas[i]) {
@@ -501,7 +503,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// At this point, all of the current Replicas are Running, Ready and Available, we can consider termination.
// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
// We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas).
// We will terminate Pods in a monotonically decreasing order.
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
for target := len(condemned) - 1; target >= 0; target-- {
@@ -614,7 +616,7 @@ func updateStatefulSetAfterInvariantEstablished(
}
}
// Collect all targets in the range between the 0 and Spec.Replicas. Count any targets in that range
// Collect all targets in the range between getStartOrdinal(set) and getEndOrdinal(set). Count any targets in that range
// that are unhealthy i.e. terminated or not running and ready as unavailable). Select the
// (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete
// those pods and count the successful deletions. Update the status with the correct number of deletions.

View File

@@ -559,6 +559,74 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
}
}
func emptyInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
return nil
}
func TestStatefulSetControlWithStartOrdinal(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
simpleSetFn := func() *apps.StatefulSet {
statefulSet := newStatefulSet(3)
statefulSet.Spec.Ordinals = &apps.StatefulSetOrdinals{Start: int32(2)}
return statefulSet
}
testCases := []struct {
fn func(*testing.T, *apps.StatefulSet, invariantFunc)
obj func() *apps.StatefulSet
}{
{CreatesPodsWithStartOrdinal, simpleSetFn},
}
for _, testCase := range testCases {
testObj := testCase.obj
testFn := testCase.fn
set := testObj()
testFn(t, set, emptyInvariants)
}
}
func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)
if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
var err error
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatalf("Error getting updated StatefulSet: %v", err)
}
if set.Status.Replicas != 3 {
t.Error("Failed to scale statefulset to 3 replicas")
}
if set.Status.ReadyReplicas != 3 {
t.Error("Failed to set ReadyReplicas correctly")
}
if set.Status.UpdatedReplicas != 3 {
t.Error("Failed to set UpdatedReplicas correctly")
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
sort.Sort(ascendingOrdinal(pods))
for i, pod := range pods {
expectedOrdinal := 2 + i
actualPodOrdinal := getOrdinal(pod)
if actualPodOrdinal != expectedOrdinal {
t.Errorf("Expected pod ordinal %d. Got %d", expectedOrdinal, actualPodOrdinal)
}
}
}
func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
runTestOverPVCRetentionPolicies(
t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
@@ -2299,6 +2367,16 @@ func (om *fakeObjectManager) SetDeleteStatefulPodError(err error, after int) {
om.deletePodTracker.after = after
}
func findPodByOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
for _, pod := range pods {
if getOrdinal(pod) == ordinal {
return pod.DeepCopy()
}
}
return nil
}
func (om *fakeObjectManager) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
@@ -2308,11 +2386,10 @@ func (om *fakeObjectManager) setPodPending(set *apps.StatefulSet, ordinal int) (
if err != nil {
return nil, err
}
if 0 > ordinal || ordinal >= len(pods) {
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
pod := findPodByOrdinal(pods, ordinal)
if pod == nil {
return nil, fmt.Errorf("setPodPending: pod ordinal %d not found", ordinal)
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal].DeepCopy()
pod.Status.Phase = v1.PodPending
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
@@ -2328,11 +2405,10 @@ func (om *fakeObjectManager) setPodRunning(set *apps.StatefulSet, ordinal int) (
if err != nil {
return nil, err
}
if 0 > ordinal || ordinal >= len(pods) {
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
pod := findPodByOrdinal(pods, ordinal)
if pod == nil {
return nil, fmt.Errorf("setPodRunning: pod ordinal %d not found", ordinal)
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal].DeepCopy()
pod.Status.Phase = v1.PodRunning
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
@@ -2348,11 +2424,10 @@ func (om *fakeObjectManager) setPodReady(set *apps.StatefulSet, ordinal int) ([]
if err != nil {
return nil, err
}
if 0 > ordinal || ordinal >= len(pods) {
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
pod := findPodByOrdinal(pods, ordinal)
if pod == nil {
return nil, fmt.Errorf("setPodReady: pod ordinal %d not found", ordinal)
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal].DeepCopy()
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
podutil.UpdatePodCondition(&pod.Status, &condition)
fakeResourceVersion(pod)
@@ -2369,11 +2444,10 @@ func (om *fakeObjectManager) setPodAvailable(set *apps.StatefulSet, ordinal int,
if err != nil {
return nil, err
}
if 0 > ordinal || ordinal >= len(pods) {
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
pod := findPodByOrdinal(pods, ordinal)
if pod == nil {
return nil, fmt.Errorf("setPodAvailable: pod ordinal %d not found", ordinal)
}
sort.Sort(ascendingOrdinal(pods))
pod := pods[ordinal].DeepCopy()
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}}
_, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type)
if existingCondition != nil {
@@ -2467,28 +2541,28 @@ func assertMonotonicInvariants(set *apps.StatefulSet, om *fakeObjectManager) err
return err
}
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) {
return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name)
for idx := 0; idx < len(pods); idx++ {
if idx > 0 && isRunningAndReady(pods[idx]) && !isRunningAndReady(pods[idx-1]) {
return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[idx].Name, pods[idx-1].Name)
}
if getOrdinal(pods[ord]) != ord {
return fmt.Errorf("pods %s deployed in the wrong order %d", pods[ord].Name, ord)
if ord := idx + getStartOrdinal(set); getOrdinal(pods[idx]) != ord {
return fmt.Errorf("pods %s deployed in the wrong order %d", pods[idx].Name, ord)
}
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
if !storageMatches(set, pods[idx]) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[idx].Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
for _, claim := range getPersistentVolumeClaims(set, pods[idx]) {
claim, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err := checkClaimInvarients(set, pods[ord], claim, ord); err != nil {
if err := checkClaimInvarients(set, pods[idx], claim); err != nil {
return err
}
}
if !identityMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
if !identityMatches(set, pods[idx]) {
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[idx].Name, set.Name)
}
}
return nil
@@ -2504,24 +2578,24 @@ func assertBurstInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
return err
}
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
for _, pod := range pods {
if !storageMatches(set, pod) {
return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
for _, claim := range getPersistentVolumeClaims(set, pod) {
claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err != nil {
return err
}
if err := checkClaimInvarients(set, pods[ord], claim, ord); err != nil {
if err := checkClaimInvarients(set, pod, claim); err != nil {
return err
}
}
if !identityMatches(set, pods[ord]) {
if !identityMatches(set, pod) {
return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
pods[ord].Name,
pod.Name,
set.Name)
}
}
@@ -2538,24 +2612,24 @@ func assertUpdateInvariants(set *apps.StatefulSet, om *fakeObjectManager) error
return err
}
sort.Sort(ascendingOrdinal(pods))
for ord := 0; ord < len(pods); ord++ {
for _, pod := range pods {
if !storageMatches(set, pods[ord]) {
return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name)
if !storageMatches(set, pod) {
return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
}
for _, claim := range getPersistentVolumeClaims(set, pods[ord]) {
for _, claim := range getPersistentVolumeClaims(set, pod) {
claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
if err != nil {
return err
}
if err := checkClaimInvarients(set, pods[ord], claim, ord); err != nil {
if err := checkClaimInvarients(set, pod, claim); err != nil {
return err
}
}
if !identityMatches(set, pods[ord]) {
return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name)
if !identityMatches(set, pod) {
return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pod.Name, set.Name)
}
}
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
@@ -2576,7 +2650,7 @@ func assertUpdateInvariants(set *apps.StatefulSet, om *fakeObjectManager) error
return nil
}
func checkClaimInvarients(set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, ordinal int) error {
func checkClaimInvarients(set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim) error {
policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
@@ -2618,14 +2692,14 @@ func checkClaimInvarients(set *apps.StatefulSet, pod *v1.Pod, claim *v1.Persiste
if hasOwnerRef(claim, set) {
return fmt.Errorf("claim %s has unexpected owner ref on %s for scaledown only", claim.Name, set.Name)
}
if ordinal >= int(*set.Spec.Replicas) && !hasOwnerRef(claim, pod) {
if !podInOrdinalRange(pod, set) && !hasOwnerRef(claim, pod) {
return fmt.Errorf("claim %s does not have owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
}
if ordinal < int(*set.Spec.Replicas) && hasOwnerRef(claim, pod) {
if podInOrdinalRange(pod, set) && hasOwnerRef(claim, pod) {
return fmt.Errorf("claim %s has unexpected owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
}
case policy.WhenScaled == delete && policy.WhenDeleted == delete:
if ordinal >= int(*set.Spec.Replicas) {
if !podInOrdinalRange(pod, set) {
if !hasOwnerRef(claim, pod) || hasOwnerRef(claim, set) {
return fmt.Errorf("condemned claim %s has bad owner refs: %v", claim.Name, claim.GetOwnerReferences())
}
@@ -2666,9 +2740,9 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
sort.Sort(ascendingOrdinal(pods))
// ensure all pods are valid (have a phase)
for ord, pod := range pods {
for _, pod := range pods {
if pod.Status.Phase == "" {
if pods, err = om.setPodPending(set, ord); err != nil {
if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
return err
}
break
@@ -2677,15 +2751,15 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
// select one of the pods and move it forward in status
if len(pods) > 0 {
ord := int(rand.Int63n(int64(len(pods))))
pod := pods[ord]
idx := int(rand.Int63n(int64(len(pods))))
pod := pods[idx]
switch pod.Status.Phase {
case v1.PodPending:
if pods, err = om.setPodRunning(set, ord); err != nil {
if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
return err
}
case v1.PodRunning:
if pods, err = om.setPodReady(set, ord); err != nil {
if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
return err
}
default:
@@ -2720,7 +2794,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
return err
}
sort.Sort(ascendingOrdinal(pods))
if ordinal := len(pods) - 1; ordinal >= 0 {
if idx := len(pods) - 1; idx >= 0 {
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
return err
}
@@ -2728,7 +2802,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
if err != nil {
return err
}
if pods, err = om.addTerminatingPod(set, ordinal); err != nil {
if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
return err
}
if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
@@ -2860,9 +2934,9 @@ func updateStatefulSetControl(set *apps.StatefulSet,
}
sort.Sort(ascendingOrdinal(pods))
initialized := false
for ord, pod := range pods {
for _, pod := range pods {
if pod.Status.Phase == "" {
if pods, err = om.setPodPending(set, ord); err != nil {
if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
return err
}
break
@@ -2873,15 +2947,15 @@ func updateStatefulSetControl(set *apps.StatefulSet,
}
if len(pods) > 0 {
ord := int(rand.Int63n(int64(len(pods))))
pod := pods[ord]
idx := int(rand.Int63n(int64(len(pods))))
pod := pods[idx]
switch pod.Status.Phase {
case v1.PodPending:
if pods, err = om.setPodRunning(set, ord); err != nil {
if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
return err
}
case v1.PodRunning:
if pods, err = om.setPodReady(set, ord); err != nil {
if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
return err
}
default:

View File

@@ -29,11 +29,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/features"
)
var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
@@ -85,6 +87,29 @@ func getOrdinal(pod *v1.Pod) int {
return ordinal
}
// getStartOrdinal gets the first possible ordinal (inclusive).
// Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0.
func getStartOrdinal(set *apps.StatefulSet) int {
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetStartOrdinal) {
if set.Spec.Ordinals != nil {
return int(set.Spec.Ordinals.Start)
}
}
return 0
}
// getEndOrdinal gets the last possible ordinal (inclusive).
func getEndOrdinal(set *apps.StatefulSet) int {
return getStartOrdinal(set) + int(*set.Spec.Replicas) - 1
}
// podInOrdinalRange returns true if the pod ordinal is within the allowed
// range of ordinals that this StatefulSet is set to control.
func podInOrdinalRange(pod *v1.Pod, set *apps.StatefulSet) bool {
ordinal := getOrdinal(pod)
return ordinal >= getStartOrdinal(set) && ordinal <= getEndOrdinal(set)
}
// getPodName gets the name of set's child Pod with an ordinal index of ordinal
func getPodName(set *apps.StatefulSet, ordinal int) string {
return fmt.Sprintf("%s-%d", set.Name, ordinal)
@@ -170,12 +195,12 @@ func claimOwnerMatchesSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.State
if hasOwnerRef(claim, set) {
return false
}
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown != hasOwnerRef(claim, pod) {
return false
}
case policy.WhenScaled == delete && policy.WhenDeleted == delete:
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
// If a pod is scaled down, there should be no set ref and a pod ref;
// if the pod is not scaled down it's the other way around.
if podScaledDown == hasOwnerRef(claim, set) {
@@ -226,7 +251,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.
needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
case policy.WhenScaled == delete && policy.WhenDeleted == retain:
needsUpdate = removeOwnerRef(claim, set) || needsUpdate
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown {
needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
}
@@ -234,7 +259,7 @@ func updateClaimOwnerRefForSetAndPod(claim *v1.PersistentVolumeClaim, set *apps.
needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
}
case policy.WhenScaled == delete && policy.WhenDeleted == delete:
podScaledDown := getOrdinal(pod) >= int(*set.Spec.Replicas)
podScaledDown := !podInOrdinalRange(pod, set)
if podScaledDown {
needsUpdate = removeOwnerRef(claim, set) || needsUpdate
needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
@@ -440,8 +465,8 @@ func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
// returned error is nil, the returned Pod is valid.
func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < int(currentSet.Status.CurrentReplicas)) ||
(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition)) {
(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < (getStartOrdinal(currentSet)+int(currentSet.Status.CurrentReplicas))) ||
(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < (getStartOrdinal(currentSet)+int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition))) {
pod := newStatefulSetPod(currentSet, ordinal)
setPodRevision(pod, currentRevision)
return pod