Merge pull request #42175 from enisoc/controller-ref-dep

Automatic merge from submit-queue

Deployment: Fully Respect ControllerRef

**What this PR does / why we need it**:

This is part of the completion of the [ControllerRef](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md) proposal. It brings Deployment into full compliance with ControllerRef. See the individual commit messages for details.

**Which issue this PR fixes**:

This ensures that Deployment does not fight with other controllers over control of Pods and ReplicaSets.

Ref: https://github.com/kubernetes/kubernetes/issues/24433

**Special notes for your reviewer**:

**Release note**:

```release-note
Deployment now fully respects ControllerRef to avoid fighting over Pods and ReplicaSets. At the time of upgrade, **you must not have Deployments with selectors that overlap**, or else [ownership of ReplicaSets may change](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#upgrading).
```
cc @erictune @kubernetes/sig-apps-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-03-07 20:44:36 -08:00 committed by GitHub
commit d306acca86
24 changed files with 878 additions and 901 deletions

View File

@ -2382,10 +2382,6 @@ run_deployment_tests() {
# Check that trying to watch the status of a superseded revision returns an error
! kubectl rollout status deployment/nginx --revision=3
cat hack/testdata/deployment-revision1.yaml | $SED "s/name: nginx$/name: nginx2/" | kubectl create -f - "${kube_flags[@]}"
# Newest deployment should be marked as overlapping
kubectl get deployment nginx2 -o yaml "${kube_flags[@]}" | grep "deployment.kubernetes.io/error-selector-overlapping-with"
# Oldest deployment should not be marked as overlapping
! kubectl get deployment nginx -o yaml "${kube_flags[@]}" | grep "deployment.kubernetes.io/error-selector-overlapping-with"
# Deletion of both deployments should not be blocked
kubectl delete deployment nginx2 "${kube_flags[@]}"
# Clean up

View File

@ -34,7 +34,10 @@ type DeploymentListerExpansion interface {
// DeploymentNamespaeLister.
type DeploymentNamespaceListerExpansion interface{}
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
// GetDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet. Only the one specified in the ReplicaSet's ControllerRef
// will actually manage it.
// Returns an error only if no matching Deployments are found.
func (s *deploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) ([]*extensions.Deployment, error) {
if len(rs.Labels) == 0 {
return nil, fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)

View File

@ -34,7 +34,10 @@ type DeploymentListerExpansion interface {
// DeploymentNamespaeLister.
type DeploymentNamespaceListerExpansion interface{}
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
// GetDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet. Only the one specified in the ReplicaSet's ControllerRef
// will actually manage it.
// Returns an error only if no matching Deployments are found.
func (s *deploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) ([]*extensions.Deployment, error) {
if len(rs.Labels) == 0 {
return nil, fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)

View File

@ -314,7 +314,7 @@ func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(replicaSet *extensions.
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`,
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.controller.GetName(), m.controller.GetUID(), replicaSet.UID)
return m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(addControllerPatch))

View File

@ -33,11 +33,10 @@ go_library(
"//pkg/util/labels:go_default_library",
"//pkg/util/metrics:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -23,16 +23,13 @@ package deployment
import (
"fmt"
"reflect"
"sort"
"time"
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -58,9 +55,8 @@ const (
maxRetries = 5
)
func getDeploymentKind() schema.GroupVersionKind {
return extensions.SchemeGroupVersion.WithKind("Deployment")
}
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = extensions.SchemeGroupVersion.WithKind("Deployment")
// DeploymentController is responsible for synchronizing Deployment objects stored
// in the system with actual running replica sets and pods.
@ -174,28 +170,6 @@ func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
curD := cur.(*extensions.Deployment)
glog.V(4).Infof("Updating deployment %s", oldD.Name)
dc.enqueueDeployment(curD)
// If the selector of the current deployment just changed, we need to requeue any old
// overlapping deployments. If the new selector steps on another deployment, the current
// deployment will get denied during the resync loop.
if !reflect.DeepEqual(curD.Spec.Selector, oldD.Spec.Selector) {
deployments, err := dc.dLister.Deployments(curD.Namespace).List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", curD.Namespace, err))
return
}
// Trigger cleanup of any old overlapping deployments; we don't care about any error
// returned here.
for i := range deployments {
otherD := deployments[i]
oldOverlaps, oldErr := util.OverlapsWith(oldD, otherD)
curOverlaps, curErr := util.OverlapsWith(curD, otherD)
// Enqueue otherD so it gets cleaned up
if oldErr == nil && curErr == nil && oldOverlaps && !curOverlaps {
dc.enqueueDeployment(otherD)
}
}
}
}
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
@ -214,38 +188,47 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) {
}
glog.V(4).Infof("Deleting deployment %s", d.Name)
dc.enqueueDeployment(d)
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err))
return
}
// Trigger cleanup of any old overlapping deployments; we don't care about any error
// returned here.
for i := range deployments {
otherD := deployments[i]
overlaps, err := util.OverlapsWith(d, otherD)
// Enqueue otherD so it gets cleaned up
if err == nil && overlaps {
dc.enqueueDeployment(otherD)
}
}
}
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*extensions.ReplicaSet)
glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
if d := dc.getDeploymentForReplicaSet(rs); d != nil {
if rs.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dc.deleteReplicaSet(rs)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(rs); controllerRef != nil {
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
dc.enqueueDeployment(d)
return
}
// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
// them to see if anyone wants to adopt it.
ds := dc.getDeploymentsForReplicaSet(rs)
if len(ds) == 0 {
return
}
glog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
// getDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet.
func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *extensions.ReplicaSet) []*extensions.Deployment {
deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name)
return nil
}
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
@ -253,11 +236,12 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return the older one
if len(deployments) > 1 {
sort.Sort(util.BySelectorLastUpdateTime(deployments))
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
glog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return deployments[0]
return deployments
}
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
@ -272,16 +256,39 @@ func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
// Two different versions of the same replica set will always have different RVs.
return
}
// TODO: Write a unittest for this case
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
if d := dc.getDeploymentForReplicaSet(curRS); d != nil {
dc.enqueueDeployment(d)
curControllerRef := controller.GetControllerOf(curRS)
oldControllerRef := controller.GetControllerOf(oldRS)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
dc.enqueueDeployment(d)
}
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
if !apiequality.Semantic.DeepEqual(oldRS, curRS) {
if oldD := dc.getDeploymentForReplicaSet(oldRS); oldD != nil {
dc.enqueueDeployment(oldD)
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
if d == nil {
return
}
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
dc.enqueueDeployment(d)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
if labelChanged || controllerRefChanged {
ds := dc.getDeploymentsForReplicaSet(curRS)
if len(ds) == 0 {
return
}
glog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
}
@ -308,10 +315,18 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
return
}
}
glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
if d := dc.getDeploymentForReplicaSet(rs); d != nil {
dc.enqueueDeployment(d)
controllerRef := controller.GetControllerOf(rs)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
dc.enqueueDeployment(d)
}
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
@ -336,8 +351,20 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
}
glog.V(4).Infof("Pod %s deleted.", pod.Name)
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType {
podList, err := dc.listPods(d)
if err == nil && len(podList.Items) == 0 {
// Sync if this Deployment now has no more Pods.
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return
}
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return
}
numPods := 0
for _, podList := range podMap {
numPods += len(podList.Items)
}
if numPods == 0 {
dc.enqueueDeployment(d)
}
}
@ -379,35 +406,48 @@ func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *extensions.Dep
// Find the owning replica set
var rs *extensions.ReplicaSet
var err error
// Look at the owner reference
controllerRef := controller.GetControllerOf(&pod.ObjectMeta)
if controllerRef != nil {
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller owns this Pod.
return nil
}
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
// Not a pod owned by a replica set.
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
return nil
}
} else {
// Fallback to listing replica sets.
rss, err := dc.rsLister.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("Cannot list replica sets for pod %q: %v", pod.Name, err)
return nil
}
// TODO: Handle multiple replica sets gracefully
// For now we return the oldest replica set.
if len(rss) > 1 {
utilruntime.HandleError(fmt.Errorf("more than one ReplicaSet is selecting pod %q with labels: %+v", pod.Name, pod.Labels))
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
}
rs = rss[0]
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil || rs.UID != controllerRef.UID {
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
return nil
}
return dc.getDeploymentForReplicaSet(rs)
// Now find the Deployment that owns that ReplicaSet.
controllerRef = controller.GetControllerOf(rs)
if controllerRef == nil {
return nil
}
return dc.resolveControllerRef(rs.Namespace, controllerRef)
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the corrrect Kind.
func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *extensions.Deployment {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if d.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return d
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -447,23 +487,58 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
dc.queue.Forget(key)
}
// claimReplicaSets uses NewReplicaSetControllerRefManager to classify ReplicaSets
// and adopts them if their labels match the Deployment but are missing the reference.
// It also removes the controllerRef for ReplicaSets, whose labels no longer matches
// the deployment.
func (dc *DeploymentController) claimReplicaSets(deployment *extensions.Deployment) error {
rsList, err := dc.rsLister.ReplicaSets(deployment.Namespace).List(labels.Everything())
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile
// ControllerRef by adopting and orphaning.
// It returns the list of ReplicaSets that this Deployment should manage.
func (dc *DeploymentController) getReplicaSetsForDeployment(d *extensions.Deployment) ([]*extensions.ReplicaSet, error) {
// List all ReplicaSets to find those we own but that no longer match our
// selector. They will be orphaned by ClaimReplicaSets().
rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
if err != nil {
return err
return nil, err
}
deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind)
return cm.ClaimReplicaSets(rsList)
}
deploymentSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
// getPodMapForDeployment returns the Pods managed by a Deployment.
//
// It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
// according to the Pod's ControllerRef.
func (dc *DeploymentController) getPodMapForDeployment(d *extensions.Deployment, rsList []*extensions.ReplicaSet) (map[types.UID]*v1.PodList, error) {
// Get all Pods that potentially belong to this Deployment.
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", deployment.Namespace, deployment.Name, err)
return nil, err
}
cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, deployment, deploymentSelector, getDeploymentKind())
_, err = cm.ClaimReplicaSets(rsList)
return err
pods, err := dc.podLister.Pods(d.Namespace).List(selector)
if err != nil {
return nil, err
}
// Group Pods by their controller (if it's in rsList).
podMap := make(map[types.UID]*v1.PodList, len(rsList))
for _, rs := range rsList {
podMap[rs.UID] = &v1.PodList{}
}
for _, pod := range pods {
// Ignore inactive Pods since that's what ReplicaSet does.
if !controller.IsPodActive(pod) {
continue
}
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
continue
}
// Only append if we care about this UID.
if podList, ok := podMap[controllerRef.UID]; ok {
podList.Items = append(podList.Items, *pod)
}
}
return podMap, nil
}
// syncDeployment will sync the deployment with the given key.
@ -506,25 +581,20 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return nil
}
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
return err
}
// Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets.
overlaps, err := dc.handleOverlap(d, deployments)
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
if overlaps {
// Emit an event and return a nil error for overlapping deployments so we won't resync them again.
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectorOverlap", err.Error())
return nil
}
// For any other failure, we should retry the deployment.
return err
}
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d)
return dc.syncStatusOnly(d, rsList, podMap)
}
// Why run the cleanup policy only when there is no rollback request?
@ -536,7 +606,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
// (and chances are higher that they will work again as opposed to others that didn't) for candidates to
// automatically roll back to (#23211) and the cleanup policy should help.
if d.Spec.RollbackTo == nil {
_, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
_, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
@ -546,11 +616,6 @@ func (dc *DeploymentController) syncDeployment(key string) error {
dc.cleanupDeployment(oldRSs, d)
}
err = dc.claimReplicaSets(d)
if err != nil {
return err
}
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
@ -558,7 +623,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return err
}
_, err = dc.hasFailed(d)
_, err = dc.hasFailed(d, rsList, podMap)
if err != nil {
return err
}
@ -567,152 +632,29 @@ func (dc *DeploymentController) syncDeployment(key string) error {
// See https://github.com/kubernetes/kubernetes/issues/23211.
if d.Spec.Paused {
return dc.sync(d)
return dc.sync(d, rsList, podMap)
}
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if d.Spec.RollbackTo != nil {
return dc.rollback(d)
return dc.rollback(d, rsList, podMap)
}
scalingEvent, err := dc.isScalingEvent(d)
scalingEvent, err := dc.isScalingEvent(d, rsList, podMap)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d)
return dc.sync(d, rsList, podMap)
}
switch d.Spec.Strategy.Type {
case extensions.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d)
return dc.rolloutRecreate(d, rsList, podMap)
case extensions.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d)
return dc.rolloutRolling(d, rsList, podMap)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
// handleOverlap will avoid syncing the newer overlapping ones (only sync the oldest one). New/old is
// determined by when the deployment's selector is last updated.
func (dc *DeploymentController) handleOverlap(d *extensions.Deployment, deployments []*extensions.Deployment) (bool, error) {
overlapping := false
var errs []error
for i := range deployments {
otherD := deployments[i]
if d.Name == otherD.Name {
continue
}
// Error is already checked during validation
foundOverlaps, _ := util.OverlapsWith(d, otherD)
// If the otherD deployment overlaps with the current we need to identify which one
// holds the set longer and mark the other as overlapping. Requeue the overlapping
// deployments if this one has been marked deleted, we only update its status as long
// as it is not actually deleted.
if foundOverlaps && d.DeletionTimestamp == nil {
overlapping = true
// Look at the overlapping annotation in both deployments. If one of them has it and points
// to the other one then we don't need to compare their timestamps.
otherOverlapsWith := otherD.Annotations[util.OverlapAnnotation]
currentOverlapsWith := d.Annotations[util.OverlapAnnotation]
// The other deployment is already marked as overlapping with the current one.
if otherOverlapsWith == d.Name {
var err error
if d, err = dc.clearDeploymentOverlap(d, otherD.Name); err != nil {
errs = append(errs, err)
}
continue
}
otherCopy, err := util.DeploymentDeepCopy(otherD)
if err != nil {
return false, err
}
// Skip syncing this one if older overlapping one is found.
if currentOverlapsWith == otherCopy.Name || util.SelectorUpdatedBefore(otherCopy, d) {
if _, err = dc.markDeploymentOverlap(d, otherCopy.Name); err != nil {
return false, err
}
if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil {
return false, err
}
return true, fmt.Errorf("deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, otherCopy.Namespace, otherCopy.Name)
}
// TODO: We need to support annotations in deployments that overlap with multiple other
// deployments.
if _, err = dc.markDeploymentOverlap(otherCopy, d.Name); err != nil {
errs = append(errs, err)
}
// This is going to get some deployments into update hotlooping if we remove the overlapping
// annotation unconditionally.
//
// Scenario:
// --> Deployment foo with label selector A=A is created.
// --> Deployment bar with label selector A=A,B=B is created. Marked as overlapping since it
// overlaps with foo.
// --> Deployment baz with label selector B=B is created. Marked as overlapping, since it
// overlaps with bar, bar overlapping annotation is cleaned up. Next sync loop marks bar
// as overlapping and it gets in an update hotloop.
if d, err = dc.clearDeploymentOverlap(d, otherCopy.Name); err != nil {
errs = append(errs, err)
}
continue
}
// If the otherD deployment does not overlap with the current deployment *anymore*
// we need to cleanup otherD from the overlapping annotation so it can be synced by
// the deployment controller.
dName, hasOverlappingAnnotation := otherD.Annotations[util.OverlapAnnotation]
if hasOverlappingAnnotation && dName == d.Name {
otherCopy, err := util.DeploymentDeepCopy(otherD)
if err != nil {
return false, err
}
if _, err = dc.clearDeploymentOverlap(otherCopy, d.Name); err != nil {
errs = append(errs, err)
}
}
}
if !overlapping {
var err error
if d, err = dc.clearDeploymentOverlap(d, ""); err != nil {
errs = append(errs, err)
}
}
return false, utilerrors.NewAggregate(errs)
}
func (dc *DeploymentController) markDeploymentOverlap(deployment *extensions.Deployment, withDeployment string) (*extensions.Deployment, error) {
if deployment.Annotations[util.OverlapAnnotation] == withDeployment && deployment.Status.ObservedGeneration >= deployment.Generation {
return deployment, nil
}
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
// Update observedGeneration for overlapping deployments so that their deletion won't be blocked.
deployment.Status.ObservedGeneration = deployment.Generation
deployment.Annotations[util.OverlapAnnotation] = withDeployment
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
}
func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.Deployment, otherName string) (*extensions.Deployment, error) {
overlapsWith := deployment.Annotations[util.OverlapAnnotation]
if len(overlapsWith) == 0 {
return deployment, nil
}
// This is not the deployment found in the annotation - do not remove the annotation.
if len(otherName) > 0 && otherName != overlapsWith {
return deployment, nil
}
delete(deployment.Annotations, util.OverlapAnnotation)
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
}

View File

@ -17,8 +17,8 @@ limitations under the License.
package deployment
import (
"strconv"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -108,13 +108,13 @@ func newDeployment(name string, replicas int, revisionHistoryLimit *int32, maxSu
}
func newReplicaSet(d *extensions.Deployment, name string, replicas int) *extensions.ReplicaSet {
control := true
return &extensions.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: uuid.NewUUID(),
Namespace: metav1.NamespaceDefault,
Labels: d.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{{APIVersion: getDeploymentKind().GroupVersion().Version, Kind: getDeploymentKind().Kind, Name: d.Name, UID: d.UID, Controller: &control}},
OwnerReferences: []metav1.OwnerReference{*newControllerRef(d)},
},
Spec: extensions.ReplicaSetSpec{
Selector: d.Spec.Selector,
@ -311,195 +311,6 @@ func TestReentrantRollback(t *testing.T) {
f.run(getKey(d, t))
}
// TestOverlappingDeployment ensures that an overlapping deployment will not be synced by
// the controller.
func TestOverlappingDeployment(t *testing.T) {
f := newFixture(t)
now := metav1.Now()
later := metav1.Time{Time: now.Add(time.Minute)}
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
foo.CreationTimestamp = now
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
bar.CreationTimestamp = later
f.dLister = append(f.dLister, foo, bar)
f.objects = append(f.objects, foo, bar)
f.expectUpdateDeploymentStatusAction(bar)
f.run(getKey(bar, t))
for _, a := range filterInformerActions(f.client.Actions()) {
action, ok := a.(core.UpdateAction)
if !ok {
continue
}
d, ok := action.GetObject().(*extensions.Deployment)
if !ok {
continue
}
if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" {
t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations)
}
}
}
// TestSyncOverlappedDeployment ensures that from two overlapping deployments, the older
// one will be synced and the newer will be marked as overlapping. Note that in reality it's
// not always the older deployment that is the one that works vs the rest but the one which
// has the selector unchanged for longer time.
func TestSyncOverlappedDeployment(t *testing.T) {
f := newFixture(t)
now := metav1.Now()
later := metav1.Time{Time: now.Add(time.Minute)}
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
foo.CreationTimestamp = now
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
bar.CreationTimestamp = later
f.dLister = append(f.dLister, foo, bar)
f.objects = append(f.objects, foo, bar)
f.expectUpdateDeploymentStatusAction(bar)
f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1))
f.expectUpdateDeploymentStatusAction(foo)
f.expectUpdateDeploymentStatusAction(foo)
f.run(getKey(foo, t))
for _, a := range filterInformerActions(f.client.Actions()) {
action, ok := a.(core.UpdateAction)
if !ok {
continue
}
d, ok := action.GetObject().(*extensions.Deployment)
if !ok {
continue
}
if d.Name == "bar" && d.Annotations[util.OverlapAnnotation] != "foo" {
t.Errorf("annotations weren't updated for the overlapping deployment: %v", d.Annotations)
}
}
}
// TestSelectorUpdate ensures that from two overlapping deployments, the one that is working won't
// be marked as overlapping if its selector is updated but still overlaps with the other one.
func TestSelectorUpdate(t *testing.T) {
f := newFixture(t)
now := metav1.Now()
later := metav1.Time{Time: now.Add(time.Minute)}
selectorUpdated := metav1.Time{Time: later.Add(time.Minute)}
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
foo.CreationTimestamp = now
foo.Annotations = map[string]string{util.SelectorUpdateAnnotation: selectorUpdated.Format(time.RFC3339)}
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar", "app": "baz"})
bar.CreationTimestamp = later
bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"}
f.dLister = append(f.dLister, foo, bar)
f.objects = append(f.objects, foo, bar)
f.expectCreateRSAction(newReplicaSet(foo, "foo-rs", 1))
f.expectUpdateDeploymentStatusAction(foo)
f.expectUpdateDeploymentStatusAction(foo)
f.run(getKey(foo, t))
for _, a := range filterInformerActions(f.client.Actions()) {
action, ok := a.(core.UpdateAction)
if !ok {
continue
}
d, ok := action.GetObject().(*extensions.Deployment)
if !ok {
continue
}
if d.Name == "foo" && len(d.Annotations[util.OverlapAnnotation]) > 0 {
t.Errorf("deployment %q should not have the overlapping annotation", d.Name)
}
if d.Name == "bar" && len(d.Annotations[util.OverlapAnnotation]) == 0 {
t.Errorf("deployment %q should have the overlapping annotation", d.Name)
}
}
}
// TestDeletedDeploymentShouldCleanupOverlaps ensures that the deletion of a deployment
// will cleanup any deployments that overlap with it.
func TestDeletedDeploymentShouldCleanupOverlaps(t *testing.T) {
f := newFixture(t)
now := metav1.Now()
earlier := metav1.Time{Time: now.Add(-time.Minute)}
later := metav1.Time{Time: now.Add(time.Minute)}
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
foo.CreationTimestamp = earlier
foo.DeletionTimestamp = &now
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"})
bar.CreationTimestamp = later
bar.Annotations = map[string]string{util.OverlapAnnotation: "foo"}
f.dLister = append(f.dLister, foo, bar)
f.objects = append(f.objects, foo, bar)
f.expectUpdateDeploymentStatusAction(bar)
f.expectUpdateDeploymentStatusAction(foo)
f.run(getKey(foo, t))
for _, a := range filterInformerActions(f.client.Actions()) {
action, ok := a.(core.UpdateAction)
if !ok {
continue
}
d := action.GetObject().(*extensions.Deployment)
if d.Name != "bar" {
continue
}
if len(d.Annotations[util.OverlapAnnotation]) > 0 {
t.Errorf("annotations weren't cleaned up for the overlapping deployment: %v", d.Annotations)
}
}
}
// TestDeletedDeploymentShouldNotCleanupOtherOverlaps ensures that the deletion of
// a deployment will not cleanup deployments that overlap with another deployment.
func TestDeletedDeploymentShouldNotCleanupOtherOverlaps(t *testing.T) {
f := newFixture(t)
now := metav1.Now()
earlier := metav1.Time{Time: now.Add(-time.Minute)}
later := metav1.Time{Time: now.Add(time.Minute)}
foo := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
foo.CreationTimestamp = earlier
foo.DeletionTimestamp = &now
bar := newDeployment("bar", 1, nil, nil, nil, map[string]string{"bla": "bla"})
bar.CreationTimestamp = later
// Notice this deployment is overlapping with another deployment
bar.Annotations = map[string]string{util.OverlapAnnotation: "baz"}
f.dLister = append(f.dLister, foo, bar)
f.objects = append(f.objects, foo, bar)
f.expectUpdateDeploymentStatusAction(foo)
f.run(getKey(foo, t))
for _, a := range filterInformerActions(f.client.Actions()) {
action, ok := a.(core.UpdateAction)
if !ok {
continue
}
d := action.GetObject().(*extensions.Deployment)
if d.Name != "bar" {
continue
}
if len(d.Annotations[util.OverlapAnnotation]) == 0 {
t.Errorf("overlapping annotation should not be cleaned up for bar: %v", d.Annotations)
}
}
}
// TestPodDeletionEnqueuesRecreateDeployment ensures that the deletion of a pod
// will requeue a Recreate deployment iff there is no other pod returned from the
// client.
@ -562,6 +373,429 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) {
}
}
func TestGetReplicaSetsForDeployment(t *testing.T) {
f := newFixture(t)
// Two Deployments with same labels.
d1 := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("bar", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// Two ReplicaSets that match labels for both Deployments,
// but have ControllerRefs to make ownership explicit.
rs1 := newReplicaSet(d1, "rs1", 1)
rs2 := newReplicaSet(d2, "rs2", 1)
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs1, rs2)
f.objects = append(f.objects, d1, d2, rs1, rs2)
// Start the fixture.
c, informers := f.newController()
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
rsList, err := c.getReplicaSetsForDeployment(d1)
if err != nil {
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
}
rsNames := []string{}
for _, rs := range rsList {
rsNames = append(rsNames, rs.Name)
}
if len(rsNames) != 1 || rsNames[0] != rs1.Name {
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs1.Name)
}
rsList, err = c.getReplicaSetsForDeployment(d2)
if err != nil {
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
}
rsNames = []string{}
for _, rs := range rsList {
rsNames = append(rsNames, rs.Name)
}
if len(rsNames) != 1 || rsNames[0] != rs2.Name {
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rs2.Name)
}
}
func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) {
f := newFixture(t)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// RS with matching labels, but orphaned. Should be adopted and returned.
rsAdopt := newReplicaSet(d, "rsAdopt", 1)
rsAdopt.OwnerReferences = nil
// RS with matching ControllerRef, but wrong labels. Should be released.
rsRelease := newReplicaSet(d, "rsRelease", 1)
rsRelease.Labels = map[string]string{"foo": "notbar"}
f.dLister = append(f.dLister, d)
f.rsLister = append(f.rsLister, rsAdopt, rsRelease)
f.objects = append(f.objects, d, rsAdopt, rsRelease)
// Start the fixture.
c, informers := f.newController()
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
rsList, err := c.getReplicaSetsForDeployment(d)
if err != nil {
t.Fatalf("getReplicaSetsForDeployment() error: %v", err)
}
rsNames := []string{}
for _, rs := range rsList {
rsNames = append(rsNames, rs.Name)
}
if len(rsNames) != 1 || rsNames[0] != rsAdopt.Name {
t.Errorf("getReplicaSetsForDeployment() = %v, want [%v]", rsNames, rsAdopt.Name)
}
}
func TestGetPodMapForReplicaSets(t *testing.T) {
f := newFixture(t)
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs1 := newReplicaSet(d, "rs1", 1)
rs2 := newReplicaSet(d, "rs2", 1)
// Add a Pod for each ReplicaSet.
pod1 := generatePodFromRS(rs1)
pod2 := generatePodFromRS(rs2)
// Add a Pod that has matching labels, but no ControllerRef.
pod3 := generatePodFromRS(rs1)
pod3.Name = "pod3"
pod3.OwnerReferences = nil
// Add a Pod that has matching labels and ControllerRef, but is inactive.
pod4 := generatePodFromRS(rs1)
pod4.Name = "pod4"
pod4.Status.Phase = v1.PodFailed
f.dLister = append(f.dLister, d)
f.rsLister = append(f.rsLister, rs1, rs2)
f.podLister = append(f.podLister, pod1, pod2, pod3, pod4)
f.objects = append(f.objects, d, rs1, rs2, pod1, pod2, pod3, pod4)
// Start the fixture.
c, informers := f.newController()
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
podMap, err := c.getPodMapForDeployment(d, f.rsLister)
if err != nil {
t.Fatalf("getPodMapForDeployment() error: %v", err)
}
podCount := 0
for _, podList := range podMap {
podCount += len(podList.Items)
}
if got, want := podCount, 2; got != want {
t.Errorf("podCount = %v, want %v", got, want)
}
if got, want := len(podMap), 2; got != want {
t.Errorf("len(podMap) = %v, want %v", got, want)
}
if got, want := len(podMap[rs1.UID].Items), 1; got != want {
t.Errorf("len(podMap[rs1]) = %v, want %v", got, want)
}
if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want {
t.Errorf("podMap[rs1] = [%v], want [%v]", got, want)
}
if got, want := len(podMap[rs2.UID].Items), 1; got != want {
t.Errorf("len(podMap[rs2]) = %v, want %v", got, want)
}
if got, want := podMap[rs2.UID].Items[0].Name, "rs2-pod"; got != want {
t.Errorf("podMap[rs2] = [%v], want [%v]", got, want)
}
}
func TestAddReplicaSet(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// Two ReplicaSets that match labels for both Deployments,
// but have ControllerRefs to make ownership explicit.
rs1 := newReplicaSet(d1, "rs1", 1)
rs2 := newReplicaSet(d2, "rs2", 1)
f.dLister = append(f.dLister, d1, d2)
f.objects = append(f.objects, d1, d2, rs1, rs2)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc.addReplicaSet(rs1)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs1.Name)
}
expectedKey, _ := controller.KeyFunc(d1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
dc.addReplicaSet(rs2)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs2.Name)
}
expectedKey, _ = controller.KeyFunc(d2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestAddReplicaSetOrphan(t *testing.T) {
f := newFixture(t)
// 2 will match the RS, 1 won't.
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d3 := newDeployment("d3", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d3.Spec.Selector.MatchLabels = map[string]string{"foo": "notbar"}
// Make the RS an orphan. Expect matching Deployments to be queued.
rs := newReplicaSet(d1, "rs1", 1)
rs.OwnerReferences = nil
f.dLister = append(f.dLister, d1, d2, d3)
f.objects = append(f.objects, d1, d2, d3)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc.addReplicaSet(rs)
if got, want := dc.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdateReplicaSet(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// Two ReplicaSets that match labels for both Deployments,
// but have ControllerRefs to make ownership explicit.
rs1 := newReplicaSet(d1, "rs1", 1)
rs2 := newReplicaSet(d2, "rs2", 1)
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs1, rs2)
f.objects = append(f.objects, d1, d2, rs1, rs2)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
prev := *rs1
next := *rs1
bumpResourceVersion(&next)
dc.updateReplicaSet(&prev, &next)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs1.Name)
}
expectedKey, _ := controller.KeyFunc(d1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
prev = *rs2
next = *rs2
bumpResourceVersion(&next)
dc.updateReplicaSet(&prev, &next)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs2.Name)
}
expectedKey, _ = controller.KeyFunc(d2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestUpdateReplicaSetOrphanWithNewLabels(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// RS matches both, but is an orphan.
rs := newReplicaSet(d1, "rs1", 1)
rs.OwnerReferences = nil
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs)
f.objects = append(f.objects, d1, d2, rs)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
// Change labels and expect all matching controllers to queue.
prev := *rs
prev.Labels = map[string]string{"foo": "notbar"}
next := *rs
bumpResourceVersion(&next)
dc.updateReplicaSet(&prev, &next)
if got, want := dc.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdateReplicaSetChangeControllerRef(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs := newReplicaSet(d1, "rs1", 1)
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs)
f.objects = append(f.objects, d1, d2, rs)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
// Change ControllerRef and expect both old and new to queue.
prev := *rs
prev.OwnerReferences = []metav1.OwnerReference{*newControllerRef(d2)}
next := *rs
bumpResourceVersion(&next)
dc.updateReplicaSet(&prev, &next)
if got, want := dc.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestUpdateReplicaSetRelease(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs := newReplicaSet(d1, "rs1", 1)
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs)
f.objects = append(f.objects, d1, d2, rs)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
// Remove ControllerRef and expect all matching controller to sync orphan.
prev := *rs
next := *rs
next.OwnerReferences = nil
bumpResourceVersion(&next)
dc.updateReplicaSet(&prev, &next)
if got, want := dc.queue.Len(), 2; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func TestDeleteReplicaSet(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// Two ReplicaSets that match labels for both Deployments,
// but have ControllerRefs to make ownership explicit.
rs1 := newReplicaSet(d1, "rs1", 1)
rs2 := newReplicaSet(d2, "rs2", 1)
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs1, rs2)
f.objects = append(f.objects, d1, d2, rs1, rs2)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc.deleteReplicaSet(rs1)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs1.Name)
}
expectedKey, _ := controller.KeyFunc(d1)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
dc.deleteReplicaSet(rs2)
if got, want := dc.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done = dc.queue.Get()
if key == nil || done {
t.Fatalf("failed to enqueue controller for rs %v", rs2.Name)
}
expectedKey, _ = controller.KeyFunc(d2)
if got, want := key.(string), expectedKey; got != want {
t.Errorf("queue.Get() = %v, want %v", got, want)
}
}
func TestDeleteReplicaSetOrphan(t *testing.T) {
f := newFixture(t)
d1 := newDeployment("d1", 1, nil, nil, nil, map[string]string{"foo": "bar"})
d2 := newDeployment("d2", 1, nil, nil, nil, map[string]string{"foo": "bar"})
// Make the RS an orphan. Expect matching Deployments to be queued.
rs := newReplicaSet(d1, "rs1", 1)
rs.OwnerReferences = nil
f.dLister = append(f.dLister, d1, d2)
f.rsLister = append(f.rsLister, rs)
f.objects = append(f.objects, d1, d2, rs)
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc.deleteReplicaSet(rs)
if got, want := dc.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
}
func bumpResourceVersion(obj metav1.Object) {
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
}
// generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template
func generatePodFromRS(rs *extensions.ReplicaSet) *v1.Pod {
trueVar := true

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller/deployment/util"
@ -33,12 +34,12 @@ import (
// and when new pods scale up or old pods scale down. Progress is not estimated for paused
// deployments or when users don't really care about it ie. progressDeadlineSeconds is not
// specified.
func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error) {
func (dc *DeploymentController) hasFailed(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
if d.Spec.ProgressDeadlineSeconds == nil || d.Spec.RollbackTo != nil || d.Spec.Paused {
return false, nil
}
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return false, err
}

View File

@ -17,14 +17,16 @@ limitations under the License.
package deployment
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
)
// rolloutRecreate implements the logic for recreating a replica set.
func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deployment) error {
func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
@ -32,31 +34,29 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
// scale down old replica sets
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, deployment)
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
newStatus := calculateStatus(allRSs, newRS, deployment)
newStatus := calculateStatus(allRSs, newRS, d)
// Do not process a deployment when it has old pods running.
if newStatus.UpdatedReplicas == 0 {
podList, err := dc.listPods(deployment)
if err != nil {
return err
}
if len(podList.Items) > 0 {
return dc.syncRolloutStatus(allRSs, newRS, deployment)
for _, podList := range podMap {
if len(podList.Items) > 0 {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
}
}
// If we need to create a new RS, create it now
// TODO: Create a new RS without re-listing all RSs.
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(deployment, true)
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
return err
}
@ -64,17 +64,17 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
}
// scale up new replica set
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, deployment)
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"

View File

@ -21,14 +21,15 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
)
// rollback the deployment to the specified revision. In any case cleanup the rollback spec.
func (dc *DeploymentController) rollback(d *extensions.Deployment) error {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, true)
func (dc *DeploymentController) rollback(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
return err
}

View File

@ -21,42 +21,44 @@ import (
"sort"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/integer"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
)
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, true)
func (dc *DeploymentController) rolloutRolling(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, deployment)
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment)
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, deployment)
return dc.syncRolloutStatus(allRSs, newRS, d)
}
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {

View File

@ -25,7 +25,7 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
@ -36,31 +36,31 @@ import (
)
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
func (dc *DeploymentController) syncStatusOnly(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
func (dc *DeploymentController) syncStatusOnly(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
func (dc *DeploymentController) sync(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return err
}
if err := dc.scale(deployment, newRS, oldRSs); err != nil {
if err := dc.scale(d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, deployment)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
@ -98,25 +98,30 @@ func (dc *DeploymentController) checkPausedConditions(d *extensions.Deployment)
}
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
//
// rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList).
//
// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
//
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
// This may lead to stale reads of replica sets, thus incorrect deployment status.
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(d, rsList, podMap)
if err != nil {
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
}
_, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList)
_, allOldRSs, err := deploymentutil.FindOldReplicaSets(d, rsList, podList)
if err != nil {
return nil, nil, err
}
// Get new replica set with the updated revision number
newRS, err := dc.getNewReplicaSet(deployment, rsList, allOldRSs, createIfNotExisted)
newRS, err := dc.getNewReplicaSet(d, rsList, allOldRSs, createIfNotExisted)
if err != nil {
return nil, nil, err
}
@ -124,33 +129,27 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext
return newRS, allOldRSs, nil
}
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, *v1.PodList, error) {
rsList, err := deploymentutil.ListReplicaSets(deployment,
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
parsed, err := labels.Parse(options.LabelSelector)
if err != nil {
return nil, err
}
return dc.rsLister.ReplicaSets(namespace).List(parsed)
})
if err != nil {
return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
}
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment
// targets, with pod-template-hash information synced.
//
// rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList).
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, *v1.PodList, error) {
syncedRSList := []*extensions.ReplicaSet{}
for _, rs := range rsList {
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
// that aren't constrained by the pod-template-hash.
syncedRS, err := dc.addHashKeyToRSAndPods(rs)
syncedRS, err := dc.addHashKeyToRSAndPods(rs, podMap[rs.UID])
if err != nil {
return nil, nil, err
}
syncedRSList = append(syncedRSList, syncedRS)
}
syncedPodList, err := dc.listPods(deployment)
if err != nil {
return nil, nil, err
// Put all Pods from podMap into one list.
syncedPodList := &v1.PodList{}
for _, podList := range podMap {
syncedPodList.Items = append(syncedPodList.Items, podList.Items...)
}
return syncedRSList, syncedPodList, nil
}
@ -159,7 +158,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
// 3. Add hash label to the rs's label and selector
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet, podList *v1.PodList) (*extensions.ReplicaSet, error) {
// If the rs already has the new hash label in its selector, it's done syncing
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
return rs, nil
@ -189,24 +188,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
}
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector, err := metav1.LabelSelectorAsSelector(updatedRS.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
}
options := metav1.ListOptions{LabelSelector: selector.String()}
parsed, err := labels.Parse(options.LabelSelector)
if err != nil {
return nil, err
}
pods, err := dc.podLister.Pods(updatedRS.Namespace).List(parsed)
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", rs.Namespace, options, err)
}
podList := v1.PodList{Items: make([]v1.Pod, 0, len(pods))}
for i := range pods {
podList.Items = append(podList.Items, *pods[i])
}
if err := deploymentutil.LabelPodsWithHash(&podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil {
if err := deploymentutil.LabelPodsWithHash(podList, dc.client, dc.podLister, rs.Namespace, rs.Name, hash); err != nil {
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
}
@ -242,22 +224,6 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet)
return updatedRS, nil
}
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*v1.PodList, error) {
return deploymentutil.ListPods(deployment,
func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
parsed, err := labels.Parse(options.LabelSelector)
if err != nil {
return nil, err
}
pods, err := dc.podLister.Pods(namespace).List(parsed)
result := v1.PodList{Items: make([]v1.Pod, 0, len(pods))}
for i := range pods {
result.Items = append(result.Items, *pods[i])
}
return &result, err
})
}
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
@ -329,25 +295,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
newRS := extensions.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
// Make the name deterministic, to ensure idempotence
Name: deployment.Name + "-" + podTemplateSpecHash,
Namespace: namespace,
Name: deployment.Name + "-" + podTemplateSpecHash,
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(deployment)},
},
Spec: extensions.ReplicaSetSpec{
Replicas: func(i int32) *int32 { return &i }(0),
Replicas: new(int32),
MinReadySeconds: deployment.Spec.MinReadySeconds,
Selector: newRSSelector,
Template: newRSTemplate,
},
}
var trueVar = true
controllerRef := &metav1.OwnerReference{
APIVersion: getDeploymentKind().GroupVersion().String(),
Kind: getDeploymentKind().Kind,
Name: deployment.Name,
UID: deployment.UID,
Controller: &trueVar,
}
newRS.OwnerReferences = append(newRS.OwnerReferences, *controllerRef)
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
if err != nil {
@ -632,8 +590,11 @@ func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaS
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
//
// rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList).
func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, false)
if err != nil {
return false, err
}
@ -649,3 +610,17 @@ func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool,
}
return false, nil
}
// newControllerRef returns a ControllerRef pointing to the deployment.
func newControllerRef(d *extensions.Deployment) *metav1.OwnerReference {
blockOwnerDeletion := true
isController := true
return &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: d.Name,
UID: d.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
}
}

View File

@ -37,6 +37,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
@ -66,14 +67,6 @@ const (
RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged"
// RollbackDone is the done rollback event reason
RollbackDone = "DeploymentRollback"
// OverlapAnnotation marks deployments with overlapping selector with other deployments
// TODO: Delete this annotation when we gracefully handle overlapping selectors.
// See https://github.com/kubernetes/kubernetes/issues/2210
OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with"
// SelectorUpdateAnnotation marks the last time deployment selector update
// TODO: Delete this annotation when we gracefully handle overlapping selectors.
// See https://github.com/kubernetes/kubernetes/issues/2210
SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at"
// Reasons for deployment conditions
//
@ -294,8 +287,6 @@ var annotationsToSkip = map[string]bool{
RevisionHistoryAnnotation: true,
DesiredReplicasAnnotation: true,
MaxReplicasAnnotation: true,
OverlapAnnotation: true,
SelectorUpdateAnnotation: true,
}
// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
@ -507,7 +498,7 @@ func GetAllReplicaSets(deployment *extensions.Deployment, c clientset.Interface)
if err != nil {
return nil, nil, nil, err
}
podList, err := listPods(deployment, c)
podList, err := listPods(deployment, rsList, c)
if err != nil {
return nil, nil, nil, err
}
@ -529,7 +520,7 @@ func GetOldReplicaSets(deployment *extensions.Deployment, c clientset.Interface)
if err != nil {
return nil, nil, err
}
podList, err := listPods(deployment, c)
podList, err := listPods(deployment, rsList, c)
if err != nil {
return nil, nil, err
}
@ -563,8 +554,8 @@ func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) (
}
// listReplicaSets lists all Pods the given deployment targets with the given client interface.
func listPods(deployment *extensions.Deployment, c clientset.Interface) (*v1.PodList, error) {
return ListPods(deployment,
func listPods(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, c clientset.Interface) (*v1.PodList, error) {
return ListPods(deployment, rsList,
func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return c.Core().Pods(namespace).List(options)
})
@ -575,28 +566,94 @@ type rsListFunc func(string, metav1.ListOptions) ([]*extensions.ReplicaSet, erro
type podListFunc func(string, metav1.ListOptions) (*v1.PodList, error)
// ListReplicaSets returns a slice of RSes the given deployment targets.
// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
// because only the controller itself should do that.
// However, it does filter out anything whose ControllerRef doesn't match.
func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]*extensions.ReplicaSet, error) {
// TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
// should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830;
// or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210
// should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
namespace := deployment.Namespace
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
}
options := metav1.ListOptions{LabelSelector: selector.String()}
return getRSList(namespace, options)
all, err := getRSList(namespace, options)
if err != nil {
return all, err
}
// Only include those whose ControllerRef matches the Deployment.
owned := make([]*extensions.ReplicaSet, 0, len(all))
for _, rs := range all {
controllerRef := controller.GetControllerOf(rs)
if controllerRef != nil && controllerRef.UID == deployment.UID {
owned = append(owned, rs)
}
}
return owned, nil
}
// ListReplicaSets returns a slice of RSes the given deployment targets.
// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
// because only the controller itself should do that.
// However, it does filter out anything whose ControllerRef doesn't match.
// TODO: Remove the duplicate.
func ListReplicaSetsInternal(deployment *internalextensions.Deployment, getRSList func(string, metav1.ListOptions) ([]*internalextensions.ReplicaSet, error)) ([]*internalextensions.ReplicaSet, error) {
// TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
// should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830.
namespace := deployment.Namespace
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
}
options := metav1.ListOptions{LabelSelector: selector.String()}
all, err := getRSList(namespace, options)
if err != nil {
return all, err
}
// Only include those whose ControllerRef matches the Deployment.
owned := make([]*internalextensions.ReplicaSet, 0, len(all))
for _, rs := range all {
controllerRef := controller.GetControllerOf(rs)
if controllerRef != nil && controllerRef.UID == deployment.UID {
owned = append(owned, rs)
}
}
return owned, nil
}
// ListPods returns a list of pods the given deployment targets.
func ListPods(deployment *extensions.Deployment, getPodList podListFunc) (*v1.PodList, error) {
// This needs a list of ReplicaSets for the Deployment,
// which can be found with ListReplicaSets().
// Note that this does NOT attempt to reconcile ControllerRef (adopt/orphan),
// because only the controller itself should do that.
// However, it does filter out anything whose ControllerRef doesn't match.
func ListPods(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, getPodList podListFunc) (*v1.PodList, error) {
namespace := deployment.Namespace
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return nil, err
}
options := metav1.ListOptions{LabelSelector: selector.String()}
return getPodList(namespace, options)
all, err := getPodList(namespace, options)
if err != nil {
return all, err
}
// Only include those whose ControllerRef points to a ReplicaSet that is in
// turn owned by this Deployment.
rsMap := make(map[types.UID]bool, len(rsList))
for _, rs := range rsList {
rsMap[rs.UID] = true
}
owned := &v1.PodList{Items: make([]v1.Pod, 0, len(all.Items))}
for i := range all.Items {
pod := &all.Items[i]
controllerRef := controller.GetControllerOf(pod)
if controllerRef != nil && rsMap[controllerRef.UID] {
owned.Items = append(owned.Items, *pod)
}
}
return owned, nil
}
// EqualIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
@ -982,59 +1039,3 @@ func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployme
}
return copied, nil
}
// SelectorUpdatedBefore returns true if the former deployment's selector
// is updated before the latter, false otherwise.
func SelectorUpdatedBefore(d1, d2 *extensions.Deployment) bool {
t1, t2 := LastSelectorUpdate(d1), LastSelectorUpdate(d2)
return t1.Before(t2)
}
// LastSelectorUpdate returns the last time given deployment's selector is updated
func LastSelectorUpdate(d *extensions.Deployment) metav1.Time {
t := d.Annotations[SelectorUpdateAnnotation]
if len(t) > 0 {
parsedTime, err := time.Parse(time.RFC3339, t)
// If failed to parse the time, use creation timestamp instead
if err != nil {
return d.CreationTimestamp
}
return metav1.Time{Time: parsedTime}
}
// If it's never updated, use creation timestamp instead
return d.CreationTimestamp
}
// BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
// first using their creation timestamp and then their names as a tie breaker.
type BySelectorLastUpdateTime []*extensions.Deployment
func (o BySelectorLastUpdateTime) Len() int { return len(o) }
func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o BySelectorLastUpdateTime) Less(i, j int) bool {
ti, tj := LastSelectorUpdate(o[i]), LastSelectorUpdate(o[j])
if ti.Equal(tj) {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}
return ti.Before(tj)
}
// OverlapsWith returns true when two given deployments are different and overlap with each other
func OverlapsWith(current, other *extensions.Deployment) (bool, error) {
if current.UID == other.UID {
return false, nil
}
currentSelector, err := metav1.LabelSelectorAsSelector(current.Spec.Selector)
if err != nil {
return false, fmt.Errorf("deployment %s/%s has invalid label selector: %v", current.Namespace, current.Name, err)
}
otherSelector, err := metav1.LabelSelectorAsSelector(other.Spec.Selector)
if err != nil {
return false, fmt.Errorf("deployment %s/%s has invalid label selector: %v", other.Namespace, other.Name, err)
}
return (!currentSelector.Empty() && currentSelector.Matches(labels.Set(other.Spec.Template.Labels))) ||
(!otherSelector.Empty() && otherSelector.Matches(labels.Set(current.Spec.Template.Labels))), nil
}

View File

@ -105,11 +105,23 @@ func newPod(now time.Time, ready bool, beforeSec int) v1.Pod {
}
}
func newRSControllerRef(rs *extensions.ReplicaSet) *metav1.OwnerReference {
isController := true
return &metav1.OwnerReference{
APIVersion: "extensions/v1beta1",
Kind: "ReplicaSet",
Name: rs.GetName(),
UID: rs.GetUID(),
Controller: &isController,
}
}
// generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template
func generatePodFromRS(rs extensions.ReplicaSet) v1.Pod {
return v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: rs.Labels,
Labels: rs.Labels,
OwnerReferences: []metav1.OwnerReference{*newRSControllerRef(&rs)},
},
Spec: rs.Spec.Template.Spec,
}
@ -161,14 +173,26 @@ func generateRSWithLabel(labels map[string]string, image string) extensions.Repl
}
}
func newDControllerRef(d *extensions.Deployment) *metav1.OwnerReference {
isController := true
return &metav1.OwnerReference{
APIVersion: "extensions/v1beta1",
Kind: "Deployment",
Name: d.GetName(),
UID: d.GetUID(),
Controller: &isController,
}
}
// generateRS creates a replica set, with the input deployment's template as its template
func generateRS(deployment extensions.Deployment) extensions.ReplicaSet {
template := GetNewReplicaSetTemplate(&deployment)
return extensions.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
UID: randomUID(),
Name: v1.SimpleNameGenerator.GenerateName("replicaset"),
Labels: template.Labels,
UID: randomUID(),
Name: v1.SimpleNameGenerator.GenerateName("replicaset"),
Labels: template.Labels,
OwnerReferences: []metav1.OwnerReference{*newDControllerRef(&deployment)},
},
Spec: extensions.ReplicaSetSpec{
Replicas: func() *int32 { i := int32(0); return &i }(),
@ -291,7 +315,8 @@ func TestGetOldRCs(t *testing.T) {
oldRS2.Status.FullyLabeledReplicas = *(oldRS2.Spec.Replicas)
oldPod2 := generatePodFromRS(oldRS2)
// create 1 ReplicaSet that existed before the deployment, with the same labels as the deployment
// create 1 ReplicaSet that existed before the deployment,
// with the same labels as the deployment, but no ControllerRef.
existedPod := generatePod(newDeployment.Spec.Template.Labels, "foo")
existedRS := generateRSWithLabel(newDeployment.Spec.Template.Labels, "foo")
existedRS.Status.FullyLabeledReplicas = *(existedRS.Spec.Replicas)
@ -345,7 +370,7 @@ func TestGetOldRCs(t *testing.T) {
},
},
},
[]*extensions.ReplicaSet{&oldRS, &oldRS2, &existedRS},
[]*extensions.ReplicaSet{&oldRS, &oldRS2},
},
}
@ -1164,111 +1189,3 @@ func TestDeploymentTimedOut(t *testing.T) {
}
}
}
func TestSelectorUpdatedBefore(t *testing.T) {
now := metav1.Now()
later := metav1.Time{Time: now.Add(time.Minute)}
selectorUpdated := metav1.Time{Time: later.Add(time.Minute)}
selectorUpdatedLater := metav1.Time{Time: selectorUpdated.Add(time.Minute)}
tests := []struct {
name string
d1 extensions.Deployment
creationTimestamp1 *metav1.Time
selectorUpdated1 *metav1.Time
d2 extensions.Deployment
creationTimestamp2 *metav1.Time
selectorUpdated2 *metav1.Time
expected bool
}{
{
name: "d1 created before d2",
d1: generateDeployment("foo"),
creationTimestamp1: &now,
d2: generateDeployment("bar"),
creationTimestamp2: &later,
expected: true,
},
{
name: "d1 created after d2",
d1: generateDeployment("foo"),
creationTimestamp1: &later,
d2: generateDeployment("bar"),
creationTimestamp2: &now,
expected: false,
},
{
// Think of the following scenario:
// d1 is created first, d2 is created after and its selector overlaps
// with d1. d2 is marked as overlapping correctly. If d1's selector is
// updated and continues to overlap with the selector of d2 then d1 is
// now marked overlapping and d2 is cleaned up. Proved by the following
// test case. Callers of SelectorUpdatedBefore should first check for
// the existence of the overlapping annotation in any of the two deployments
// prior to comparing their timestamps and as a matter of fact this is
// now handled in `(dc *DeploymentController) handleOverlap`.
name: "d1 created before d2 but updated its selector afterwards",
d1: generateDeployment("foo"),
creationTimestamp1: &now,
selectorUpdated1: &selectorUpdated,
d2: generateDeployment("bar"),
creationTimestamp2: &later,
expected: false,
},
{
name: "d1 selector is older than d2",
d1: generateDeployment("foo"),
selectorUpdated1: &selectorUpdated,
d2: generateDeployment("bar"),
selectorUpdated2: &selectorUpdatedLater,
expected: true,
},
{
name: "d1 selector is younger than d2",
d1: generateDeployment("foo"),
selectorUpdated1: &selectorUpdatedLater,
d2: generateDeployment("bar"),
selectorUpdated2: &selectorUpdated,
expected: false,
},
}
for _, test := range tests {
t.Logf("running scenario %q", test.name)
if test.creationTimestamp1 != nil {
test.d1.CreationTimestamp = *test.creationTimestamp1
}
if test.creationTimestamp2 != nil {
test.d2.CreationTimestamp = *test.creationTimestamp2
}
if test.selectorUpdated1 != nil {
test.d1.Annotations[SelectorUpdateAnnotation] = test.selectorUpdated1.Format(time.RFC3339)
}
if test.selectorUpdated2 != nil {
test.d2.Annotations[SelectorUpdateAnnotation] = test.selectorUpdated2.Format(time.RFC3339)
}
if got := SelectorUpdatedBefore(&test.d1, &test.d2); got != test.expected {
t.Errorf("expected d1 selector to be updated before d2: %t, got: %t", test.expected, got)
}
}
}

View File

@ -152,6 +152,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/rest/fake",

View File

@ -420,7 +420,6 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
deployments := reaper.dClient.Deployments(namespace)
replicaSets := reaper.rsClient.ReplicaSets(namespace)
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout}
deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
@ -441,25 +440,26 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
return err
}
// Do not cascade deletion for overlapping deployments.
if len(deployment.Annotations[deploymentutil.OverlapAnnotation]) > 0 {
return deployments.Delete(name, nil)
}
// Stop all replica sets.
selector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
// Stop all replica sets belonging to this Deployment.
rss, err := deploymentutil.ListReplicaSetsInternal(deployment,
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
rsList, err := reaper.rsClient.ReplicaSets(namespace).List(options)
if err != nil {
return nil, err
}
rss := make([]*extensions.ReplicaSet, 0, len(rsList.Items))
for i := range rsList.Items {
rss = append(rss, &rsList.Items[i])
}
return rss, nil
})
if err != nil {
return err
}
options := metav1.ListOptions{LabelSelector: selector.String()}
rsList, err := replicaSets.List(options)
if err != nil {
return err
}
errList := []error{}
for _, rc := range rsList.Items {
if err := rsReaper.Stop(rc.Namespace, rc.Name, timeout, gracePeriod); err != nil {
for _, rs := range rss {
if err := rsReaper.Stop(rs.Namespace, rs.Name, timeout, gracePeriod); err != nil {
scaleGetErr, ok := err.(ScaleError)
if errors.IsNotFound(err) || (ok && errors.IsNotFound(scaleGetErr.ActualError)) {
continue

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api"
@ -429,6 +430,7 @@ func TestDeploymentStop(t *testing.T) {
deployment := extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: uuid.NewUUID(),
Namespace: ns,
},
Spec: extensions.DeploymentSpec{
@ -440,6 +442,7 @@ func TestDeploymentStop(t *testing.T) {
},
}
template := deploymentutil.GetNewReplicaSetTemplateInternal(&deployment)
trueVar := true
tests := []struct {
Name string
Objs []runtime.Object
@ -478,6 +481,15 @@ func TestDeploymentStop(t *testing.T) {
Name: name,
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: extensions.SchemeGroupVersion.String(),
Kind: "ReplicaSet",
Name: deployment.Name,
UID: deployment.UID,
Controller: &trueVar,
},
},
},
Spec: extensions.ReplicaSetSpec{
Template: template,

View File

@ -2439,10 +2439,6 @@ func (dd *DeploymentDescriber) Describe(namespace, name string, describerSetting
}
w.Write(LEVEL_0, "NewReplicaSet:\t%s\n", printReplicaSetsByLabels(newRSs))
}
overlapWith := d.Annotations[deploymentutil.OverlapAnnotation]
if len(overlapWith) > 0 {
w.Write(LEVEL_0, "!!!WARNING!!! This deployment has overlapping label selector with deployment %q and won't behave as expected. Please fix it before continue.\n", overlapWith)
}
if describerSettings.ShowEvents {
events, err := dd.Core().Events(namespace).Search(api.Scheme, d)
if err == nil && events != nil {

View File

@ -20,7 +20,6 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/validation:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/internalversion",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",

View File

@ -19,9 +19,7 @@ package deployment
import (
"fmt"
"reflect"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -34,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/validation"
"k8s.io/kubernetes/pkg/controller/deployment/util"
)
// deploymentStrategy implements behavior for Deployments.
@ -93,15 +90,6 @@ func (deploymentStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, o
!reflect.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) {
newDeployment.Generation = oldDeployment.Generation + 1
}
// Records timestamp on selector updates in annotation
if !reflect.DeepEqual(newDeployment.Spec.Selector, oldDeployment.Spec.Selector) {
if newDeployment.Annotations == nil {
newDeployment.Annotations = make(map[string]string)
}
now := metav1.Now()
newDeployment.Annotations[util.SelectorUpdateAnnotation] = now.Format(time.RFC3339)
}
}
// ValidateUpdate is the default update validation for an end user.

View File

@ -34,7 +34,10 @@ type DeploymentListerExpansion interface {
// DeploymentNamespaeLister.
type DeploymentNamespaceListerExpansion interface{}
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
// GetDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet. Only the one specified in the ReplicaSet's ControllerRef
// will actually manage it.
// Returns an error only if no matching Deployments are found.
func (s *deploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) ([]*extensions.Deployment, error) {
if len(rs.Labels) == 0 {
return nil, fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)

View File

@ -19,7 +19,6 @@ package e2e
import (
"fmt"
"math/rand"
"strings"
"time"
. "github.com/onsi/ginkgo"
@ -136,15 +135,7 @@ func checkDeploymentRevision(c clientset.Interface, ns, deploymentName, revision
return deployment, newRS
}
func stopDeploymentOverlap(c clientset.Interface, internalClient internalclientset.Interface, ns, deploymentName, overlapWith string) {
stopDeploymentMaybeOverlap(c, internalClient, ns, deploymentName, overlapWith)
}
func stopDeployment(c clientset.Interface, internalClient internalclientset.Interface, ns, deploymentName string) {
stopDeploymentMaybeOverlap(c, internalClient, ns, deploymentName, "")
}
func stopDeploymentMaybeOverlap(c clientset.Interface, internalClient internalclientset.Interface, ns, deploymentName, overlapWith string) {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
@ -166,18 +157,7 @@ func stopDeploymentMaybeOverlap(c clientset.Interface, internalClient internalcl
options := metav1.ListOptions{LabelSelector: selector.String()}
rss, err := c.Extensions().ReplicaSets(ns).List(options)
Expect(err).NotTo(HaveOccurred())
// RSes may be created by overlapping deployments right after this deployment is deleted, ignore them
if len(overlapWith) == 0 {
Expect(rss.Items).Should(HaveLen(0))
} else {
noOverlapRSes := []extensions.ReplicaSet{}
for _, rs := range rss.Items {
if !strings.HasPrefix(rs.Name, overlapWith) {
noOverlapRSes = append(noOverlapRSes, rs)
}
}
Expect(noOverlapRSes).Should(HaveLen(0))
}
Expect(rss.Items).Should(HaveLen(0))
framework.Logf("Ensuring deployment %s's Pods were deleted", deploymentName)
var pods *v1.PodList
if err := wait.PollImmediate(time.Second, timeout, func() (bool, error) {
@ -186,18 +166,8 @@ func stopDeploymentMaybeOverlap(c clientset.Interface, internalClient internalcl
return false, err
}
// Pods may be created by overlapping deployments right after this deployment is deleted, ignore them
if len(overlapWith) == 0 && len(pods.Items) == 0 {
if len(pods.Items) == 0 {
return true, nil
} else if len(overlapWith) != 0 {
noOverlapPods := []v1.Pod{}
for _, pod := range pods.Items {
if !strings.HasPrefix(pod.Name, overlapWith) {
noOverlapPods = append(noOverlapPods, pod)
}
}
if len(noOverlapPods) == 0 {
return true, nil
}
}
return false, nil
}); err != nil {
@ -1105,8 +1075,8 @@ func testScaledRolloutDeployment(f *framework.Framework) {
func testOverlappingDeployment(f *framework.Framework) {
ns := f.Namespace.Name
c := f.ClientSet
internalClient := f.InternalClientset
// Create first deployment.
deploymentName := "first-deployment"
podLabels := map[string]string{"name": redisImageName}
replicas := int32(1)
@ -1119,7 +1089,7 @@ func testOverlappingDeployment(f *framework.Framework) {
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploy.Name, "1", redisImage)
Expect(err).NotTo(HaveOccurred(), "The first deployment failed to update to revision 1")
Expect(err).NotTo(HaveOccurred())
// Create second deployment with overlapping selector.
deploymentName = "second-deployment"
By(fmt.Sprintf("Creating deployment %q with overlapping selector", deploymentName))
podLabels["other-label"] = "random-label"
@ -1127,85 +1097,16 @@ func testOverlappingDeployment(f *framework.Framework) {
deployOverlapping, err := c.Extensions().Deployments(ns).Create(d)
Expect(err).NotTo(HaveOccurred(), "Failed creating the second deployment")
// Wait for overlapping annotation updated to both deployments
By("Waiting for the overlapping deployment to have overlapping annotation")
err = framework.WaitForOverlappingAnnotationMatch(c, ns, deployOverlapping.Name, deploy.Name)
Expect(err).NotTo(HaveOccurred(), "Failed to update the second deployment's overlapping annotation")
err = framework.WaitForOverlappingAnnotationMatch(c, ns, deploy.Name, "")
Expect(err).NotTo(HaveOccurred(), "The deployment that holds the oldest selector shouldn't have the overlapping annotation")
// Only the first deployment is synced
By("Checking only the first overlapping deployment is synced")
options := metav1.ListOptions{}
rsList, err := c.Extensions().ReplicaSets(ns).List(options)
Expect(err).NotTo(HaveOccurred(), "Failed listing all replica sets in namespace %s", ns)
Expect(rsList.Items).To(HaveLen(int(replicas)))
Expect(rsList.Items[0].Spec.Template.Spec.Containers).To(HaveLen(1))
Expect(rsList.Items[0].Spec.Template.Spec.Containers[0].Image).To(Equal(deploy.Spec.Template.Spec.Containers[0].Image))
By("Deleting the first deployment")
stopDeploymentOverlap(c, internalClient, ns, deploy.Name, deployOverlapping.Name)
// Wait for overlapping annotation cleared
By("Waiting for the second deployment to clear overlapping annotation")
err = framework.WaitForOverlappingAnnotationMatch(c, ns, deployOverlapping.Name, "")
Expect(err).NotTo(HaveOccurred(), "Failed to clear the second deployment's overlapping annotation")
// Wait for it to be updated to revision 1
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deployOverlapping.Name, "1", nginxImage)
Expect(err).NotTo(HaveOccurred(), "The second deployment failed to update to revision 1")
// Now the second deployment is synced
By("Checking the second overlapping deployment is synced")
rsList, err = c.Extensions().ReplicaSets(ns).List(options)
// Both deployments should proceed independently.
By("Checking each deployment creates its own replica set")
options := metav1.ListOptions{}
rsList, err := c.Extensions().ReplicaSets(ns).List(options)
Expect(err).NotTo(HaveOccurred(), "Failed listing all replica sets in namespace %s", ns)
Expect(rsList.Items).To(HaveLen(int(replicas)))
Expect(rsList.Items[0].Spec.Template.Spec.Containers).To(HaveLen(1))
Expect(rsList.Items[0].Spec.Template.Spec.Containers[0].Image).To(Equal(deployOverlapping.Spec.Template.Spec.Containers[0].Image))
deploymentName = "third-deployment"
podLabels = map[string]string{"name": nginxImageName}
By(fmt.Sprintf("Creating deployment %q", deploymentName))
d = framework.NewDeployment(deploymentName, replicas, podLabels, nginxImageName, nginxImage, extensions.RollingUpdateDeploymentStrategyType)
thirdDeployment, err := c.Extensions().Deployments(ns).Create(d)
Expect(err).NotTo(HaveOccurred(), "Failed creating the third deployment")
// Wait for it to be updated to revision 1
err = framework.WaitForDeploymentRevisionAndImage(c, ns, thirdDeployment.Name, "1", nginxImage)
Expect(err).NotTo(HaveOccurred(), "The third deployment failed to update to revision 1")
// Update the second deployment's selector to make it overlap with the third deployment
By(fmt.Sprintf("Updating deployment %q selector to make it overlap with existing one", deployOverlapping.Name))
deployOverlapping, err = framework.UpdateDeploymentWithRetries(c, ns, deployOverlapping.Name, func(update *extensions.Deployment) {
update.Spec.Selector = thirdDeployment.Spec.Selector
update.Spec.Template.Labels = thirdDeployment.Spec.Template.Labels
update.Spec.Template.Spec.Containers[0].Image = redisImage
})
Expect(err).NotTo(HaveOccurred())
// Wait for overlapping annotation updated to both deployments
By("Waiting for the second deployment to have the overlapping annotation")
err = framework.WaitForOverlappingAnnotationMatch(c, ns, deployOverlapping.Name, thirdDeployment.Name)
Expect(err).NotTo(HaveOccurred(), "Failed to update the second deployment's overlapping annotation")
err = framework.WaitForOverlappingAnnotationMatch(c, ns, thirdDeployment.Name, "")
Expect(err).NotTo(HaveOccurred(), "The deployment that holds the oldest selector shouldn't have the overlapping annotation")
// The second deployment shouldn't be synced
By("Checking the second deployment is not synced")
Expect(deployOverlapping.Annotations[deploymentutil.RevisionAnnotation]).To(Equal("1"))
// Update the second deployment's selector to make it not overlap with the third deployment
By(fmt.Sprintf("Updating deployment %q selector to make it not overlap with existing one", deployOverlapping.Name))
deployOverlapping, err = framework.UpdateDeploymentWithRetries(c, ns, deployOverlapping.Name, func(update *extensions.Deployment) {
update.Spec.Selector = deploy.Spec.Selector
update.Spec.Template.Labels = deploy.Spec.Template.Labels
})
Expect(err).NotTo(HaveOccurred())
// Wait for the second deployment to be synced
By("Checking the second deployment is now synced")
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deployOverlapping.Name, "2", redisImage)
Expect(err).NotTo(HaveOccurred(), "The second deployment failed to update to revision 2")
Expect(rsList.Items).To(HaveLen(2))
}
func testFailedDeployment(f *framework.Framework) {

View File

@ -3391,19 +3391,6 @@ func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName
return nil
}
func WaitForOverlappingAnnotationMatch(c clientset.Interface, ns, deploymentName, expected string) error {
return wait.Poll(Poll, 1*time.Minute, func() (bool, error) {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
if deployment.Annotations[deploymentutil.OverlapAnnotation] == expected {
return true, nil
}
return false, nil
})
}
// CheckNewRSAnnotations check if the new RS's annotation is as expected
func CheckNewRSAnnotations(c clientset.Interface, ns, deploymentName string, expectedAnnotations map[string]string) error {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
@ -3509,7 +3496,22 @@ func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, r
func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment) {
minReadySeconds := deployment.Spec.MinReadySeconds
podList, err := deploymentutil.ListPods(deployment,
rsList, err := deploymentutil.ListReplicaSets(deployment,
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
if err != nil {
return nil, err
}
ret := make([]*extensions.ReplicaSet, 0, len(rsList.Items))
for i := range rsList.Items {
ret = append(ret, &rsList.Items[i])
}
return ret, nil
})
if err != nil {
Logf("Failed to list ReplicaSets of Deployment %s: %v", deployment.Name, err)
}
podList, err := deploymentutil.ListPods(deployment, rsList,
func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return c.Core().Pods(namespace).List(options)
})