From ca13b9e532149b61fc5d08d7bc05902e499a071c Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 08:58:28 -0800 Subject: [PATCH 1/5] RC/RS: Use ControllerRef to route watch events. This is part of the completion of ControllerRef, as described here: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches This also removes the need for the Pod->Controller mapping cache in RC and RS. This mapping is now persisted in the Pod's ControllerRef instead. --- cmd/kube-controller-manager/app/core.go | 1 - cmd/kube-controller-manager/app/extensions.go | 1 - .../app/options/options.go | 4 +- pkg/controller/replicaset/BUILD | 1 - pkg/controller/replicaset/replica_set.go | 233 +++++++--------- pkg/controller/replicaset/replica_set_test.go | 192 +++++++++---- pkg/controller/replication/BUILD | 1 - .../replication/replication_controller.go | 259 ++++++++---------- .../replication_controller_test.go | 203 +++++++++----- test/integration/framework/master_utils.go | 2 +- test/integration/quota/quota_test.go | 2 - .../integration/replicaset/replicaset_test.go | 1 - .../replicationcontroller_test.go | 2 +- 13 files changed, 486 insertions(+), 416 deletions(-) diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index cab11f6d47b..8de3eb9fae0 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -59,7 +59,6 @@ func startReplicationController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().ReplicationControllers(), ctx.ClientBuilder.ClientOrDie("replication-controller"), replicationcontroller.BurstReplicas, - int(ctx.Options.LookupCacheSizeForRC), ).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/extensions.go b/cmd/kube-controller-manager/app/extensions.go index c4ea15401f3..f349ecbe212 100644 --- a/cmd/kube-controller-manager/app/extensions.go +++ b/cmd/kube-controller-manager/app/extensions.go @@ -63,7 +63,6 @@ func startReplicaSetController(ctx ControllerContext) (bool, error) { ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, - int(ctx.Options.LookupCacheSizeForRS), ).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop) return true, nil } diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index bc7f8e9fde6..196ba2742bd 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -132,8 +132,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet, allControllers []string, disabled fs.Int32Var(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load") fs.Int32Var(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load") fs.Int32Var(&s.ConcurrentSATokenSyncs, "concurrent-serviceaccount-token-syncs", s.ConcurrentSATokenSyncs, "The number of service account token objects that are allowed to sync concurrently. Larger number = more responsive token generation, but more CPU (and network) load") - fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.") - fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.") + fs.Int32Var(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "This flag is deprecated and will be removed in future releases. ReplicationController no longer requires a lookup cache.") + fs.Int32Var(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "This flag is deprecated and will be removed in future releases. ReplicaSet no longer requires a lookup cache.") fs.Int32Var(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.") fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", 0, ""+ diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index 0a93111b2e5..57e78d44f4f 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -32,7 +32,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index e165a9e5c56..d7b8f802838 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -59,9 +58,8 @@ const ( statusUpdateRetries = 1 ) -func getRSKind() schema.GroupVersionKind { - return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") -} +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored // in the system with actual running pods. @@ -90,14 +88,12 @@ type ReplicaSetController struct { // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // Controllers that need to be synced queue workqueue.RateLimitingInterface } // NewReplicaSetController configures a replica set controller with the specified event recorder -func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicaSetController { +func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } @@ -139,7 +135,6 @@ func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, rsc.podListerSynced = podInformer.Informer().HasSynced rsc.syncHandler = rsc.syncReplicaSet - rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rsc } @@ -172,46 +167,19 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Shutting down ReplicaSet Controller") } -// getPodReplicaSet returns the replica set managing the given pod. -// TODO: Surface that we are ignoring multiple replica sets for a single pod. -// TODO: use ownerReference.Controller to determine if the rs controls the pod. -func (rsc *ReplicaSetController) getPodReplicaSet(pod *v1.Pod) *extensions.ReplicaSet { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached { - rs, ok := obj.(*extensions.ReplicaSet) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicaSet object")) - return nil - } - if cached && rsc.isCacheValid(pod, rs) { - return rs - } - } - - // if not cached or cached value is invalid, search all the rs to find the matching one, and update cache +// getPodReplicaSets returns a list of ReplicaSets matching the given pod. +func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.ReplicaSet { rss, err := rsc.rsLister.GetPodReplicaSets(pod) if err != nil { glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) return nil } - // In theory, overlapping ReplicaSets is user error. This sorting will not prevent - // oscillation of replicas in all cases, eg: - // rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2 - // pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1. - // pod: [(k2:v2)] will wake rs2 which creates a new replica. if len(rss) > 1 { - // More than two items in this list indicates user error. If two replicasets - // overlap, sort by creation timestamp, subsort by name, then pick - // the first. + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss)) } - - // update lookup cache - rsc.lookupCache.Update(pod, rss[0]) - - return rss[0] + return rss } // callback when RS is updated @@ -219,21 +187,6 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { oldRS := old.(*extensions.ReplicaSet) curRS := cur.(*extensions.ReplicaSet) - // We should invalidate the whole lookup cache if a RS's selector has been updated. - // - // Imagine that you have two RSs: - // * old RS1 - // * new RS2 - // You also have a pod that is attached to RS2 (because it doesn't match RS1 selector). - // Now imagine that you are changing RS1 selector so that it is now matching that pod, - // in such case we must invalidate the whole cache so that pod could be adopted by RS1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRS.Spec.Selector, curRS.Spec.Selector) { - rsc.lookupCache.InvalidateAll() - } - // You might imagine that we only really need to enqueue the // replica set when Spec changes, but it is safer to sync any // time this function is triggered. That way a full informer @@ -252,57 +205,44 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { rsc.enqueueReplicaSet(cur) } -// isCacheValid check if the cache is valid -func (rsc *ReplicaSetController) isCacheValid(pod *v1.Pod, cachedRS *extensions.ReplicaSet) bool { - _, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name) - // rs has been deleted or updated, cache is invalid - if err != nil || !isReplicaSetMatch(pod, cachedRS) { - return false - } - return true -} - -// isReplicaSetMatch take a Pod and ReplicaSet, return whether the Pod and ReplicaSet are matching -// TODO(mqliang): This logic is a copy from GetPodReplicaSets(), remove the duplication -func isReplicaSetMatch(pod *v1.Pod, rs *extensions.ReplicaSet) bool { - if rs.Namespace != pod.Namespace { - return false - } - selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) - if err != nil { - err = fmt.Errorf("invalid selector: %v", err) - return false - } - - // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true -} - -// When a pod is created, enqueue the replica set that manages it and update it's expectations. +// When a pod is created, enqueue the replica set that manages it and update its expectations. func (rsc *ReplicaSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) - rs := rsc.getPodReplicaSet(pod) - if rs == nil { - return - } - rsKey, err := controller.KeyFunc(rs) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replica set %#v: %v", rs, err)) - return - } if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. rsc.deletePod(pod) return } - rsc.expectations.CreationObserved(rsKey) - rsc.enqueueReplicaSet(rs) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rs) + if err != nil { + return + } + rsc.expectations.CreationObserved(rsKey) + rsc.enqueueReplicaSet(rs) + return + } + + // Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, rs := range rsc.getPodReplicaSets(pod) { + rsc.enqueueReplicaSet(rs) + } } // When a pod is updated, figure out what replica set/s manage it and wake them @@ -317,6 +257,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, @@ -332,18 +273,29 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { return } - // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. - if labelChanged { - // If the old and new ReplicaSet are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldRS := rsc.getPodReplicaSet(oldPod); oldRS != nil { - rsc.enqueueReplicaSet(oldRS) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + rs, err := rsc.rsLister.ReplicaSets(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + rsc.enqueueReplicaSet(rs) } } - changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curRS := rsc.getPodReplicaSet(curPod); curRS != nil { - rsc.enqueueReplicaSet(curRS) + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rs, err := rsc.rsLister.ReplicaSets(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + rsc.enqueueReplicaSet(rs) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in // the Pod status which in turn will trigger a requeue of the owning replica set thus // having its status updated with the newly available replica. For now, we can fake the @@ -351,9 +303,18 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // a Pod transitioned to Ready. // Note that this still suffers from #29229, we are just moving the problem one level // "closer" to kubelet (from the deployment to the replica set controller). - if changedToReady && curRS.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", curRS.Name, curRS.Spec.MinReadySeconds) - rsc.enqueueReplicaSetAfter(curRS, time.Duration(curRS.Spec.MinReadySeconds)*time.Second) + if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 { + glog.V(2).Infof("ReplicaSet %q will be enqueued after %ds for availability check", rs.Name, rs.Spec.MinReadySeconds) + rsc.enqueueReplicaSetAfter(rs, time.Duration(rs.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, rs := range rsc.getPodReplicaSets(curPod) { + rsc.enqueueReplicaSet(rs) } } } @@ -370,41 +331,46 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) - if rs := rsc.getPodReplicaSet(pod); rs != nil { - rsKey, err := controller.KeyFunc(rs) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err)) - return - } - rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) - rsc.enqueueReplicaSet(rs) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rs) + if err != nil { + return + } + rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) + rsc.enqueueReplicaSet(rs) } // obj could be an *extensions.ReplicaSet, or a DeletionFinalStateUnknown marker item. func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping replica sets better. Either disallow them at admission time or - // deterministically avoid syncing replica sets that fight over pods. Currently, we only - // ensure that the same replica set is synced for a given pod. When we periodically relist - // all replica sets there will still be some replica instability. One way to handle this is - // by querying the store for all replica sets that this replica set overlaps, as well as all - // replica sets that overlap this ReplicaSet, and sorting them. rsc.queue.Add(key) } @@ -412,16 +378,9 @@ func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) { func (rsc *ReplicaSetController) enqueueReplicaSetAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping replica sets better. Either disallow them at admission time or - // deterministically avoid syncing replica sets that fight over pods. Currently, we only - // ensure that the same replica set is synced for a given pod. When we periodically relist - // all replica sets there will still be some replica instability. One way to handle this is - // by querying the store for all replica sets that this replica set overlaps, as well as all - // replica sets that overlap this ReplicaSet, and sorting them. rsc.queue.AddAfter(key, after) } @@ -483,8 +442,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte var err error boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRSKind().GroupVersion().String(), - Kind: getRSKind().Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), @@ -592,7 +551,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { if err != nil { return err } - cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind()) + cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { // Something went wrong with adoption or release. diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index a7fbb5cb51a..80e3e672288 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -51,7 +51,7 @@ import ( "k8s.io/kubernetes/pkg/securitycontext" ) -func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int, lookupCacheSize int) (*ReplicaSetController, informers.SharedInformerFactory) { +func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) { informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) ret := NewReplicaSetController( @@ -59,7 +59,6 @@ func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh ch informers.Core().V1().Pods(), client, burstReplicas, - lookupCacheSize, ) ret.podListerSynced = alwaysReady @@ -216,7 +215,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // 2 running pods, a controller with 2 replicas, sync is a no-op labelMap := map[string]string{"foo": "bar"} @@ -234,7 +233,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -252,7 +251,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl received := make(chan string) @@ -286,7 +285,7 @@ func TestSyncReplicaSetCreates(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // A controller with 2 replicas and no pods in the store, 2 creates expected labelMap := map[string]string{"foo": "bar"} @@ -311,7 +310,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Steady state for the ReplicaSet, no Status.Replicas updates expected activePods := 5 @@ -356,7 +355,7 @@ func TestControllerUpdateReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -405,7 +404,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) manager.podControl = &fakePodControl @@ -461,7 +460,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) { func TestPodControllerLookup(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), stopCh, BurstReplicas) testCases := []struct { inRSs []*extensions.ReplicaSet pod *v1.Pod @@ -509,7 +508,12 @@ func TestPodControllerLookup(t *testing.T) { for _, r := range c.inRSs { informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(r) } - if rs := manager.getPodReplicaSet(c.pod); rs != nil { + if rss := manager.getPodReplicaSets(c.pod); rss != nil { + if len(rss) != 1 { + t.Errorf("len(rss) = %v, want %v", len(rss), 1) + continue + } + rs := rss[0] if c.outRSName != rs.Name { t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName) } @@ -536,7 +540,6 @@ func TestWatchControllers(t *testing.T) { informers.Core().V1().Pods(), client, BurstReplicas, - 0, ) informers.Start(stopCh) @@ -581,7 +584,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Put one ReplicaSet into the shared informer labelMap := map[string]string{"foo": "bar"} @@ -627,7 +630,7 @@ func TestWatchPods(t *testing.T) { func TestUpdatePods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas) received := make(chan string) @@ -656,16 +659,19 @@ func TestUpdatePods(t *testing.T) { testRSSpec2.Name = "barfoo" informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(&testRSSpec2) - // case 1: We put in the podLister a pod with labels matching testRSSpec1, - // then update its labels to match testRSSpec2. We expect to receive a sync - // request for both replica sets. + isController := true + controllerRef1 := metav1.OwnerReference{UID: testRSSpec1.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec1.Name, Controller: &isController} + controllerRef2 := metav1.OwnerReference{UID: testRSSpec2.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec2.Name, Controller: &isController} + + // case 1: Pod with a ControllerRef pod1 := newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} pod1.ResourceVersion = "1" pod2 := pod1 pod2.Labels = labelMap2 pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + expected := sets.NewString(testRSSpec1.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -674,17 +680,20 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for replica sets within 100ms each") + t.Errorf("Expected update notifications for replica sets") } } - // case 2: pod1 in the podLister has labels matching testRSSpec1. We update - // its labels to match no replica set. We expect to receive a sync request - // for testRSSpec1. - pod2.Labels = make(map[string]string) + // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.OwnerReferences = nil pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testRSSpec1.Name) + expected = sets.NewString(testRSSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -693,7 +702,52 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for replica sets within 100ms each") + t.Errorf("Expected update notifications for replica sets") + } + } + + // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and + // any label-matching RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} + pod2 = pod1 + pod2.OwnerReferences = nil + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets") + } + } + + // case 4: Keep ControllerRef, change labels. Expect to sync owning RS. + pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap1 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.Labels = labelMap2 + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testRSSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for replica sets") } } } @@ -711,7 +765,7 @@ func TestControllerUpdateRequeue(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) labelMap := map[string]string{"foo": "bar"} rs := newReplicaSet(1, labelMap) @@ -782,7 +836,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas) manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} @@ -845,6 +899,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // the rs is waiting for. expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t)) podsToDelete := []*v1.Pod{} + isController := true for _, key := range expectedDels.List() { nsName := strings.Split(key, "/") podsToDelete = append(podsToDelete, &v1.Pod{ @@ -852,6 +907,9 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Name: nsName[1], Namespace: nsName[0], Labels: rsSpec.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, + }, }, }) } @@ -888,11 +946,15 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) t.Fatalf("Waiting on unexpected number of deletes.") } nsName := strings.Split(expectedDel.List()[0], "/") + isController := true lastPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: nsName[1], Namespace: nsName[0], Labels: rsSpec.Spec.Selector.MatchLabels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, + }, }, } informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod) @@ -935,7 +997,7 @@ func TestRSSyncExpectations(t *testing.T) { fakePodControl := controller.FakePodControl{} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2) manager.podControl = &fakePodControl labelMap := map[string]string{"foo": "bar"} @@ -961,7 +1023,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) rs := newReplicaSet(1, map[string]string{"foo": "bar"}) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) @@ -1015,34 +1077,42 @@ func TestOverlappingRSs(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) labelMap := map[string]string{"foo": "bar"} - for i := 0; i < 5; i++ { - func() { - stopCh := make(chan struct{}) - defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10, 0) + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) - // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store - var controllers []*extensions.ReplicaSet - for j := 1; j < 10; j++ { - rsSpec := newReplicaSet(1, labelMap) - rsSpec.CreationTimestamp = metav1.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) - rsSpec.Name = string(uuid.NewUUID()) - controllers = append(controllers, rsSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j]) - } - // Add a pod and make sure only the oldest ReplicaSet is synced - pods := newPodList(nil, 1, v1.PodPending, labelMap, controllers[0], "pod") - rsKey := getKey(controllers[0], t) + // Create 10 ReplicaSets, shuffled them randomly and insert them into the + // ReplicaSet controller's store. + // All use the same CreationTimestamp since ControllerRef should be able + // to handle that. + timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) + var controllers []*extensions.ReplicaSet + for j := 1; j < 10; j++ { + rsSpec := newReplicaSet(1, labelMap) + rsSpec.CreationTimestamp = timestamp + rsSpec.Name = fmt.Sprintf("rs%d", j) + controllers = append(controllers, rsSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j]) + } + // Add a pod with a ControllerRef and make sure only the corresponding + // ReplicaSet is synced. Pick a RS in the middle since the old code used to + // sort by name if all timestamps were equal. + rs := controllers[3] + pods := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod") + pod := &pods.Items[0] + isController := true + pod.OwnerReferences = []metav1.OwnerReference{ + {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, + } + rsKey := getKey(rs, t) - manager.addPod(&pods.Items[0]) - queueRS, _ := manager.queue.Get() - if queueRS != rsKey { - t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) - } - }() + manager.addPod(pod) + queueRS, _ := manager.queue.Get() + if queueRS != rsKey { + t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) } } @@ -1051,7 +1121,7 @@ func TestDeletionTimestamp(t *testing.T) { labelMap := map[string]string{"foo": "bar"} stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10, 0) + manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10) rs := newReplicaSet(1, labelMap) informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs) @@ -1098,11 +1168,15 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. + isController := true secondPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: "secondPod", Labels: pod.Labels, + OwnerReferences: []metav1.OwnerReference{ + {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, + }, }, } manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) @@ -1142,7 +1216,7 @@ func TestDeletionTimestamp(t *testing.T) { func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) { c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} - manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0) + manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas) manager.podControl = fakePodControl return manager, fakePodControl, informers @@ -1372,7 +1446,7 @@ func TestReadyReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} @@ -1414,7 +1488,7 @@ func TestAvailableReplicas(t *testing.T) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) stopCh := make(chan struct{}) defer close(stopCh) - manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas, 0) + manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. labelMap := map[string]string{"foo": "bar"} diff --git a/pkg/controller/replication/BUILD b/pkg/controller/replication/BUILD index f0e4a0c8e02..9a920a39e6c 100644 --- a/pkg/controller/replication/BUILD +++ b/pkg/controller/replication/BUILD @@ -29,7 +29,6 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/util/trace", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index fba38f0df82..5c23a0b0974 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" utiltrace "k8s.io/apiserver/pkg/util/trace" @@ -56,9 +55,8 @@ const ( statusUpdateRetries = 1 ) -func getRCKind() schema.GroupVersionKind { - return v1.SchemeGroupVersion.WithKind("ReplicationController") -} +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = v1.SchemeGroupVersion.WithKind("ReplicationController") // ReplicationManager is responsible for synchronizing ReplicationController objects stored // in the system with actual running pods. @@ -85,14 +83,12 @@ type ReplicationManager struct { // Added as a member to the struct to allow injection for testing. podListerSynced cache.InformerSynced - lookupCache *controller.MatchingCache - // Controllers that need to be synced queue workqueue.RateLimitingInterface } // NewReplicationManager configures a replication manager with the specified event recorder -func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicationManager { +func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicationManager { if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter()) } @@ -135,7 +131,6 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor rm.podListerSynced = podInformer.Informer().HasSynced rm.syncHandler = rm.syncReplicationController - rm.lookupCache = controller.NewMatchingCache(lookupCacheSize) return rm } @@ -167,71 +162,19 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Shutting down RC Manager") } -// getPodController returns the controller managing the given pod. -// TODO: Surface that we are ignoring multiple controllers for a single pod. -// TODO: use ownerReference.Controller to determine if the rc controls the pod. -func (rm *ReplicationManager) getPodController(pod *v1.Pod) *v1.ReplicationController { - // look up in the cache, if cached and the cache is valid, just return cached value - if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached { - controller, ok := obj.(*v1.ReplicationController) - if !ok { - // This should not happen - utilruntime.HandleError(fmt.Errorf("lookup cache does not return a ReplicationController object")) - return nil - } - if cached && rm.isCacheValid(pod, controller) { - return controller - } - } - - // if not cached or cached value is invalid, search all the rc to find the matching one, and update cache - controllers, err := rm.rcLister.GetPodControllers(pod) +// getPodControllers returns a list of ReplicationControllers matching the given pod. +func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController { + rcs, err := rm.rcLister.GetPodControllers(pod) if err != nil { - glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name) + glog.V(4).Infof("No ReplicationControllers found for pod %v, controller will avoid syncing", pod.Name) return nil } - // In theory, overlapping controllers is user error. This sorting will not prevent - // oscillation of replicas in all cases, eg: - // rc1 (older rc): [(k1=v1)], replicas=1 rc2: [(k2=v2)], replicas=2 - // pod: [(k1:v1), (k2:v2)] will wake both rc1 and rc2, and we will sync rc1. - // pod: [(k2:v2)] will wake rc2 which creates a new replica. - if len(controllers) > 1 { - // More than two items in this list indicates user error. If two replication-controller - // overlap, sort by creation timestamp, subsort by name, then pick - // the first. - utilruntime.HandleError(fmt.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)) - sort.Sort(OverlappingControllers(controllers)) + if len(rcs) > 1 { + // ControllerRef will ensure we don't do anything crazy, but more than one + // item in this list nevertheless constitutes user error. + utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicationController is selecting pods with labels: %+v", pod.Labels)) } - - // update lookup cache - rm.lookupCache.Update(pod, controllers[0]) - - return controllers[0] -} - -// isCacheValid check if the cache is valid -func (rm *ReplicationManager) isCacheValid(pod *v1.Pod, cachedRC *v1.ReplicationController) bool { - _, err := rm.rcLister.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) - // rc has been deleted or updated, cache is invalid - if err != nil || !isControllerMatch(pod, cachedRC) { - return false - } - return true -} - -// isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching -// TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication -func isControllerMatch(pod *v1.Pod, rc *v1.ReplicationController) bool { - if rc.Namespace != pod.Namespace { - return false - } - selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated() - - // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - return false - } - return true + return rcs } // callback when RC is updated @@ -239,20 +182,6 @@ func (rm *ReplicationManager) updateRC(old, cur interface{}) { oldRC := old.(*v1.ReplicationController) curRC := cur.(*v1.ReplicationController) - // We should invalidate the whole lookup cache if a RC's selector has been updated. - // - // Imagine that you have two RCs: - // * old RC1 - // * new RC2 - // You also have a pod that is attached to RC2 (because it doesn't match RC1 selector). - // Now imagine that you are changing RC1 selector so that it is now matching that pod, - // in such case, we must invalidate the whole cache so that pod could be adopted by RC1 - // - // This makes the lookup cache less helpful, but selector update does not happen often, - // so it's not a big problem - if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) { - rm.lookupCache.InvalidateAll() - } // TODO: Remove when #31981 is resolved! glog.Infof("Observed updated replication controller %v. Desired pod count change: %d->%d", curRC.Name, *(oldRC.Spec.Replicas), *(curRC.Spec.Replicas)) @@ -275,19 +204,10 @@ func (rm *ReplicationManager) updateRC(old, cur interface{}) { rm.enqueueController(cur) } -// When a pod is created, enqueue the controller that manages it and update it's expectations. +// When a pod is created, enqueue the ReplicationController that manages it and update its expectations. func (rm *ReplicationManager) addPod(obj interface{}) { pod := obj.(*v1.Pod) - - rc := rm.getPodController(pod) - if rc == nil { - return - } - rcKey, err := controller.KeyFunc(rc) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) - return - } + glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -295,13 +215,38 @@ func (rm *ReplicationManager) addPod(obj interface{}) { rm.deletePod(pod) return } - rm.expectations.CreationObserved(rcKey) - rm.enqueueController(rc) + + // If it has a ControllerRef, that's all that matters. + if controllerRef := controller.GetControllerOf(pod); controllerRef != nil { + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rc) + if err != nil { + return + } + rm.expectations.CreationObserved(rsKey) + rm.enqueueController(rc) + return + } + + // Otherwise, it's an orphan. Get a list of all matching ReplicationControllers and sync + // them to see if anyone wants to adopt it. + // DO NOT observe creation because no controller should be waiting for an + // orphan. + for _, rc := range rm.getPodControllers(pod) { + rm.enqueueController(rc) + } } -// When a pod is updated, figure out what controller/s manage it and wake them +// When a pod is updated, figure out what ReplicationController/s manage it and wake them // up. If the labels of the pod have changed we need to awaken both the old -// and new controller. old and cur must be *v1.Pod types. +// and new ReplicationController. old and cur must be *v1.Pod types. func (rm *ReplicationManager) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) @@ -311,6 +256,7 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { return } glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period, @@ -326,34 +272,53 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { return } - // Only need to get the old controller if the labels changed. - // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod. - if labelChanged { - // If the old and new rc are the same, the first one that syncs - // will set expectations preventing any damage from the second. - if oldRC := rm.getPodController(oldPod); oldRC != nil { - rm.enqueueController(oldRC) + curControllerRef := controller.GetControllerOf(curPod) + oldControllerRef := controller.GetControllerOf(oldPod) + controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) + if controllerRefChanged && + oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind { + // The ControllerRef was changed. Sync the old controller, if any. + rc, err := rm.rcLister.ReplicationControllers(oldPod.Namespace).Get(oldControllerRef.Name) + if err == nil { + rm.enqueueController(rc) } } - changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) - if curRC := rm.getPodController(curPod); curRC != nil { - rm.enqueueController(curRC) + // If it has a ControllerRef, that's all that matters. + if curControllerRef != nil { + if curControllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + rc, err := rm.rcLister.ReplicationControllers(curPod.Namespace).Get(curControllerRef.Name) + if err != nil { + return + } + rm.enqueueController(rc) // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in - // the Pod status which in turn will trigger a requeue of the owning replication controller - // thus having its status updated with the newly available replica. For now, we can fake the - // update by resyncing the controller MinReadySeconds after the it is requeued because a Pod - // transitioned to Ready. + // the Pod status which in turn will trigger a requeue of the owning ReplicationController thus + // having its status updated with the newly available replica. For now, we can fake the + // update by resyncing the controller MinReadySeconds after the it is requeued because + // a Pod transitioned to Ready. // Note that this still suffers from #29229, we are just moving the problem one level - // "closer" to kubelet (from the deployment to the replication controller manager). - if changedToReady && curRC.Spec.MinReadySeconds > 0 { - glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", curRC.Name, curRC.Spec.MinReadySeconds) - rm.enqueueControllerAfter(curRC, time.Duration(curRC.Spec.MinReadySeconds)*time.Second) + // "closer" to kubelet (from the deployment to the ReplicationController controller). + if !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod) && rc.Spec.MinReadySeconds > 0 { + glog.V(2).Infof("ReplicationController %q will be enqueued after %ds for availability check", rc.Name, rc.Spec.MinReadySeconds) + rm.enqueueControllerAfter(rc, time.Duration(rc.Spec.MinReadySeconds)*time.Second) + } + return + } + + // Otherwise, it's an orphan. If anything changed, sync matching controllers + // to see if anyone wants to adopt it now. + if labelChanged || controllerRefChanged { + for _, rc := range rm.getPodControllers(curPod) { + rm.enqueueController(rc) } } } -// When a pod is deleted, enqueue the controller that manages the pod and update its expectations. +// When a pod is deleted, enqueue the ReplicationController that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (rm *ReplicationManager) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) @@ -361,45 +326,50 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new rc will not be woken up till the periodic resync. + // changed labels the new ReplicationController will not be woken up till the periodic resync. if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj)) return } } - glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels) - if rc := rm.getPodController(pod); rc != nil { - rcKey, err := controller.KeyFunc(rc) - if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)) - return - } - rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod)) - rm.enqueueController(rc) + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) + + controllerRef := controller.GetControllerOf(pod) + if controllerRef == nil { + // No controller should care about orphans being deleted. + return } + if controllerRef.Kind != controllerKind.Kind { + // It's controlled by a different type of controller. + return + } + + rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) + if err != nil { + return + } + rsKey, err := controller.KeyFunc(rc) + if err != nil { + return + } + rm.expectations.DeletionObserved(rsKey, controller.PodKey(pod)) + rm.enqueueController(rc) } // obj could be an *v1.ReplicationController, or a DeletionFinalStateUnknown marker item. func (rm *ReplicationManager) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping controllers better. Either disallow them at admission time or - // deterministically avoid syncing controllers that fight over pods. Currently, we only - // ensure that the same controller is synced for a given pod. When we periodically relist - // all controllers there will still be some replica instability. One way to handle this is - // by querying the store for all controllers that this rc overlaps, as well as all - // controllers that overlap this rc, and sorting them. rm.queue.Add(key) } @@ -407,16 +377,9 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) { func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } - - // TODO: Handle overlapping controllers better. Either disallow them at admission time or - // deterministically avoid syncing controllers that fight over pods. Currently, we only - // ensure that the same controller is synced for a given pod. When we periodically relist - // all controllers there will still be some replica instability. One way to handle this is - // by querying the store for all controllers that this rc overlaps, as well as all - // controllers that overlap this rc, and sorting them. rm.queue.AddAfter(key, after) } @@ -481,8 +444,8 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl var err error boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRCKind().GroupVersion().String(), - Kind: getRCKind().Kind, + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, Name: rc.Name, UID: rc.UID, BlockOwnerDeletion: boolPtr(true), @@ -610,7 +573,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rm.queue.Add(key) return err } - cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind()) + cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { // Something went wrong with adoption or release. diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 27fac3aa5c1..dda9435f9db 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -167,11 +167,11 @@ type serverResponse struct { obj interface{} } -func NewReplicationManagerFromClient(kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) (*ReplicationManager, coreinformers.PodInformer, coreinformers.ReplicationControllerInformer) { +func newReplicationManagerFromClient(kubeClient clientset.Interface, burstReplicas int) (*ReplicationManager, coreinformers.PodInformer, coreinformers.ReplicationControllerInformer) { informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() rcInformer := informerFactory.Core().V1().ReplicationControllers() - rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize) + rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas) rm.podListerSynced = alwaysReady rm.rcListerSynced = alwaysReady return rm, podInformer, rcInformer @@ -180,7 +180,7 @@ func NewReplicationManagerFromClient(kubeClient clientset.Interface, burstReplic func TestSyncReplicationControllerDoesNothing(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) @@ -195,7 +195,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -210,7 +210,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, _, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl received := make(chan string) @@ -241,7 +241,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // A controller with 2 replicas and no pods in the store, 2 creates expected rc := newReplicationController(2) @@ -262,7 +262,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 @@ -302,7 +302,7 @@ func TestControllerUpdateReplicas(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -347,7 +347,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -399,7 +399,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager, _, rcInformer := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), BurstReplicas) testCases := []struct { inRCs []*v1.ReplicationController pod *v1.Pod @@ -447,7 +447,12 @@ func TestPodControllerLookup(t *testing.T) { for _, r := range c.inRCs { rcInformer.Informer().GetIndexer().Add(r) } - if rc := manager.getPodController(c.pod); rc != nil { + if rcs := manager.getPodControllers(c.pod); rcs != nil { + if len(rcs) != 1 { + t.Errorf("len(rcs) = %v, want %v", len(rcs), 1) + continue + } + rc := rcs[0] if c.outRCName != rc.Name { t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName) } @@ -466,7 +471,7 @@ func TestWatchControllers(t *testing.T) { informers := informers.NewSharedInformerFactory(c, controller.NoResyncPeriodFunc()) podInformer := informers.Core().V1().Pods() rcInformer := informers.Core().V1().ReplicationControllers() - manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas, 0) + manager := NewReplicationManager(podInformer, rcInformer, c, BurstReplicas) informers.Start(stopCh) var testControllerSpec v1.ReplicationController @@ -506,7 +511,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() c := &fake.Clientset{} c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) @@ -547,7 +552,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager, podInformer, rcInformer := NewReplicationManagerFromClient(fake.NewSimpleClientset(), BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(fake.NewSimpleClientset(), BurstReplicas) received := make(chan string) @@ -565,23 +570,29 @@ func TestUpdatePods(t *testing.T) { go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // Put 2 rcs and one pod into the controller's stores + labelMap1 := map[string]string{"foo": "bar"} testControllerSpec1 := newReplicationController(1) + testControllerSpec1.Spec.Selector = labelMap1 rcInformer.Informer().GetIndexer().Add(testControllerSpec1) + labelMap2 := map[string]string{"bar": "foo"} testControllerSpec2 := *testControllerSpec1 - testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"} + testControllerSpec2.Spec.Selector = labelMap2 testControllerSpec2.Name = "barfoo" rcInformer.Informer().GetIndexer().Add(&testControllerSpec2) - // case 1: We put in the podLister a pod with labels matching - // testControllerSpec1, then update its labels to match testControllerSpec2. - // We expect to receive a sync request for both controllers. + isController := true + controllerRef1 := metav1.OwnerReference{UID: testControllerSpec1.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec1.Name, Controller: &isController} + controllerRef2 := metav1.OwnerReference{UID: testControllerSpec2.UID, APIVersion: "v1", Kind: "ReplicationController", Name: testControllerSpec2.Name, Controller: &isController} + + // case 1: Pod with a ControllerRef pod1 := newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} pod1.ResourceVersion = "1" pod2 := pod1 - pod2.Labels = testControllerSpec2.Spec.Selector + pod2.Labels = labelMap2 pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name) + expected := sets.NewString(testControllerSpec1.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -590,17 +601,20 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for controllers within 100ms each") + t.Errorf("Expected update notifications for ReplicationControllers") } } - // case 2: pod1 in the podLister has labels matching testControllerSpec1. - // We update its labels to match no replication controller. We expect to - // receive a sync request for testControllerSpec1. - pod2.Labels = make(map[string]string) + // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.OwnerReferences = nil pod2.ResourceVersion = "2" manager.updatePod(&pod1, &pod2) - expected = sets.NewString(testControllerSpec1.Name) + expected = sets.NewString(testControllerSpec2.Name) for _, name := range expected.List() { t.Logf("Expecting update for %+v", name) select { @@ -609,7 +623,52 @@ func TestUpdatePods(t *testing.T) { t.Errorf("Expected keys %#v got %v", expected, got) } case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update notifications for controllers within 100ms each") + t.Errorf("Expected update notifications for ReplicationControllers") + } + } + + // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and + // any label-matching RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap2 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} + pod2 = pod1 + pod2.OwnerReferences = nil + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for ReplicationControllers") + } + } + + // case 4: Keep ControllerRef, change labels. Expect to sync owning RC. + pod1 = newPodList(podInformer.Informer().GetIndexer(), 1, v1.PodRunning, testControllerSpec1, "pod").Items[0] + pod1.ResourceVersion = "1" + pod1.Labels = labelMap1 + pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} + pod2 = pod1 + pod2.Labels = labelMap2 + pod2.ResourceVersion = "2" + manager.updatePod(&pod1, &pod2) + expected = sets.NewString(testControllerSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Expected update notifications for ReplicationControllers") } } } @@ -624,7 +683,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) @@ -694,7 +753,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, burstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, burstReplicas) manager.podControl = &fakePodControl controllerSpec := newReplicationController(numReplicas) @@ -755,6 +814,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) // the rc is waiting for. expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t)) podsToDelete := []*v1.Pod{} + isController := true for _, key := range expectedDels.List() { nsName := strings.Split(key, "/") podsToDelete = append(podsToDelete, &v1.Pod{ @@ -762,6 +822,9 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) Name: nsName[1], Namespace: nsName[0], Labels: controllerSpec.Spec.Selector, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, }) } @@ -798,11 +861,15 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) t.Fatalf("Waiting on unexpected number of deletes.") } nsName := strings.Split(expectedDel.List()[0], "/") + isController := true lastPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: nsName[1], Namespace: nsName[0], Labels: controllerSpec.Spec.Selector, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, } podInformer.Informer().GetIndexer().Delete(lastPod) @@ -843,7 +910,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) fakePodControl := controller.FakePodControl{} - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, 2, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 2) manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -866,7 +933,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, 10) rc := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(rc) @@ -919,36 +986,46 @@ func shuffle(controllers []*v1.ReplicationController) []*v1.ReplicationControlle func TestOverlappingRCs(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - for i := 0; i < 5; i++ { - manager, _, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, 10) - // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store - var controllers []*v1.ReplicationController - for j := 1; j < 10; j++ { - controllerSpec := newReplicationController(1) - controllerSpec.CreationTimestamp = metav1.Date(2014, time.December, j, 0, 0, 0, 0, time.Local) - controllerSpec.Name = string(uuid.NewUUID()) - controllers = append(controllers, controllerSpec) - } - shuffledControllers := shuffle(controllers) - for j := range shuffledControllers { - rcInformer.Informer().GetIndexer().Add(shuffledControllers[j]) - } - // Add a pod and make sure only the oldest rc is synced - pods := newPodList(nil, 1, v1.PodPending, controllers[0], "pod") - rcKey := getKey(controllers[0], t) + // Create 10 rcs, shuffled them randomly and insert them into the + // rc manager's store. + // All use the same CreationTimestamp since ControllerRef should be able + // to handle that. + var controllers []*v1.ReplicationController + timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) + for j := 1; j < 10; j++ { + controllerSpec := newReplicationController(1) + controllerSpec.CreationTimestamp = timestamp + controllerSpec.Name = fmt.Sprintf("rc%d", j) + controllers = append(controllers, controllerSpec) + } + shuffledControllers := shuffle(controllers) + for j := range shuffledControllers { + rcInformer.Informer().GetIndexer().Add(shuffledControllers[j]) + } + // Add a pod with a ControllerRef and make sure only the corresponding + // ReplicationController is synced. Pick a RC in the middle since the old code + // used to sort by name if all timestamps were equal. + rc := controllers[3] + pods := newPodList(nil, 1, v1.PodPending, rc, "pod") + pod := &pods.Items[0] + isController := true + pod.OwnerReferences = []metav1.OwnerReference{ + {UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &isController}, + } + rcKey := getKey(rc, t) - manager.addPod(&pods.Items[0]) - queueRC, _ := manager.queue.Get() - if queueRC != rcKey { - t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) - } + manager.addPod(pod) + queueRC, _ := manager.queue.Get() + if queueRC != rcKey { + t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC) } } func TestDeletionTimestamp(t *testing.T) { c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(c, 10, 0) + manager, _, rcInformer := newReplicationManagerFromClient(c, 10) controllerSpec := newReplicationController(1) rcInformer.Informer().GetIndexer().Add(controllerSpec) @@ -995,11 +1072,15 @@ func TestDeletionTimestamp(t *testing.T) { // An update to the pod (including an update to the deletion timestamp) // should not be counted as a second delete. + isController := true secondPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: pod.Namespace, Name: "secondPod", Labels: pod.Labels, + OwnerReferences: []metav1.OwnerReference{ + {UID: controllerSpec.UID, APIVersion: "v1", Kind: "ReplicationController", Name: controllerSpec.Name, Controller: &isController}, + }, }, } manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)}) @@ -1037,7 +1118,7 @@ func TestDeletionTimestamp(t *testing.T) { func BenchmarkGetPodControllerMultiNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(client, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) const nsNum = 1000 @@ -1076,14 +1157,14 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) { for i := 0; i < b.N; i++ { for _, pod := range pods { - manager.getPodController(&pod) + manager.getPodControllers(&pod) } } } func BenchmarkGetPodControllerSingleNS(b *testing.B) { client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, _, rcInformer := NewReplicationManagerFromClient(client, BurstReplicas, 0) + manager, _, rcInformer := newReplicationManagerFromClient(client, BurstReplicas) const rcNum = 1000 const replicaNum = 3 @@ -1116,7 +1197,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { for i := 0; i < b.N; i++ { for _, pod := range pods { - manager.getPodController(&pod) + manager.getPodControllers(&pod) } } } @@ -1125,7 +1206,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) { func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl, podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer) { c := fakeclientset.NewSimpleClientset(objs...) fakePodControl = &controller.FakePodControl{} - manager, podInformer, rcInformer = NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer = newReplicationManagerFromClient(c, BurstReplicas) manager.podControl = fakePodControl return manager, fakePodControl, podInformer, rcInformer } @@ -1327,7 +1408,7 @@ func TestReadyReplicas(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) @@ -1365,7 +1446,7 @@ func TestAvailableReplicas(t *testing.T) { defer testServer.Close() c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - manager, podInformer, rcInformer := NewReplicationManagerFromClient(c, BurstReplicas, 0) + manager, podInformer, rcInformer := newReplicationManagerFromClient(c, BurstReplicas) // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(2) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 6729fa85b0a..c0ca8cb9fd6 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -124,7 +124,7 @@ func NewMasterComponents(c *Config) *MasterComponents { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}, QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst, 4096) + controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{}) diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index d1f1343d42a..f8ce95b3fe0 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -96,7 +96,6 @@ func TestQuota(t *testing.T) { informers.Core().V1().ReplicationControllers(), clientset, replicationcontroller.BurstReplicas, - 4096, ) rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(3, controllerCh) @@ -280,7 +279,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) { informers.Core().V1().ReplicationControllers(), clientset, replicationcontroller.BurstReplicas, - 4096, ) rm.SetEventRecorder(&record.FakeRecorder{}) go rm.Run(3, controllerCh) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index b9feaff9449..bd0c0aebf04 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -143,7 +143,6 @@ func rmSetup(t *testing.T) (*httptest.Server, *replicaset.ReplicaSetController, informers.Core().V1().Pods(), clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), replicaset.BurstReplicas, - 4096, ) if err != nil { diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 4ee05ce72df..afcfed13da4 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -135,7 +135,7 @@ func rmSetup(t *testing.T, stopCh chan struct{}) (*httptest.Server, *replication resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientSet, resyncPeriod) - rm := replication.NewReplicationManager(informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientSet, replication.BurstReplicas, 4096) + rm := replication.NewReplicationManager(informers.Core().V1().Pods(), informers.Core().V1().ReplicationControllers(), clientSet, replication.BurstReplicas) informers.Start(stopCh) return s, rm, informers, clientSet From f54a5c9728a91336433ad190e4323b44b4de86a0 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Thu, 23 Feb 2017 10:50:37 -0800 Subject: [PATCH 2/5] RC/RS: Update Lister documentation for ControllerRef. The RC/RS Listers still use selectors, because this is the behavior expected by callers. This clarifies the meaning of the returned list. Some callers may need to switch to using GetControllerOf() instead, but that is a separate, case-by-case issue. --- .../core/internalversion/replicationcontroller_expansion.go | 4 +++- pkg/client/listers/core/v1/replicationcontroller_expansion.go | 4 +++- .../extensions/internalversion/replicaset_expansion.go | 4 +++- pkg/client/listers/extensions/v1beta1/replicaset_expansion.go | 4 +++- .../listers/core/v1/replicationcontroller_expansion.go | 4 +++- .../listers/extensions/v1beta1/replicaset_expansion.go | 4 +++- 6 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/client/listers/core/internalversion/replicationcontroller_expansion.go b/pkg/client/listers/core/internalversion/replicationcontroller_expansion.go index e9070bd5abf..98ea3115ee2 100644 --- a/pkg/client/listers/core/internalversion/replicationcontroller_expansion.go +++ b/pkg/client/listers/core/internalversion/replicationcontroller_expansion.go @@ -33,7 +33,9 @@ type ReplicationControllerListerExpansion interface { // ReplicationControllerNamespaeLister. type ReplicationControllerNamespaceListerExpansion interface{} -// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +// GetPodControllers returns a list of ReplicationControllers that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicationControllers are found. func (s *replicationControllerLister) GetPodControllers(pod *api.Pod) ([]*api.ReplicationController, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) diff --git a/pkg/client/listers/core/v1/replicationcontroller_expansion.go b/pkg/client/listers/core/v1/replicationcontroller_expansion.go index aa5202d5e42..d3adae7f5d2 100644 --- a/pkg/client/listers/core/v1/replicationcontroller_expansion.go +++ b/pkg/client/listers/core/v1/replicationcontroller_expansion.go @@ -33,7 +33,9 @@ type ReplicationControllerListerExpansion interface { // ReplicationControllerNamespaeLister. type ReplicationControllerNamespaceListerExpansion interface{} -// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +// GetPodControllers returns a list of ReplicationControllers that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicationControllers are found. func (s *replicationControllerLister) GetPodControllers(pod *v1.Pod) ([]*v1.ReplicationController, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) diff --git a/pkg/client/listers/extensions/internalversion/replicaset_expansion.go b/pkg/client/listers/extensions/internalversion/replicaset_expansion.go index d2ab13ec7f7..d5f4031b8b7 100644 --- a/pkg/client/listers/extensions/internalversion/replicaset_expansion.go +++ b/pkg/client/listers/extensions/internalversion/replicaset_expansion.go @@ -35,7 +35,9 @@ type ReplicaSetListerExpansion interface { // ReplicaSetNamespaeLister. type ReplicaSetNamespaceListerExpansion interface{} -// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. +// GetPodReplicaSets returns a list of ReplicaSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicaSets are found. func (s *replicaSetLister) GetPodReplicaSets(pod *api.Pod) ([]*extensions.ReplicaSet, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) diff --git a/pkg/client/listers/extensions/v1beta1/replicaset_expansion.go b/pkg/client/listers/extensions/v1beta1/replicaset_expansion.go index cfef9588406..3f3db42b67e 100644 --- a/pkg/client/listers/extensions/v1beta1/replicaset_expansion.go +++ b/pkg/client/listers/extensions/v1beta1/replicaset_expansion.go @@ -35,7 +35,9 @@ type ReplicaSetListerExpansion interface { // ReplicaSetNamespaeLister. type ReplicaSetNamespaceListerExpansion interface{} -// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. +// GetPodReplicaSets returns a list of ReplicaSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicaSets are found. func (s *replicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*extensions.ReplicaSet, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) diff --git a/staging/src/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go b/staging/src/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go index 6a639868124..c0d667454b8 100644 --- a/staging/src/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go +++ b/staging/src/k8s.io/client-go/listers/core/v1/replicationcontroller_expansion.go @@ -33,7 +33,9 @@ type ReplicationControllerListerExpansion interface { // ReplicationControllerNamespaeLister. type ReplicationControllerNamespaceListerExpansion interface{} -// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +// GetPodControllers returns a list of ReplicationControllers that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicationControllers are found. func (s *replicationControllerLister) GetPodControllers(pod *v1.Pod) ([]*v1.ReplicationController, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) diff --git a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/replicaset_expansion.go b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/replicaset_expansion.go index 8b92a32835c..8b0c5555f40 100644 --- a/staging/src/k8s.io/client-go/listers/extensions/v1beta1/replicaset_expansion.go +++ b/staging/src/k8s.io/client-go/listers/extensions/v1beta1/replicaset_expansion.go @@ -35,7 +35,9 @@ type ReplicaSetListerExpansion interface { // ReplicaSetNamespaeLister. type ReplicaSetNamespaceListerExpansion interface{} -// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found. +// GetPodReplicaSets returns a list of ReplicaSets that potentially match a pod. +// Only the one specified in the Pod's ControllerRef will actually manage it. +// Returns an error only if no matching ReplicaSets are found. func (s *replicaSetLister) GetPodReplicaSets(pod *v1.Pod) ([]*extensions.ReplicaSet, error) { if len(pod.Labels) == 0 { return nil, fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name) From db6665251a366fa145552deaa1d6102fa37d3c55 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 13:09:15 -0800 Subject: [PATCH 3/5] RC/RS: Don't requeue on error inside sync function. Returning an error from the sync function already triggers a requeue in processNextWorkItem(). --- pkg/controller/replicaset/replica_set.go | 3 -- pkg/controller/replicaset/replica_set_test.go | 20 ++++++++- .../replication/replication_controller.go | 41 ++++++++----------- .../replication_controller_test.go | 20 ++++++++- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index d7b8f802838..665f08578ba 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -554,9 +554,6 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { - // Something went wrong with adoption or release. - // Requeue and try again so we don't leave orphans sitting around. - rsc.queue.Add(key) return err } diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 80e3e672288..ae79874dd97 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -189,6 +189,24 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[s } } +// processSync initiates a sync via processNextWorkItem() to test behavior that +// depends on both functions (such as re-queueing on sync error). +func processSync(rsc *ReplicaSetController, key string) error { + // Save old syncHandler and replace with one that captures the error. + oldSyncHandler := rsc.syncHandler + defer func() { + rsc.syncHandler = oldSyncHandler + }() + var syncErr error + rsc.syncHandler = func(key string) error { + syncErr = oldSyncHandler(key) + return syncErr + } + rsc.queue.Add(key) + rsc.processNextWorkItem() + return syncErr +} + func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { if e, a := expectedCreates, len(fakePodControl.Templates); e != a { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) @@ -1303,7 +1321,7 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") rsKey := getKey(rs, t) - err := manager.syncReplicaSet(rsKey) + err := processSync(manager, rsKey) if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Errorf("expected Fake Error, got %+v", err) } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 5c23a0b0974..df34de1c63f 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -386,29 +386,27 @@ func (rm *ReplicationManager) enqueueControllerAfter(obj interface{}, after time // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (rm *ReplicationManager) worker() { - workFunc := func() bool { - key, quit := rm.queue.Get() - if quit { - return true - } - defer rm.queue.Done(key) + for rm.processNextWorkItem() { + } + glog.Infof("replication controller worker shutting down") +} - err := rm.syncHandler(key.(string)) - if err == nil { - rm.queue.Forget(key) - return false - } - - rm.queue.AddRateLimited(key) - utilruntime.HandleError(err) +func (rm *ReplicationManager) processNextWorkItem() bool { + key, quit := rm.queue.Get() + if quit { return false } - for { - if quit := workFunc(); quit { - glog.Infof("replication controller worker shutting down") - return - } + defer rm.queue.Done(key) + + err := rm.syncHandler(key.(string)) + if err == nil { + rm.queue.Forget(key) + return true } + + rm.queue.AddRateLimited(key) + utilruntime.HandleError(err) + return true } // manageReplicas checks and updates replicas for the given replication controller. @@ -569,16 +567,11 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { // anymore but has the stale controller ref. pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err)) - rm.queue.Add(key) return err } cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) filteredPods, err = cm.ClaimPods(pods) if err != nil { - // Something went wrong with adoption or release. - // Requeue and try again so we don't leave orphans sitting around. - rm.queue.Add(key) return err } diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index dda9435f9db..23ce15afa90 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -146,6 +146,24 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, rc *v1.Replica } } +// processSync initiates a sync via processNextWorkItem() to test behavior that +// depends on both functions (such as re-queueing on sync error). +func processSync(rm *ReplicationManager, key string) error { + // Save old syncHandler and replace with one that captures the error. + oldSyncHandler := rm.syncHandler + defer func() { + rm.syncHandler = oldSyncHandler + }() + var syncErr error + rm.syncHandler = func(key string) error { + syncErr = oldSyncHandler(key) + return syncErr + } + rm.queue.Add(key) + rm.processNextWorkItem() + return syncErr +} + func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) { if e, a := expectedCreates, len(fakePodControl.Templates); e != a { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) @@ -1280,7 +1298,7 @@ func TestPatchPodFails(t *testing.T) { // control of the pods and requeue to try again. fakePodControl.Err = fmt.Errorf("Fake Error") rcKey := getKey(rc, t) - err := manager.syncReplicationController(rcKey) + err := processSync(manager, rcKey) if err == nil || !strings.Contains(err.Error(), "Fake Error") { t.Fatalf("expected Fake Error, got %v", err) } From 01d025a7cca4ab5db5b10ab720991cb77d40cc51 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Sat, 25 Feb 2017 12:39:49 -0800 Subject: [PATCH 4/5] ControllerRefManager: Don't always filter inactive Pods. Some controllers, like DaemonSet, want to see all Pods. --- pkg/controller/controller_ref_manager.go | 5 ----- pkg/controller/replicaset/replica_set.go | 16 +++++++++++----- .../replication/replication_controller.go | 14 ++++++++++---- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 226c9b8d3ec..d3bc26473a0 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -163,11 +163,6 @@ func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod) ([]*v1.Pod, error) { } for _, pod := range pods { - if !IsPodActive(pod) { - glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", - pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp) - continue - } ok, err := m.claimObject(pod, adopt, release) if err != nil { errlist = append(errlist, err) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 665f08578ba..83c67819bf9 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -541,18 +541,24 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { return nil } - // NOTE: filteredPods are pointing to objects from cache - if you need to - // modify them, you need to copy it first. - // TODO: Do the List and Filter in a single pass, or use an index. - var filteredPods []*v1.Pod // list all pods to include the pods that don't match the rs`s selector // anymore but has the stale controller ref. + // TODO: Do the List and Filter in a single pass, or use an index. pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } + // Ignore inactive pods. + var filteredPods []*v1.Pod + for _, pod := range pods { + if controller.IsPodActive(pod) { + filteredPods = append(filteredPods, pod) + } + } cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind) - filteredPods, err = cm.ClaimPods(pods) + // NOTE: filteredPods are pointing to objects from cache - if you need to + // modify them, you need to copy it first. + filteredPods, err = cm.ClaimPods(filteredPods) if err != nil { return err } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index df34de1c63f..9a6f8cff16e 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -559,17 +559,23 @@ func (rm *ReplicationManager) syncReplicationController(key string) error { rcNeedsSync := rm.expectations.SatisfiedExpectations(key) trace.Step("Expectations restored") - // NOTE: filteredPods are pointing to objects from cache - if you need to - // modify them, you need to copy it first. - // TODO: Do the List and Filter in a single pass, or use an index. - var filteredPods []*v1.Pod // list all pods to include the pods that don't match the rc's selector // anymore but has the stale controller ref. + // TODO: Do the List and Filter in a single pass, or use an index. pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything()) if err != nil { return err } + // Ignore inactive pods. + var filteredPods []*v1.Pod + for _, pod := range pods { + if controller.IsPodActive(pod) { + filteredPods = append(filteredPods, pod) + } + } cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), controllerKind) + // NOTE: filteredPods are pointing to objects from cache - if you need to + // modify them, you need to copy it first. filteredPods, err = cm.ClaimPods(pods) if err != nil { return err From 2c2fc9c707efd66cd4ac98ba918e09fa803578be Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 1 Mar 2017 14:59:25 -0800 Subject: [PATCH 5/5] RC/RS: Don't log Pod events unless some RC/RS actually cares. --- pkg/controller/replicaset/replica_set.go | 21 +++++++++++++------ .../replication/replication_controller.go | 21 +++++++++++++------ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 83c67819bf9..91a1b87d33b 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -171,7 +171,6 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*extensions.ReplicaSet { rss, err := rsc.rsLister.GetPodReplicaSets(pod) if err != nil { - glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name) return nil } if len(rss) > 1 { @@ -208,7 +207,6 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) { // When a pod is created, enqueue the replica set that manages it and update its expectations. func (rsc *ReplicaSetController) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -223,6 +221,7 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) if err != nil { return @@ -240,7 +239,12 @@ func (rsc *ReplicaSetController) addPod(obj interface{}) { // them to see if anyone wants to adopt it. // DO NOT observe creation because no controller should be waiting for an // orphan. - for _, rs := range rsc.getPodReplicaSets(pod) { + rss := rsc.getPodReplicaSets(pod) + if len(rss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) + for _, rs := range rss { rsc.enqueueReplicaSet(rs) } } @@ -256,7 +260,6 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { @@ -291,6 +294,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) rs, err := rsc.rsLister.ReplicaSets(curPod.Namespace).Get(curControllerRef.Name) if err != nil { return @@ -313,7 +317,12 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) { // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. if labelChanged || controllerRefChanged { - for _, rs := range rsc.getPodReplicaSets(curPod) { + rss := rsc.getPodReplicaSets(curPod) + if len(rss) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + for _, rs := range rss { rsc.enqueueReplicaSet(rs) } } @@ -340,7 +349,6 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { return } } - glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { @@ -351,6 +359,7 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) rs, err := rsc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name) if err != nil { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 9a6f8cff16e..8818f2e1d27 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -166,7 +166,6 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { func (rm *ReplicationManager) getPodControllers(pod *v1.Pod) []*v1.ReplicationController { rcs, err := rm.rcLister.GetPodControllers(pod) if err != nil { - glog.V(4).Infof("No ReplicationControllers found for pod %v, controller will avoid syncing", pod.Name) return nil } if len(rcs) > 1 { @@ -207,7 +206,6 @@ func (rm *ReplicationManager) updateRC(old, cur interface{}) { // When a pod is created, enqueue the ReplicationController that manages it and update its expectations. func (rm *ReplicationManager) addPod(obj interface{}) { pod := obj.(*v1.Pod) - glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller manager, it's possible a new pod shows up in a state that @@ -222,6 +220,7 @@ func (rm *ReplicationManager) addPod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) if err != nil { return @@ -239,7 +238,12 @@ func (rm *ReplicationManager) addPod(obj interface{}) { // them to see if anyone wants to adopt it. // DO NOT observe creation because no controller should be waiting for an // orphan. - for _, rc := range rm.getPodControllers(pod) { + rcs := rm.getPodControllers(pod) + if len(rcs) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod) + for _, rc := range rcs { rm.enqueueController(rc) } } @@ -255,7 +259,6 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { // Two different versions of the same pod will always have different RVs. return } - glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if curPod.DeletionTimestamp != nil { @@ -290,6 +293,7 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) rc, err := rm.rcLister.ReplicationControllers(curPod.Namespace).Get(curControllerRef.Name) if err != nil { return @@ -312,7 +316,12 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) { // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. if labelChanged || controllerRefChanged { - for _, rc := range rm.getPodControllers(curPod) { + rcs := rm.getPodControllers(curPod) + if len(rcs) == 0 { + return + } + glog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) + for _, rc := range rcs { rm.enqueueController(rc) } } @@ -339,7 +348,6 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { return } } - glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { @@ -350,6 +358,7 @@ func (rm *ReplicationManager) deletePod(obj interface{}) { // It's controlled by a different type of controller. return } + glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod) rc, err := rm.rcLister.ReplicationControllers(pod.Namespace).Get(controllerRef.Name) if err != nil {