Merge pull request #30277 from wojtek-t/optimize_controllers
Automatic merge from submit-queue Avoid computing DeepEqual in controllers all the time Computing DeepCopy was responsible for ~33% of cpu usage of controller-manager before this PR. <!-- Checklist for submitting a Pull Request Please remove this comment block before submitting. 1. Please read our [contributor guidelines](https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md). 2. See our [developer guide](https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md). 3. If you want this PR to automatically close an issue when it is merged, add `fixes #<issue number>` or `fixes #<issue number>, fixes #<issue number>` to close multiple issues (see: https://github.com/blog/1506-closing-issues-via-pull-requests). 4. Follow the instructions for [labeling and writing a release note for this PR](https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes) in the block below. --> ```release-note * Use the release-note-* labels to set the release note state * Clear this block to use the PR title as the release note -OR- * Enter your extended release note here (newlines are formatted as bullets) ``` <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.kubernetes.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.kubernetes.io/reviews/kubernetes/kubernetes/30277) <!-- Reviewable:end -->
This commit is contained in:
6
pkg/client/cache/reflector.go
vendored
6
pkg/client/cache/reflector.go
vendored
@@ -69,8 +69,6 @@ type Reflector struct {
|
|||||||
resyncPeriod time.Duration
|
resyncPeriod time.Duration
|
||||||
// now() returns current time - exposed for testing purposes
|
// now() returns current time - exposed for testing purposes
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
// nextResync is approximate time of next resync (0 if not scheduled)
|
|
||||||
nextResync time.Time
|
|
||||||
// lastSyncResourceVersion is the resource version token last
|
// lastSyncResourceVersion is the resource version token last
|
||||||
// observed when doing a sync with the underlying store
|
// observed when doing a sync with the underlying store
|
||||||
// it is thread safe, but not synchronized with the underlying store
|
// it is thread safe, but not synchronized with the underlying store
|
||||||
@@ -234,14 +232,12 @@ var (
|
|||||||
// required, and a cleanup function.
|
// required, and a cleanup function.
|
||||||
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
|
||||||
if r.resyncPeriod == 0 {
|
if r.resyncPeriod == 0 {
|
||||||
r.nextResync = time.Time{}
|
|
||||||
return neverExitWatch, func() bool { return false }
|
return neverExitWatch, func() bool { return false }
|
||||||
}
|
}
|
||||||
// The cleanup function is required: imagine the scenario where watches
|
// The cleanup function is required: imagine the scenario where watches
|
||||||
// always fail so we end up listing frequently. Then, if we don't
|
// always fail so we end up listing frequently. Then, if we don't
|
||||||
// manually stop the timer, we could end up with many timers active
|
// manually stop the timer, we could end up with many timers active
|
||||||
// concurrently.
|
// concurrently.
|
||||||
r.nextResync = r.now().Add(r.resyncPeriod)
|
|
||||||
t := time.NewTimer(r.resyncPeriod)
|
t := time.NewTimer(r.resyncPeriod)
|
||||||
return t.C, t.Stop
|
return t.C, t.Stop
|
||||||
}
|
}
|
||||||
@@ -285,7 +281,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync)
|
glog.V(4).Infof("%s: forcing resync", r.name)
|
||||||
if err := r.store.Resync(); err != nil {
|
if err := r.store.Resync(); err != nil {
|
||||||
resyncerrc <- err
|
resyncerrc <- err
|
||||||
return
|
return
|
||||||
|
@@ -354,16 +354,17 @@ func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
|||||||
// up. If the labels of the pod have changed we need to awaken both the old
|
// up. If the labels of the pod have changed we need to awaken both the old
|
||||||
// and new set. old and cur must be *api.Pod types.
|
// and new set. old and cur must be *api.Pod types.
|
||||||
func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
curPod := cur.(*api.Pod)
|
||||||
// A periodic relist will send update events for all known pods.
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
curPod := cur.(*api.Pod)
|
|
||||||
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||||
if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
|
if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
|
||||||
dsc.enqueueDaemonSet(curDS)
|
dsc.enqueueDaemonSet(curDS)
|
||||||
}
|
}
|
||||||
oldPod := old.(*api.Pod)
|
|
||||||
// If the labels have not changed, then the daemon set responsible for
|
// If the labels have not changed, then the daemon set responsible for
|
||||||
// the pod is the same as it was before. In that case we have enqueued the daemon
|
// the pod is the same as it was before. In that case we have enqueued the daemon
|
||||||
// set above, and do not have to enqueue the set again.
|
// set above, and do not have to enqueue the set again.
|
||||||
@@ -427,8 +428,8 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
|||||||
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
||||||
oldNode := old.(*api.Node)
|
oldNode := old.(*api.Node)
|
||||||
curNode := cur.(*api.Node)
|
curNode := cur.(*api.Node)
|
||||||
if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) {
|
if reflect.DeepEqual(oldNode.Labels, curNode.Labels) {
|
||||||
// A periodic relist will send update events for all known pods.
|
// If node labels didn't change, we can ignore this update.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dsList, err := dsc.dsStore.List()
|
dsList, err := dsc.dsStore.List()
|
||||||
|
@@ -272,19 +272,20 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
|
|||||||
// awaken both the old and new deployments. old and cur must be *extensions.ReplicaSet
|
// awaken both the old and new deployments. old and cur must be *extensions.ReplicaSet
|
||||||
// types.
|
// types.
|
||||||
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
|
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
curRS := cur.(*extensions.ReplicaSet)
|
||||||
// A periodic relist will send update events for all known controllers.
|
oldRS := old.(*extensions.ReplicaSet)
|
||||||
|
if curRS.ResourceVersion == oldRS.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known replica sets.
|
||||||
|
// Two different versions of the same replica set will always have different RVs.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// TODO: Write a unittest for this case
|
// TODO: Write a unittest for this case
|
||||||
curRS := cur.(*extensions.ReplicaSet)
|
|
||||||
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
|
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
|
||||||
if d := dc.getDeploymentForReplicaSet(curRS); d != nil {
|
if d := dc.getDeploymentForReplicaSet(curRS); d != nil {
|
||||||
dc.enqueueDeployment(d)
|
dc.enqueueDeployment(d)
|
||||||
}
|
}
|
||||||
// A number of things could affect the old deployment: labels changing,
|
// A number of things could affect the old deployment: labels changing,
|
||||||
// pod template changing, etc.
|
// pod template changing, etc.
|
||||||
oldRS := old.(*extensions.ReplicaSet)
|
|
||||||
if !api.Semantic.DeepEqual(oldRS, curRS) {
|
if !api.Semantic.DeepEqual(oldRS, curRS) {
|
||||||
if oldD := dc.getDeploymentForReplicaSet(oldRS); oldD != nil {
|
if oldD := dc.getDeploymentForReplicaSet(oldRS); oldD != nil {
|
||||||
dc.enqueueDeployment(oldD)
|
dc.enqueueDeployment(oldD)
|
||||||
@@ -354,11 +355,13 @@ func (dc *DeploymentController) addPod(obj interface{}) {
|
|||||||
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
|
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
|
||||||
// the old and new deployments. old and cur must be *api.Pod types.
|
// the old and new deployments. old and cur must be *api.Pod types.
|
||||||
func (dc *DeploymentController) updatePod(old, cur interface{}) {
|
func (dc *DeploymentController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
curPod := cur.(*api.Pod)
|
curPod := cur.(*api.Pod)
|
||||||
oldPod := old.(*api.Pod)
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
|
return
|
||||||
|
}
|
||||||
glog.V(4).Infof("Pod %s updated %#v -> %#v.", curPod.Name, oldPod, curPod)
|
glog.V(4).Infof("Pod %s updated %#v -> %#v.", curPod.Name, oldPod, curPod)
|
||||||
if d := dc.getDeploymentForPod(curPod); d != nil {
|
if d := dc.getDeploymentForPod(curPod); d != nil {
|
||||||
dc.enqueueDeployment(d)
|
dc.enqueueDeployment(d)
|
||||||
|
@@ -210,17 +210,19 @@ func (e *EndpointController) addPod(obj interface{}) {
|
|||||||
// and what services it will be a member of, and enqueue the union of these.
|
// and what services it will be a member of, and enqueue the union of these.
|
||||||
// old and cur must be *api.Pod types.
|
// old and cur must be *api.Pod types.
|
||||||
func (e *EndpointController) updatePod(old, cur interface{}) {
|
func (e *EndpointController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
newPod := cur.(*api.Pod)
|
||||||
|
oldPod := old.(*api.Pod)
|
||||||
|
if newPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
newPod := old.(*api.Pod)
|
|
||||||
services, err := e.getPodServiceMemberships(newPod)
|
services, err := e.getPodServiceMemberships(newPod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
|
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPod := cur.(*api.Pod)
|
|
||||||
// Only need to get the old services if the labels changed.
|
// Only need to get the old services if the labels changed.
|
||||||
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
|
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
|
||||||
!hostNameAndDomainAreEqual(newPod, oldPod) {
|
!hostNameAndDomainAreEqual(newPod, oldPod) {
|
||||||
|
@@ -197,11 +197,13 @@ func (jm *JobController) addPod(obj interface{}) {
|
|||||||
// If the labels of the pod have changed we need to awaken both the old
|
// If the labels of the pod have changed we need to awaken both the old
|
||||||
// and new job. old and cur must be *api.Pod types.
|
// and new job. old and cur must be *api.Pod types.
|
||||||
func (jm *JobController) updatePod(old, cur interface{}) {
|
func (jm *JobController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
curPod := cur.(*api.Pod)
|
||||||
// A periodic relist will send update events for all known pods.
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
curPod := cur.(*api.Pod)
|
|
||||||
if curPod.DeletionTimestamp != nil {
|
if curPod.DeletionTimestamp != nil {
|
||||||
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
|
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
|
||||||
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
|
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
|
||||||
@@ -213,7 +215,6 @@ func (jm *JobController) updatePod(old, cur interface{}) {
|
|||||||
if job := jm.getPodJob(curPod); job != nil {
|
if job := jm.getPodJob(curPod); job != nil {
|
||||||
jm.enqueueController(job)
|
jm.enqueueController(job)
|
||||||
}
|
}
|
||||||
oldPod := old.(*api.Pod)
|
|
||||||
// Only need to get the old job if the labels changed.
|
// Only need to get the old job if the labels changed.
|
||||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
||||||
// If the old and new job are the same, the first one that syncs
|
// If the old and new job are the same, the first one that syncs
|
||||||
|
@@ -166,11 +166,13 @@ func (psc *PetSetController) addPod(obj interface{}) {
|
|||||||
// updatePod adds the petset for the current and old pods to the sync queue.
|
// updatePod adds the petset for the current and old pods to the sync queue.
|
||||||
// If the labels of the pod didn't change, this method enqueues a single petset.
|
// If the labels of the pod didn't change, this method enqueues a single petset.
|
||||||
func (psc *PetSetController) updatePod(old, cur interface{}) {
|
func (psc *PetSetController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
curPod := cur.(*api.Pod)
|
curPod := cur.(*api.Pod)
|
||||||
oldPod := old.(*api.Pod)
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
|
return
|
||||||
|
}
|
||||||
ps := psc.getPetSetForPod(curPod)
|
ps := psc.getPetSetForPod(curPod)
|
||||||
if ps == nil {
|
if ps == nil {
|
||||||
return
|
return
|
||||||
|
@@ -352,12 +352,13 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) {
|
|||||||
// up. If the labels of the pod have changed we need to awaken both the old
|
// up. If the labels of the pod have changed we need to awaken both the old
|
||||||
// and new replica set. old and cur must be *api.Pod types.
|
// and new replica set. old and cur must be *api.Pod types.
|
||||||
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
|
||||||
// A periodic relist will send update events for all known pods.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
curPod := cur.(*api.Pod)
|
curPod := cur.(*api.Pod)
|
||||||
oldPod := old.(*api.Pod)
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
|
return
|
||||||
|
}
|
||||||
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
||||||
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
||||||
if curPod.DeletionTimestamp != nil {
|
if curPod.DeletionTimestamp != nil {
|
||||||
|
@@ -565,8 +565,10 @@ func TestUpdatePods(t *testing.T) {
|
|||||||
// then update its labels to match testRSSpec2. We expect to receive a sync
|
// then update its labels to match testRSSpec2. We expect to receive a sync
|
||||||
// request for both replica sets.
|
// request for both replica sets.
|
||||||
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
|
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
|
||||||
|
pod1.ResourceVersion = "1"
|
||||||
pod2 := pod1
|
pod2 := pod1
|
||||||
pod2.Labels = labelMap2
|
pod2.Labels = labelMap2
|
||||||
|
pod2.ResourceVersion = "2"
|
||||||
manager.updatePod(&pod1, &pod2)
|
manager.updatePod(&pod1, &pod2)
|
||||||
expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
|
expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
|
||||||
for _, name := range expected.List() {
|
for _, name := range expected.List() {
|
||||||
@@ -585,6 +587,7 @@ func TestUpdatePods(t *testing.T) {
|
|||||||
// its labels to match no replica set. We expect to receive a sync request
|
// its labels to match no replica set. We expect to receive a sync request
|
||||||
// for testRSSpec1.
|
// for testRSSpec1.
|
||||||
pod2.Labels = make(map[string]string)
|
pod2.Labels = make(map[string]string)
|
||||||
|
pod2.ResourceVersion = "2"
|
||||||
manager.updatePod(&pod1, &pod2)
|
manager.updatePod(&pod1, &pod2)
|
||||||
expected = sets.NewString(testRSSpec1.Name)
|
expected = sets.NewString(testRSSpec1.Name)
|
||||||
for _, name := range expected.List() {
|
for _, name := range expected.List() {
|
||||||
@@ -991,6 +994,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
pod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
pod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
||||||
pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
||||||
|
pod.ResourceVersion = "1"
|
||||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||||
|
|
||||||
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
||||||
@@ -1010,6 +1014,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
// An update from no deletion timestamp to having one should be treated
|
// An update from no deletion timestamp to having one should be treated
|
||||||
// as a deletion.
|
// as a deletion.
|
||||||
oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
|
||||||
|
oldPod.ResourceVersion = "2"
|
||||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
|
||||||
manager.updatePod(&oldPod, &pod)
|
manager.updatePod(&oldPod, &pod)
|
||||||
|
|
||||||
@@ -1035,6 +1040,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
|
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
|
||||||
oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
||||||
|
oldPod.ResourceVersion = "2"
|
||||||
manager.updatePod(&oldPod, &pod)
|
manager.updatePod(&oldPod, &pod)
|
||||||
|
|
||||||
podExp, exists, err = manager.expectations.GetExpectations(rsKey)
|
podExp, exists, err = manager.expectations.GetExpectations(rsKey)
|
||||||
@@ -1182,12 +1188,14 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
|
|||||||
manager.rsStore.Store.Add(rs)
|
manager.rsStore.Store.Add(rs)
|
||||||
// put one pod in the podStore
|
// put one pod in the podStore
|
||||||
pod := newPod("pod", rs, api.PodRunning)
|
pod := newPod("pod", rs, api.PodRunning)
|
||||||
|
pod.ResourceVersion = "1"
|
||||||
var trueVar = true
|
var trueVar = true
|
||||||
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
|
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
|
||||||
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
|
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
|
||||||
updatedPod := *pod
|
updatedPod := *pod
|
||||||
// reset the labels
|
// reset the labels
|
||||||
updatedPod.Labels = make(map[string]string)
|
updatedPod.Labels = make(map[string]string)
|
||||||
|
updatedPod.ResourceVersion = "2"
|
||||||
// add the updatedPod to the store. This is consistent with the behavior of
|
// add the updatedPod to the store. This is consistent with the behavior of
|
||||||
// the Informer: Informer updates the store before call the handler
|
// the Informer: Informer updates the store before call the handler
|
||||||
// (updatePod() in this case).
|
// (updatePod() in this case).
|
||||||
|
@@ -367,12 +367,13 @@ func (rm *ReplicationManager) addPod(obj interface{}) {
|
|||||||
// up. If the labels of the pod have changed we need to awaken both the old
|
// up. If the labels of the pod have changed we need to awaken both the old
|
||||||
// and new controller. old and cur must be *api.Pod types.
|
// and new controller. old and cur must be *api.Pod types.
|
||||||
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
|
func (rm *ReplicationManager) updatePod(old, cur interface{}) {
|
||||||
if api.Semantic.DeepEqual(old, cur) {
|
|
||||||
// A periodic relist will send update events for all known pods.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
curPod := cur.(*api.Pod)
|
curPod := cur.(*api.Pod)
|
||||||
oldPod := old.(*api.Pod)
|
oldPod := old.(*api.Pod)
|
||||||
|
if curPod.ResourceVersion == oldPod.ResourceVersion {
|
||||||
|
// Periodic resync will send update events for all known pods.
|
||||||
|
// Two different versions of the same pod will always have different RVs.
|
||||||
|
return
|
||||||
|
}
|
||||||
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
|
||||||
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
|
||||||
if curPod.DeletionTimestamp != nil {
|
if curPod.DeletionTimestamp != nil {
|
||||||
|
@@ -547,8 +547,10 @@ func TestUpdatePods(t *testing.T) {
|
|||||||
// testControllerSpec1, then update its labels to match testControllerSpec2.
|
// testControllerSpec1, then update its labels to match testControllerSpec2.
|
||||||
// We expect to receive a sync request for both controllers.
|
// We expect to receive a sync request for both controllers.
|
||||||
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
|
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
|
||||||
|
pod1.ResourceVersion = "1"
|
||||||
pod2 := pod1
|
pod2 := pod1
|
||||||
pod2.Labels = testControllerSpec2.Spec.Selector
|
pod2.Labels = testControllerSpec2.Spec.Selector
|
||||||
|
pod2.ResourceVersion = "2"
|
||||||
manager.updatePod(&pod1, &pod2)
|
manager.updatePod(&pod1, &pod2)
|
||||||
expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name)
|
expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name)
|
||||||
for _, name := range expected.List() {
|
for _, name := range expected.List() {
|
||||||
@@ -567,6 +569,7 @@ func TestUpdatePods(t *testing.T) {
|
|||||||
// We update its labels to match no replication controller. We expect to
|
// We update its labels to match no replication controller. We expect to
|
||||||
// receive a sync request for testControllerSpec1.
|
// receive a sync request for testControllerSpec1.
|
||||||
pod2.Labels = make(map[string]string)
|
pod2.Labels = make(map[string]string)
|
||||||
|
pod2.ResourceVersion = "2"
|
||||||
manager.updatePod(&pod1, &pod2)
|
manager.updatePod(&pod1, &pod2)
|
||||||
expected = sets.NewString(testControllerSpec1.Name)
|
expected = sets.NewString(testControllerSpec1.Name)
|
||||||
for _, name := range expected.List() {
|
for _, name := range expected.List() {
|
||||||
@@ -969,6 +972,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
pod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
pod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
||||||
pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
||||||
|
pod.ResourceVersion = "1"
|
||||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||||
|
|
||||||
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
// A pod added with a deletion timestamp should decrement deletions, not creations.
|
||||||
@@ -988,6 +992,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
// An update from no deletion timestamp to having one should be treated
|
// An update from no deletion timestamp to having one should be treated
|
||||||
// as a deletion.
|
// as a deletion.
|
||||||
oldPod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
oldPod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
|
||||||
|
oldPod.ResourceVersion = "2"
|
||||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
|
||||||
manager.updatePod(&oldPod, &pod)
|
manager.updatePod(&oldPod, &pod)
|
||||||
|
|
||||||
@@ -1013,6 +1018,7 @@ func TestDeletionTimestamp(t *testing.T) {
|
|||||||
}
|
}
|
||||||
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)})
|
manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)})
|
||||||
oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
|
||||||
|
oldPod.ResourceVersion = "2"
|
||||||
manager.updatePod(&oldPod, &pod)
|
manager.updatePod(&oldPod, &pod)
|
||||||
|
|
||||||
podExp, exists, err = manager.expectations.GetExpectations(rcKey)
|
podExp, exists, err = manager.expectations.GetExpectations(rcKey)
|
||||||
@@ -1239,12 +1245,14 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
|
|||||||
manager.rcStore.Indexer.Add(rc)
|
manager.rcStore.Indexer.Add(rc)
|
||||||
// put one pod in the podStore
|
// put one pod in the podStore
|
||||||
pod := newPod("pod", rc, api.PodRunning)
|
pod := newPod("pod", rc, api.PodRunning)
|
||||||
|
pod.ResourceVersion = "1"
|
||||||
var trueVar = true
|
var trueVar = true
|
||||||
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
|
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
|
||||||
pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
|
pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
|
||||||
updatedPod := *pod
|
updatedPod := *pod
|
||||||
// reset the labels
|
// reset the labels
|
||||||
updatedPod.Labels = make(map[string]string)
|
updatedPod.Labels = make(map[string]string)
|
||||||
|
updatedPod.ResourceVersion = "2"
|
||||||
// add the updatedPod to the store. This is consistent with the behavior of
|
// add the updatedPod to the store. This is consistent with the behavior of
|
||||||
// the Informer: Informer updates the store before call the handler
|
// the Informer: Informer updates the store before call the handler
|
||||||
// (updatePod() in this case).
|
// (updatePod() in this case).
|
||||||
|
Reference in New Issue
Block a user