diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index df5b6457621..2ce713e0041 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -343,6 +343,22 @@ func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *U return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)} } +// Reasons for pod events +const ( + // FailedCreatePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be created. + FailedCreatePodReason = "FailedCreate" + // SuccessfulCreatePodReason is added in an event when a pod for a replica set + // is successfully created. + SuccessfulCreatePodReason = "SuccessfulCreate" + // FailedDeletePodReason is added in an event and in a replica set condition + // when a pod for a replica set is failed to be deleted. + FailedDeletePodReason = "FailedDelete" + // SuccessfulDeletePodReason is added in an event when a pod for a replica set + // is successfully deleted. + SuccessfulDeletePodReason = "SuccessfulDelete" +) + // PodControlInterface is an interface that knows how to add or delete pods // created as an interface to allow testing. type PodControlInterface interface { @@ -485,7 +501,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod return fmt.Errorf("unable to create pods, no labels") } if newPod, err := r.KubeClient.Core().Pods(namespace).Create(pod); err != nil { - r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err) + r.Recorder.Eventf(object, api.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) return fmt.Errorf("unable to create pods: %v", err) } else { accessor, err := meta.Accessor(object) @@ -494,7 +510,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod return nil } glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) - r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulCreate", "Created pod: %v", newPod.Name) + r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) } return nil } @@ -505,11 +521,11 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime return fmt.Errorf("object does not have ObjectMeta, %v", err) } if err := r.KubeClient.Core().Pods(namespace).Delete(podID, nil); err != nil { - r.Recorder.Eventf(object, api.EventTypeWarning, "FailedDelete", "Error deleting: %v", err) + r.Recorder.Eventf(object, api.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) return fmt.Errorf("unable to delete pods: %v", err) } else { glog.V(4).Infof("Controller %v deleted pod %v", accessor.GetName(), podID) - r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulDelete", "Deleted pod: %v", podID) + r.Recorder.Eventf(object, api.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID) } return nil } diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index 508df1fda44..c50ebd0d620 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -14,6 +14,7 @@ go_library( name = "go_default_library", srcs = [ "deployment_controller.go", + "progress.go", "recreate.go", "rollback.go", "rolling.go", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 528858ed883..f51a53c99f6 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -350,6 +350,21 @@ func (dc *DeploymentController) syncDeployment(key string) error { return nil } + // Update deployment conditions with an Unknown condition when pausing/resuming + // a deployment. In this way, we can be sure that we won't timeout when a user + // resumes a Deployment with a set progressDeadlineSeconds. + if err = dc.checkPausedConditions(d); err != nil { + return err + } + + _, err = dc.hasFailed(d) + if err != nil { + return err + } + // TODO: Automatically rollback here if we failed above. Locate the last complete + // revision and populate the rollback spec with it. + // See https://github.com/kubernetes/kubernetes/issues/23211. + if d.Spec.Paused { return dc.sync(d) } diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 7272055e7d6..03f40b13e5e 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -152,14 +152,6 @@ func (f *fixture) expectCreateRSAction(rs *extensions.ReplicaSet) { f.actions = append(f.actions, core.NewCreateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs)) } -func (f *fixture) expectUpdateRSAction(rs *extensions.ReplicaSet) { - f.actions = append(f.actions, core.NewUpdateAction(unversioned.GroupVersionResource{Resource: "replicasets"}, rs.Namespace, rs)) -} - -func (f *fixture) expectListPodAction(namespace string, opt api.ListOptions) { - f.actions = append(f.actions, core.NewListAction(unversioned.GroupVersionResource{Resource: "pods"}, namespace, opt)) -} - func newFixture(t *testing.T) *fixture { f := &fixture{} f.t = t diff --git a/pkg/controller/deployment/progress.go b/pkg/controller/deployment/progress.go new file mode 100644 index 00000000000..0d915e62e6b --- /dev/null +++ b/pkg/controller/deployment/progress.go @@ -0,0 +1,188 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "reflect" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/extensions" + "k8s.io/kubernetes/pkg/controller/deployment/util" +) + +// hasFailed determines if a deployment has failed or not by estimating its progress. +// Progress for a deployment is considered when a new replica set is created or adopted, +// and when new pods scale up or old pods scale down. Progress is not estimated for paused +// deployments or when users don't really care about it ie. progressDeadlineSeconds is not +// specified. +func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) { + if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused { + return false, nil + } + + newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false) + if err != nil { + return false, err + } + + // There is a template change so we don't need to check for any progress right now. + if newRS == nil { + return false, nil + } + + // Look at the status of the deployment - if there is already a NewRSAvailableReason + // then we don't need to estimate any progress. This is needed in order to avoid + // estimating progress for scaling events after a rollout has finished. + cond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + if cond != nil && cond.Reason == util.NewRSAvailableReason { + return false, nil + } + + // TODO: Look for permanent failures here. + // See https://github.com/kubernetes/kubernetes/issues/18568 + + allRSs := append(oldRSs, newRS) + newStatus := dc.calculateStatus(allRSs, newRS, d) + + // If the deployment is complete or it is progressing, there is no need to check if it + // has timed out. + if util.DeploymentComplete(d, &newStatus) || util.DeploymentProgressing(d, &newStatus) { + return false, nil + } + + // Check if the deployment has timed out. + return util.DeploymentTimedOut(d, &newStatus), nil +} + +// syncRolloutStatus updates the status of a deployment during a rollout. There are +// cases this helper will run that cannot be prevented from the scaling detection, +// for example a resync of the deployment after it was scaled up. In those cases, +// we shouldn't try to estimate any progress. +func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { + newStatus := dc.calculateStatus(allRSs, newRS, d) + + // If there is no progressDeadlineSeconds set, remove any Progressing condition. + if d.Spec.ProgressDeadlineSeconds == nil { + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing) + } + + // If there is only one replica set that is active then that means we are not running + // a new rollout and this is a resync where we don't need to estimate any progress. + // In such a case, we should simply not estimate any progress for this deployment. + currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + isResyncEvent := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason + // Check for progress only if there is a progress deadline set and the latest rollout + // hasn't completed yet. + if d.Spec.ProgressDeadlineSeconds != nil && !isResyncEvent { + switch { + case util.DeploymentComplete(d, &newStatus): + // Update the deployment conditions with a message for the new replica set that + // was successfully deployed. If the condition already exists, we ignore this update. + msg := fmt.Sprintf("Replica set %q has successfully progressed.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.NewRSAvailableReason, msg) + util.SetDeploymentCondition(&newStatus, *condition) + + case util.DeploymentProgressing(d, &newStatus): + // If there is any progress made, continue by not checking if the deployment failed. This + // behavior emulates the rolling updater progressDeadline check. + msg := fmt.Sprintf("Replica set %q is progressing.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, util.ReplicaSetUpdatedReason, msg) + // Update the current Progressing condition or add a new one if it doesn't exist. + // If a Progressing condition with status=true already exists, we should update + // everything but lastTransitionTime. SetDeploymentCondition already does that but + // it also is not updating conditions when the reason of the new condition is the + // same as the old. The Progressing condition is a special case because we want to + // update with the same reason and change just lastUpdateTime iff we notice any + // progress. That's why we handle it here. + if currentCond != nil { + if currentCond.Status == api.ConditionTrue { + condition.LastTransitionTime = currentCond.LastTransitionTime + } + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentProgressing) + } + util.SetDeploymentCondition(&newStatus, *condition) + + case util.DeploymentTimedOut(d, &newStatus): + // Update the deployment with a timeout condition. If the condition already exists, + // we ignore this update. + msg := fmt.Sprintf("Replica set %q has timed out progressing.", newRS.Name) + condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionFalse, util.TimedOutReason, msg) + util.SetDeploymentCondition(&newStatus, *condition) + } + } + + // Move failure conditions of all replica sets in deployment conditions. For now, + // only one failure condition is returned from getReplicaFailures. + if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 { + // There will be only one ReplicaFailure condition on the replica set. + util.SetDeploymentCondition(&newStatus, replicaFailureCond[0]) + } else { + util.RemoveDeploymentCondition(&newStatus, extensions.DeploymentReplicaFailure) + } + + // Do not update if there is nothing new to add. + if reflect.DeepEqual(d.Status, newStatus) { + // TODO: If there is no sign of progress at this point then there is a high chance that the + // deployment is stuck. We should resync this deployment at some point[1] in the future[2] and + // check if it has timed out. We definitely need this, otherwise we depend on the controller + // resync interval. See https://github.com/kubernetes/kubernetes/issues/34458. + // + // [1] time.Now() + progressDeadlineSeconds - lastUpdateTime (of the Progressing condition). + // [2] Use dc.queue.AddAfter + return nil + } + + newDeployment := d + newDeployment.Status = newStatus + _, err := dc.client.Extensions().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment) + return err +} + +// getReplicaFailures will convert replica failure conditions from replica sets +// to deployment conditions. +func (dc *DeploymentController) getReplicaFailures(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) []extensions.DeploymentCondition { + var conditions []extensions.DeploymentCondition + if newRS != nil { + for _, c := range newRS.Status.Conditions { + if c.Type != extensions.ReplicaSetReplicaFailure { + continue + } + conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c)) + } + } + + // Return failures for the new replica set over failures from old replica sets. + if len(conditions) > 0 { + return conditions + } + + for i := range allRSs { + rs := allRSs[i] + if rs == nil { + continue + } + + for _, c := range rs.Status.Conditions { + if c.Type != extensions.ReplicaSetReplicaFailure { + continue + } + conditions = append(conditions, util.ReplicaSetToDeploymentCondition(c)) + } + } + return conditions +} diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index a5214c2d7dc..5255e930c85 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledDown { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // Wait for all old replica set to scale down to zero. @@ -67,13 +67,13 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledUp { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate" diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 3a3e18f5491..47d7d6f623f 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledUp { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } // Scale down, if we can. @@ -52,13 +52,13 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledDown { // Update DeploymentStatus - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) // Sync deployment status - return dc.syncDeploymentStatus(allRSs, newRS, deployment) + return dc.syncRolloutStatus(allRSs, newRS, deployment) } func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 73a8e51ba98..3880b5470ba 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -64,6 +64,40 @@ func (dc *DeploymentController) sync(deployment *extensions.Deployment) error { return dc.syncDeploymentStatus(allRSs, newRS, deployment) } +// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. +// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments +// that were paused for longer than progressDeadlineSeconds. +func (dc *DeploymentController) checkPausedConditions(d *extensions.Deployment) error { + if d.Spec.ProgressDeadlineSeconds == nil { + return nil + } + cond := deploymentutil.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing) + if cond != nil && cond.Reason == deploymentutil.TimedOutReason { + // If we have reported lack of progress, do not overwrite it with a paused condition. + return nil + } + pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason + + needsUpdate := false + if d.Spec.Paused && !pausedCondExists { + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused") + deploymentutil.SetDeploymentCondition(&d.Status, *condition) + needsUpdate = true + } else if !d.Spec.Paused && pausedCondExists { + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed") + deploymentutil.SetDeploymentCondition(&d.Status, *condition) + needsUpdate = true + } + + if !needsUpdate { + return nil + } + + var err error + d, err = dc.client.Extensions().Deployments(d.Namespace).UpdateStatus(d) + return err +} + // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated. // 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1), @@ -267,6 +301,16 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme } updateConditions := deploymentutil.SetDeploymentRevision(deployment, newRevision) + // If no other Progressing condition has been recorded and we need to estimate the progress + // of this deployment then it is likely that old users started caring about progress. In that + // case we need to take into account the first time we noticed their new replica set. + cond := deploymentutil.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing) + if deployment.Spec.ProgressDeadlineSeconds != nil && cond == nil { + msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name) + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, deploymentutil.FoundNewRSReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *condition) + updateConditions = true + } if updateConditions { if deployment, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment); err != nil { @@ -311,14 +355,36 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme // Set new replica set's annotation deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false) createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS) - if err != nil { - return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err) + switch { + // We may end up hitting this due to a slow cache or a fast resync of the deployment. + case errors.IsAlreadyExists(err): + return dc.rsLister.ReplicaSets(namespace).Get(newRS.Name) + case err != nil: + msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err) + if deployment.Spec.ProgressDeadlineSeconds != nil { + cond := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionFalse, deploymentutil.FailedRSCreateReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *cond) + // We don't really care about this error at this point, since we have a bigger issue to report. + // TODO: Update the rest of the Deployment status, too. We may need to do this every time we + // error out in all other places in the controller so that we let users know that their deployments + // have been noticed by the controller, albeit with errors. + // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account + // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 + _, _ = dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(deployment) + } + dc.eventRecorder.Eventf(deployment, api.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) + return nil, err } if newReplicasCount > 0 { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount) + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Created new replica set %q and scaled up to %d", createdRS.Name, newReplicasCount) } deploymentutil.SetDeploymentRevision(deployment, newRevision) + if deployment.Spec.ProgressDeadlineSeconds != nil { + msg := fmt.Sprintf("Created new replica set %q", createdRS.Name) + condition := deploymentutil.NewDeploymentCondition(extensions.DeploymentProgressing, api.ConditionTrue, deploymentutil.NewReplicaSetReason, msg) + deploymentutil.SetDeploymentCondition(&deployment.Status, *condition) + } _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment) return createdRS, err } @@ -442,7 +508,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newSc deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment)) rs, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs) if err == nil { - dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) + dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %q to %d", scalingOperation, rs.Name, newScale) } return rs, err } @@ -496,6 +562,14 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) + if availableReplicas >= deployment.Spec.Replicas-deploymentutil.MaxUnavailable(*deployment) { + minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, api.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") + deploymentutil.SetDeploymentCondition(&deployment.Status, *minAvailability) + } else { + noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, api.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.") + deploymentutil.SetDeploymentCondition(&deployment.Status, *noMinAvailability) + } + return extensions.DeploymentStatus{ // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. ObservedGeneration: deployment.Generation, @@ -503,6 +577,7 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), AvailableReplicas: availableReplicas, UnavailableReplicas: totalReplicas - availableReplicas, + Conditions: deployment.Status.Conditions, } } diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 3502e77d8f7..66f1c3f8d37 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -70,8 +70,111 @@ const ( // TODO: Delete this annotation when we gracefully handle overlapping selectors. // See https://github.com/kubernetes/kubernetes/issues/2210 SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at" + + // Reasons for deployment conditions + // + // Progressing: + // + // ReplicaSetUpdatedReason is added in a deployment when one of its replica sets is updated as part + // of the rollout process. + ReplicaSetUpdatedReason = "ReplicaSetUpdated" + // FailedRSCreateReason is added in a deployment when it cannot create a new replica set. + FailedRSCreateReason = "ReplicaSetCreateError" + // NewReplicaSetReason is added in a deployment when it creates a new replica set. + NewReplicaSetReason = "NewReplicaSetCreated" + // FoundNewRSReason is added in a deployment when it adopts an existing replica set. + FoundNewRSReason = "FoundNewReplicaSet" + // NewRSAvailableReason is added in a deployment when its newest replica set is made available + // ie. the number of new pods that have passed readiness checks and run for at least minReadySeconds + // is at least the minimum available pods that need to run for the deployment. + NewRSAvailableReason = "NewReplicaSetAvailable" + // TimedOutReason is added in a deployment when its newest replica set fails to show any progress + // within the given deadline (progressDeadlineSeconds). + TimedOutReason = "ProgressDeadlineExceeded" + // PausedDeployReason is added in a deployment when it is paused. Lack of progress shouldn't be + // estimated once a deployment is paused. + PausedDeployReason = "DeploymentPaused" + // ResumedDeployReason is added in a deployment when it is resumed. Useful for not failing accidentally + // deployments that paused amidst a rollout and are bounded by a deadline. + ResumedDeployReason = "DeploymentResumed" + // + // Available: + // + // MinimumReplicasAvailable is added in a deployment when it has its minimum replicas required available. + MinimumReplicasAvailable = "MinimumReplicasAvailable" + // MinimumReplicasUnavailable is added in a deployment when it doesn't have the minimum required replicas + // available. + MinimumReplicasUnavailable = "MinimumReplicasUnavailable" ) +// NewDeploymentCondition creates a new deployment condition. +func NewDeploymentCondition(condType extensions.DeploymentConditionType, status api.ConditionStatus, reason, message string) *extensions.DeploymentCondition { + return &extensions.DeploymentCondition{ + Type: condType, + Status: status, + LastUpdateTime: unversioned.Now(), + LastTransitionTime: unversioned.Now(), + Reason: reason, + Message: message, + } +} + +// GetDeploymentCondition returns the condition with the provided type. +func GetDeploymentCondition(status extensions.DeploymentStatus, condType extensions.DeploymentConditionType) *extensions.DeploymentCondition { + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == condType { + return &c + } + } + return nil +} + +// SetDeploymentCondition updates the deployment to include the provided condition. If the condition that +// we are about to add already exists and has the same status and reason then we are not going to update. +func SetDeploymentCondition(status *extensions.DeploymentStatus, condition extensions.DeploymentCondition) { + currentCond := GetDeploymentCondition(*status, condition.Type) + if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason { + return + } + // Do not update lastTransitionTime if the status of the condition doesn't change. + if currentCond != nil && currentCond.Status == condition.Status { + condition.LastTransitionTime = currentCond.LastTransitionTime + } + newConditions := filterOutCondition(status.Conditions, condition.Type) + status.Conditions = append(newConditions, condition) +} + +// RemoveDeploymentCondition removes the deployment condition with the provided type. +func RemoveDeploymentCondition(status *extensions.DeploymentStatus, condType extensions.DeploymentConditionType) { + status.Conditions = filterOutCondition(status.Conditions, condType) +} + +// filterOutCondition returns a new slice of deployment conditions without conditions with the provided type. +func filterOutCondition(conditions []extensions.DeploymentCondition, condType extensions.DeploymentConditionType) []extensions.DeploymentCondition { + var newConditions []extensions.DeploymentCondition + for _, c := range conditions { + if c.Type == condType { + continue + } + newConditions = append(newConditions, c) + } + return newConditions +} + +// ReplicaSetToDeploymentCondition converts a replica set condition into a deployment condition. +// Useful for promoting replica set failure conditions into deployments. +func ReplicaSetToDeploymentCondition(cond extensions.ReplicaSetCondition) extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentConditionType(cond.Type), + Status: cond.Status, + LastTransitionTime: cond.LastTransitionTime, + LastUpdateTime: cond.LastTransitionTime, + Reason: cond.Reason, + Message: cond.Message, + } +} + // SetDeploymentRevision updates the revision for a deployment. func SetDeploymentRevision(deployment *extensions.Deployment, revision string) bool { updated := false @@ -696,6 +799,56 @@ func IsRollingUpdate(deployment *extensions.Deployment) bool { return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType } +// DeploymentComplete considers a deployment to be complete once its desired replicas equals its +// updatedReplicas and it doesn't violate minimum availability. +func DeploymentComplete(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + return newStatus.UpdatedReplicas == deployment.Spec.Replicas && + newStatus.AvailableReplicas >= deployment.Spec.Replicas-MaxUnavailable(*deployment) +} + +// DeploymentProgressing reports progress for a deployment. Progress is estimated by comparing the +// current with the new status of the deployment that the controller is observing. The following +// algorithm is already used in the kubectl rolling updater to report lack of progress. +func DeploymentProgressing(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + oldStatus := deployment.Status + + // Old replicas that need to be scaled down + oldStatusOldReplicas := oldStatus.Replicas - oldStatus.UpdatedReplicas + newStatusOldReplicas := newStatus.Replicas - newStatus.UpdatedReplicas + + return (newStatus.UpdatedReplicas > oldStatus.UpdatedReplicas) || (newStatusOldReplicas < oldStatusOldReplicas) +} + +// used for unit testing +var nowFn = func() time.Time { return time.Now() } + +// DeploymentTimedOut considers a deployment to have timed out once its condition that reports progress +// is older than progressDeadlineSeconds or a Progressing condition with a TimedOutReason reason already +// exists. +func DeploymentTimedOut(deployment *extensions.Deployment, newStatus *extensions.DeploymentStatus) bool { + if deployment.Spec.ProgressDeadlineSeconds == nil { + return false + } + + // Look for the Progressing condition. If it doesn't exist, we have no base to estimate progress. + // If it's already set with a TimedOutReason reason, we have already timed out, no need to check + // again. + condition := GetDeploymentCondition(*newStatus, extensions.DeploymentProgressing) + if condition == nil { + return false + } + if condition.Reason == TimedOutReason { + return true + } + + // Look at the difference in seconds between now and the last time we reported any + // progress or tried to create a replica set, or resumed a paused deployment and + // compare against progressDeadlineSeconds. + from := condition.LastTransitionTime + delta := time.Duration(*deployment.Spec.ProgressDeadlineSeconds) * time.Second + return from.Add(delta).Before(nowFn()) +} + // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have. // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it. // 1) The new RS is saturated: newRS's replicas == deployment's replicas diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 2d85bea2847..96fb29431a4 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -688,7 +688,6 @@ func TestResolveFenceposts(t *testing.T) { } func TestNewRSNewReplicas(t *testing.T) { - tests := []struct { test string strategyType extensions.DeploymentStrategyType @@ -703,12 +702,12 @@ func TestNewRSNewReplicas(t *testing.T) { 1, 5, 1, 5, }, { - "scale up - to depDeplicas", + "scale up - to depReplicas", extensions.RollingUpdateDeploymentStrategyType, 6, 2, 10, 6, }, { - "recreate - to depDeplicas", + "recreate - to depReplicas", extensions.RecreateDeploymentStrategyType, 3, 1, 1, 3, }, @@ -735,3 +734,373 @@ func TestNewRSNewReplicas(t *testing.T) { } } } + +var ( + condProgressing = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentProgressing, + Status: api.ConditionFalse, + Reason: "ForSomeReason", + } + } + + condProgressing2 = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentProgressing, + Status: api.ConditionTrue, + Reason: "BecauseItIs", + } + } + + condAvailable = func() extensions.DeploymentCondition { + return extensions.DeploymentCondition{ + Type: extensions.DeploymentAvailable, + Status: api.ConditionTrue, + Reason: "AwesomeController", + } + } + + status = func() *extensions.DeploymentStatus { + return &extensions.DeploymentStatus{ + Conditions: []extensions.DeploymentCondition{condProgressing(), condAvailable()}, + } + } +) + +func TestGetCondition(t *testing.T) { + exampleStatus := status() + + tests := []struct { + name string + + status extensions.DeploymentStatus + condType extensions.DeploymentConditionType + condStatus api.ConditionStatus + condReason string + + expected bool + }{ + { + name: "condition exists", + + status: *exampleStatus, + condType: extensions.DeploymentAvailable, + + expected: true, + }, + { + name: "condition does not exist", + + status: *exampleStatus, + condType: extensions.DeploymentReplicaFailure, + + expected: false, + }, + } + + for _, test := range tests { + cond := GetDeploymentCondition(test.status, test.condType) + exists := cond != nil + if exists != test.expected { + t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists) + } + } +} + +func TestSetCondition(t *testing.T) { + tests := []struct { + name string + + status *extensions.DeploymentStatus + cond extensions.DeploymentCondition + + expectedStatus *extensions.DeploymentStatus + }{ + { + name: "set for the first time", + + status: &extensions.DeploymentStatus{}, + cond: condAvailable(), + + expectedStatus: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condAvailable()}}, + }, + { + name: "simple set", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + cond: condAvailable(), + + expectedStatus: status(), + }, + { + name: "overwrite", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + cond: condProgressing2(), + + expectedStatus: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing2()}}, + }, + } + + for _, test := range tests { + SetDeploymentCondition(test.status, test.cond) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +} + +func TestRemoveCondition(t *testing.T) { + tests := []struct { + name string + + status *extensions.DeploymentStatus + condType extensions.DeploymentConditionType + + expectedStatus *extensions.DeploymentStatus + }{ + { + name: "remove from empty status", + + status: &extensions.DeploymentStatus{}, + condType: extensions.DeploymentProgressing, + + expectedStatus: &extensions.DeploymentStatus{}, + }, + { + name: "simple remove", + + status: &extensions.DeploymentStatus{Conditions: []extensions.DeploymentCondition{condProgressing()}}, + condType: extensions.DeploymentProgressing, + + expectedStatus: &extensions.DeploymentStatus{}, + }, + { + name: "doesn't remove anything", + + status: status(), + condType: extensions.DeploymentReplicaFailure, + + expectedStatus: status(), + }, + } + + for _, test := range tests { + RemoveDeploymentCondition(test.status, test.condType) + if !reflect.DeepEqual(test.status, test.expectedStatus) { + t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) + } + } +} + +func TestDeploymentComplete(t *testing.T) { + deployment := func(desired, current, updated, available, maxUnavailable int32) *extensions.Deployment { + return &extensions.Deployment{ + Spec: extensions.DeploymentSpec{ + Replicas: desired, + Strategy: extensions.DeploymentStrategy{ + RollingUpdate: &extensions.RollingUpdateDeployment{ + MaxUnavailable: intstr.FromInt(int(maxUnavailable)), + }, + Type: extensions.RollingUpdateDeploymentStrategyType, + }, + }, + Status: extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + AvailableReplicas: available, + }, + } + } + + tests := []struct { + name string + + d *extensions.Deployment + + expected bool + }{ + { + name: "complete", + + d: deployment(5, 5, 5, 4, 1), + expected: true, + }, + { + name: "not complete", + + d: deployment(5, 5, 5, 3, 1), + expected: false, + }, + { + name: "complete #2", + + d: deployment(5, 5, 5, 5, 0), + expected: true, + }, + { + name: "not complete #2", + + d: deployment(5, 5, 4, 5, 0), + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + if got, exp := DeploymentComplete(test.d, &test.d.Status), test.expected; got != exp { + t.Errorf("expected complete: %t, got: %t", exp, got) + } + } +} + +func TestDeploymentProgressing(t *testing.T) { + deployment := func(current, updated int32) *extensions.Deployment { + return &extensions.Deployment{ + Status: extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + }, + } + } + newStatus := func(current, updated int32) extensions.DeploymentStatus { + return extensions.DeploymentStatus{ + Replicas: current, + UpdatedReplicas: updated, + } + } + + tests := []struct { + name string + + d *extensions.Deployment + newStatus extensions.DeploymentStatus + + expected bool + }{ + { + name: "progressing", + + d: deployment(10, 4), + newStatus: newStatus(10, 6), + + expected: true, + }, + { + name: "not progressing", + + d: deployment(10, 4), + newStatus: newStatus(10, 4), + + expected: false, + }, + { + name: "progressing #2", + + d: deployment(10, 4), + newStatus: newStatus(8, 4), + + expected: true, + }, + { + name: "not progressing #2", + + d: deployment(10, 7), + newStatus: newStatus(10, 6), + + expected: false, + }, + { + name: "progressing #3", + + d: deployment(10, 4), + newStatus: newStatus(8, 8), + + expected: true, + }, + { + name: "not progressing #2", + + d: deployment(10, 7), + newStatus: newStatus(10, 7), + + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + if got, exp := DeploymentProgressing(test.d, &test.newStatus), test.expected; got != exp { + t.Errorf("expected progressing: %t, got: %t", exp, got) + } + } +} + +func TestDeploymentTimedOut(t *testing.T) { + var ( + null *int32 + ten = int32(10) + ) + + timeFn := func(min, sec int) time.Time { + return time.Date(2016, 1, 1, 0, min, sec, 0, time.UTC) + } + deployment := func(condType extensions.DeploymentConditionType, status api.ConditionStatus, pds *int32, from time.Time) extensions.Deployment { + return extensions.Deployment{ + Spec: extensions.DeploymentSpec{ + ProgressDeadlineSeconds: pds, + }, + Status: extensions.DeploymentStatus{ + Conditions: []extensions.DeploymentCondition{ + { + Type: condType, + Status: status, + LastTransitionTime: unversioned.Time{Time: from}, + }, + }, + }, + } + } + + tests := []struct { + name string + + d extensions.Deployment + nowFn func() time.Time + + expected bool + }{ + { + name: "no progressDeadlineSeconds specified - no timeout", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, null, timeFn(1, 9)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: false, + }, + { + name: "progressDeadlineSeconds: 10s, now - started => 00:01:20 - 00:01:09 => 11s", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, &ten, timeFn(1, 9)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: true, + }, + { + name: "progressDeadlineSeconds: 10s, now - started => 00:01:20 - 00:01:11 => 9s", + + d: deployment(extensions.DeploymentProgressing, api.ConditionTrue, &ten, timeFn(1, 11)), + nowFn: func() time.Time { return timeFn(1, 20) }, + expected: false, + }, + } + + for _, test := range tests { + t.Log(test.name) + + nowFn = test.nowFn + if got, exp := DeploymentTimedOut(&test.d, &test.d.Status), test.expected; got != exp { + t.Errorf("expected timeout: %t, got: %t", exp, got) + } + } +}