Merge pull request #25414 from derekwaynecarr/quota_int_test_improvements

Automatic merge from submit-queue

Quota integration test improvements

This PR does the following:

* allow a replication manager to get created that does not record events
* improve the shutdown behavior of replication manager and resource quota to ensure doWork funcs exit properly
* update quota integration test to use non event generating replication manager, reduce number of pods to provision

I am hoping this combination of changes should fix the referenced flake.

Fixes https://github.com/kubernetes/kubernetes/issues/25037
This commit is contained in:
k8s-merge-robot
2016-05-10 18:58:34 -07:00
6 changed files with 138 additions and 64 deletions

View File

@@ -102,11 +102,18 @@ type ReplicationManager struct {
queue *workqueue.Type
}
// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicationManagerInternal(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
}
// newReplicationManagerInternal configures a replication manager with the specified event recorder
func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
@@ -115,7 +122,7 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
Recorder: eventRecorder,
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
@@ -195,7 +202,14 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient
rm.podStoreSynced = rm.podController.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rm := newReplicationManagerInternal(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
@@ -413,18 +427,23 @@ func (rm *ReplicationManager) enqueueController(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
workFunc := func() bool {
key, quit := rm.queue.Get()
if quit {
return true
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
return false
}
for {
func() {
key, quit := rm.queue.Get()
if quit {
return
}
defer rm.queue.Done(key)
err := rm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing replication controller: %v", err)
}
}()
if quit := workFunc(); quit {
glog.Infof("replication controller worker shutting down")
return
}
}
}

View File

@@ -163,19 +163,24 @@ func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rq *ResourceQuotaController) worker() {
workFunc := func() bool {
key, quit := rq.queue.Get()
if quit {
return true
}
defer rq.queue.Done(key)
err := rq.syncHandler(key.(string))
if err != nil {
utilruntime.HandleError(err)
rq.queue.Add(key)
}
return false
}
for {
func() {
key, quit := rq.queue.Get()
if quit {
return
}
defer rq.queue.Done(key)
err := rq.syncHandler(key.(string))
if err != nil {
utilruntime.HandleError(err)
rq.queue.Add(key)
}
}()
if quit := workFunc(); quit {
glog.Infof("resource quota controller worker shutting down")
return
}
}
}