client-go: add jitter to flowcontrol.Backoff
This commit is contained in:
@@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package flowcontrol
|
package flowcontrol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -36,23 +37,43 @@ type Backoff struct {
|
|||||||
defaultDuration time.Duration
|
defaultDuration time.Duration
|
||||||
maxDuration time.Duration
|
maxDuration time.Duration
|
||||||
perItemBackoff map[string]*backoffEntry
|
perItemBackoff map[string]*backoffEntry
|
||||||
|
rand *rand.Rand
|
||||||
|
|
||||||
|
// maxJitterFactor adds jitter to the exponentially backed off delay.
|
||||||
|
// if maxJitterFactor is zero, no jitter is added to the delay in
|
||||||
|
// order to maintain current behavior.
|
||||||
|
maxJitterFactor float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff {
|
func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff {
|
||||||
return &Backoff{
|
return newBackoff(tc, initial, max, 0.0)
|
||||||
perItemBackoff: map[string]*backoffEntry{},
|
|
||||||
Clock: tc,
|
|
||||||
defaultDuration: initial,
|
|
||||||
maxDuration: max,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBackOff(initial, max time.Duration) *Backoff {
|
func NewBackOff(initial, max time.Duration) *Backoff {
|
||||||
|
return NewBackOffWithJitter(initial, max, 0.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakeBackOffWithJitter(initial, max time.Duration, tc *testingclock.FakeClock, maxJitterFactor float64) *Backoff {
|
||||||
|
return newBackoff(tc, initial, max, maxJitterFactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff {
|
||||||
|
clock := clock.RealClock{}
|
||||||
|
return newBackoff(clock, initial, max, maxJitterFactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff {
|
||||||
|
var random *rand.Rand
|
||||||
|
if maxJitterFactor > 0 {
|
||||||
|
random = rand.New(rand.NewSource(clock.Now().UnixNano()))
|
||||||
|
}
|
||||||
return &Backoff{
|
return &Backoff{
|
||||||
perItemBackoff: map[string]*backoffEntry{},
|
perItemBackoff: map[string]*backoffEntry{},
|
||||||
Clock: clock.RealClock{},
|
Clock: clock,
|
||||||
defaultDuration: initial,
|
defaultDuration: initial,
|
||||||
maxDuration: max,
|
maxDuration: max,
|
||||||
|
maxJitterFactor: maxJitterFactor,
|
||||||
|
rand: random,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,8 +96,10 @@ func (p *Backoff) Next(id string, eventTime time.Time) {
|
|||||||
entry, ok := p.perItemBackoff[id]
|
entry, ok := p.perItemBackoff[id]
|
||||||
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
|
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
|
||||||
entry = p.initEntryUnsafe(id)
|
entry = p.initEntryUnsafe(id)
|
||||||
|
entry.backoff += p.jitter(entry.backoff)
|
||||||
} else {
|
} else {
|
||||||
delay := entry.backoff * 2 // exponential
|
delay := entry.backoff * 2 // exponential
|
||||||
|
delay += p.jitter(entry.backoff) // add some jitter to the delay
|
||||||
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
|
entry.backoff = time.Duration(integer.Int64Min(int64(delay), int64(p.maxDuration)))
|
||||||
}
|
}
|
||||||
entry.lastUpdate = p.Clock.Now()
|
entry.lastUpdate = p.Clock.Now()
|
||||||
@@ -144,6 +167,14 @@ func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
|
|||||||
return entry
|
return entry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Backoff) jitter(delay time.Duration) time.Duration {
|
||||||
|
if p.rand == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay))
|
||||||
|
}
|
||||||
|
|
||||||
// After 2*maxDuration we restart the backoff factor to the beginning
|
// After 2*maxDuration we restart the backoff factor to the beginning
|
||||||
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
|
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
|
||||||
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
|
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
|
||||||
|
@@ -193,3 +193,60 @@ func TestIsInBackOffSinceUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBackoffWithJitter(t *testing.T) {
|
||||||
|
id := "_idJitter"
|
||||||
|
tc := testingclock.NewFakeClock(time.Now())
|
||||||
|
|
||||||
|
// test setup: we show 11 iterations, series of delays we expect with
|
||||||
|
// a jitter factor of zero each time:
|
||||||
|
// 100ms 200ms 400ms 800ms 1.6s 3.2s 06.4s 12.8s 25.6s 51.2s 1m42s
|
||||||
|
// and with jitter factor of 0.1 (max) each time:
|
||||||
|
// 110ms 231ms 485ms 1.0s 2.1s 4.4s 09.4s 19.8s 41.6s 1m27s 2m6s
|
||||||
|
//
|
||||||
|
// with the following configuration, it is guaranteed that the maximum delay
|
||||||
|
// will be reached even though we are unlucky and get jitter factor of zero.
|
||||||
|
// This ensures that this test covers the code path for checking whether
|
||||||
|
// maximum delay has been reached with jitter enabled.
|
||||||
|
initial := 100 * time.Millisecond
|
||||||
|
maxDuration := time.Minute
|
||||||
|
maxJitterFactor := 0.1
|
||||||
|
attempts := 10
|
||||||
|
|
||||||
|
b := NewFakeBackOffWithJitter(initial, maxDuration, tc, maxJitterFactor)
|
||||||
|
|
||||||
|
assert := func(t *testing.T, factor int, prevDelayGot, curDelayGot time.Duration) {
|
||||||
|
low := time.Duration((float64(prevDelayGot) * float64(factor)))
|
||||||
|
high := low + time.Duration(maxJitterFactor*float64(prevDelayGot))
|
||||||
|
if !((curDelayGot > low && curDelayGot <= high) || curDelayGot == maxDuration) {
|
||||||
|
t.Errorf("jittered delay not within range: (%s - %s], but got %s", low, high, curDelayGot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delays := make([]time.Duration, 0)
|
||||||
|
next := func() time.Duration {
|
||||||
|
tc.Step(initial)
|
||||||
|
b.Next(id, tc.Now())
|
||||||
|
|
||||||
|
delay := b.Get(id)
|
||||||
|
delays = append(delays, delay)
|
||||||
|
return delay
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := b.Get(id); got != 0 {
|
||||||
|
t.Errorf("expected a zero wait durtion, but got: %s", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
delayGot := next()
|
||||||
|
assert(t, 1, initial, delayGot)
|
||||||
|
|
||||||
|
prevDelayGot := delayGot
|
||||||
|
for i := 0; i < attempts; i++ {
|
||||||
|
delayGot = next()
|
||||||
|
assert(t, 2, prevDelayGot, delayGot)
|
||||||
|
|
||||||
|
prevDelayGot = delayGot
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("exponentially backed off jittered delays: %v", delays)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user