diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index c8b98c86277..66ff252d665 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -109,38 +109,42 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin eventCorrelator := NewEventCorrelator(util.RealClock{}) return eventBroadcaster.StartEventWatcher( func(event *api.Event) { - // Make a copy before modification, because there could be multiple listeners. - // Events are safe to copy like this. - eventCopy := *event - event = &eventCopy - result, err := eventCorrelator.EventCorrelate(event) - if err != nil { - util.HandleError(err) - } - if result.Skip { - return - } - tries := 0 - for { - if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { - break - } - tries++ - if tries >= maxTriesPerEvent { - glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) - break - } - // Randomize the first sleep so that various clients won't all be - // synced up if the master goes down. - if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) - } else { - time.Sleep(sleepDuration) - } - } + recordToSink(sink, event, eventCorrelator, randGen) }) } +func recordToSink(sink EventSink, event *api.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand) { + // Make a copy before modification, because there could be multiple listeners. + // Events are safe to copy like this. + eventCopy := *event + event = &eventCopy + result, err := eventCorrelator.EventCorrelate(event) + if err != nil { + util.HandleError(err) + } + if result.Skip { + return + } + tries := 0 + for { + if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { + break + } + tries++ + if tries >= maxTriesPerEvent { + glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) + break + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} + func isKeyNotFoundError(err error) bool { statusErr, _ := err.(*errors.StatusError) // At the moment the server is returning 500 instead of a more specific diff --git a/pkg/client/record/event_test.go b/pkg/client/record/event_test.go index d0baa92bea3..3c8983b5b02 100644 --- a/pkg/client/record/event_test.go +++ b/pkg/client/record/event_test.go @@ -19,7 +19,7 @@ package record import ( "encoding/json" "fmt" - "runtime" + "math/rand" "strconv" "testing" "time" @@ -382,16 +382,8 @@ func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBr } func TestWriteEventError(t *testing.T) { - ref := &api.ObjectReference{ - Kind: "Pod", - Name: "foo", - Namespace: "baz", - UID: "bar", - APIVersion: "version", - } type entry struct { timesToSendError int - attemptsMade int attemptsWanted int err error } @@ -422,42 +414,25 @@ func TestWriteEventError(t *testing.T) { err: fmt.Errorf("A weird error"), }, } - done := make(chan struct{}) - eventBroadcaster := NewBroadcaster() - defer eventBroadcaster.StartRecordingToSink( - &testEventSink{ + eventCorrelator := NewEventCorrelator(util.RealClock{}) + randGen := rand.New(rand.NewSource(time.Now().UnixNano())) + + for caseName, ent := range table { + attempts := 0 + sink := &testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { - if event.Message == "finished" { - close(done) - return event, nil - } - item, ok := table[event.Message] - if !ok { - t.Errorf("Unexpected event: %#v", event) - return event, nil - } - item.attemptsMade++ - if item.attemptsMade < item.timesToSendError { - return nil, item.err + attempts++ + if attempts < ent.timesToSendError { + return nil, ent.err } return event, nil }, - }, - ).Stop() - clock := &util.FakeClock{time.Now()} - recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock) - for caseName := range table { - clock.Step(1 * time.Second) - recorder.Event(ref, api.EventTypeNormal, "Reason", caseName) - runtime.Gosched() - } - recorder.Event(ref, api.EventTypeNormal, "Reason", "finished") - <-done - - for caseName, item := range table { - if e, a := item.attemptsWanted, item.attemptsMade; e != a { - t.Errorf("case %v: wanted %v, got %v attempts", caseName, e, a) + } + ev := &api.Event{} + recordToSink(sink, ev, eventCorrelator, randGen) + if attempts != ent.attemptsWanted { + t.Errorf("case %v: wanted %d, got %d attempts", caseName, ent.attemptsWanted, attempts) } } }