Merge pull request #27600 from caesarxuchao/rc-gc

Automatic merge from submit-queue

[GarbageCollector] Let the RC manager set/remove ControllerRef

What's done:
* RC manager sets Controller Ref when creating new pods
* RC manager sets Controller Ref when adopting pods with matching labels but having no controller
* RC manager clears Controller Ref when pod labels change
* RC manager clears pods' Controller Ref when rc's selector changes
* RC manager stops adoption/creating/deleting pods when rc's DeletionTimestamp is set
* RC manager bumps up ObservedGeneration: The [original code](https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/replication/replication_controller_utils.go#L36) will do this.
* Integration tests:
  * verifies that changing RC's selector or Pod's Labels triggers adoption/abandoning
* e2e tests (separated to #27151):
  * verifies GC deletes the pods created by RC if DeleteOptions.OrphanDependents=false, and orphans the pods if DeleteOptions.OrphanDependents=true.

TODO:

- [x] we need to be able to select Pods that have a specific ControllerRef. Then each time we sync the RC, we will iterate through all the Pods that has a controllerRef pointing the RC, event if the labels of the Pod doesn't match the selector of RC anymore. This will prevent a Pod from stuck with a stale controllerRef, which could be caused by the race between abandoner (the goroutine that removes controllerRef) and worker the goroutine that add controllerRef to pods).
- [ ] use controllerRef instead of calling `getPodController`. This might be carried out by the control-plane team.
- [ ] according to the controllerRef proposal (#25256): "For debugging purposes we want to add an adoptionTime annotation prefixed with kubernetes.io/ which will keep the time of last controller ownership transfer." This might be carried out by the control-plane team.

cc @lavalamp @gmarek
This commit is contained in:
k8s-merge-robot
2016-07-15 04:40:40 -07:00
committed by GitHub
10 changed files with 1033 additions and 103 deletions

View File

@@ -0,0 +1,443 @@
// +build integration,!no-etcd
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"fmt"
"net/http/httptest"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
internalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient"
controllerframwork "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
func testLabels() map[string]string {
return map[string]string{"name": "test"}
}
func newRC(name, namespace string, replicas int) *v1.ReplicationController {
replicasCopy := int32(replicas)
return &v1.ReplicationController{
TypeMeta: unversioned.TypeMeta{
Kind: "ReplicationController",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Selector: testLabels(),
Replicas: &replicasCopy,
Template: &v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
},
},
}
}
func newMatchingPod(podName, namespace string) *v1.Pod {
return &v1.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
}
// verifyRemainingObjects verifies if the number of the remaining replication
// controllers and pods are rcNum and podNum. It returns error if the
// communication with the API server fails.
func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespace string, rcNum, podNum int) (bool, error) {
rcClient := clientSet.Core().ReplicationControllers(namespace)
podClient := clientSet.Core().Pods(namespace)
pods, err := podClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list pods: %v", err)
}
var ret = true
if len(pods.Items) != podNum {
ret = false
t.Logf("expect %d pods, got %d pods", podNum, len(pods.Items))
}
rcs, err := rcClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list replication controllers: %v", err)
}
if len(rcs.Items) != rcNum {
ret = false
t.Logf("expect %d RCs, got %d RCs", rcNum, len(rcs.Items))
}
return ret, nil
}
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, controllerframwork.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
resyncPeriodFunc := func() time.Duration {
return resyncPeriod
}
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replication.NewReplicationManager(
podInformer,
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),
resyncPeriodFunc,
replication.BurstReplicas,
4096,
enableGarbageCollector,
)
if err != nil {
t.Fatalf("Failed to create replication manager")
}
return s, rm, podInformer, clientSet
}
func TestAdoption(t *testing.T) {
var trueVar = true
testCases := []struct {
name string
existingOwnerReferences func(rc *v1.ReplicationController) []v1.OwnerReference
expectedOwnerReferences func(rc *v1.ReplicationController) []v1.OwnerReference
}{
{
"pod refers rc as an owner, not a controller",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"}}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod doesn't have owner references",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod refers rc as a controller",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod refers other rc as the controller, refers the rc as an owner",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRC", APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar},
{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"},
}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRC", APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar},
{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"},
}
},
},
}
for i, tc := range testCases {
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace(fmt.Sprintf("adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
podClient := clientSet.Core().Pods(ns.Name)
const rcName = "rc"
rc, err := rcClient.Create(newRC(rcName, ns.Name, 1))
if err != nil {
t.Fatalf("Failed to create replication controller: %v", err)
}
pod := newMatchingPod("pod1", ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rc)
_, err = podClient.Create(pod)
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedPod, err := podClient.Get(pod.Name)
if err != nil {
return false, err
}
if e, a := tc.expectedOwnerReferences(rc), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) {
return true, nil
} else {
t.Logf("ownerReferences don't match, expect %v, got %v", e, a)
return false, nil
}
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
}
func createRCsPods(t *testing.T, clientSet clientset.Interface, rcs []*v1.ReplicationController, pods []*v1.Pod, ns string) {
rcClient := clientSet.Core().ReplicationControllers(ns)
podClient := clientSet.Core().Pods(ns)
for _, rc := range rcs {
if _, err := rcClient.Create(rc); err != nil {
t.Fatalf("Failed to create replication controller %s: %v", rc.Name, err)
}
}
for _, pod := range pods {
if _, err := podClient.Create(pod); err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
}
}
func waitRCStable(t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController, ns string) {
rcClient := clientSet.Core().ReplicationControllers(ns)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedRC, err := rcClient.Get(rc.Name)
if err != nil {
return false, err
}
if updatedRC.Status.Replicas != *rc.Spec.Replicas {
return false, nil
} else {
return true, nil
}
}); err != nil {
t.Fatal(err)
}
}
func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-selector-to-adopt", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only match pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":null}}}`
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
}
t.Logf("patched rc = %#v", rc)
// wait for the rc select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change the selector to match only pod1. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":"1"},"template":{"metadata":{"labels":{"uniqueKey":"1"}}}}}`
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
}
t.Logf("patched rc = %#v", rc)
// wait for the rc to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
podClient := clientSet.Core().Pods(ns.Name)
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change pod2's lables to non-matching. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-label-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod2 := newMatchingPod("pod2", ns.Name)
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"name":null}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToBeAdopted(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change pod2's lables to be matching. Verify the RC
// controller adopts pod2 and delete one of them, so there is only 1 pod
// left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-label-to-be-adopted", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only matches pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"uniqueKey":"1"}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}