start sharing the pod cache and list/watch

This commit is contained in:
deads2k
2016-04-14 14:00:52 -04:00
parent 96d1f48b0b
commit f0c33d65b6
9 changed files with 169 additions and 82 deletions

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@@ -66,6 +67,13 @@ type ReplicationManager struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@@ -82,7 +90,7 @@ type ReplicationManager struct {
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
podController framework.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
@@ -93,8 +101,7 @@ type ReplicationManager struct {
queue *workqueue.Type
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@@ -167,31 +174,31 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
},
)
rm.podStore.Store, rm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podStore.Store = podInformer.GetStore()
rm.podController = podInformer.GetController()
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
@@ -211,6 +218,11 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
if rm.internalPodInformer != nil {
go rm.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
@@ -478,7 +490,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name)
glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey, podKey)
utilruntime.HandleError(err)
}

View File

@@ -138,7 +138,7 @@ type serverResponse struct {
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
@@ -154,7 +154,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
func TestSyncReplicationControllerDeletes(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -170,7 +170,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
func TestDeleteFinalStateUnknown(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -202,7 +202,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
@@ -225,7 +225,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
@@ -267,7 +267,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
@@ -313,7 +313,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -359,7 +359,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
@@ -422,7 +422,7 @@ func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
@@ -465,7 +465,7 @@ func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
@@ -492,6 +492,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go manager.internalPodInformer.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod")
@@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicationManager(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@@ -567,7 +568,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@@ -649,7 +650,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -799,7 +800,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
func TestRCSyncExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@@ -823,7 +824,7 @@ func TestRCSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@@ -866,7 +867,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
func TestRCManagerNotReady(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
@@ -904,7 +905,7 @@ func TestOverlappingRCs(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
for i := 0; i < 5; i++ {
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
@@ -933,7 +934,7 @@ func TestOverlappingRCs(t *testing.T) {
func TestDeletionTimestamp(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
controllerSpec := newReplicationController(1)
@@ -1020,7 +1021,7 @@ func TestDeletionTimestamp(t *testing.T) {
func BenchmarkGetPodControllerMultiNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
const nsNum = 1000
@@ -1066,7 +1067,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) {
func BenchmarkGetPodControllerSingleNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
const rcNum = 1000
const replicaNum = 3