Merge pull request #41163 from janetkuo/deployment-cleanup
Automatic merge from submit-queue (batch tested with PRs 41517, 41494, 41163) Deployment: filter out old RSes that are deleted or with non-zero replicas before cleanup Fixes #36379 cc @zmerlynn @yujuhong @kargakis @kubernetes/sig-apps-bugs
This commit is contained in:
		| @@ -61,6 +61,7 @@ go_test( | |||||||
|         "//pkg/api:go_default_library", |         "//pkg/api:go_default_library", | ||||||
|         "//pkg/api/testapi:go_default_library", |         "//pkg/api/testapi:go_default_library", | ||||||
|         "//pkg/api/v1:go_default_library", |         "//pkg/api/v1:go_default_library", | ||||||
|  |         "//pkg/apis/extensions/v1beta1:go_default_library", | ||||||
|         "//pkg/client/clientset_generated/clientset:go_default_library", |         "//pkg/client/clientset_generated/clientset:go_default_library", | ||||||
|         "//pkg/securitycontext:go_default_library", |         "//pkg/securitycontext:go_default_library", | ||||||
|         "//vendor:k8s.io/apimachinery/pkg/api/equality", |         "//vendor:k8s.io/apimachinery/pkg/api/equality", | ||||||
|   | |||||||
| @@ -767,15 +767,23 @@ func IsPodActive(p *v1.Pod) bool { | |||||||
|  |  | ||||||
| // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. | // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. | ||||||
| func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet { | func FilterActiveReplicaSets(replicaSets []*extensions.ReplicaSet) []*extensions.ReplicaSet { | ||||||
| 	active := []*extensions.ReplicaSet{} | 	activeFilter := func(rs *extensions.ReplicaSet) bool { | ||||||
| 	for i := range replicaSets { | 		return rs != nil && *(rs.Spec.Replicas) > 0 | ||||||
| 		rs := replicaSets[i] | 	} | ||||||
|  | 	return FilterReplicaSets(replicaSets, activeFilter) | ||||||
|  | } | ||||||
|  |  | ||||||
| 		if rs != nil && *(rs.Spec.Replicas) > 0 { | type filterRS func(rs *extensions.ReplicaSet) bool | ||||||
| 			active = append(active, replicaSets[i]) |  | ||||||
|  | // FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn). | ||||||
|  | func FilterReplicaSets(RSes []*extensions.ReplicaSet, filterFn filterRS) []*extensions.ReplicaSet { | ||||||
|  | 	var filtered []*extensions.ReplicaSet | ||||||
|  | 	for i := range RSes { | ||||||
|  | 		if filterFn(RSes[i]) { | ||||||
|  | 			filtered = append(filtered, RSes[i]) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return active | 	return filtered | ||||||
| } | } | ||||||
|  |  | ||||||
| // PodKey returns a key unique to the given pod within a cluster. | // PodKey returns a key unique to the given pod within a cluster. | ||||||
|   | |||||||
| @@ -40,6 +40,7 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
| 	"k8s.io/kubernetes/pkg/api/testapi" | 	"k8s.io/kubernetes/pkg/api/testapi" | ||||||
| 	"k8s.io/kubernetes/pkg/api/v1" | 	"k8s.io/kubernetes/pkg/api/v1" | ||||||
|  | 	extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" | ||||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||||
| 	"k8s.io/kubernetes/pkg/securitycontext" | 	"k8s.io/kubernetes/pkg/securitycontext" | ||||||
| ) | ) | ||||||
| @@ -116,6 +117,45 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, rc *v1.Replica | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func newReplicaSet(name string, replicas int) *extensions.ReplicaSet { | ||||||
|  | 	return &extensions.ReplicaSet{ | ||||||
|  | 		TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, | ||||||
|  | 		ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 			UID:             uuid.NewUUID(), | ||||||
|  | 			Name:            name, | ||||||
|  | 			Namespace:       metav1.NamespaceDefault, | ||||||
|  | 			ResourceVersion: "18", | ||||||
|  | 		}, | ||||||
|  | 		Spec: extensions.ReplicaSetSpec{ | ||||||
|  | 			Replicas: func() *int32 { i := int32(replicas); return &i }(), | ||||||
|  | 			Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, | ||||||
|  | 			Template: v1.PodTemplateSpec{ | ||||||
|  | 				ObjectMeta: metav1.ObjectMeta{ | ||||||
|  | 					Labels: map[string]string{ | ||||||
|  | 						"name": "foo", | ||||||
|  | 						"type": "production", | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 				Spec: v1.PodSpec{ | ||||||
|  | 					Containers: []v1.Container{ | ||||||
|  | 						{ | ||||||
|  | 							Image: "foo/bar", | ||||||
|  | 							TerminationMessagePath: v1.TerminationMessagePathDefault, | ||||||
|  | 							ImagePullPolicy:        v1.PullIfNotPresent, | ||||||
|  | 							SecurityContext:        securitycontext.ValidSecurityContextWithContainerDefaults(), | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 					RestartPolicy: v1.RestartPolicyAlways, | ||||||
|  | 					DNSPolicy:     v1.DNSDefault, | ||||||
|  | 					NodeSelector: map[string]string{ | ||||||
|  | 						"baz": "blah", | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestControllerExpectations(t *testing.T) { | func TestControllerExpectations(t *testing.T) { | ||||||
| 	ttl := 30 * time.Second | 	ttl := 30 * time.Second | ||||||
| 	e, fakeClock := NewFakeControllerExpectationsLookup(ttl) | 	e, fakeClock := NewFakeControllerExpectationsLookup(ttl) | ||||||
| @@ -381,3 +421,25 @@ func TestSortingActivePods(t *testing.T) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestActiveReplicaSetsFiltering(t *testing.T) { | ||||||
|  | 	var replicaSets []*extensions.ReplicaSet | ||||||
|  | 	replicaSets = append(replicaSets, newReplicaSet("zero", 0)) | ||||||
|  | 	replicaSets = append(replicaSets, nil) | ||||||
|  | 	replicaSets = append(replicaSets, newReplicaSet("foo", 1)) | ||||||
|  | 	replicaSets = append(replicaSets, newReplicaSet("bar", 2)) | ||||||
|  | 	expectedNames := sets.NewString() | ||||||
|  | 	for _, rs := range replicaSets[2:] { | ||||||
|  | 		expectedNames.Insert(rs.Name) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	got := FilterActiveReplicaSets(replicaSets) | ||||||
|  | 	gotNames := sets.NewString() | ||||||
|  | 	for _, rs := range got { | ||||||
|  | 		gotNames.Insert(rs.Name) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if expectedNames.Difference(gotNames).Len() != 0 || gotNames.Difference(expectedNames).Len() != 0 { | ||||||
|  | 		t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List()) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -541,18 +541,25 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe | |||||||
| 	if deployment.Spec.RevisionHistoryLimit == nil { | 	if deployment.Spec.RevisionHistoryLimit == nil { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit |  | ||||||
|  | 	// Avoid deleting replica set with deletion timestamp set | ||||||
|  | 	aliveFilter := func(rs *extensions.ReplicaSet) bool { | ||||||
|  | 		return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil | ||||||
|  | 	} | ||||||
|  | 	cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter) | ||||||
|  |  | ||||||
|  | 	diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit | ||||||
| 	if diff <= 0 { | 	if diff <= 0 { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) | 	sort.Sort(controller.ReplicaSetsByCreationTimestamp(cleanableRSes)) | ||||||
| 	glog.V(2).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name) | 	glog.V(2).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name) | ||||||
|  |  | ||||||
| 	var errList []error | 	var errList []error | ||||||
| 	// TODO: This should be parallelized. | 	// TODO: This should be parallelized. | ||||||
| 	for i := int32(0); i < diff; i++ { | 	for i := int32(0); i < diff; i++ { | ||||||
| 		rs := oldRSs[i] | 		rs := cleanableRSes[i] | ||||||
| 		// Avoid delete replica set with non-zero replica counts | 		// Avoid delete replica set with non-zero replica counts | ||||||
| 		if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil { | 		if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil { | ||||||
| 			continue | 			continue | ||||||
|   | |||||||
| @@ -381,7 +381,7 @@ func testDeploymentCleanUpPolicy(f *framework.Framework) { | |||||||
|  |  | ||||||
| 	// Create a deployment to delete nginx pods and instead bring up redis pods. | 	// Create a deployment to delete nginx pods and instead bring up redis pods. | ||||||
| 	deploymentName := "test-cleanup-deployment" | 	deploymentName := "test-cleanup-deployment" | ||||||
| 	framework.Logf("Creating deployment %s", deploymentName) | 	By(fmt.Sprintf("Creating deployment %s", deploymentName)) | ||||||
|  |  | ||||||
| 	pods, err := c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) | 	pods, err := c.Core().Pods(ns).List(metav1.ListOptions{LabelSelector: labels.Everything().String()}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @@ -424,6 +424,7 @@ func testDeploymentCleanUpPolicy(f *framework.Framework) { | |||||||
| 	_, err = c.Extensions().Deployments(ns).Create(d) | 	_, err = c.Extensions().Deployments(ns).Create(d) | ||||||
| 	Expect(err).NotTo(HaveOccurred()) | 	Expect(err).NotTo(HaveOccurred()) | ||||||
|  |  | ||||||
|  | 	By(fmt.Sprintf("Waiting for deployment %s history to be cleaned up", deploymentName)) | ||||||
| 	err = framework.WaitForDeploymentOldRSsNum(c, ns, deploymentName, int(*revisionHistoryLimit)) | 	err = framework.WaitForDeploymentOldRSsNum(c, ns, deploymentName, int(*revisionHistoryLimit)) | ||||||
| 	Expect(err).NotTo(HaveOccurred()) | 	Expect(err).NotTo(HaveOccurred()) | ||||||
| 	close(stopCh) | 	close(stopCh) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue