prevent RC hotloop on denied pods
This commit is contained in:
		| @@ -112,7 +112,7 @@ type ReplicationManager struct { | ||||
| 	lookupCache *controller.MatchingCache | ||||
|  | ||||
| 	// Controllers that need to be synced | ||||
| 	queue *workqueue.Type | ||||
| 	queue workqueue.RateLimitingInterface | ||||
|  | ||||
| 	// garbageCollectorEnabled denotes if the garbage collector is enabled. RC | ||||
| 	// manager behaves differently if GC is enabled. | ||||
| @@ -143,7 +143,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame | ||||
| 		}, | ||||
| 		burstReplicas: burstReplicas, | ||||
| 		expectations:  controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), | ||||
| 		queue:         workqueue.New(), | ||||
| 		queue:         workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | ||||
| 		garbageCollectorEnabled: garbageCollectorEnabled, | ||||
| 	} | ||||
|  | ||||
| @@ -464,10 +464,15 @@ func (rm *ReplicationManager) worker() { | ||||
| 			return true | ||||
| 		} | ||||
| 		defer rm.queue.Done(key) | ||||
|  | ||||
| 		err := rm.syncHandler(key.(string)) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Error syncing replication controller: %v", err) | ||||
| 		if err == nil { | ||||
| 			rm.queue.Forget(key) | ||||
| 			return false | ||||
| 		} | ||||
|  | ||||
| 		rm.queue.AddRateLimited(key) | ||||
| 		utilruntime.HandleError(err) | ||||
| 		return false | ||||
| 	} | ||||
| 	for { | ||||
| @@ -480,13 +485,16 @@ func (rm *ReplicationManager) worker() { | ||||
|  | ||||
| // manageReplicas checks and updates replicas for the given replication controller. | ||||
| // Does NOT modify <filteredPods>. | ||||
| func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) { | ||||
| func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) error { | ||||
| 	diff := len(filteredPods) - int(rc.Spec.Replicas) | ||||
| 	rcKey, err := controller.KeyFunc(rc) | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err) | ||||
| 		return | ||||
| 		return err | ||||
| 	} | ||||
| 	if diff == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if diff < 0 { | ||||
| 		diff *= -1 | ||||
| 		if diff > rm.burstReplicas { | ||||
| @@ -497,6 +505,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re | ||||
| 		// UID, which would require locking *across* the create, which will turn | ||||
| 		// into a performance bottleneck. We should generate a UID for the pod | ||||
| 		// beforehand and store it via ExpectCreations. | ||||
| 		errCh := make(chan error, diff) | ||||
| 		rm.expectations.ExpectCreations(rcKey, diff) | ||||
| 		var wg sync.WaitGroup | ||||
| 		wg.Add(diff) | ||||
| @@ -522,55 +531,79 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re | ||||
| 					// Decrement the expected number of creates because the informer won't observe this pod | ||||
| 					glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name) | ||||
| 					rm.expectations.CreationObserved(rcKey) | ||||
| 					rm.enqueueController(rc) | ||||
| 					errCh <- err | ||||
| 					utilruntime.HandleError(err) | ||||
| 				} | ||||
| 			}() | ||||
| 		} | ||||
| 		wg.Wait() | ||||
| 	} else if diff > 0 { | ||||
| 		if diff > rm.burstReplicas { | ||||
| 			diff = rm.burstReplicas | ||||
|  | ||||
| 		select { | ||||
| 		case err := <-errCh: | ||||
| 			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. | ||||
| 			if err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		default: | ||||
| 		} | ||||
| 		glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) | ||||
| 		// No need to sort pods if we are about to delete all of them | ||||
| 		if rc.Spec.Replicas != 0 { | ||||
| 			// Sort the pods in the order such that not-ready < ready, unscheduled | ||||
| 			// < scheduled, and pending < running. This ensures that we delete pods | ||||
| 			// in the earlier stages whenever possible. | ||||
| 			sort.Sort(controller.ActivePods(filteredPods)) | ||||
| 		} | ||||
| 		// Snapshot the UIDs (ns/name) of the pods we're expecting to see | ||||
| 		// deleted, so we know to record their expectations exactly once either | ||||
| 		// when we see it as an update of the deletion timestamp, or as a delete. | ||||
| 		// Note that if the labels on a pod/rc change in a way that the pod gets | ||||
| 		// orphaned, the rs will only wake up after the expectations have | ||||
| 		// expired even if other pods are deleted. | ||||
| 		deletedPodKeys := []string{} | ||||
| 		for i := 0; i < diff; i++ { | ||||
| 			deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) | ||||
| 		} | ||||
| 		// We use pod namespace/name as a UID to wait for deletions, so if the | ||||
| 		// labels on a pod/rc change in a way that the pod gets orphaned, the | ||||
| 		// rc will only wake up after the expectation has expired. | ||||
| 		rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) | ||||
| 		var wg sync.WaitGroup | ||||
| 		wg.Add(diff) | ||||
| 		for i := 0; i < diff; i++ { | ||||
| 			go func(ix int) { | ||||
| 				defer wg.Done() | ||||
| 				if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { | ||||
| 					// Decrement the expected number of deletes because the informer won't observe this deletion | ||||
| 					podKey := controller.PodKey(filteredPods[ix]) | ||||
| 					glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) | ||||
| 					rm.expectations.DeletionObserved(rcKey, podKey) | ||||
| 					rm.enqueueController(rc) | ||||
| 					utilruntime.HandleError(err) | ||||
| 				} | ||||
| 			}(i) | ||||
| 		} | ||||
| 		wg.Wait() | ||||
|  | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if diff > rm.burstReplicas { | ||||
| 		diff = rm.burstReplicas | ||||
| 	} | ||||
| 	glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff) | ||||
| 	// No need to sort pods if we are about to delete all of them | ||||
| 	if rc.Spec.Replicas != 0 { | ||||
| 		// Sort the pods in the order such that not-ready < ready, unscheduled | ||||
| 		// < scheduled, and pending < running. This ensures that we delete pods | ||||
| 		// in the earlier stages whenever possible. | ||||
| 		sort.Sort(controller.ActivePods(filteredPods)) | ||||
| 	} | ||||
| 	// Snapshot the UIDs (ns/name) of the pods we're expecting to see | ||||
| 	// deleted, so we know to record their expectations exactly once either | ||||
| 	// when we see it as an update of the deletion timestamp, or as a delete. | ||||
| 	// Note that if the labels on a pod/rc change in a way that the pod gets | ||||
| 	// orphaned, the rs will only wake up after the expectations have | ||||
| 	// expired even if other pods are deleted. | ||||
| 	deletedPodKeys := []string{} | ||||
| 	for i := 0; i < diff; i++ { | ||||
| 		deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i])) | ||||
| 	} | ||||
| 	// We use pod namespace/name as a UID to wait for deletions, so if the | ||||
| 	// labels on a pod/rc change in a way that the pod gets orphaned, the | ||||
| 	// rc will only wake up after the expectation has expired. | ||||
| 	errCh := make(chan error, diff) | ||||
| 	rm.expectations.ExpectDeletions(rcKey, deletedPodKeys) | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(diff) | ||||
| 	for i := 0; i < diff; i++ { | ||||
| 		go func(ix int) { | ||||
| 			defer wg.Done() | ||||
| 			if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil { | ||||
| 				// Decrement the expected number of deletes because the informer won't observe this deletion | ||||
| 				podKey := controller.PodKey(filteredPods[ix]) | ||||
| 				glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name) | ||||
| 				rm.expectations.DeletionObserved(rcKey, podKey) | ||||
| 				errCh <- err | ||||
| 				utilruntime.HandleError(err) | ||||
| 			} | ||||
| 		}(i) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	select { | ||||
| 	case err := <-errCh: | ||||
| 		// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit. | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	default: | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
|  | ||||
| } | ||||
|  | ||||
| // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning | ||||
| @@ -600,8 +633,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { | ||||
| 		return nil | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		glog.Infof("Unable to retrieve rc %v from store: %v", key, err) | ||||
| 		rm.queue.Add(key) | ||||
| 		return err | ||||
| 	} | ||||
| 	rc := *obj.(*api.ReplicationController) | ||||
| @@ -672,8 +703,9 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { | ||||
| 		filteredPods = controller.FilterActivePods(pods) | ||||
| 	} | ||||
|  | ||||
| 	var manageReplicasErr error | ||||
| 	if rcNeedsSync && rc.DeletionTimestamp == nil { | ||||
| 		rm.manageReplicas(filteredPods, &rc) | ||||
| 		manageReplicasErr = rm.manageReplicas(filteredPods, &rc) | ||||
| 	} | ||||
| 	trace.Step("manageReplicas done") | ||||
|  | ||||
| @@ -692,10 +724,9 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { | ||||
|  | ||||
| 	// Always updates status as pods come up or die. | ||||
| 	if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount); err != nil { | ||||
| 		// Multiple things could lead to this update failing. Requeuing the controller ensures | ||||
| 		// we retry with some fairness. | ||||
| 		glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rc.Namespace, rc.Name, err) | ||||
| 		rm.enqueueController(&rc) | ||||
| 		// Multiple things could lead to this update failing.  Returning an error causes a requeue without forcing a hotloop | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
|  | ||||
| 	return manageReplicasErr | ||||
| } | ||||
|   | ||||
| @@ -32,6 +32,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/client/cache" | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | ||||
| 	fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | ||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | ||||
| 	"k8s.io/kubernetes/pkg/client/testing/core" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| @@ -606,23 +607,11 @@ func TestControllerUpdateRequeue(t *testing.T) { | ||||
| 	fakePodControl := controller.FakePodControl{} | ||||
| 	manager.podControl = &fakePodControl | ||||
|  | ||||
| 	manager.syncReplicationController(getKey(rc, t)) | ||||
|  | ||||
| 	ch := make(chan interface{}) | ||||
| 	go func() { | ||||
| 		item, _ := manager.queue.Get() | ||||
| 		ch <- item | ||||
| 	}() | ||||
| 	select { | ||||
| 	case key := <-ch: | ||||
| 		expectedKey := getKey(rc, t) | ||||
| 		if key != expectedKey { | ||||
| 			t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key) | ||||
| 		} | ||||
| 	case <-time.After(wait.ForeverTestTimeout): | ||||
| 		manager.queue.ShutDown() | ||||
| 		t.Errorf("Expected to find an rc in the queue, found none.") | ||||
| 	// an error from the sync function will be requeued, check to make sure we returned an error | ||||
| 	if err := manager.syncReplicationController(getKey(rc, t)); err == nil { | ||||
| 		t.Errorf("missing error for requeue") | ||||
| 	} | ||||
|  | ||||
| 	// 1 Update and 1 GET, both of which fail | ||||
| 	fakeHandler.ValidateRequestCount(t, 2) | ||||
| } | ||||
| @@ -1136,8 +1125,8 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { | ||||
| } | ||||
|  | ||||
| // setupManagerWithGCEnabled creates a RC manager with a fakePodControl and with garbageCollectorEnabled set to true | ||||
| func setupManagerWithGCEnabled() (manager *ReplicationManager, fakePodControl *controller.FakePodControl) { | ||||
| 	c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) | ||||
| func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl) { | ||||
| 	c := fakeclientset.NewSimpleClientset(objs...) | ||||
| 	fakePodControl = &controller.FakePodControl{} | ||||
| 	manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0) | ||||
| 	manager.garbageCollectorEnabled = true | ||||
| @@ -1165,8 +1154,8 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestPatchPodWithOtherOwnerRef(t *testing.T) { | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled() | ||||
| 	rc := newReplicationController(2) | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled(rc) | ||||
| 	manager.rcStore.Indexer.Add(rc) | ||||
| 	// add to podStore one more matching pod that doesn't have a controller | ||||
| 	// ref, but has an owner ref pointing to other object. Expect a patch to | ||||
| @@ -1185,8 +1174,8 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestPatchPodWithCorrectOwnerRef(t *testing.T) { | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled() | ||||
| 	rc := newReplicationController(2) | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled(rc) | ||||
| 	manager.rcStore.Indexer.Add(rc) | ||||
| 	// add to podStore a matching pod that has an ownerRef pointing to the rc, | ||||
| 	// but ownerRef.Controller is false. Expect a patch to take control it. | ||||
| @@ -1204,8 +1193,8 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestPatchPodFails(t *testing.T) { | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled() | ||||
| 	rc := newReplicationController(2) | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled(rc) | ||||
| 	manager.rcStore.Indexer.Add(rc) | ||||
| 	// add to podStore two matching pods. Expect two patches to take control | ||||
| 	// them. | ||||
| @@ -1215,16 +1204,16 @@ func TestPatchPodFails(t *testing.T) { | ||||
| 	// control of the pods and create new ones. | ||||
| 	fakePodControl.Err = fmt.Errorf("Fake Error") | ||||
| 	err := manager.syncReplicationController(getKey(rc, t)) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	if err == nil || err.Error() != "Fake Error" { | ||||
| 		t.Fatalf("expected Fake Error, got %v", err) | ||||
| 	} | ||||
| 	// 2 patches to take control of pod1 and pod2 (both fail), 2 creates. | ||||
| 	validateSyncReplication(t, fakePodControl, 2, 0, 2) | ||||
| } | ||||
|  | ||||
| func TestPatchExtraPodsThenDelete(t *testing.T) { | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled() | ||||
| 	rc := newReplicationController(2) | ||||
| 	manager, fakePodControl := setupManagerWithGCEnabled(rc) | ||||
| 	manager.rcStore.Indexer.Add(rc) | ||||
| 	// add to podStore three matching pods. Expect three patches to take control | ||||
| 	// them, and later delete one of them. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 deads2k
					deads2k