Fix data race by eliminating concurrency in test
I broke out the error retry logic into a named function that could be tested independently of the rest of the event processing framework. This allows the test to know when the retry logic is done. The problem with the original test was there was no reliable way to know when it was done trying record an event. A sentinal event was being used, but there is no ordering guarantee. I could have added synchronization around attempts tracking to fix the data race, but the test case was still fundamentally flawed and would error occasionally.
This commit is contained in:
@@ -109,38 +109,42 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
|
||||
eventCorrelator := NewEventCorrelator(util.RealClock{})
|
||||
return eventBroadcaster.StartEventWatcher(
|
||||
func(event *api.Event) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
result, err := eventCorrelator.EventCorrelate(event)
|
||||
if err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
if result.Skip {
|
||||
return
|
||||
}
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
}
|
||||
recordToSink(sink, event, eventCorrelator, randGen)
|
||||
})
|
||||
}
|
||||
|
||||
func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
result, err := eventCorrelator.EventCorrelate(event)
|
||||
if err != nil {
|
||||
util.HandleError(err)
|
||||
}
|
||||
if result.Skip {
|
||||
return
|
||||
}
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isKeyNotFoundError(err error) bool {
|
||||
statusErr, _ := err.(*errors.StatusError)
|
||||
// At the moment the server is returning 500 instead of a more specific
|
||||
|
@@ -19,7 +19,7 @@ package record
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -382,16 +382,8 @@ func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBr
|
||||
}
|
||||
|
||||
func TestWriteEventError(t *testing.T) {
|
||||
ref := &api.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: "foo",
|
||||
Namespace: "baz",
|
||||
UID: "bar",
|
||||
APIVersion: "version",
|
||||
}
|
||||
type entry struct {
|
||||
timesToSendError int
|
||||
attemptsMade int
|
||||
attemptsWanted int
|
||||
err error
|
||||
}
|
||||
@@ -422,42 +414,25 @@ func TestWriteEventError(t *testing.T) {
|
||||
err: fmt.Errorf("A weird error"),
|
||||
},
|
||||
}
|
||||
done := make(chan struct{})
|
||||
|
||||
eventBroadcaster := NewBroadcaster()
|
||||
defer eventBroadcaster.StartRecordingToSink(
|
||||
&testEventSink{
|
||||
eventCorrelator := NewEventCorrelator(util.RealClock{})
|
||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
for caseName, ent := range table {
|
||||
attempts := 0
|
||||
sink := &testEventSink{
|
||||
OnCreate: func(event *api.Event) (*api.Event, error) {
|
||||
if event.Message == "finished" {
|
||||
close(done)
|
||||
return event, nil
|
||||
}
|
||||
item, ok := table[event.Message]
|
||||
if !ok {
|
||||
t.Errorf("Unexpected event: %#v", event)
|
||||
return event, nil
|
||||
}
|
||||
item.attemptsMade++
|
||||
if item.attemptsMade < item.timesToSendError {
|
||||
return nil, item.err
|
||||
attempts++
|
||||
if attempts < ent.timesToSendError {
|
||||
return nil, ent.err
|
||||
}
|
||||
return event, nil
|
||||
},
|
||||
},
|
||||
).Stop()
|
||||
clock := &util.FakeClock{time.Now()}
|
||||
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
|
||||
for caseName := range table {
|
||||
clock.Step(1 * time.Second)
|
||||
recorder.Event(ref, api.EventTypeNormal, "Reason", caseName)
|
||||
runtime.Gosched()
|
||||
}
|
||||
recorder.Event(ref, api.EventTypeNormal, "Reason", "finished")
|
||||
<-done
|
||||
|
||||
for caseName, item := range table {
|
||||
if e, a := item.attemptsWanted, item.attemptsMade; e != a {
|
||||
t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a)
|
||||
}
|
||||
ev := &api.Event{}
|
||||
recordToSink(sink, ev, eventCorrelator, randGen)
|
||||
if attempts != ent.attemptsWanted {
|
||||
t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user