Replace hand-written informers with generated ones

Replace existing uses of hand-written informers with generated ones.
 Follow-up commits will switch the use of one-off informers to shared
 informers.
This commit is contained in:
Andy Goldstein
2017-02-06 13:35:50 -05:00
parent cb758738f9
commit 70c6087600
55 changed files with 936 additions and 823 deletions

View File

@@ -19,6 +19,7 @@ limitations under the License.
package replication
import (
"fmt"
"reflect"
"sort"
"sync"
@@ -41,9 +42,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/util/metrics"
)
@@ -68,13 +69,6 @@ type ReplicationManager struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedIndexInformer
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@@ -84,15 +78,13 @@ type ReplicationManager struct {
// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations
// A store of replication controllers, populated by the rcController
rcLister listers.StoreToReplicationControllerLister
// A store of pods, populated by the podController
podLister listers.StoreToPodLister
// Watches changes to all pods
podController cache.Controller
rcLister corelisters.ReplicationControllerLister
rcListerSynced cache.InformerSynced
podLister corelisters.PodLister
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced func() bool
podListerSynced cache.InformerSynced
lookupCache *controller.MatchingCache
@@ -105,7 +97,7 @@ type ReplicationManager struct {
}
// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
@@ -126,7 +118,7 @@ func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, ku
garbageCollectorEnabled: garbageCollectorEnabled,
}
rcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
@@ -134,7 +126,10 @@ func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, ku
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
})
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
rm.rcLister = rcInformer.Lister()
rm.rcListerSynced = rcInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
@@ -142,24 +137,14 @@ func NewReplicationManager(podInformer, rcInformer cache.SharedIndexInformer, ku
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podLister = podInformer.Lister()
rm.podListerSynced = podInformer.Informer().HasSynced
rm.syncHandler = rm.syncReplicationController
rm.rcLister.Indexer = rcInformer.GetIndexer()
rm.podLister.Indexer = podInformer.GetIndexer()
rm.podListerSynced = podInformer.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
rcInformer := informers.NewReplicationControllerInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, rcInformer, kubeClient, burstReplicas, lookupCacheSize, false)
rm.internalPodInformer = podInformer
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
// with the given recorder. Only used for testing.
func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
@@ -174,11 +159,9 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
defer rm.queue.ShutDown()
glog.Infof("Starting RC Manager")
if rm.internalPodInformer != nil {
go rm.internalPodInformer.Run(stopCh)
}
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced) {
if !cache.WaitForCacheSync(stopCh, rm.podListerSynced, rm.rcListerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
@@ -606,16 +589,19 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := rm.rcLister.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if !exists {
rc, err := rm.rcLister.ReplicationControllers(namespace).Get(name)
if errors.IsNotFound(err) {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)
return nil
}
rc := *obj.(*v1.ReplicationController)
if err != nil {
return err
}
trace.Step("ReplicationController restored")
rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
@@ -680,14 +666,20 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
var manageReplicasErr error
if rcNeedsSync && rc.DeletionTimestamp == nil {
manageReplicasErr = rm.manageReplicas(filteredPods, &rc)
manageReplicasErr = rm.manageReplicas(filteredPods, rc)
}
trace.Step("manageReplicas done")
copy, err := api.Scheme.DeepCopy(rc)
if err != nil {
return err
}
rc = copy.(*v1.ReplicationController)
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)
// Always updates status as pods come up or die.
if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil {
if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), *rc, newStatus); err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}