controller: ensure deployment rollback is re-entrant

Make rollbacks re-entrant in the Deployment controller, otherwise
fast enqueues of a Deployment may end up in undesired behavior
- redundant rollbacks.
This commit is contained in:
Michail Kargakis
2017-03-05 02:15:03 +01:00
parent 1a94d0186f
commit 0eeef8e683
4 changed files with 107 additions and 77 deletions

View File

@@ -570,11 +570,11 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return dc.sync(d) return dc.sync(d)
} }
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if d.Spec.RollbackTo != nil { if d.Spec.RollbackTo != nil {
revision := d.Spec.RollbackTo.Revision return dc.rollback(d)
if d, err = dc.rollback(d, &revision); err != nil {
return err
}
} }
scalingEvent, err := dc.isScalingEvent(d) scalingEvent, err := dc.isScalingEvent(d)

View File

@@ -17,7 +17,6 @@ limitations under the License.
package deployment package deployment
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@@ -109,11 +108,13 @@ func newDeployment(name string, replicas int, revisionHistoryLimit *int32, maxSu
} }
func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet { func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet {
control := true
return &extensions.ReplicaSet{ return &extensions.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
Labels: d.Spec.Selector.MatchLabels, Labels: d.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{{APIVersion: getDeploymentKind().GroupVersion().Version, Kind: getDeploymentKind().Kind, Name: d.Name, UID: d.UID, Controller: &control}},
}, },
Spec: extensions.ReplicaSetSpec{ Spec: extensions.ReplicaSetSpec{
Selector: d.Spec.Selector, Selector: d.Spec.Selector,
@@ -153,6 +154,11 @@ func (f *fixture) expectUpdateDeploymentStatusAction(d *extensions.Deployment) {
f.actions = append(f.actions, action) f.actions = append(f.actions, action)
} }
func (f *fixture) expectUpdateDeploymentAction(d *extensions.Deployment) {
action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "deployments"}, d.Namespace, d)
f.actions = append(f.actions, action)
}
func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) { func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) {
f.actions = append(f.actions, core.NewCreateAction(schema.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs)) f.actions = append(f.actions, core.NewCreateAction(schema.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs))
} }
@@ -214,6 +220,24 @@ func (f *fixture) run(deploymentName string) {
} }
} }
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", "pods") ||
action.Matches("list", "deployments") ||
action.Matches("list", "replicasets") ||
action.Matches("watch", "pods") ||
action.Matches("watch", "deployments") ||
action.Matches("watch", "replicasets")) {
continue
}
ret = append(ret, action)
}
return ret
}
func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
f := newFixture(t) f := newFixture(t)
@@ -244,53 +268,47 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
} }
// issue: https://github.com/kubernetes/kubernetes/issues/23218 // issue: https://github.com/kubernetes/kubernetes/issues/23218
func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing.T) { func TestDontSyncDeploymentsWithEmptyPodSelector(t *testing.T) {
fake := &fake.Clientset{} f := newFixture(t)
informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
controller := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), fake)
controller.eventRecorder = &record.FakeRecorder{}
controller.dListerSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
empty := metav1.LabelSelector{} d.Spec.Selector = &metav1.LabelSelector{}
d.Spec.Selector = &empty f.dLister = append(f.dLister, d)
informers.Extensions().V1beta1().Deployments().Informer().GetIndexer().Add(d) f.objects = append(f.objects, d)
// We expect the deployment controller to not take action here since it's configuration
// is invalid, even though no replicasets exist that match it's selector.
controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name))
filteredActions := filterInformerActions(fake.Actions()) // Normally there should be a status update to sync observedGeneration but the fake
if len(filteredActions) == 0 { // deployment has no generation set so there is no action happpening here.
return f.run(getKey(d, t))
}
for _, action := range filteredActions {
t.Logf("unexpected action: %#v", action)
}
t.Errorf("expected deployment controller to not take action")
} }
func filterInformerActions(actions []core.Action) []core.Action { func TestReentrantRollback(t *testing.T) {
ret := []core.Action{} f := newFixture(t)
for _, action := range actions {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", "pods") ||
action.Matches("list", "deployments") ||
action.Matches("list", "replicasets") ||
action.Matches("watch", "pods") ||
action.Matches("watch", "deployments") ||
action.Matches("watch", "replicasets")) {
continue
}
ret = append(ret, action)
}
return ret d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d.Spec.RollbackTo = &extensions.RollbackConfig{Revision: 0}
// TODO: This is 1 for now until FindOldReplicaSets gets fixed.
// See https://github.com/kubernetes/kubernetes/issues/42570.
d.Annotations = map[string]string{util.RevisionAnnotation: "1"}
f.dLister = append(f.dLister, d)
rs1 := newReplicaSet(d, "deploymentrs-old", 0)
rs1.Annotations = map[string]string{util.RevisionAnnotation: "1"}
one := int64(1)
rs1.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
rs1.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] = "hash"
rs2 := newReplicaSet(d, "deploymentrs-new", 1)
rs2.Annotations = map[string]string{util.RevisionAnnotation: "2"}
rs2.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] = "hash"
f.rsLister = append(f.rsLister, rs1, rs2)
f.objects = append(f.objects, d, rs1, rs2)
// Rollback is done here
f.expectUpdateDeploymentAction(d)
// Expect no update on replica sets though
f.run(getKey(d, t))
} }
// TestOverlappingDeployment ensures that an overlapping deployment will not be synced by // TestOverlappingDeployment ensures that an overlapping deployment will not be synced by

View File

@@ -20,25 +20,28 @@ import (
"fmt" "fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
) )
// Rolling back to a revision; no-op if the toRevision is deployment's current revision // rollback the deployment to the specified revision. In any case cleanup the rollback spec.
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) { func (dc *DeploymentController) rollback(d *extensions.Deployment) error {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true) newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, true)
if err != nil { if err != nil {
return nil, err return err
} }
allRSs := append(allOldRSs, newRS) allRSs := append(allOldRSs, newRS)
toRevision := &d.Spec.RollbackTo.Revision
// If rollback revision is 0, rollback to the last revision // If rollback revision is 0, rollback to the last revision
if *toRevision == 0 { if *toRevision == 0 {
if *toRevision = deploymentutil.LastRevision(allRSs); *toRevision == 0 { if *toRevision = deploymentutil.LastRevision(allRSs); *toRevision == 0 {
// If we still can't find the last revision, gives up rollback // If we still can't find the last revision, gives up rollback
dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
// Gives up rollback // Gives up rollback
return dc.updateDeploymentAndClearRollbackTo(deployment) return dc.updateDeploymentAndClearRollbackTo(d)
} }
} }
for _, rs := range allRSs { for _, rs := range allRSs {
@@ -52,22 +55,26 @@ func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRe
// rollback by copying podTemplate.Spec from the replica set // rollback by copying podTemplate.Spec from the replica set
// revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call // revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call
// no-op if the the spec matches current deployment's podTemplate.Spec // no-op if the the spec matches current deployment's podTemplate.Spec
deployment, performedRollback, err := dc.rollbackToTemplate(deployment, rs) performedRollback, err := dc.rollbackToTemplate(d, rs)
if performedRollback && err == nil { if performedRollback && err == nil {
dc.emitRollbackNormalEvent(deployment, fmt.Sprintf("Rolled back deployment %q to revision %d", deployment.Name, *toRevision)) dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, *toRevision))
} }
return deployment, err return err
} }
} }
dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
// Gives up rollback // Gives up rollback
return dc.updateDeploymentAndClearRollbackTo(deployment) return dc.updateDeploymentAndClearRollbackTo(d)
} }
func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deployment, rs *extensions.ReplicaSet) (d *extensions.Deployment, performedRollback bool, err error) { // rollbackToTemplate compares the templates of the provided deployment and replica set and
if !deploymentutil.EqualIgnoreHash(deployment.Spec.Template, rs.Spec.Template) { // updates the deployment with the replica set template in case they are different. It also
glog.V(4).Infof("Rolling back deployment %q to template spec %+v", deployment.Name, rs.Spec.Template.Spec) // cleans up the rollback spec so subsequent requeues of the deployment won't end up in here.
deploymentutil.SetFromReplicaSetTemplate(deployment, rs.Spec.Template) func (dc *DeploymentController) rollbackToTemplate(d *extensions.Deployment, rs *extensions.ReplicaSet) (bool, error) {
performedRollback := false
if !deploymentutil.EqualIgnoreHash(d.Spec.Template, rs.Spec.Template) {
glog.V(4).Infof("Rolling back deployment %q to template spec %+v", d.Name, rs.Spec.Template.Spec)
deploymentutil.SetFromReplicaSetTemplate(d, rs.Spec.Template)
// set RS (the old RS we'll rolling back to) annotations back to the deployment; // set RS (the old RS we'll rolling back to) annotations back to the deployment;
// otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback. // otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback.
// //
@@ -79,27 +86,31 @@ func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deploy
// //
// If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit}, // If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit},
// and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct. // and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct.
deploymentutil.SetDeploymentAnnotationsTo(deployment, rs) deploymentutil.SetDeploymentAnnotationsTo(d, rs)
performedRollback = true performedRollback = true
} else { } else {
glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %s, skipping rollback...", deployment.Name) glog.V(4).Infof("Rolling back to a revision that contains the same template as current deployment %q, skipping rollback...", d.Name)
dc.emitRollbackWarningEvent(deployment, deploymentutil.RollbackTemplateUnchanged, fmt.Sprintf("The rollback revision contains the same template as current deployment %q", deployment.Name)) eventMsg := fmt.Sprintf("The rollback revision contains the same template as current deployment %q", d.Name)
} dc.emitRollbackWarningEvent(d, deploymentutil.RollbackTemplateUnchanged, eventMsg)
d, err = dc.updateDeploymentAndClearRollbackTo(deployment)
return
} }
func (dc *DeploymentController) emitRollbackWarningEvent(deployment *extensions.Deployment, reason, message string) { return performedRollback, dc.updateDeploymentAndClearRollbackTo(d)
dc.eventRecorder.Eventf(deployment, v1.EventTypeWarning, reason, message)
} }
func (dc *DeploymentController) emitRollbackNormalEvent(deployment *extensions.Deployment, message string) { func (dc *DeploymentController) emitRollbackWarningEvent(d *extensions.Deployment, reason, message string) {
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, deploymentutil.RollbackDone, message) dc.eventRecorder.Eventf(d, v1.EventTypeWarning, reason, message)
}
func (dc *DeploymentController) emitRollbackNormalEvent(d *extensions.Deployment, message string) {
dc.eventRecorder.Eventf(d, v1.EventTypeNormal, deploymentutil.RollbackDone, message)
} }
// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment // updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *extensions.Deployment) (*extensions.Deployment, error) { // It is assumed that the caller will have updated the deployment template appropriately (in case
glog.V(4).Infof("Cleans up rollbackTo of deployment %s", deployment.Name) // we want to rollback).
deployment.Spec.RollbackTo = nil func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *extensions.Deployment) error {
return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment) glog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name)
d.Spec.RollbackTo = nil
_, err := dc.client.Extensions().Deployments(d.Namespace).Update(d)
return err
} }

View File

@@ -650,6 +650,7 @@ func FindOldReplicaSets(deployment *extensions.Deployment, rsList []*extensions.
if newRS != nil && rs.UID == newRS.UID { if newRS != nil && rs.UID == newRS.UID {
continue continue
} }
// TODO: If there are no pods for a deployment, we will never return old replica sets....!
allOldRSs[rs.ObjectMeta.Name] = rs allOldRSs[rs.ObjectMeta.Name] = rs
if rsLabelsSelector.Matches(podLabelsSelector) { if rsLabelsSelector.Matches(podLabelsSelector) {
oldRSs[rs.ObjectMeta.Name] = rs oldRSs[rs.ObjectMeta.Name] = rs