From dd9301ede92679626ce94c7198d50dbc6831fa69 Mon Sep 17 00:00:00 2001 From: Kris Date: Thu, 7 Jan 2016 10:13:22 -0800 Subject: [PATCH] Fix data race by eliminating concurrency in test I broke out the error retry logic into a named function that could be tested independently of the rest of the event processing framework. This allows the test to know when the retry logic is done. The problem with the original test was there was no reliable way to know when it was done trying record an event. A sentinal event was being used, but there is no ordering guarantee. I could have added synchronization around attempts tracking to fix the data race, but the test case was still fundamentally flawed and would error occasionally. --- pkg/client/record/event.go | 62 ++++++++++++++++++--------------- pkg/client/record/event_test.go | 55 ++++++++--------------------- 2 files changed, 48 insertions(+), 69 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index c8b98c86277..66ff252d665 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -109,38 +109,42 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin eventCorrelator := NewEventCorrelator(util.RealClock{}) return eventBroadcaster.StartEventWatcher( func(event *api.Event) { - // Make a copy before modification, because there could be multiple listeners. - // Events are safe to copy like this. - eventCopy := *event - event = &eventCopy - result, err := eventCorrelator.EventCorrelate(event) - if err != nil { - util.HandleError(err) - } - if result.Skip { - return - } - tries := 0 - for { - if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { - break - } - tries++ - if tries >= maxTriesPerEvent { - glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) - break - } - // Randomize the first sleep so that various clients won't all be - // synced up if the master goes down. - if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) - } else { - time.Sleep(sleepDuration) - } - } + recordToSink(sink, event, eventCorrelator, randGen) }) } +func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy + result, err := eventCorrelator.EventCorrelate(event) + if err != nil { + util.HandleError(err) + } + if result.Skip { + return + } + tries := 0 + for { + if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} + func isKeyNotFoundError(err error) bool { statusErr, _ := err.(*errors.StatusError) // At the moment the server is returning 500 instead of a more specific diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index d0baa92bea3..3c8983b5b02 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -19,7 +19,7 @@ package record import ( "encoding/json" "fmt" - "runtime" + "math/rand" "strconv" "testing" "time" @@ -382,16 +382,8 @@ func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBr } func TestWriteEventError(t *testing.T) { - ref := &api.ObjectReference{ - Kind: "Pod", - Name: "foo", - Namespace: "baz", - UID: "bar", - APIVersion: "version", - } type entry struct { timesToSendError int - attemptsMade int attemptsWanted int err error } @@ -422,42 +414,25 @@ func TestWriteEventError(t *testing.T) { err: fmt.Errorf("A weird error"), }, } - done := make(chan struct{}) - eventBroadcaster := NewBroadcaster() - defer eventBroadcaster.StartRecordingToSink( - &testEventSink{ + eventCorrelator := NewEventCorrelator(util.RealClock{}) + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) + + for caseName, ent := range table { + attempts := 0 + sink := &testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { - if event.Message == "finished" { - close(done) - return event, nil - } - item, ok := table[event.Message] - if !ok { - t.Errorf("Unexpected event: %#v", event) - return event, nil - } - item.attemptsMade++ - if item.attemptsMade < item.timesToSendError { - return nil, item.err + attempts++ + if attempts < ent.timesToSendError { + return nil, ent.err } return event, nil }, - }, - ).Stop() - clock := &util.FakeClock{time.Now()} - recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) - for caseName := range table { - clock.Step(1 * time.Second) - recorder.Event(ref, api.EventTypeNormal, "Reason", caseName) - runtime.Gosched() - } - recorder.Event(ref, api.EventTypeNormal, "Reason", "finished") - <-done - - for caseName, item := range table { - if e, a := item.attemptsWanted, item.attemptsMade; e != a { - t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a) + } + ev := &api.Event{} + recordToSink(sink, ev, eventCorrelator, randGen) + if attempts != ent.attemptsWanted { + t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) } } }