261 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package events
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| // RetryingSink retries the write until success or an ErrSinkClosed is
 | |
| // returned. Underlying sink must have p > 0 of succeeding or the sink will
 | |
| // block. Retry is configured with a RetryStrategy.  Concurrent calls to a
 | |
| // retrying sink are serialized through the sink, meaning that if one is
 | |
| // in-flight, another will not proceed.
 | |
| type RetryingSink struct {
 | |
| 	sink     Sink
 | |
| 	strategy RetryStrategy
 | |
| 	closed   chan struct{}
 | |
| 	once     sync.Once
 | |
| }
 | |
| 
 | |
| // NewRetryingSink returns a sink that will retry writes to a sink, backing
 | |
| // off on failure. Parameters threshold and backoff adjust the behavior of the
 | |
| // circuit breaker.
 | |
| func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
 | |
| 	rs := &RetryingSink{
 | |
| 		sink:     sink,
 | |
| 		strategy: strategy,
 | |
| 		closed:   make(chan struct{}),
 | |
| 	}
 | |
| 
 | |
| 	return rs
 | |
| }
 | |
| 
 | |
| // Write attempts to flush the events to the downstream sink until it succeeds
 | |
| // or the sink is closed.
 | |
| func (rs *RetryingSink) Write(event Event) error {
 | |
| 	logger := logrus.WithField("event", event)
 | |
| 
 | |
| retry:
 | |
| 	select {
 | |
| 	case <-rs.closed:
 | |
| 		return ErrSinkClosed
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	if backoff := rs.strategy.Proceed(event); backoff > 0 {
 | |
| 		select {
 | |
| 		case <-time.After(backoff):
 | |
| 			// TODO(stevvooe): This branch holds up the next try. Before, we
 | |
| 			// would simply break to the "retry" label and then possibly wait
 | |
| 			// again. However, this requires all retry strategies to have a
 | |
| 			// large probability of probing the sync for success, rather than
 | |
| 			// just backing off and sending the request.
 | |
| 		case <-rs.closed:
 | |
| 			return ErrSinkClosed
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := rs.sink.Write(event); err != nil {
 | |
| 		if err == ErrSinkClosed {
 | |
| 			// terminal!
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		logger := logger.WithError(err) // shadow!!
 | |
| 
 | |
| 		if rs.strategy.Failure(event, err) {
 | |
| 			logger.Errorf("retryingsink: dropped event")
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		logger.Errorf("retryingsink: error writing event, retrying")
 | |
| 		goto retry
 | |
| 	}
 | |
| 
 | |
| 	rs.strategy.Success(event)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close closes the sink and the underlying sink.
 | |
| func (rs *RetryingSink) Close() error {
 | |
| 	rs.once.Do(func() {
 | |
| 		close(rs.closed)
 | |
| 	})
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (rs *RetryingSink) String() string {
 | |
| 	// Serialize a copy of the RetryingSink without the sync.Once, to avoid
 | |
| 	// a data race.
 | |
| 	rs2 := map[string]interface{}{
 | |
| 		"sink":     rs.sink,
 | |
| 		"strategy": rs.strategy,
 | |
| 		"closed":   rs.closed,
 | |
| 	}
 | |
| 	return fmt.Sprint(rs2)
 | |
| }
 | |
| 
 | |
| // RetryStrategy defines a strategy for retrying event sink writes.
 | |
| //
 | |
| // All methods should be goroutine safe.
 | |
| type RetryStrategy interface {
 | |
| 	// Proceed is called before every event send. If proceed returns a
 | |
| 	// positive, non-zero integer, the retryer will back off by the provided
 | |
| 	// duration.
 | |
| 	//
 | |
| 	// An event is provided, by may be ignored.
 | |
| 	Proceed(event Event) time.Duration
 | |
| 
 | |
| 	// Failure reports a failure to the strategy. If this method returns true,
 | |
| 	// the event should be dropped.
 | |
| 	Failure(event Event, err error) bool
 | |
| 
 | |
| 	// Success should be called when an event is sent successfully.
 | |
| 	Success(event Event)
 | |
| }
 | |
| 
 | |
| // Breaker implements a circuit breaker retry strategy.
 | |
| //
 | |
| // The current implementation never drops events.
 | |
| type Breaker struct {
 | |
| 	threshold int
 | |
| 	recent    int
 | |
| 	last      time.Time
 | |
| 	backoff   time.Duration // time after which we retry after failure.
 | |
| 	mu        sync.Mutex
 | |
| }
 | |
| 
 | |
| var _ RetryStrategy = &Breaker{}
 | |
| 
 | |
| // NewBreaker returns a breaker that will backoff after the threshold has been
 | |
| // tripped. A Breaker is thread safe and may be shared by many goroutines.
 | |
| func NewBreaker(threshold int, backoff time.Duration) *Breaker {
 | |
| 	return &Breaker{
 | |
| 		threshold: threshold,
 | |
| 		backoff:   backoff,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Proceed checks the failures against the threshold.
 | |
| func (b *Breaker) Proceed(event Event) time.Duration {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 
 | |
| 	if b.recent < b.threshold {
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	return b.last.Add(b.backoff).Sub(time.Now())
 | |
| }
 | |
| 
 | |
| // Success resets the breaker.
 | |
| func (b *Breaker) Success(event Event) {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 
 | |
| 	b.recent = 0
 | |
| 	b.last = time.Time{}
 | |
| }
 | |
| 
 | |
| // Failure records the failure and latest failure time.
 | |
| func (b *Breaker) Failure(event Event, err error) bool {
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 
 | |
| 	b.recent++
 | |
| 	b.last = time.Now().UTC()
 | |
| 	return false // never drop events.
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	// DefaultExponentialBackoffConfig provides a default configuration for
 | |
| 	// exponential backoff.
 | |
| 	DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
 | |
| 		Base:   time.Second,
 | |
| 		Factor: time.Second,
 | |
| 		Max:    20 * time.Second,
 | |
| 	}
 | |
| )
 | |
| 
 | |
| // ExponentialBackoffConfig configures backoff parameters.
 | |
| //
 | |
| // Note that these parameters operate on the upper bound for choosing a random
 | |
| // value. For example, at Base=1s, a random value in [0,1s) will be chosen for
 | |
| // the backoff value.
 | |
| type ExponentialBackoffConfig struct {
 | |
| 	// Base is the minimum bound for backing off after failure.
 | |
| 	Base time.Duration
 | |
| 
 | |
| 	// Factor sets the amount of time by which the backoff grows with each
 | |
| 	// failure.
 | |
| 	Factor time.Duration
 | |
| 
 | |
| 	// Max is the absolute maxiumum bound for a single backoff.
 | |
| 	Max time.Duration
 | |
| }
 | |
| 
 | |
| // ExponentialBackoff implements random backoff with exponentially increasing
 | |
| // bounds as the number consecutive failures increase.
 | |
| type ExponentialBackoff struct {
 | |
| 	config   ExponentialBackoffConfig
 | |
| 	failures uint64 // consecutive failure counter.
 | |
| }
 | |
| 
 | |
| // NewExponentialBackoff returns an exponential backoff strategy with the
 | |
| // desired config. If config is nil, the default is returned.
 | |
| func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
 | |
| 	return &ExponentialBackoff{
 | |
| 		config: config,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Proceed returns the next randomly bound exponential backoff time.
 | |
| func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
 | |
| 	return b.backoff(atomic.LoadUint64(&b.failures))
 | |
| }
 | |
| 
 | |
| // Success resets the failures counter.
 | |
| func (b *ExponentialBackoff) Success(event Event) {
 | |
| 	atomic.StoreUint64(&b.failures, 0)
 | |
| }
 | |
| 
 | |
| // Failure increments the failure counter.
 | |
| func (b *ExponentialBackoff) Failure(event Event, err error) bool {
 | |
| 	atomic.AddUint64(&b.failures, 1)
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // backoff calculates the amount of time to wait based on the number of
 | |
| // consecutive failures.
 | |
| func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
 | |
| 	if failures <= 0 {
 | |
| 		// proceed normally when there are no failures.
 | |
| 		return 0
 | |
| 	}
 | |
| 
 | |
| 	factor := b.config.Factor
 | |
| 	if factor <= 0 {
 | |
| 		factor = DefaultExponentialBackoffConfig.Factor
 | |
| 	}
 | |
| 
 | |
| 	backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
 | |
| 
 | |
| 	max := b.config.Max
 | |
| 	if max <= 0 {
 | |
| 		max = DefaultExponentialBackoffConfig.Max
 | |
| 	}
 | |
| 
 | |
| 	if backoff > max || backoff < 0 {
 | |
| 		backoff = max
 | |
| 	}
 | |
| 
 | |
| 	// Choose a uniformly distributed value from [0, backoff).
 | |
| 	return time.Duration(rand.Int63n(int64(backoff)))
 | |
| }
 | 
