run hack/update-staging-client-go.sh after the copy.sh changes
This commit is contained in:
		@@ -0,0 +1,211 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/juju/ratelimit"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type RateLimiter interface {
 | 
				
			||||||
 | 
						// When gets an item and gets to decide how long that item should wait
 | 
				
			||||||
 | 
						When(item interface{}) time.Duration
 | 
				
			||||||
 | 
						// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
 | 
				
			||||||
 | 
						// or for success, we'll stop tracking it
 | 
				
			||||||
 | 
						Forget(item interface{})
 | 
				
			||||||
 | 
						// NumRequeues returns back how many failures the item has had
 | 
				
			||||||
 | 
						NumRequeues(item interface{}) int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue.  It has
 | 
				
			||||||
 | 
					// both overall and per-item rate limitting.  The overall is a token bucket and the per-item is exponential
 | 
				
			||||||
 | 
					func DefaultControllerRateLimiter() RateLimiter {
 | 
				
			||||||
 | 
						return NewMaxOfRateLimiter(
 | 
				
			||||||
 | 
							NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
 | 
				
			||||||
 | 
							// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
 | 
				
			||||||
 | 
							&BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
 | 
				
			||||||
 | 
					type BucketRateLimiter struct {
 | 
				
			||||||
 | 
						*ratelimit.Bucket
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ RateLimiter = &BucketRateLimiter{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *BucketRateLimiter) When(item interface{}) time.Duration {
 | 
				
			||||||
 | 
						return r.Bucket.Take(1)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
 | 
				
			||||||
 | 
						return 0
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *BucketRateLimiter) Forget(item interface{}) {
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
 | 
				
			||||||
 | 
					// dealing with max failures and expiration are up to the caller
 | 
				
			||||||
 | 
					type ItemExponentialFailureRateLimiter struct {
 | 
				
			||||||
 | 
						failuresLock sync.Mutex
 | 
				
			||||||
 | 
						failures     map[interface{}]int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						baseDelay time.Duration
 | 
				
			||||||
 | 
						maxDelay  time.Duration
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
 | 
				
			||||||
 | 
						return &ItemExponentialFailureRateLimiter{
 | 
				
			||||||
 | 
							failures:  map[interface{}]int{},
 | 
				
			||||||
 | 
							baseDelay: baseDelay,
 | 
				
			||||||
 | 
							maxDelay:  maxDelay,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func DefaultItemBasedRateLimiter() RateLimiter {
 | 
				
			||||||
 | 
						return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						exp := r.failures[item]
 | 
				
			||||||
 | 
						r.failures[item] = r.failures[item] + 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The backoff is capped such that 'calculated' value never overflows.
 | 
				
			||||||
 | 
						backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
 | 
				
			||||||
 | 
						if backoff > math.MaxInt64 {
 | 
				
			||||||
 | 
							return r.maxDelay
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						calculated := time.Duration(backoff)
 | 
				
			||||||
 | 
						if calculated > r.maxDelay {
 | 
				
			||||||
 | 
							return r.maxDelay
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return calculated
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return r.failures[item]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						delete(r.failures, item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
 | 
				
			||||||
 | 
					type ItemFastSlowRateLimiter struct {
 | 
				
			||||||
 | 
						failuresLock sync.Mutex
 | 
				
			||||||
 | 
						failures     map[interface{}]int
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						maxFastAttempts int
 | 
				
			||||||
 | 
						fastDelay       time.Duration
 | 
				
			||||||
 | 
						slowDelay       time.Duration
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ RateLimiter = &ItemFastSlowRateLimiter{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
 | 
				
			||||||
 | 
						return &ItemFastSlowRateLimiter{
 | 
				
			||||||
 | 
							failures:        map[interface{}]int{},
 | 
				
			||||||
 | 
							fastDelay:       fastDelay,
 | 
				
			||||||
 | 
							slowDelay:       slowDelay,
 | 
				
			||||||
 | 
							maxFastAttempts: maxFastAttempts,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						r.failures[item] = r.failures[item] + 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if r.failures[item] <= r.maxFastAttempts {
 | 
				
			||||||
 | 
							return r.fastDelay
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return r.slowDelay
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return r.failures[item]
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
 | 
				
			||||||
 | 
						r.failuresLock.Lock()
 | 
				
			||||||
 | 
						defer r.failuresLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						delete(r.failures, item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
 | 
				
			||||||
 | 
					// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
 | 
				
			||||||
 | 
					// were separately delayed a longer time.
 | 
				
			||||||
 | 
					type MaxOfRateLimiter struct {
 | 
				
			||||||
 | 
						limiters []RateLimiter
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
 | 
				
			||||||
 | 
						ret := time.Duration(0)
 | 
				
			||||||
 | 
						for _, limiter := range r.limiters {
 | 
				
			||||||
 | 
							curr := limiter.When(item)
 | 
				
			||||||
 | 
							if curr > ret {
 | 
				
			||||||
 | 
								ret = curr
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return ret
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
 | 
				
			||||||
 | 
						return &MaxOfRateLimiter{limiters: limiters}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
 | 
				
			||||||
 | 
						ret := 0
 | 
				
			||||||
 | 
						for _, limiter := range r.limiters {
 | 
				
			||||||
 | 
							curr := limiter.NumRequeues(item)
 | 
				
			||||||
 | 
							if curr > ret {
 | 
				
			||||||
 | 
								ret = curr
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return ret
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (r *MaxOfRateLimiter) Forget(item interface{}) {
 | 
				
			||||||
 | 
						for _, limiter := range r.limiters {
 | 
				
			||||||
 | 
							limiter.Forget(item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,184 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestItemExponentialFailureRateLimiter(t *testing.T) {
 | 
				
			||||||
 | 
						limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 4*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 8*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 16*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						limiter.Forget("one")
 | 
				
			||||||
 | 
						if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
 | 
				
			||||||
 | 
						limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1000*time.Second)
 | 
				
			||||||
 | 
						for i := 0; i < 5; i++ {
 | 
				
			||||||
 | 
							limiter.When("one")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 32*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := 0; i < 1000; i++ {
 | 
				
			||||||
 | 
							limiter.When("overflow1")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 1000*time.Second, limiter.When("overflow1"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						limiter = NewItemExponentialFailureRateLimiter(1*time.Minute, 1000*time.Hour)
 | 
				
			||||||
 | 
						for i := 0; i < 2; i++ {
 | 
				
			||||||
 | 
							limiter.When("two")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 4*time.Minute, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i := 0; i < 1000; i++ {
 | 
				
			||||||
 | 
							limiter.When("overflow2")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 1000*time.Hour, limiter.When("overflow2"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestItemFastSlowRateLimiter(t *testing.T) {
 | 
				
			||||||
 | 
						limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 10*time.Second, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 10*time.Second, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						limiter.Forget("one")
 | 
				
			||||||
 | 
						if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestMaxOfRateLimiter(t *testing.T) {
 | 
				
			||||||
 | 
						limiter := NewMaxOfRateLimiter(
 | 
				
			||||||
 | 
							NewItemFastSlowRateLimiter(5*time.Millisecond, 3*time.Second, 3),
 | 
				
			||||||
 | 
							NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second),
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 3*time.Second, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 3*time.Second, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2, limiter.NumRequeues("two"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						limiter.Forget("one")
 | 
				
			||||||
 | 
						if e, a := 0, limiter.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 5*time.Millisecond, limiter.When("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,246 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sort"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/util/clock"
 | 
				
			||||||
 | 
						utilruntime "k8s.io/client-go/pkg/util/runtime"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
 | 
				
			||||||
 | 
					// requeue items after failures without ending up in a hot-loop.
 | 
				
			||||||
 | 
					type DelayingInterface interface {
 | 
				
			||||||
 | 
						Interface
 | 
				
			||||||
 | 
						// AddAfter adds an item to the workqueue after the indicated duration has passed
 | 
				
			||||||
 | 
						AddAfter(item interface{}, duration time.Duration)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewDelayingQueue constructs a new workqueue with delayed queuing ability
 | 
				
			||||||
 | 
					func NewDelayingQueue() DelayingInterface {
 | 
				
			||||||
 | 
						return newDelayingQueue(clock.RealClock{}, "")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewNamedDelayingQueue(name string) DelayingInterface {
 | 
				
			||||||
 | 
						return newDelayingQueue(clock.RealClock{}, name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
 | 
				
			||||||
 | 
						ret := &delayingType{
 | 
				
			||||||
 | 
							Interface:          NewNamed(name),
 | 
				
			||||||
 | 
							clock:              clock,
 | 
				
			||||||
 | 
							heartbeat:          clock.Tick(maxWait),
 | 
				
			||||||
 | 
							stopCh:             make(chan struct{}),
 | 
				
			||||||
 | 
							waitingTimeByEntry: map[t]time.Time{},
 | 
				
			||||||
 | 
							waitingForAddCh:    make(chan waitFor, 1000),
 | 
				
			||||||
 | 
							metrics:            newRetryMetrics(name),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go ret.waitingLoop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return ret
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// delayingType wraps an Interface and provides delayed re-enquing
 | 
				
			||||||
 | 
					type delayingType struct {
 | 
				
			||||||
 | 
						Interface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// clock tracks time for delayed firing
 | 
				
			||||||
 | 
						clock clock.Clock
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// stopCh lets us signal a shutdown to the waiting loop
 | 
				
			||||||
 | 
						stopCh chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// heartbeat ensures we wait no more than maxWait before firing
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
 | 
				
			||||||
 | 
						// clock.Tick will leak.
 | 
				
			||||||
 | 
						heartbeat <-chan time.Time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// waitingForAdd is an ordered slice of items to be added to the contained work queue
 | 
				
			||||||
 | 
						waitingForAdd []waitFor
 | 
				
			||||||
 | 
						// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
 | 
				
			||||||
 | 
						waitingTimeByEntry map[t]time.Time
 | 
				
			||||||
 | 
						// waitingForAddCh is a buffered channel that feeds waitingForAdd
 | 
				
			||||||
 | 
						waitingForAddCh chan waitFor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// metrics counts the number of retries
 | 
				
			||||||
 | 
						metrics retryMetrics
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// waitFor holds the data to add and the time it should be added
 | 
				
			||||||
 | 
					type waitFor struct {
 | 
				
			||||||
 | 
						data    t
 | 
				
			||||||
 | 
						readyAt time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ShutDown gives a way to shut off this queue
 | 
				
			||||||
 | 
					func (q *delayingType) ShutDown() {
 | 
				
			||||||
 | 
						q.Interface.ShutDown()
 | 
				
			||||||
 | 
						close(q.stopCh)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// AddAfter adds the given item to the work queue after the given delay
 | 
				
			||||||
 | 
					func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
 | 
				
			||||||
 | 
						// don't add if we're already shutting down
 | 
				
			||||||
 | 
						if q.ShuttingDown() {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.metrics.retry()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// immediately add things with no delay
 | 
				
			||||||
 | 
						if duration <= 0 {
 | 
				
			||||||
 | 
							q.Add(item)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-q.stopCh:
 | 
				
			||||||
 | 
							// unblock if ShutDown() is called
 | 
				
			||||||
 | 
						case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
 | 
				
			||||||
 | 
					// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
 | 
				
			||||||
 | 
					// expired item sitting for more than 10 seconds.
 | 
				
			||||||
 | 
					const maxWait = 10 * time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
 | 
				
			||||||
 | 
					func (q *delayingType) waitingLoop() {
 | 
				
			||||||
 | 
						defer utilruntime.HandleCrash()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Make a placeholder channel to use when there are no items in our list
 | 
				
			||||||
 | 
						never := make(<-chan time.Time)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							if q.Interface.ShuttingDown() {
 | 
				
			||||||
 | 
								// discard waiting entries
 | 
				
			||||||
 | 
								q.waitingForAdd = nil
 | 
				
			||||||
 | 
								q.waitingTimeByEntry = nil
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							now := q.clock.Now()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Add ready entries
 | 
				
			||||||
 | 
							readyEntries := 0
 | 
				
			||||||
 | 
							for _, entry := range q.waitingForAdd {
 | 
				
			||||||
 | 
								if entry.readyAt.After(now) {
 | 
				
			||||||
 | 
									break
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								q.Add(entry.data)
 | 
				
			||||||
 | 
								delete(q.waitingTimeByEntry, entry.data)
 | 
				
			||||||
 | 
								readyEntries++
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							q.waitingForAdd = q.waitingForAdd[readyEntries:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// Set up a wait for the first item's readyAt (if one exists)
 | 
				
			||||||
 | 
							nextReadyAt := never
 | 
				
			||||||
 | 
							if len(q.waitingForAdd) > 0 {
 | 
				
			||||||
 | 
								nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-q.stopCh:
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case <-q.heartbeat:
 | 
				
			||||||
 | 
								// continue the loop, which will add ready items
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case <-nextReadyAt:
 | 
				
			||||||
 | 
								// continue the loop, which will add ready items
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case waitEntry := <-q.waitingForAddCh:
 | 
				
			||||||
 | 
								if waitEntry.readyAt.After(q.clock.Now()) {
 | 
				
			||||||
 | 
									q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									q.Add(waitEntry.data)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								drained := false
 | 
				
			||||||
 | 
								for !drained {
 | 
				
			||||||
 | 
									select {
 | 
				
			||||||
 | 
									case waitEntry := <-q.waitingForAddCh:
 | 
				
			||||||
 | 
										if waitEntry.readyAt.After(q.clock.Now()) {
 | 
				
			||||||
 | 
											q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
 | 
				
			||||||
 | 
										} else {
 | 
				
			||||||
 | 
											q.Add(waitEntry.data)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									default:
 | 
				
			||||||
 | 
										drained = true
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// inserts the given entry into the sorted entries list
 | 
				
			||||||
 | 
					// same semantics as append()... the given slice may be modified,
 | 
				
			||||||
 | 
					// and the returned value should be used
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// TODO: This should probably be converted to use container/heap to improve
 | 
				
			||||||
 | 
					// running time for a large number of items.
 | 
				
			||||||
 | 
					func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
 | 
				
			||||||
 | 
						// if the entry is already in our retry list and the existing time is before the new one, just skip it
 | 
				
			||||||
 | 
						existingTime, exists := knownEntries[entry.data]
 | 
				
			||||||
 | 
						if exists && existingTime.Before(entry.readyAt) {
 | 
				
			||||||
 | 
							return entries
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// if the entry exists and is scheduled for later, go ahead and remove the entry
 | 
				
			||||||
 | 
						if exists {
 | 
				
			||||||
 | 
							if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
 | 
				
			||||||
 | 
								entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						insertionIndex := sort.Search(len(entries), func(i int) bool {
 | 
				
			||||||
 | 
							return entry.readyAt.Before(entries[i].readyAt)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// grow by 1
 | 
				
			||||||
 | 
						entries = append(entries, waitFor{})
 | 
				
			||||||
 | 
						// shift items from the insertion point to the end
 | 
				
			||||||
 | 
						copy(entries[insertionIndex+1:], entries[insertionIndex:])
 | 
				
			||||||
 | 
						// insert the record
 | 
				
			||||||
 | 
						entries[insertionIndex] = entry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						knownEntries[entry.data] = entry.readyAt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return entries
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// findEntryIndex returns the index for an existing entry
 | 
				
			||||||
 | 
					func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
 | 
				
			||||||
 | 
						index := sort.Search(len(entries), func(i int) bool {
 | 
				
			||||||
 | 
							return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// we know this is the earliest possible index, but there could be multiple with the same time
 | 
				
			||||||
 | 
						// iterate from here to find the dupe
 | 
				
			||||||
 | 
						for ; index < len(entries); index++ {
 | 
				
			||||||
 | 
							if entries[index].data == data {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return index
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,236 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/util/clock"
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/util/wait"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestSimpleQueue(t *testing.T) {
 | 
				
			||||||
 | 
						fakeClock := clock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
						q := newDelayingQueue(fakeClock, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						first := "foo"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.AddAfter(first, 50*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(60 * time.Millisecond)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Errorf("should have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ := q.Get()
 | 
				
			||||||
 | 
						q.Done(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// step past the next heartbeat
 | 
				
			||||||
 | 
						fakeClock.Step(10 * time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
 | 
				
			||||||
 | 
							if q.Len() > 0 {
 | 
				
			||||||
 | 
								return false, fmt.Errorf("added to queue")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != wait.ErrWaitTimeout {
 | 
				
			||||||
 | 
							t.Errorf("expected timeout, got: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestDeduping(t *testing.T) {
 | 
				
			||||||
 | 
						fakeClock := clock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
						q := newDelayingQueue(fakeClock, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						first := "foo"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.AddAfter(first, 50*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						q.AddAfter(first, 70*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// step past the first block, we should receive now
 | 
				
			||||||
 | 
						fakeClock.Step(60 * time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Errorf("should have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ := q.Get()
 | 
				
			||||||
 | 
						q.Done(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// step past the second add
 | 
				
			||||||
 | 
						fakeClock.Step(20 * time.Millisecond)
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// test again, but this time the earlier should override
 | 
				
			||||||
 | 
						q.AddAfter(first, 50*time.Millisecond)
 | 
				
			||||||
 | 
						q.AddAfter(first, 30*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(40 * time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Errorf("should have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ = q.Get()
 | 
				
			||||||
 | 
						q.Done(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// step past the second add
 | 
				
			||||||
 | 
						fakeClock.Step(20 * time.Millisecond)
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestAddTwoFireEarly(t *testing.T) {
 | 
				
			||||||
 | 
						fakeClock := clock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
						q := newDelayingQueue(fakeClock, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						first := "foo"
 | 
				
			||||||
 | 
						second := "bar"
 | 
				
			||||||
 | 
						third := "baz"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.AddAfter(first, 1*time.Second)
 | 
				
			||||||
 | 
						q.AddAfter(second, 50*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(60 * time.Millisecond)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ := q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(item, second) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", second, item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.AddAfter(third, 2*time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(1 * time.Second)
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ = q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(item, first) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", first, item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(2 * time.Second)
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 1); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						item, _ = q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(item, third) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", third, item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCopyShifting(t *testing.T) {
 | 
				
			||||||
 | 
						fakeClock := clock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
						q := newDelayingQueue(fakeClock, "")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						first := "foo"
 | 
				
			||||||
 | 
						second := "bar"
 | 
				
			||||||
 | 
						third := "baz"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.AddAfter(first, 1*time.Second)
 | 
				
			||||||
 | 
						q.AddAfter(second, 500*time.Millisecond)
 | 
				
			||||||
 | 
						q.AddAfter(third, 250*time.Millisecond)
 | 
				
			||||||
 | 
						if err := waitForWaitingQueueToFill(q); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if q.Len() != 0 {
 | 
				
			||||||
 | 
							t.Errorf("should not have added")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fakeClock.Step(2 * time.Second)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if err := waitForAdded(q, 3); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("unexpected err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						actualFirst, _ := q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(actualFirst, third) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", third, actualFirst)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						actualSecond, _ := q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(actualSecond, second) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", second, actualSecond)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						actualThird, _ := q.Get()
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(actualThird, first) {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", first, actualThird)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func waitForAdded(q DelayingInterface, depth int) error {
 | 
				
			||||||
 | 
						return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
 | 
				
			||||||
 | 
							if q.Len() == depth {
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func waitForWaitingQueueToFill(q DelayingInterface) error {
 | 
				
			||||||
 | 
						return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
 | 
				
			||||||
 | 
							if len(q.(*delayingType).waitingForAddCh) == 0 {
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										26
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/doc.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,26 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2014 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Package workqueue provides a simple queue that supports the following
 | 
				
			||||||
 | 
					// features:
 | 
				
			||||||
 | 
					//  * Fair: items processed in the order in which they are added.
 | 
				
			||||||
 | 
					//  * Stingy: a single item will not be processed multiple times concurrently,
 | 
				
			||||||
 | 
					//      and if an item is added multiple times before it can be processed, it
 | 
				
			||||||
 | 
					//      will only be processed once.
 | 
				
			||||||
 | 
					//  * Multiple consumers and producers. In particular, it is allowed for an
 | 
				
			||||||
 | 
					//      item to be reenqueued while it is being processed.
 | 
				
			||||||
 | 
					//  * Shutdown notifications.
 | 
				
			||||||
 | 
					package workqueue // import "k8s.io/client-go/pkg/util/workqueue"
 | 
				
			||||||
							
								
								
									
										195
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										195
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/metrics.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,195 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// This file provides abstractions for setting the provider (e.g., prometheus)
 | 
				
			||||||
 | 
					// of metrics.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type queueMetrics interface {
 | 
				
			||||||
 | 
						add(item t)
 | 
				
			||||||
 | 
						get(item t)
 | 
				
			||||||
 | 
						done(item t)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GaugeMetric represents a single numerical value that can arbitrarily go up
 | 
				
			||||||
 | 
					// and down.
 | 
				
			||||||
 | 
					type GaugeMetric interface {
 | 
				
			||||||
 | 
						Inc()
 | 
				
			||||||
 | 
						Dec()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CounterMetric represents a single numerical value that only ever
 | 
				
			||||||
 | 
					// goes up.
 | 
				
			||||||
 | 
					type CounterMetric interface {
 | 
				
			||||||
 | 
						Inc()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SummaryMetric captures individual observations.
 | 
				
			||||||
 | 
					type SummaryMetric interface {
 | 
				
			||||||
 | 
						Observe(float64)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type noopMetric struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (noopMetric) Inc()            {}
 | 
				
			||||||
 | 
					func (noopMetric) Dec()            {}
 | 
				
			||||||
 | 
					func (noopMetric) Observe(float64) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type defaultQueueMetrics struct {
 | 
				
			||||||
 | 
						// current depth of a workqueue
 | 
				
			||||||
 | 
						depth GaugeMetric
 | 
				
			||||||
 | 
						// total number of adds handled by a workqueue
 | 
				
			||||||
 | 
						adds CounterMetric
 | 
				
			||||||
 | 
						// how long an item stays in a workqueue
 | 
				
			||||||
 | 
						latency SummaryMetric
 | 
				
			||||||
 | 
						// how long processing an item from a workqueue takes
 | 
				
			||||||
 | 
						workDuration         SummaryMetric
 | 
				
			||||||
 | 
						addTimes             map[t]time.Time
 | 
				
			||||||
 | 
						processingStartTimes map[t]time.Time
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *defaultQueueMetrics) add(item t) {
 | 
				
			||||||
 | 
						if m == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m.adds.Inc()
 | 
				
			||||||
 | 
						m.depth.Inc()
 | 
				
			||||||
 | 
						if _, exists := m.addTimes[item]; !exists {
 | 
				
			||||||
 | 
							m.addTimes[item] = time.Now()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *defaultQueueMetrics) get(item t) {
 | 
				
			||||||
 | 
						if m == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m.depth.Dec()
 | 
				
			||||||
 | 
						m.processingStartTimes[item] = time.Now()
 | 
				
			||||||
 | 
						if startTime, exists := m.addTimes[item]; exists {
 | 
				
			||||||
 | 
							m.latency.Observe(sinceInMicroseconds(startTime))
 | 
				
			||||||
 | 
							delete(m.addTimes, item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *defaultQueueMetrics) done(item t) {
 | 
				
			||||||
 | 
						if m == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if startTime, exists := m.processingStartTimes[item]; exists {
 | 
				
			||||||
 | 
							m.workDuration.Observe(sinceInMicroseconds(startTime))
 | 
				
			||||||
 | 
							delete(m.processingStartTimes, item)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Gets the time since the specified start in microseconds.
 | 
				
			||||||
 | 
					func sinceInMicroseconds(start time.Time) float64 {
 | 
				
			||||||
 | 
						return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type retryMetrics interface {
 | 
				
			||||||
 | 
						retry()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type defaultRetryMetrics struct {
 | 
				
			||||||
 | 
						retries CounterMetric
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (m *defaultRetryMetrics) retry() {
 | 
				
			||||||
 | 
						if m == nil {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						m.retries.Inc()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// MetricsProvider generates various metrics used by the queue.
 | 
				
			||||||
 | 
					type MetricsProvider interface {
 | 
				
			||||||
 | 
						NewDepthMetric(name string) GaugeMetric
 | 
				
			||||||
 | 
						NewAddsMetric(name string) CounterMetric
 | 
				
			||||||
 | 
						NewLatencyMetric(name string) SummaryMetric
 | 
				
			||||||
 | 
						NewWorkDurationMetric(name string) SummaryMetric
 | 
				
			||||||
 | 
						NewRetriesMetric(name string) CounterMetric
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type noopMetricsProvider struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
 | 
				
			||||||
 | 
						return noopMetric{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
 | 
				
			||||||
 | 
						return noopMetric{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (_ noopMetricsProvider) NewLatencyMetric(name string) SummaryMetric {
 | 
				
			||||||
 | 
						return noopMetric{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (_ noopMetricsProvider) NewWorkDurationMetric(name string) SummaryMetric {
 | 
				
			||||||
 | 
						return noopMetric{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
 | 
				
			||||||
 | 
						return noopMetric{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var metricsFactory = struct {
 | 
				
			||||||
 | 
						metricsProvider MetricsProvider
 | 
				
			||||||
 | 
						setProviders    sync.Once
 | 
				
			||||||
 | 
					}{
 | 
				
			||||||
 | 
						metricsProvider: noopMetricsProvider{},
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newQueueMetrics(name string) queueMetrics {
 | 
				
			||||||
 | 
						var ret *defaultQueueMetrics
 | 
				
			||||||
 | 
						if len(name) == 0 {
 | 
				
			||||||
 | 
							return ret
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &defaultQueueMetrics{
 | 
				
			||||||
 | 
							depth:                metricsFactory.metricsProvider.NewDepthMetric(name),
 | 
				
			||||||
 | 
							adds:                 metricsFactory.metricsProvider.NewAddsMetric(name),
 | 
				
			||||||
 | 
							latency:              metricsFactory.metricsProvider.NewLatencyMetric(name),
 | 
				
			||||||
 | 
							workDuration:         metricsFactory.metricsProvider.NewWorkDurationMetric(name),
 | 
				
			||||||
 | 
							addTimes:             map[t]time.Time{},
 | 
				
			||||||
 | 
							processingStartTimes: map[t]time.Time{},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func newRetryMetrics(name string) retryMetrics {
 | 
				
			||||||
 | 
						var ret *defaultRetryMetrics
 | 
				
			||||||
 | 
						if len(name) == 0 {
 | 
				
			||||||
 | 
							return ret
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return &defaultRetryMetrics{
 | 
				
			||||||
 | 
							retries: metricsFactory.metricsProvider.NewRetriesMetric(name),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SetProvider sets the metrics provider of the metricsFactory.
 | 
				
			||||||
 | 
					func SetProvider(metricsProvider MetricsProvider) {
 | 
				
			||||||
 | 
						metricsFactory.setProviders.Do(func() {
 | 
				
			||||||
 | 
							metricsFactory.metricsProvider = metricsProvider
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,52 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						utilruntime "k8s.io/client-go/pkg/util/runtime"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type DoWorkPieceFunc func(piece int)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Parallelize is a very simple framework that allow for parallelizing
 | 
				
			||||||
 | 
					// N independent pieces of work.
 | 
				
			||||||
 | 
					func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
 | 
				
			||||||
 | 
						toProcess := make(chan int, pieces)
 | 
				
			||||||
 | 
						for i := 0; i < pieces; i++ {
 | 
				
			||||||
 | 
							toProcess <- i
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						close(toProcess)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if pieces < workers {
 | 
				
			||||||
 | 
							workers = pieces
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						wg := sync.WaitGroup{}
 | 
				
			||||||
 | 
						wg.Add(workers)
 | 
				
			||||||
 | 
						for i := 0; i < workers; i++ {
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								defer utilruntime.HandleCrash()
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
								for piece := range toProcess {
 | 
				
			||||||
 | 
									doWorkPiece(piece)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										172
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										172
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/queue.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,172 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Interface interface {
 | 
				
			||||||
 | 
						Add(item interface{})
 | 
				
			||||||
 | 
						Len() int
 | 
				
			||||||
 | 
						Get() (item interface{}, shutdown bool)
 | 
				
			||||||
 | 
						Done(item interface{})
 | 
				
			||||||
 | 
						ShutDown()
 | 
				
			||||||
 | 
						ShuttingDown() bool
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// New constructs a new workqueue (see the package comment).
 | 
				
			||||||
 | 
					func New() *Type {
 | 
				
			||||||
 | 
						return NewNamed("")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewNamed(name string) *Type {
 | 
				
			||||||
 | 
						return &Type{
 | 
				
			||||||
 | 
							dirty:      set{},
 | 
				
			||||||
 | 
							processing: set{},
 | 
				
			||||||
 | 
							cond:       sync.NewCond(&sync.Mutex{}),
 | 
				
			||||||
 | 
							metrics:    newQueueMetrics(name),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Type is a work queue (see the package comment).
 | 
				
			||||||
 | 
					type Type struct {
 | 
				
			||||||
 | 
						// queue defines the order in which we will work on items. Every
 | 
				
			||||||
 | 
						// element of queue should be in the dirty set and not in the
 | 
				
			||||||
 | 
						// processing set.
 | 
				
			||||||
 | 
						queue []t
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// dirty defines all of the items that need to be processed.
 | 
				
			||||||
 | 
						dirty set
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Things that are currently being processed are in the processing set.
 | 
				
			||||||
 | 
						// These things may be simultaneously in the dirty set. When we finish
 | 
				
			||||||
 | 
						// processing something and remove it from this set, we'll check if
 | 
				
			||||||
 | 
						// it's in the dirty set, and if so, add it to the queue.
 | 
				
			||||||
 | 
						processing set
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cond *sync.Cond
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						shuttingDown bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						metrics queueMetrics
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type empty struct{}
 | 
				
			||||||
 | 
					type t interface{}
 | 
				
			||||||
 | 
					type set map[t]empty
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s set) has(item t) bool {
 | 
				
			||||||
 | 
						_, exists := s[item]
 | 
				
			||||||
 | 
						return exists
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s set) insert(item t) {
 | 
				
			||||||
 | 
						s[item] = empty{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (s set) delete(item t) {
 | 
				
			||||||
 | 
						delete(s, item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Add marks item as needing processing.
 | 
				
			||||||
 | 
					func (q *Type) Add(item interface{}) {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
						if q.shuttingDown {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if q.dirty.has(item) {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.metrics.add(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.dirty.insert(item)
 | 
				
			||||||
 | 
						if q.processing.has(item) {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.queue = append(q.queue, item)
 | 
				
			||||||
 | 
						q.cond.Signal()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Len returns the current queue length, for informational purposes only. You
 | 
				
			||||||
 | 
					// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
 | 
				
			||||||
 | 
					// value, that can't be synchronized properly.
 | 
				
			||||||
 | 
					func (q *Type) Len() int {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
						return len(q.queue)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Get blocks until it can return an item to be processed. If shutdown = true,
 | 
				
			||||||
 | 
					// the caller should end their goroutine. You must call Done with item when you
 | 
				
			||||||
 | 
					// have finished processing it.
 | 
				
			||||||
 | 
					func (q *Type) Get() (item interface{}, shutdown bool) {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
						for len(q.queue) == 0 && !q.shuttingDown {
 | 
				
			||||||
 | 
							q.cond.Wait()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(q.queue) == 0 {
 | 
				
			||||||
 | 
							// We must be shutting down.
 | 
				
			||||||
 | 
							return nil, true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						item, q.queue = q.queue[0], q.queue[1:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.metrics.get(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.processing.insert(item)
 | 
				
			||||||
 | 
						q.dirty.delete(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return item, false
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Done marks item as done processing, and if it has been marked as dirty again
 | 
				
			||||||
 | 
					// while it was being processed, it will be re-added to the queue for
 | 
				
			||||||
 | 
					// re-processing.
 | 
				
			||||||
 | 
					func (q *Type) Done(item interface{}) {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.metrics.done(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q.processing.delete(item)
 | 
				
			||||||
 | 
						if q.dirty.has(item) {
 | 
				
			||||||
 | 
							q.queue = append(q.queue, item)
 | 
				
			||||||
 | 
							q.cond.Signal()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ShutDown will cause q to ignore all new items added to it. As soon as the
 | 
				
			||||||
 | 
					// worker goroutines have drained the existing items in the queue, they will be
 | 
				
			||||||
 | 
					// instructed to exit.
 | 
				
			||||||
 | 
					func (q *Type) ShutDown() {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
						q.shuttingDown = true
 | 
				
			||||||
 | 
						q.cond.Broadcast()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *Type) ShuttingDown() bool {
 | 
				
			||||||
 | 
						q.cond.L.Lock()
 | 
				
			||||||
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return q.shuttingDown
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										161
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										161
									
								
								staging/src/k8s.io/client-go/pkg/util/workqueue/queue_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,161 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2015 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue_test
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/util/workqueue"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestBasic(t *testing.T) {
 | 
				
			||||||
 | 
						// If something is seriously wrong this test will never complete.
 | 
				
			||||||
 | 
						q := workqueue.New()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start producers
 | 
				
			||||||
 | 
						const producers = 50
 | 
				
			||||||
 | 
						producerWG := sync.WaitGroup{}
 | 
				
			||||||
 | 
						producerWG.Add(producers)
 | 
				
			||||||
 | 
						for i := 0; i < producers; i++ {
 | 
				
			||||||
 | 
							go func(i int) {
 | 
				
			||||||
 | 
								defer producerWG.Done()
 | 
				
			||||||
 | 
								for j := 0; j < 50; j++ {
 | 
				
			||||||
 | 
									q.Add(i)
 | 
				
			||||||
 | 
									time.Sleep(time.Millisecond)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start consumers
 | 
				
			||||||
 | 
						const consumers = 10
 | 
				
			||||||
 | 
						consumerWG := sync.WaitGroup{}
 | 
				
			||||||
 | 
						consumerWG.Add(consumers)
 | 
				
			||||||
 | 
						for i := 0; i < consumers; i++ {
 | 
				
			||||||
 | 
							go func(i int) {
 | 
				
			||||||
 | 
								defer consumerWG.Done()
 | 
				
			||||||
 | 
								for {
 | 
				
			||||||
 | 
									item, quit := q.Get()
 | 
				
			||||||
 | 
									if item == "added after shutdown!" {
 | 
				
			||||||
 | 
										t.Errorf("Got an item added after shutdown.")
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									if quit {
 | 
				
			||||||
 | 
										return
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									t.Logf("Worker %v: begin processing %v", i, item)
 | 
				
			||||||
 | 
									time.Sleep(3 * time.Millisecond)
 | 
				
			||||||
 | 
									t.Logf("Worker %v: done processing %v", i, item)
 | 
				
			||||||
 | 
									q.Done(item)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						producerWG.Wait()
 | 
				
			||||||
 | 
						q.ShutDown()
 | 
				
			||||||
 | 
						q.Add("added after shutdown!")
 | 
				
			||||||
 | 
						consumerWG.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestAddWhileProcessing(t *testing.T) {
 | 
				
			||||||
 | 
						q := workqueue.New()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start producers
 | 
				
			||||||
 | 
						const producers = 50
 | 
				
			||||||
 | 
						producerWG := sync.WaitGroup{}
 | 
				
			||||||
 | 
						producerWG.Add(producers)
 | 
				
			||||||
 | 
						for i := 0; i < producers; i++ {
 | 
				
			||||||
 | 
							go func(i int) {
 | 
				
			||||||
 | 
								defer producerWG.Done()
 | 
				
			||||||
 | 
								q.Add(i)
 | 
				
			||||||
 | 
							}(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start consumers
 | 
				
			||||||
 | 
						const consumers = 10
 | 
				
			||||||
 | 
						consumerWG := sync.WaitGroup{}
 | 
				
			||||||
 | 
						consumerWG.Add(consumers)
 | 
				
			||||||
 | 
						for i := 0; i < consumers; i++ {
 | 
				
			||||||
 | 
							go func(i int) {
 | 
				
			||||||
 | 
								defer consumerWG.Done()
 | 
				
			||||||
 | 
								// Every worker will re-add every item up to two times.
 | 
				
			||||||
 | 
								// This tests the dirty-while-processing case.
 | 
				
			||||||
 | 
								counters := map[interface{}]int{}
 | 
				
			||||||
 | 
								for {
 | 
				
			||||||
 | 
									item, quit := q.Get()
 | 
				
			||||||
 | 
									if quit {
 | 
				
			||||||
 | 
										return
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									counters[item]++
 | 
				
			||||||
 | 
									if counters[item] < 2 {
 | 
				
			||||||
 | 
										q.Add(item)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									q.Done(item)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}(i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						producerWG.Wait()
 | 
				
			||||||
 | 
						q.ShutDown()
 | 
				
			||||||
 | 
						consumerWG.Wait()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestLen(t *testing.T) {
 | 
				
			||||||
 | 
						q := workqueue.New()
 | 
				
			||||||
 | 
						q.Add("foo")
 | 
				
			||||||
 | 
						if e, a := 1, q.Len(); e != a {
 | 
				
			||||||
 | 
							t.Errorf("Expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						q.Add("bar")
 | 
				
			||||||
 | 
						if e, a := 2, q.Len(); e != a {
 | 
				
			||||||
 | 
							t.Errorf("Expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						q.Add("foo") // should not increase the queue length.
 | 
				
			||||||
 | 
						if e, a := 2, q.Len(); e != a {
 | 
				
			||||||
 | 
							t.Errorf("Expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestReinsert(t *testing.T) {
 | 
				
			||||||
 | 
						q := workqueue.New()
 | 
				
			||||||
 | 
						q.Add("foo")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Start processing
 | 
				
			||||||
 | 
						i, _ := q.Get()
 | 
				
			||||||
 | 
						if i != "foo" {
 | 
				
			||||||
 | 
							t.Errorf("Expected %v, got %v", "foo", i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Add it back while processing
 | 
				
			||||||
 | 
						q.Add(i)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Finish it up
 | 
				
			||||||
 | 
						q.Done(i)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// It should be back on the queue
 | 
				
			||||||
 | 
						i, _ = q.Get()
 | 
				
			||||||
 | 
						if i != "foo" {
 | 
				
			||||||
 | 
							t.Errorf("Expected %v, got %v", "foo", i)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Finish that one up
 | 
				
			||||||
 | 
						q.Done(i)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if a := q.Len(); a != 0 {
 | 
				
			||||||
 | 
							t.Errorf("Expected queue to be empty. Has %v items", a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,69 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// RateLimitingInterface is an interface that rate limits items being added to the queue.
 | 
				
			||||||
 | 
					type RateLimitingInterface interface {
 | 
				
			||||||
 | 
						DelayingInterface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
 | 
				
			||||||
 | 
						AddRateLimited(item interface{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
 | 
				
			||||||
 | 
						// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
 | 
				
			||||||
 | 
						// still have to call `Done` on the queue.
 | 
				
			||||||
 | 
						Forget(item interface{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// NumRequeues returns back how many times the item was requeued
 | 
				
			||||||
 | 
						NumRequeues(item interface{}) int
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
 | 
				
			||||||
 | 
					// Remember to call Forget!  If you don't, you may end up tracking failures forever.
 | 
				
			||||||
 | 
					func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
 | 
				
			||||||
 | 
						return &rateLimitingType{
 | 
				
			||||||
 | 
							DelayingInterface: NewDelayingQueue(),
 | 
				
			||||||
 | 
							rateLimiter:       rateLimiter,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
 | 
				
			||||||
 | 
						return &rateLimitingType{
 | 
				
			||||||
 | 
							DelayingInterface: NewNamedDelayingQueue(name),
 | 
				
			||||||
 | 
							rateLimiter:       rateLimiter,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// rateLimitingType wraps an Interface and provides rateLimited re-enquing
 | 
				
			||||||
 | 
					type rateLimitingType struct {
 | 
				
			||||||
 | 
						DelayingInterface
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						rateLimiter RateLimiter
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// AddRateLimited AddAfter's the item based on the time when the rate limiter says its ok
 | 
				
			||||||
 | 
					func (q *rateLimitingType) AddRateLimited(item interface{}) {
 | 
				
			||||||
 | 
						q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *rateLimitingType) NumRequeues(item interface{}) int {
 | 
				
			||||||
 | 
						return q.rateLimiter.NumRequeues(item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *rateLimitingType) Forget(item interface{}) {
 | 
				
			||||||
 | 
						q.rateLimiter.Forget(item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,75 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/util/clock"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestRateLimitingQueue(t *testing.T) {
 | 
				
			||||||
 | 
						limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
 | 
				
			||||||
 | 
						queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
 | 
				
			||||||
 | 
						fakeClock := clock.NewFakeClock(time.Now())
 | 
				
			||||||
 | 
						delayingQueue := &delayingType{
 | 
				
			||||||
 | 
							Interface:       New(),
 | 
				
			||||||
 | 
							clock:           fakeClock,
 | 
				
			||||||
 | 
							heartbeat:       fakeClock.Tick(maxWait),
 | 
				
			||||||
 | 
							stopCh:          make(chan struct{}),
 | 
				
			||||||
 | 
							waitingForAddCh: make(chan waitFor, 1000),
 | 
				
			||||||
 | 
							metrics:         newRetryMetrics(""),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						queue.DelayingInterface = delayingQueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.AddRateLimited("one")
 | 
				
			||||||
 | 
						waitEntry := <-delayingQueue.waitingForAddCh
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						queue.AddRateLimited("one")
 | 
				
			||||||
 | 
						waitEntry = <-delayingQueue.waitingForAddCh
 | 
				
			||||||
 | 
						if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if e, a := 2, queue.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.AddRateLimited("two")
 | 
				
			||||||
 | 
						waitEntry = <-delayingQueue.waitingForAddCh
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						queue.AddRateLimited("two")
 | 
				
			||||||
 | 
						waitEntry = <-delayingQueue.waitingForAddCh
 | 
				
			||||||
 | 
						if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.Forget("one")
 | 
				
			||||||
 | 
						if e, a := 0, queue.NumRequeues("one"); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						queue.AddRateLimited("one")
 | 
				
			||||||
 | 
						waitEntry = <-delayingQueue.waitingForAddCh
 | 
				
			||||||
 | 
						if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %v, got %v", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,52 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import "time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type TimedWorkQueue struct {
 | 
				
			||||||
 | 
						*Type
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type TimedWorkQueueItem struct {
 | 
				
			||||||
 | 
						StartTime time.Time
 | 
				
			||||||
 | 
						Object    interface{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewTimedWorkQueue() *TimedWorkQueue {
 | 
				
			||||||
 | 
						return &TimedWorkQueue{New()}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Add adds the obj along with the current timestamp to the queue.
 | 
				
			||||||
 | 
					func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem) {
 | 
				
			||||||
 | 
						q.Type.Add(timedItem)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Get gets the obj along with its timestamp from the queue.
 | 
				
			||||||
 | 
					func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool) {
 | 
				
			||||||
 | 
						origin, shutdown := q.Type.Get()
 | 
				
			||||||
 | 
						if origin == nil {
 | 
				
			||||||
 | 
							return nil, shutdown
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						timedItem, _ = origin.(*TimedWorkQueueItem)
 | 
				
			||||||
 | 
						return timedItem, shutdown
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error {
 | 
				
			||||||
 | 
						q.Type.Done(timedItem)
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,38 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2016 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package workqueue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"testing"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/pkg/api/v1"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestNoMemoryLeak(t *testing.T) {
 | 
				
			||||||
 | 
						timedQueue := NewTimedWorkQueue()
 | 
				
			||||||
 | 
						timedQueue.Add(&TimedWorkQueueItem{Object: &v1.Pod{}, StartTime: time.Time{}})
 | 
				
			||||||
 | 
						item, _ := timedQueue.Get()
 | 
				
			||||||
 | 
						timedQueue.Add(item)
 | 
				
			||||||
 | 
						// The item should still be in the timedQueue.
 | 
				
			||||||
 | 
						timedQueue.Done(item)
 | 
				
			||||||
 | 
						item, _ = timedQueue.Get()
 | 
				
			||||||
 | 
						timedQueue.Done(item)
 | 
				
			||||||
 | 
						if len(timedQueue.Type.processing) != 0 {
 | 
				
			||||||
 | 
							t.Errorf("expect timedQueue.Type.processing to be empty!")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user