Shorten execution time of HPA tests
This commit is contained in:
		| @@ -35,7 +35,7 @@ import ( | |||||||
|  |  | ||||||
| const maxTriesPerEvent = 12 | const maxTriesPerEvent = 12 | ||||||
|  |  | ||||||
| var sleepDuration = 10 * time.Second | var defaultSleepDuration = 10 * time.Second | ||||||
|  |  | ||||||
| const maxQueuedEvents = 1000 | const maxQueuedEvents = 1000 | ||||||
|  |  | ||||||
| @@ -93,11 +93,16 @@ type EventBroadcaster interface { | |||||||
|  |  | ||||||
| // Creates a new event broadcaster. | // Creates a new event broadcaster. | ||||||
| func NewBroadcaster() EventBroadcaster { | func NewBroadcaster() EventBroadcaster { | ||||||
| 	return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)} | 	return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { | ||||||
|  | 	return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration} | ||||||
| } | } | ||||||
|  |  | ||||||
| type eventBroadcasterImpl struct { | type eventBroadcasterImpl struct { | ||||||
| 	*watch.Broadcaster | 	*watch.Broadcaster | ||||||
|  | 	sleepDuration time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. | // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. | ||||||
| @@ -110,11 +115,11 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin | |||||||
| 	eventCorrelator := NewEventCorrelator(util.RealClock{}) | 	eventCorrelator := NewEventCorrelator(util.RealClock{}) | ||||||
| 	return eventBroadcaster.StartEventWatcher( | 	return eventBroadcaster.StartEventWatcher( | ||||||
| 		func(event *api.Event) { | 		func(event *api.Event) { | ||||||
| 			recordToSink(sink, event, eventCorrelator, randGen) | 			recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration) | ||||||
| 		}) | 		}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) { | func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) { | ||||||
| 	// Make a copy before modification, because there could be multiple listeners. | 	// Make a copy before modification, because there could be multiple listeners. | ||||||
| 	// Events are safe to copy like this. | 	// Events are safe to copy like this. | ||||||
| 	eventCopy := *event | 	eventCopy := *event | ||||||
|   | |||||||
| @@ -33,11 +33,6 @@ import ( | |||||||
| 	"k8s.io/kubernetes/pkg/util/strategicpatch" | 	"k8s.io/kubernetes/pkg/util/strategicpatch" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func init() { |  | ||||||
| 	// Don't bother sleeping between retries. |  | ||||||
| 	sleepDuration = 0 |  | ||||||
| } |  | ||||||
|  |  | ||||||
| type testEventSink struct { | type testEventSink struct { | ||||||
| 	OnCreate func(e *api.Event) (*api.Event, error) | 	OnCreate func(e *api.Event) (*api.Event, error) | ||||||
| 	OnUpdate func(e *api.Event) (*api.Event, error) | 	OnUpdate func(e *api.Event) (*api.Event, error) | ||||||
| @@ -346,7 +341,7 @@ func TestEventf(t *testing.T) { | |||||||
| 		}, | 		}, | ||||||
| 		OnPatch: OnPatchFactory(testCache, patchEvent), | 		OnPatch: OnPatchFactory(testCache, patchEvent), | ||||||
| 	} | 	} | ||||||
| 	eventBroadcaster := NewBroadcaster() | 	eventBroadcaster := NewBroadcasterForTests(0) | ||||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||||
|  |  | ||||||
| 	clock := util.NewFakeClock(time.Now()) | 	clock := util.NewFakeClock(time.Now()) | ||||||
| @@ -431,7 +426,7 @@ func TestWriteEventError(t *testing.T) { | |||||||
| 			}, | 			}, | ||||||
| 		} | 		} | ||||||
| 		ev := &api.Event{} | 		ev := &api.Event{} | ||||||
| 		recordToSink(sink, ev, eventCorrelator, randGen) | 		recordToSink(sink, ev, eventCorrelator, randGen, 0) | ||||||
| 		if attempts != ent.attemptsWanted { | 		if attempts != ent.attemptsWanted { | ||||||
| 			t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) | 			t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) | ||||||
| 		} | 		} | ||||||
| @@ -460,7 +455,7 @@ func TestLotsOfEvents(t *testing.T) { | |||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	eventBroadcaster := NewBroadcaster() | 	eventBroadcaster := NewBroadcasterForTests(0) | ||||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||||
| 	logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { | 	logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { | ||||||
| 		loggerCalled <- struct{}{} | 		loggerCalled <- struct{}{} | ||||||
| @@ -557,7 +552,7 @@ func TestEventfNoNamespace(t *testing.T) { | |||||||
| 		}, | 		}, | ||||||
| 		OnPatch: OnPatchFactory(testCache, patchEvent), | 		OnPatch: OnPatchFactory(testCache, patchEvent), | ||||||
| 	} | 	} | ||||||
| 	eventBroadcaster := NewBroadcaster() | 	eventBroadcaster := NewBroadcasterForTests(0) | ||||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||||
|  |  | ||||||
| 	clock := util.NewFakeClock(time.Now()) | 	clock := util.NewFakeClock(time.Now()) | ||||||
| @@ -846,7 +841,7 @@ func TestMultiSinkCache(t *testing.T) { | |||||||
| 		OnPatch: OnPatchFactory(testCache2, patchEvent2), | 		OnPatch: OnPatchFactory(testCache2, patchEvent2), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	eventBroadcaster := NewBroadcaster() | 	eventBroadcaster := NewBroadcasterForTests(0) | ||||||
| 	clock := util.NewFakeClock(time.Now()) | 	clock := util.NewFakeClock(time.Now()) | ||||||
| 	recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | 	recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -65,19 +65,8 @@ type HorizontalController struct { | |||||||
| var downscaleForbiddenWindow = 5 * time.Minute | var downscaleForbiddenWindow = 5 * time.Minute | ||||||
| var upscaleForbiddenWindow = 3 * time.Minute | var upscaleForbiddenWindow = 3 * time.Minute | ||||||
|  |  | ||||||
| func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { | func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *framework.Controller) { | ||||||
| 	broadcaster := record.NewBroadcaster() | 	return framework.NewInformer( | ||||||
| 	broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")}) |  | ||||||
| 	recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) |  | ||||||
|  |  | ||||||
| 	controller := &HorizontalController{ |  | ||||||
| 		metricsClient:   metricsClient, |  | ||||||
| 		eventRecorder:   recorder, |  | ||||||
| 		scaleNamespacer: scaleNamespacer, |  | ||||||
| 		hpaNamespacer:   hpaNamespacer, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	controller.store, controller.controller = framework.NewInformer( |  | ||||||
| 		&cache.ListWatch{ | 		&cache.ListWatch{ | ||||||
| 			ListFunc: func(options api.ListOptions) (runtime.Object, error) { | 			ListFunc: func(options api.ListOptions) (runtime.Object, error) { | ||||||
| 				return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options) | 				return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options) | ||||||
| @@ -111,6 +100,22 @@ func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNa | |||||||
| 			// We are not interested in deletions. | 			// We are not interested in deletions. | ||||||
| 		}, | 		}, | ||||||
| 	) | 	) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedextensions.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController { | ||||||
|  | 	broadcaster := record.NewBroadcaster() | ||||||
|  | 	broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{evtNamespacer.Events("")}) | ||||||
|  | 	recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) | ||||||
|  |  | ||||||
|  | 	controller := &HorizontalController{ | ||||||
|  | 		metricsClient:   metricsClient, | ||||||
|  | 		eventRecorder:   recorder, | ||||||
|  | 		scaleNamespacer: scaleNamespacer, | ||||||
|  | 		hpaNamespacer:   hpaNamespacer, | ||||||
|  | 	} | ||||||
|  | 	store, frameworkController := newInformer(controller, resyncPeriod) | ||||||
|  | 	controller.store = store | ||||||
|  | 	controller.controller = frameworkController | ||||||
|  |  | ||||||
| 	return controller | 	return controller | ||||||
| } | } | ||||||
|   | |||||||
| @@ -29,8 +29,10 @@ import ( | |||||||
| 	_ "k8s.io/kubernetes/pkg/apimachinery/registered" | 	_ "k8s.io/kubernetes/pkg/apimachinery/registered" | ||||||
| 	"k8s.io/kubernetes/pkg/apis/extensions" | 	"k8s.io/kubernetes/pkg/apis/extensions" | ||||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | 	"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" | ||||||
|  | 	"k8s.io/kubernetes/pkg/client/record" | ||||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | 	"k8s.io/kubernetes/pkg/client/restclient" | ||||||
| 	"k8s.io/kubernetes/pkg/client/testing/core" | 	"k8s.io/kubernetes/pkg/client/testing/core" | ||||||
|  | 	unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" | ||||||
| 	"k8s.io/kubernetes/pkg/client/unversioned/testclient" | 	"k8s.io/kubernetes/pkg/client/unversioned/testclient" | ||||||
| 	"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" | 	"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" | ||||||
| 	"k8s.io/kubernetes/pkg/runtime" | 	"k8s.io/kubernetes/pkg/runtime" | ||||||
| @@ -329,13 +331,29 @@ func (tc *testCase) verifyResults(t *testing.T) { | |||||||
| func (tc *testCase) runTest(t *testing.T) { | func (tc *testCase) runTest(t *testing.T) { | ||||||
| 	testClient := tc.prepareTestClient(t) | 	testClient := tc.prepareTestClient(t) | ||||||
| 	metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) | 	metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) | ||||||
| 	hpaController := NewHorizontalController(testClient.Core(), testClient.Extensions(), testClient.Extensions(), metricsClient, 0) |  | ||||||
|  | 	broadcaster := record.NewBroadcasterForTests(0) | ||||||
|  | 	broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{testClient.Core().Events("")}) | ||||||
|  | 	recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"}) | ||||||
|  |  | ||||||
|  | 	hpaController := &HorizontalController{ | ||||||
|  | 		metricsClient:   metricsClient, | ||||||
|  | 		eventRecorder:   recorder, | ||||||
|  | 		scaleNamespacer: testClient.Extensions(), | ||||||
|  | 		hpaNamespacer:   testClient.Extensions(), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	store, frameworkController := newInformer(hpaController, time.Minute) | ||||||
|  | 	hpaController.store = store | ||||||
|  | 	hpaController.controller = frameworkController | ||||||
|  |  | ||||||
| 	stop := make(chan struct{}) | 	stop := make(chan struct{}) | ||||||
| 	defer close(stop) | 	defer close(stop) | ||||||
| 	go hpaController.Run(stop) | 	go hpaController.Run(stop) | ||||||
|  |  | ||||||
| 	if tc.verifyEvents { | 	if tc.verifyEvents { | ||||||
| 		// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). | 		// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration). | ||||||
| 		time.Sleep(12 * time.Second) | 		time.Sleep(2 * time.Second) | ||||||
| 	} | 	} | ||||||
| 	// Wait for HPA to be processed. | 	// Wait for HPA to be processed. | ||||||
| 	<-tc.processed | 	<-tc.processed | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Marcin Wielgus
					Marcin Wielgus