controller: proportionally scale paused and rolling deployments

Enable paused and rolling deployments to be proportionally scaled.
Also have cleanup policy work for paused deployments.
This commit is contained in:
Michail Kargakis
2016-01-28 17:35:14 +01:00
parent a098d9fd24
commit f3d2e3ff22
11 changed files with 798 additions and 129 deletions

View File

@@ -438,13 +438,9 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
if d.Spec.Paused {
// TODO: Implement scaling for paused deployments.
// Don't take any action for paused deployment.
// But keep the status up-to-date.
// Ignore paused deployments
glog.V(4).Infof("Updating status only for paused deployment %s/%s", d.Namespace, d.Name)
return dc.syncPausedDeploymentStatus(d)
return dc.sync(d)
}
if d.Spec.RollbackTo != nil {
revision := d.Spec.RollbackTo.Revision
if _, err = dc.rollback(d, &revision); err != nil {
@@ -452,27 +448,135 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
}
if dc.isScalingEvent(d) {
return dc.sync(d)
}
switch d.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
return dc.syncRecreateDeployment(d)
return dc.rolloutRecreate(d)
case extensions.RollingUpdateDeploymentStrategyType:
return dc.syncRollingUpdateDeployment(d)
return dc.rolloutRolling(d)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
// Updates the status of a paused deployment
func (dc *DeploymentController) syncPausedDeploymentStatus(deployment *extensions.Deployment) error {
// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
if err := dc.scale(deployment, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
// when a deployment is paused and not during the normal rollout process.
func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error {
// If there is only one active replica set then we should scale that up to the full count of the
// deployment. If there is no active replica set, then we should scale up the newest replica set.
if activeOrLatest := findActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas {
return nil
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment)
return err
}
// If the new replica set is saturated, old replica sets should be fully scaled down.
// This case handles replica set adoption during a saturated new replica set.
if deploymentutil.IsSaturated(deployment, newRS) {
for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
return err
}
}
return nil
}
// There are old replica sets with pods and the new replica set is not saturated.
// We need to proportionally scale all replica sets (new and old) in case of a
// rolling deployment.
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
allowedSize := int32(0)
if deployment.Spec.Replicas > 0 {
allowedSize = deployment.Spec.Replicas + maxSurge(*deployment)
}
// Number of additional replicas that can be either added or removed from the total
// replicas count. These replicas should be distributed proportionally to the active
// replica sets.
deploymentReplicasToAdd := allowedSize - allRSsReplicas
// The additional replicas should be distributed proportionally amongst the active
// replica sets from the larger to the smaller in size replica set. Scaling direction
// drives what happens in case we are trying to scale replica sets of the same size.
// In such a case when scaling up, we should scale up newer replica sets first, and
// when scaling down, we should scale down older replica sets first.
scalingOperation := "up"
switch {
case deploymentReplicasToAdd > 0:
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
case deploymentReplicasToAdd < 0:
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
default: /* deploymentReplicasToAdd == 0 */
// Nothing to add.
return nil
}
// Iterate over all active replica sets and estimate proportions for each of them.
// The absolute value of deploymentReplicasAdded should never exceed the absolute
// value of deploymentReplicasToAdd.
deploymentReplicasAdded := int32(0)
for i := range allRSs {
rs := allRSs[i]
proportion := getProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
rs.Spec.Replicas += proportion
deploymentReplicasAdded += proportion
}
// Update all replica sets
for i := range allRSs {
rs := allRSs[i]
// Add/remove any leftovers to the largest replica set.
if i == 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
rs.Spec.Replicas += leftover
if rs.Spec.Replicas < 0 {
rs.Spec.Replicas = 0
}
}
if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
func (dc *DeploymentController) rollback(deployment *extensions.Deployment, toRevision *int64) (*extensions.Deployment, error) {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
@@ -526,13 +630,13 @@ func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(deployment *e
return dc.updateDeployment(deployment)
}
func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.Deployment) error {
func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error {
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
allRSs := append(oldRSs, newRS)
// scale down old replica sets
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(controller.FilterActiveReplicaSets(oldRSs), deployment)
@@ -564,21 +668,18 @@ func (dc *DeploymentController) syncRecreateDeployment(deployment *extensions.De
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
}
if deployment.Spec.RevisionHistoryLimit != nil {
// Cleanup old replica sets
dc.cleanupOldReplicaSets(oldRSs, deployment)
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
}
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensions.Deployment) error {
func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
if err != nil {
return err
}
allRSs := append(controller.FilterActiveReplicaSets(oldRSs), newRS)
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment)
@@ -600,10 +701,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio
return dc.updateDeploymentStatus(allRSs, newRS, deployment)
}
if deployment.Spec.RevisionHistoryLimit != nil {
// Cleanup old replicas sets
dc.cleanupOldReplicaSets(oldRSs, deployment)
}
dc.cleanupDeployment(oldRSs, deployment)
// Sync deployment status
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
@@ -611,11 +709,11 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment *extensio
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, _, err := dc.calculateStatus(allRSs, newRS, d)
newStatus, err := dc.calculateStatus(allRSs, newRS, d)
if err != nil {
return err
}
if d.Generation > d.Status.ObservedGeneration || d.Status.Replicas != totalActualReplicas || d.Status.UpdatedReplicas != updatedReplicas || d.Status.AvailableReplicas != availableReplicas {
if !reflect.DeepEqual(d.Status, newStatus) {
return dc.updateDeploymentStatus(allRSs, newRS, d)
}
return nil
@@ -626,6 +724,8 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
// This may lead to stale reads of replica sets, thus incorrect deployment status.
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
@@ -701,7 +801,7 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
return nil, err
} else if existingNewRS != nil {
// Set existing new replica set's annotation
if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision) {
if setNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
}
return existingNewRS, nil
@@ -731,8 +831,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
Template: newRSTemplate,
},
}
// Set new replica set's annotation
setNewReplicaSetAnnotations(deployment, &newRS, newRevision)
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
if err != nil {
@@ -740,6 +838,8 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
}
newRS.Spec.Replicas = newReplicasCount
// Set new replica set's annotation
setNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil {
dc.enqueueDeployment(deployment)
@@ -881,7 +981,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string) bool {
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
// First, copy deployment's annotations (except for apply and revision annotations)
annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
// Then, update replica set's revision annotation
@@ -894,17 +994,26 @@ func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *exten
if newRS.Annotations[deploymentutil.RevisionAnnotation] < newRevision {
newRS.Annotations[deploymentutil.RevisionAnnotation] = newRevision
annotationChanged = true
glog.V(4).Infof("updating replica set %q's revision to %s - %+v\n", newRS.Name, newRevision, newRS)
glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
}
if !exists && setReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment)) {
annotationChanged = true
}
return annotationChanged
}
var annotationsToSkip = map[string]bool{
annotations.LastAppliedConfigAnnotation: true,
deploymentutil.RevisionAnnotation: true,
deploymentutil.DesiredReplicasAnnotation: true,
deploymentutil.MaxReplicasAnnotation: true,
}
// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
// TODO: How to decide which annotations should / should not be copied?
// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
func skipCopyAnnotation(key string) bool {
// Skip apply annotations and revision annotations.
return key == annotations.LastAppliedConfigAnnotation || key == deploymentutil.RevisionAnnotation
return annotationsToSkip[key]
}
func getSkippedAnnotations(annotations map[string]string) map[string]string {
@@ -980,12 +1089,12 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
return scaled, err
}
func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) {
podList, err := dc.listPods(deployment)
if err != nil {
return 0, err
}
return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds)
}
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
@@ -1002,11 +1111,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}
_, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
if err != nil {
return false, err
}
maxUnavailable := maxUnavailable(*deployment)
// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
@@ -1108,10 +1213,7 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) (int32, error) {
_, maxUnavailable, err := deploymentutil.ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
if err != nil {
return 0, err
}
maxUnavailable := maxUnavailable(*deployment)
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
@@ -1183,7 +1285,13 @@ func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extension
return scaled, err
}
func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
if deployment.Spec.RevisionHistoryLimit == nil {
return nil
}
diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit
if diff <= 0 {
return nil
@@ -1209,39 +1317,31 @@ func (dc *DeploymentController) cleanupOldReplicaSets(oldRSs []*extensions.Repli
}
func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error {
totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas, err := dc.calculateStatus(allRSs, newRS, deployment)
newStatus, err := dc.calculateStatus(allRSs, newRS, deployment)
if err != nil {
return err
}
newDeployment := *deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration: deployment.Generation,
Replicas: totalActualReplicas,
UpdatedReplicas: updatedReplicas,
AvailableReplicas: availableReplicas,
UnavailableReplicas: unavailableReplicas,
}
_, err = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
if err == nil {
glog.V(4).Infof("Updated deployment %s status: %+v", deployment.Name, newDeployment.Status)
}
newDeployment := deployment
newDeployment.Status = newStatus
_, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment)
return err
}
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (totalActualReplicas, updatedReplicas, availableReplicas, unavailableReplicas int32, err error) {
totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs)
updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS})
minReadySeconds := deployment.Spec.MinReadySeconds
availableReplicas, err = dc.getAvailablePodsForReplicaSets(deployment, allRSs, minReadySeconds)
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) {
availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs)
if err != nil {
err = fmt.Errorf("failed to count available pods: %v", err)
return
return deployment.Status, fmt.Errorf("failed to count available pods: %v", err)
}
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
unavailableReplicas = totalReplicas - availableReplicas
return
return extensions.DeploymentStatus{
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration: deployment.Generation,
Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}),
AvailableReplicas: availableReplicas,
UnavailableReplicas: totalReplicas - availableReplicas,
}, nil
}
func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
@@ -1255,24 +1355,25 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
} else {
scalingOperation = "down"
}
newRS, err := dc.scaleReplicaSet(rs, newScale)
if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
dc.enqueueDeployment(deployment)
}
newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
return true, newRS, err
}
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32) (*extensions.ReplicaSet, error) {
// TODO: Using client for now, update to use store when it is ready.
func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
rs.Spec.Replicas = newScale
return dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
setReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+maxSurge(*deployment))
rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
if err == nil {
dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
glog.Warningf("Cannot update replica set %q: %v", rs.Name, err)
dc.enqueueDeployment(deployment)
}
return rs, err
}
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
// TODO: Using client for now, update to use store when it is ready.
return dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
}
@@ -1300,3 +1401,28 @@ func (dc *DeploymentController) rollbackToTemplate(deployment *extensions.Deploy
d, err = dc.updateDeploymentAndClearRollbackTo(deployment)
return
}
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) bool {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
if err != nil {
return false
}
// If there is no new replica set matching this deployment and the deployment isn't paused
// then there is a new rollout that waits to happen
if newRS == nil && !d.Spec.Paused {
return false
}
allRSs := append(oldRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := getDesiredReplicasAnnotation(rs)
if !ok {
continue
}
if desired != d.Spec.Replicas {
return true
}
}
return false
}