Merge pull request #49251 from enisoc/initialized-annotation
Automatic merge from submit-queue (batch tested with PRs 48636, 49088, 49251, 49417, 49494) StatefulSet: Remove `pod.alpha.kubernetes.io/initialized` annotation. The `pod.alpha.kubernetes.io/initialized` annotation was originally a tool for validating StatefulSet's ordered Pod creation guarantees during the feature's alpha phase. If set to "false" on a given Pod, it would interrupt StatefulSet's normal behavior. In v1.5.0, the annotation was deprecated and the default became "true" as part of StatefulSet's graduation to beta. The annotation is now ignored, meaning it cannot be used to interrupt StatefulSet Pod management. ```release-note StatefulSet: The deprecated `pod.alpha.kubernetes.io/initialized` annotation for interrupting StatefulSet Pod management is now ignored. If you were setting it to `true` or leaving it unset, no action is required. However, if you were setting it to `false`, be aware that previously-dormant StatefulSets may become active after upgrading. ``` ref #41605
This commit is contained in:
commit
7e5fd72aa6
@ -85,7 +85,6 @@ func TestStatefulSetControl(t *testing.T) {
|
||||
{ScalesDown, simpleSetFn},
|
||||
{ReplacesPods, largeSetFn},
|
||||
{RecreatesFailedPod, simpleSetFn},
|
||||
{SetsInitAnnotation, simpleSetFn},
|
||||
{CreatePodFailure, simpleSetFn},
|
||||
{UpdatePodFailure, simpleSetFn},
|
||||
{UpdateSetStatusFailure, simpleSetFn},
|
||||
@ -279,59 +278,6 @@ func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invarian
|
||||
}
|
||||
}
|
||||
|
||||
func SetsInitAnnotation(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
client := fake.NewSimpleClientset(set)
|
||||
spc, _, ssc, stop := setupController(client)
|
||||
defer close(stop)
|
||||
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err = ssc.UpdateStatefulSet(set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err = invariants(set, spc); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if pods, err = spc.setPodRunning(set, 0); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if pods, err = spc.setPodReady(set, 0); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if pods, err = spc.setPodInitStatus(set, 0, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
replicas := int(set.Status.Replicas)
|
||||
if err := ssc.UpdateStatefulSet(set, pods); err != nil {
|
||||
t.Errorf("Error updating StatefulSet %s", err)
|
||||
}
|
||||
if err := invariants(set, spc); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if replicas != int(set.Status.Replicas) {
|
||||
t.Errorf("StatefulSetControl does not block on %s=false", apps.StatefulSetInitAnnotation)
|
||||
}
|
||||
if pods, err = spc.setPodInitStatus(set, 0, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil {
|
||||
t.Errorf("Failed to turn up StatefulSet : %s", err)
|
||||
}
|
||||
set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting updated StatefulSet: %v", err)
|
||||
}
|
||||
if int(set.Status.Replicas) != 3 {
|
||||
t.Errorf("StatefulSetControl does not unblock on %s=true", apps.StatefulSetInitAnnotation)
|
||||
}
|
||||
}
|
||||
|
||||
func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
|
||||
client := fake.NewSimpleClientset(set)
|
||||
spc, _, ssc, stop := setupController(client)
|
||||
@ -1673,30 +1619,6 @@ func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal in
|
||||
return spc.podsLister.Pods(set.Namespace).List(selector)
|
||||
}
|
||||
|
||||
func (spc *fakeStatefulPodControl) setPodInitStatus(set *apps.StatefulSet, ordinal int, init bool) ([]*v1.Pod, error) {
|
||||
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if 0 > ordinal || ordinal >= len(pods) {
|
||||
return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods))
|
||||
}
|
||||
sort.Sort(ascendingOrdinal(pods))
|
||||
pod := copyPod(pods[ordinal])
|
||||
if init {
|
||||
pod.Annotations[apps.StatefulSetInitAnnotation] = "true"
|
||||
} else {
|
||||
pod.Annotations[apps.StatefulSetInitAnnotation] = "false"
|
||||
}
|
||||
fakeResourceVersion(pod)
|
||||
spc.podsIndexer.Update(pod)
|
||||
return spc.podsLister.Pods(set.Namespace).List(selector)
|
||||
}
|
||||
|
||||
func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
|
||||
pod := newStatefulSetPod(set, ordinal)
|
||||
pod.Status.Phase = v1.PodRunning
|
||||
|
@ -31,8 +31,6 @@ import (
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/history"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// maxUpdateRetries is the maximum number of retries used for update conflict resolution prior to failure
|
||||
@ -191,26 +189,9 @@ func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) {
|
||||
pod.Spec.Subdomain = set.Spec.ServiceName
|
||||
}
|
||||
|
||||
// isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady, and if the init
|
||||
// annotation has not explicitly disabled the Pod from being ready.
|
||||
// isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
|
||||
func isRunningAndReady(pod *v1.Pod) bool {
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
return false
|
||||
}
|
||||
podReady := podutil.IsPodReady(pod)
|
||||
// User may have specified a pod readiness override through a debug annotation.
|
||||
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
|
||||
if ok {
|
||||
if initAnnotation, err := strconv.ParseBool(initialized); err != nil {
|
||||
glog.V(4).Infof("Failed to parse %v annotation on pod %v: %v",
|
||||
apps.StatefulSetInitAnnotation, pod.Name, err)
|
||||
} else if !initAnnotation {
|
||||
glog.V(4).Infof("StatefulSet pod %v waiting on annotation %v", pod.Name,
|
||||
apps.StatefulSetInitAnnotation)
|
||||
podReady = initAnnotation
|
||||
}
|
||||
}
|
||||
return podReady
|
||||
return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
|
||||
}
|
||||
|
||||
// isCreated returns true if pod has been created and is maintained by the API server
|
||||
|
@ -213,18 +213,6 @@ func TestIsRunningAndReady(t *testing.T) {
|
||||
if !isRunningAndReady(pod) {
|
||||
t.Error("Pod should be running and ready")
|
||||
}
|
||||
pod.Annotations[apps.StatefulSetInitAnnotation] = "true"
|
||||
if !isRunningAndReady(pod) {
|
||||
t.Error("isRunningAndReady does not respected init annotation set to true")
|
||||
}
|
||||
pod.Annotations[apps.StatefulSetInitAnnotation] = "false"
|
||||
if isRunningAndReady(pod) {
|
||||
t.Error("isRunningAndReady does not respected init annotation set to false")
|
||||
}
|
||||
pod.Annotations[apps.StatefulSetInitAnnotation] = "blah"
|
||||
if !isRunningAndReady(pod) {
|
||||
t.Error("isRunningAndReady does not erroneous init annotation")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAscendingOrdinal(t *testing.T) {
|
||||
|
@ -24,8 +24,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// StatefulSetInitAnnotation if present, and set to false, indicates that a Pod's readiness should be ignored.
|
||||
StatefulSetInitAnnotation = "pod.alpha.kubernetes.io/initialized"
|
||||
ControllerRevisionHashLabelKey = "controller-revision-hash"
|
||||
StatefulSetRevisionLabel = ControllerRevisionHashLabelKey
|
||||
)
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
klabels "k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -92,13 +91,12 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
It("should provide basic identity", func() {
|
||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||
*(ss.Spec.Replicas) = 3
|
||||
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.PauseNewPods(ss)
|
||||
|
||||
_, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
|
||||
By("Saturating stateful set " + ss.Name)
|
||||
sst.Saturate(ss)
|
||||
|
||||
@ -130,7 +128,8 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
It("should adopt matching orphans and release non-matching pods", func() {
|
||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||
*(ss.Spec.Replicas) = 1
|
||||
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.PauseNewPods(ss)
|
||||
|
||||
// Replace ss with the one returned from Create() so it has the UID.
|
||||
// Save Kind since it won't be populated in the returned ss.
|
||||
@ -139,8 +138,6 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
ss.Kind = kind
|
||||
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
|
||||
By("Saturating stateful set " + ss.Name)
|
||||
sst.Saturate(ss)
|
||||
pods := sst.GetPodList(ss)
|
||||
@ -214,20 +211,19 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
It("should not deadlock when a pod's predecessor fails", func() {
|
||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||
*(ss.Spec.Replicas) = 2
|
||||
framework.SetStatefulSetInitializedAnnotation(ss, "false")
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.PauseNewPods(ss)
|
||||
|
||||
_, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunning(1, 0, ss)
|
||||
|
||||
sst.WaitForRunningAndReady(1, ss)
|
||||
|
||||
By("Marking stateful pod at index 0 as healthy.")
|
||||
sst.SetHealthy(ss)
|
||||
By("Resuming stateful pod at index 0.")
|
||||
sst.ResumeNextPod(ss)
|
||||
|
||||
By("Waiting for stateful pod at index 1 to enter running.")
|
||||
sst.WaitForRunningAndReady(2, ss)
|
||||
sst.WaitForRunning(2, 1, ss)
|
||||
|
||||
// Now we have 1 healthy and 1 unhealthy stateful pod. Deleting the healthy stateful pod should *not*
|
||||
// create a new stateful pod till the remaining stateful pod becomes healthy, which won't happen till
|
||||
@ -237,7 +233,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
sst.DeleteStatefulPodAtIndex(0, ss)
|
||||
|
||||
By("Confirming stateful pod at index 0 is recreated.")
|
||||
sst.WaitForRunningAndReady(2, ss)
|
||||
sst.WaitForRunning(2, 0, ss)
|
||||
|
||||
By("Deleting unhealthy stateful pod at index 1.")
|
||||
sst.DeleteStatefulPodAtIndex(1, ss)
|
||||
@ -248,14 +244,11 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
|
||||
It("should perform rolling updates and roll backs of template modifications", func() {
|
||||
By("Creating a new StatefulSet")
|
||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.SetHttpProbe(ss)
|
||||
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||
ss = sst.WaitForStatus(ss)
|
||||
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
|
||||
@ -272,7 +265,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
currentRevision))
|
||||
}
|
||||
sst.SortStatefulPods(pods)
|
||||
sst.BreakPodProbe(ss, &pods.Items[1], testProbe)
|
||||
sst.BreakPodHttpProbe(ss, &pods.Items[1])
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name)
|
||||
newImage := NewNginxImage
|
||||
@ -294,7 +287,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
By("Updating Pods in reverse ordinal order")
|
||||
pods = sst.GetPodList(ss)
|
||||
sst.SortStatefulPods(pods)
|
||||
sst.RestorePodProbe(ss, &pods.Items[1], testProbe)
|
||||
sst.RestorePodHttpProbe(ss, &pods.Items[1])
|
||||
ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name)
|
||||
ss, pods = sst.WaitForRollingUpdate(ss)
|
||||
Expect(ss.Status.CurrentRevision).To(Equal(updateRevision),
|
||||
@ -319,7 +312,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
}
|
||||
|
||||
By("Rolling back to a previous revision")
|
||||
sst.BreakPodProbe(ss, &pods.Items[1], testProbe)
|
||||
sst.BreakPodHttpProbe(ss, &pods.Items[1])
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
ss, pods = sst.WaitForPodNotReady(ss, pods.Items[1].Name)
|
||||
priorRevision := currentRevision
|
||||
@ -338,7 +331,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
By("Rolling back update in reverse ordinal order")
|
||||
pods = sst.GetPodList(ss)
|
||||
sst.SortStatefulPods(pods)
|
||||
sst.RestorePodProbe(ss, &pods.Items[1], testProbe)
|
||||
sst.RestorePodHttpProbe(ss, &pods.Items[1])
|
||||
ss, pods = sst.WaitForPodReady(ss, pods.Items[1].Name)
|
||||
ss, pods = sst.WaitForRollingUpdate(ss)
|
||||
Expect(ss.Status.CurrentRevision).To(Equal(priorRevision),
|
||||
@ -366,11 +359,9 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
|
||||
It("should perform canary updates and phased rolling updates of template modifications", func() {
|
||||
By("Creating a new StaefulSet")
|
||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.SetHttpProbe(ss)
|
||||
ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
|
||||
Type: apps.RollingUpdateStatefulSetStrategyType,
|
||||
RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
|
||||
@ -383,7 +374,6 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
}
|
||||
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||
ss = sst.WaitForStatus(ss)
|
||||
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
|
||||
@ -578,17 +568,14 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
|
||||
It("should implement legacy replacement when the update strategy is OnDelete", func() {
|
||||
By("Creating a new StatefulSet")
|
||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||
ss := framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.SetHttpProbe(ss)
|
||||
ss.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
|
||||
Type: apps.OnDeleteStatefulSetStrategyType,
|
||||
}
|
||||
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||
ss = sst.WaitForStatus(ss)
|
||||
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
|
||||
@ -668,27 +655,24 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("Creating stateful set " + ssName + " in namespace " + ns)
|
||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.SetHttpProbe(ss)
|
||||
ss, err = c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||
|
||||
By("Confirming that stateful set scale up will halt with unhealthy stateful pod")
|
||||
sst.BreakProbe(ss, testProbe)
|
||||
sst.BreakHttpProbe(ss)
|
||||
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
|
||||
sst.WaitForStatusReadyReplicas(ss, 0)
|
||||
sst.UpdateReplicas(ss, 3)
|
||||
sst.ConfirmStatefulPodCount(1, ss, 10*time.Second, true)
|
||||
|
||||
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
|
||||
sst.RestoreProbe(ss, testProbe)
|
||||
sst.RestoreHttpProbe(ss)
|
||||
sst.WaitForRunningAndReady(3, ss)
|
||||
|
||||
By("Verifying that stateful set " + ssName + " was scaled up in order")
|
||||
@ -712,14 +696,14 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
sst.BreakProbe(ss, testProbe)
|
||||
sst.BreakHttpProbe(ss)
|
||||
sst.WaitForStatusReadyReplicas(ss, 0)
|
||||
sst.WaitForRunningAndNotReady(3, ss)
|
||||
sst.UpdateReplicas(ss, 0)
|
||||
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, true)
|
||||
|
||||
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
|
||||
sst.RestoreProbe(ss, testProbe)
|
||||
sst.RestoreHttpProbe(ss)
|
||||
sst.Scale(ss, 0)
|
||||
|
||||
By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
|
||||
@ -742,39 +726,36 @@ var _ = SIGDescribe("StatefulSet", func() {
|
||||
psLabels := klabels.Set(labels)
|
||||
|
||||
By("Creating stateful set " + ssName + " in namespace " + ns)
|
||||
testProbe := &v1.Probe{Handler: v1.Handler{HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80}}}}
|
||||
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 1, nil, nil, psLabels)
|
||||
ss.Spec.PodManagementPolicy = apps.ParallelPodManagement
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = testProbe
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.SetHttpProbe(ss)
|
||||
ss, err := c.AppsV1beta1().StatefulSets(ns).Create(ss)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By("Waiting until all stateful set " + ssName + " replicas will be running in namespace " + ns)
|
||||
sst := framework.NewStatefulSetTester(c)
|
||||
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
|
||||
|
||||
By("Confirming that stateful set scale up will not halt with unhealthy stateful pod")
|
||||
sst.BreakProbe(ss, testProbe)
|
||||
sst.BreakHttpProbe(ss)
|
||||
sst.WaitForRunningAndNotReady(*ss.Spec.Replicas, ss)
|
||||
sst.WaitForStatusReadyReplicas(ss, 0)
|
||||
sst.UpdateReplicas(ss, 3)
|
||||
sst.ConfirmStatefulPodCount(3, ss, 10*time.Second, false)
|
||||
|
||||
By("Scaling up stateful set " + ssName + " to 3 replicas and waiting until all of them will be running in namespace " + ns)
|
||||
sst.RestoreProbe(ss, testProbe)
|
||||
sst.RestoreHttpProbe(ss)
|
||||
sst.WaitForRunningAndReady(3, ss)
|
||||
|
||||
By("Scale down will not halt with unhealthy stateful pod")
|
||||
sst.BreakProbe(ss, testProbe)
|
||||
sst.BreakHttpProbe(ss)
|
||||
sst.WaitForStatusReadyReplicas(ss, 0)
|
||||
sst.WaitForRunningAndNotReady(3, ss)
|
||||
sst.UpdateReplicas(ss, 0)
|
||||
sst.ConfirmStatefulPodCount(0, ss, 10*time.Second, false)
|
||||
|
||||
By("Scaling down stateful set " + ssName + " to 0 replicas and waiting until none of pods will run in namespace" + ns)
|
||||
sst.RestoreProbe(ss, testProbe)
|
||||
sst.RestoreHttpProbe(ss)
|
||||
sst.Scale(ss, 0)
|
||||
sst.WaitForStatusReadyReplicas(ss, 0)
|
||||
})
|
||||
|
@ -19,6 +19,7 @@ package framework
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -33,6 +34,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -160,10 +162,10 @@ func (s *StatefulSetTester) CheckHostname(ss *apps.StatefulSet) error {
|
||||
func (s *StatefulSetTester) Saturate(ss *apps.StatefulSet) {
|
||||
var i int32
|
||||
for i = 0; i < *(ss.Spec.Replicas); i++ {
|
||||
Logf("Waiting for stateful pod at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
|
||||
s.WaitForRunningAndReady(i+1, ss)
|
||||
Logf("Marking stateful pod at index " + fmt.Sprintf("%v", i) + " healthy")
|
||||
s.SetHealthy(ss)
|
||||
Logf("Waiting for stateful pod at index %v to enter Running", i)
|
||||
s.WaitForRunning(i+1, i, ss)
|
||||
Logf("Resuming stateful pod at index %v", i)
|
||||
s.ResumeNextPod(ss)
|
||||
}
|
||||
}
|
||||
|
||||
@ -282,18 +284,22 @@ func (s *StatefulSetTester) ConfirmStatefulPodCount(count int, ss *apps.Stateful
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatefulSetTester) waitForRunning(numStatefulPods int32, ss *apps.StatefulSet, shouldBeReady bool) {
|
||||
// WaitForRunning waits for numPodsRunning in ss to be Running and for the first
|
||||
// numPodsReady ordinals to be Ready.
|
||||
func (s *StatefulSetTester) WaitForRunning(numPodsRunning, numPodsReady int32, ss *apps.StatefulSet) {
|
||||
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
|
||||
func() (bool, error) {
|
||||
podList := s.GetPodList(ss)
|
||||
if int32(len(podList.Items)) < numStatefulPods {
|
||||
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numStatefulPods)
|
||||
s.SortStatefulPods(podList)
|
||||
if int32(len(podList.Items)) < numPodsRunning {
|
||||
Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numPodsRunning)
|
||||
return false, nil
|
||||
}
|
||||
if int32(len(podList.Items)) > numStatefulPods {
|
||||
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numStatefulPods, len(podList.Items))
|
||||
if int32(len(podList.Items)) > numPodsRunning {
|
||||
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numPodsRunning, len(podList.Items))
|
||||
}
|
||||
for _, p := range podList.Items {
|
||||
shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
|
||||
isReady := podutil.IsPodReady(&p)
|
||||
desiredReadiness := shouldBeReady == isReady
|
||||
Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
|
||||
@ -339,7 +345,7 @@ func (s *StatefulSetTester) WaitForStatus(set *apps.StatefulSet) *apps.StatefulS
|
||||
|
||||
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and Ready.
|
||||
func (s *StatefulSetTester) WaitForRunningAndReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
||||
s.waitForRunning(numStatefulPods, ss, true)
|
||||
s.WaitForRunning(numStatefulPods, numStatefulPods, ss)
|
||||
}
|
||||
|
||||
// WaitForPodReady waits for the Pod named podName in set to exist and have a Ready condition.
|
||||
@ -473,12 +479,31 @@ func (s *StatefulSetTester) WaitForPartitionedRollingUpdate(set *apps.StatefulSe
|
||||
|
||||
// WaitForRunningAndReady waits for numStatefulPods in ss to be Running and not Ready.
|
||||
func (s *StatefulSetTester) WaitForRunningAndNotReady(numStatefulPods int32, ss *apps.StatefulSet) {
|
||||
s.waitForRunning(numStatefulPods, ss, false)
|
||||
s.WaitForRunning(numStatefulPods, 0, ss)
|
||||
}
|
||||
|
||||
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
|
||||
func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
||||
path := probe.HTTPGet.Path
|
||||
var httpProbe = &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
HTTPGet: &v1.HTTPGetAction{
|
||||
Path: "/index.html",
|
||||
Port: intstr.IntOrString{IntVal: 80},
|
||||
},
|
||||
},
|
||||
PeriodSeconds: 1,
|
||||
SuccessThreshold: 1,
|
||||
FailureThreshold: 1,
|
||||
}
|
||||
|
||||
// SetHttpProbe sets the pod template's ReadinessProbe for Nginx StatefulSet containers.
|
||||
// This probe can then be controlled with BreakHttpProbe() and RestoreHttpProbe().
|
||||
// Note that this cannot be used together with PauseNewPods().
|
||||
func (s *StatefulSetTester) SetHttpProbe(ss *apps.StatefulSet) {
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = httpProbe
|
||||
}
|
||||
|
||||
// BreakHttpProbe breaks the readiness probe for Nginx StatefulSet containers in ss.
|
||||
func (s *StatefulSetTester) BreakHttpProbe(ss *apps.StatefulSet) error {
|
||||
path := httpProbe.HTTPGet.Path
|
||||
if path == "" {
|
||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||
}
|
||||
@ -486,9 +511,9 @@ func (s *StatefulSetTester) BreakProbe(ss *apps.StatefulSet, probe *v1.Probe) er
|
||||
return s.ExecInStatefulPods(ss, cmd)
|
||||
}
|
||||
|
||||
// BreakProbe breaks the readiness probe for Nginx StatefulSet containers in pod.
|
||||
func (s *StatefulSetTester) BreakPodProbe(ss *apps.StatefulSet, pod *v1.Pod, probe *v1.Probe) error {
|
||||
path := probe.HTTPGet.Path
|
||||
// BreakPodHttpProbe breaks the readiness probe for Nginx StatefulSet containers in one pod.
|
||||
func (s *StatefulSetTester) BreakPodHttpProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
|
||||
path := httpProbe.HTTPGet.Path
|
||||
if path == "" {
|
||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||
}
|
||||
@ -498,9 +523,9 @@ func (s *StatefulSetTester) BreakPodProbe(ss *apps.StatefulSet, pod *v1.Pod, pro
|
||||
return err
|
||||
}
|
||||
|
||||
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers in ss.
|
||||
func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe) error {
|
||||
path := probe.HTTPGet.Path
|
||||
// RestoreHttpProbe restores the readiness probe for Nginx StatefulSet containers in ss.
|
||||
func (s *StatefulSetTester) RestoreHttpProbe(ss *apps.StatefulSet) error {
|
||||
path := httpProbe.HTTPGet.Path
|
||||
if path == "" {
|
||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||
}
|
||||
@ -508,9 +533,9 @@ func (s *StatefulSetTester) RestoreProbe(ss *apps.StatefulSet, probe *v1.Probe)
|
||||
return s.ExecInStatefulPods(ss, cmd)
|
||||
}
|
||||
|
||||
// RestoreProbe restores the readiness probe for Nginx StatefulSet containers in pod.
|
||||
func (s *StatefulSetTester) RestorePodProbe(ss *apps.StatefulSet, pod *v1.Pod, probe *v1.Probe) error {
|
||||
path := probe.HTTPGet.Path
|
||||
// RestorePodHttpProbe restores the readiness probe for Nginx StatefulSet containers in pod.
|
||||
func (s *StatefulSetTester) RestorePodHttpProbe(ss *apps.StatefulSet, pod *v1.Pod) error {
|
||||
path := httpProbe.HTTPGet.Path
|
||||
if path == "" {
|
||||
return fmt.Errorf("Path expected to be not empty: %v", path)
|
||||
}
|
||||
@ -520,26 +545,50 @@ func (s *StatefulSetTester) RestorePodProbe(ss *apps.StatefulSet, pod *v1.Pod, p
|
||||
return err
|
||||
}
|
||||
|
||||
// SetHealthy updates the StatefulSet InitAnnotation to true in order to set a StatefulSet Pod to be Running and Ready.
|
||||
func (s *StatefulSetTester) SetHealthy(ss *apps.StatefulSet) {
|
||||
var pauseProbe = &v1.Probe{
|
||||
Handler: v1.Handler{
|
||||
Exec: &v1.ExecAction{Command: []string{"test", "-f", "/tmp/statefulset-continue"}},
|
||||
},
|
||||
PeriodSeconds: 1,
|
||||
SuccessThreshold: 1,
|
||||
FailureThreshold: 1,
|
||||
}
|
||||
|
||||
func hasPauseProbe(pod *v1.Pod) bool {
|
||||
probe := pod.Spec.Containers[0].ReadinessProbe
|
||||
return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
|
||||
}
|
||||
|
||||
// PauseNewPods adds an always-failing ReadinessProbe to the StatefulSet PodTemplate.
|
||||
// This causes all newly-created Pods to stay Unready until they are manually resumed
|
||||
// with ResumeNextPod().
|
||||
// Note that this cannot be used together with SetHttpProbe().
|
||||
func (s *StatefulSetTester) PauseNewPods(ss *apps.StatefulSet) {
|
||||
ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe
|
||||
}
|
||||
|
||||
// ResumeNextPod allows the next Pod in the StatefulSet to continue by removing the ReadinessProbe
|
||||
// added by PauseNewPods(), if it's still there.
|
||||
// It fails the test if it finds any pods that are not in phase Running,
|
||||
// or if it finds more than one paused Pod existing at the same time.
|
||||
// This is a no-op if there are no paused pods.
|
||||
func (s *StatefulSetTester) ResumeNextPod(ss *apps.StatefulSet) {
|
||||
podList := s.GetPodList(ss)
|
||||
markedHealthyPod := ""
|
||||
resumedPod := ""
|
||||
for _, pod := range podList.Items {
|
||||
if pod.Status.Phase != v1.PodRunning {
|
||||
Failf("Found pod in %v cannot set health", pod.Status.Phase)
|
||||
Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
|
||||
}
|
||||
if IsStatefulSetPodInitialized(pod) {
|
||||
if podutil.IsPodReady(&pod) || !hasPauseProbe(&pod) {
|
||||
continue
|
||||
}
|
||||
if markedHealthyPod != "" {
|
||||
Failf("Found multiple non-healthy stateful pods: %v and %v", pod.Name, markedHealthyPod)
|
||||
if resumedPod != "" {
|
||||
Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
|
||||
}
|
||||
p, err := UpdatePodWithRetries(s.c, pod.Namespace, pod.Name, func(update *v1.Pod) {
|
||||
update.Annotations[apps.StatefulSetInitAnnotation] = "true"
|
||||
})
|
||||
_, err := RunHostCmd(pod.Namespace, pod.Name, "touch /tmp/statefulset-continue")
|
||||
ExpectNoError(err)
|
||||
Logf("Set annotation %v to %v on pod %v", apps.StatefulSetInitAnnotation, p.Annotations[apps.StatefulSetInitAnnotation], pod.Name)
|
||||
markedHealthyPod = pod.Name
|
||||
Logf("Resumed pod %v", pod.Name)
|
||||
resumedPod = pod.Name
|
||||
}
|
||||
}
|
||||
|
||||
@ -682,19 +731,6 @@ func DeleteAllStatefulSets(c clientset.Interface, ns string) {
|
||||
}
|
||||
}
|
||||
|
||||
// IsStatefulSetPodInitialized returns true if pod's StatefulSetInitAnnotation exists and is set to true.
|
||||
func IsStatefulSetPodInitialized(pod v1.Pod) bool {
|
||||
initialized, ok := pod.Annotations[apps.StatefulSetInitAnnotation]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
inited, err := strconv.ParseBool(initialized)
|
||||
if err != nil {
|
||||
Failf("Couldn't parse statefulset init annotations %v", initialized)
|
||||
}
|
||||
return inited
|
||||
}
|
||||
|
||||
// NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
|
||||
func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
|
||||
return v1.PersistentVolumeClaim{
|
||||
@ -776,11 +812,6 @@ func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulP
|
||||
}
|
||||
}
|
||||
|
||||
// SetStatefulSetInitializedAnnotation sets teh StatefulSetInitAnnotation to value.
|
||||
func SetStatefulSetInitializedAnnotation(ss *apps.StatefulSet, value string) {
|
||||
ss.Spec.Template.ObjectMeta.Annotations["pod.alpha.kubernetes.io/initialized"] = value
|
||||
}
|
||||
|
||||
var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
|
||||
|
||||
func getStatefulPodOrdinal(pod *v1.Pod) int {
|
||||
|
@ -62,12 +62,12 @@ func (t *StatefulSetUpgradeTest) Setup(f *framework.Framework) {
|
||||
t.set = framework.NewStatefulSet(ssName, ns, headlessSvcName, 2, statefulPodMounts, podMounts, labels)
|
||||
t.service = framework.CreateStatefulSetService(ssName, labels)
|
||||
*(t.set.Spec.Replicas) = 3
|
||||
framework.SetStatefulSetInitializedAnnotation(t.set, "false")
|
||||
t.tester = framework.NewStatefulSetTester(f.ClientSet)
|
||||
t.tester.PauseNewPods(t.set)
|
||||
|
||||
By("Creating service " + headlessSvcName + " in namespace " + ns)
|
||||
_, err := f.ClientSet.Core().Services(ns).Create(t.service)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
t.tester = framework.NewStatefulSetTester(f.ClientSet)
|
||||
|
||||
By("Creating statefulset " + ssName + " in namespace " + ns)
|
||||
*(t.set.Spec.Replicas) = 3
|
||||
|
Loading…
Reference in New Issue
Block a user