workqueue: make queue as configurable
The default queue implementation is mostly FIFO and it is not exchangeable unless we implement the whole `workqueue.Interface` which is less desirable as we have to duplicate a lot of code. There was one attempt done in [kubernetes/kubernetes#109349][1] which tried to implement a priority queue. That is really useful and [knative/pkg][2] implemented something called two-lane-queue. While two lane queue is great, but isn't perfect since a full slow queue can still slow down items in fast queue. This change proposes a swappable queue implementation while not adding extra maintenance effort in kubernetes community. We are happy to maintain our own queue implementation (similar to two-lane-queue) in downstream. [1]: https://github.com/kubernetes/kubernetes/pull/109349 [2]: https://github.com/knative/pkg/blob/main/controller/two_lane_queue.go
This commit is contained in:
		
				
					committed by
					
						
						zhouhaibing089
					
				
			
			
				
	
			
			
			
						parent
						
							20d0ab7ae8
						
					
				
				
					commit
					87b4279e07
				
			@@ -41,7 +41,7 @@ func TestMetricShutdown(t *testing.T) {
 | 
				
			|||||||
		updateCalled: ch,
 | 
							updateCalled: ch,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	c := testingclock.NewFakeClock(time.Now())
 | 
						c := testingclock.NewFakeClock(time.Now())
 | 
				
			||||||
	q := newQueue(c, m, time.Millisecond)
 | 
						q := newQueue(c, DefaultQueue(), m, time.Millisecond)
 | 
				
			||||||
	for !c.HasWaiters() {
 | 
						for !c.HasWaiters() {
 | 
				
			||||||
		// Wait for the go routine to call NewTicker()
 | 
							// Wait for the go routine to call NewTicker()
 | 
				
			||||||
		time.Sleep(time.Millisecond)
 | 
							time.Sleep(time.Millisecond)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,6 +33,48 @@ type Interface interface {
 | 
				
			|||||||
	ShuttingDown() bool
 | 
						ShuttingDown() bool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Queue is the underlying storage for items. The functions below are always
 | 
				
			||||||
 | 
					// called from the same goroutine.
 | 
				
			||||||
 | 
					type Queue interface {
 | 
				
			||||||
 | 
						// Touch can be hooked when an existing item is added again. This may be
 | 
				
			||||||
 | 
						// useful if the implementation allows priority change for the given item.
 | 
				
			||||||
 | 
						Touch(item interface{})
 | 
				
			||||||
 | 
						// Push adds a new item.
 | 
				
			||||||
 | 
						Push(item interface{})
 | 
				
			||||||
 | 
						// Len tells the total number of items.
 | 
				
			||||||
 | 
						Len() int
 | 
				
			||||||
 | 
						// Pop retrieves an item.
 | 
				
			||||||
 | 
						Pop() (item interface{})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DefaultQueue is a slice based FIFO queue.
 | 
				
			||||||
 | 
					func DefaultQueue() Queue {
 | 
				
			||||||
 | 
						return new(queue)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// queue is a slice which implements Queue.
 | 
				
			||||||
 | 
					type queue []interface{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *queue) Touch(item interface{}) {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *queue) Push(item interface{}) {
 | 
				
			||||||
 | 
						*q = append(*q, item)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *queue) Len() int {
 | 
				
			||||||
 | 
						return len(*q)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (q *queue) Pop() (item interface{}) {
 | 
				
			||||||
 | 
						item = (*q)[0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// The underlying array still exists and reference this object, so the object will not be garbage collected.
 | 
				
			||||||
 | 
						(*q)[0] = nil
 | 
				
			||||||
 | 
						*q = (*q)[1:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return item
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// QueueConfig specifies optional configurations to customize an Interface.
 | 
					// QueueConfig specifies optional configurations to customize an Interface.
 | 
				
			||||||
type QueueConfig struct {
 | 
					type QueueConfig struct {
 | 
				
			||||||
	// Name for the queue. If unnamed, the metrics will not be registered.
 | 
						// Name for the queue. If unnamed, the metrics will not be registered.
 | 
				
			||||||
@@ -44,6 +86,9 @@ type QueueConfig struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Clock ability to inject real or fake clock for testing purposes.
 | 
						// Clock ability to inject real or fake clock for testing purposes.
 | 
				
			||||||
	Clock clock.WithTicker
 | 
						Clock clock.WithTicker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
 | 
				
			||||||
 | 
						Queue Queue
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// New constructs a new work queue (see the package comment).
 | 
					// New constructs a new work queue (see the package comment).
 | 
				
			||||||
@@ -83,16 +128,22 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
 | 
				
			|||||||
		config.Clock = clock.RealClock{}
 | 
							config.Clock = clock.RealClock{}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if config.Queue == nil {
 | 
				
			||||||
 | 
							config.Queue = DefaultQueue()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return newQueue(
 | 
						return newQueue(
 | 
				
			||||||
		config.Clock,
 | 
							config.Clock,
 | 
				
			||||||
 | 
							config.Queue,
 | 
				
			||||||
		metricsFactory.newQueueMetrics(config.Name, config.Clock),
 | 
							metricsFactory.newQueueMetrics(config.Name, config.Clock),
 | 
				
			||||||
		updatePeriod,
 | 
							updatePeriod,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
 | 
					func newQueue(c clock.WithTicker, queue Queue, metrics queueMetrics, updatePeriod time.Duration) *Type {
 | 
				
			||||||
	t := &Type{
 | 
						t := &Type{
 | 
				
			||||||
		clock:                      c,
 | 
							clock:                      c,
 | 
				
			||||||
 | 
							queue:                      queue,
 | 
				
			||||||
		dirty:                      set{},
 | 
							dirty:                      set{},
 | 
				
			||||||
		processing:                 set{},
 | 
							processing:                 set{},
 | 
				
			||||||
		cond:                       sync.NewCond(&sync.Mutex{}),
 | 
							cond:                       sync.NewCond(&sync.Mutex{}),
 | 
				
			||||||
@@ -116,7 +167,7 @@ type Type struct {
 | 
				
			|||||||
	// queue defines the order in which we will work on items. Every
 | 
						// queue defines the order in which we will work on items. Every
 | 
				
			||||||
	// element of queue should be in the dirty set and not in the
 | 
						// element of queue should be in the dirty set and not in the
 | 
				
			||||||
	// processing set.
 | 
						// processing set.
 | 
				
			||||||
	queue []t
 | 
						queue Queue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// dirty defines all of the items that need to be processed.
 | 
						// dirty defines all of the items that need to be processed.
 | 
				
			||||||
	dirty set
 | 
						dirty set
 | 
				
			||||||
@@ -167,6 +218,11 @@ func (q *Type) Add(item interface{}) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if q.dirty.has(item) {
 | 
						if q.dirty.has(item) {
 | 
				
			||||||
 | 
							// the same item is added again before it is processed, call the Touch
 | 
				
			||||||
 | 
							// function if the queue cares about it (for e.g, reset its priority)
 | 
				
			||||||
 | 
							if !q.processing.has(item) {
 | 
				
			||||||
 | 
								q.queue.Touch(item)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -177,7 +233,7 @@ func (q *Type) Add(item interface{}) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	q.queue = append(q.queue, item)
 | 
						q.queue.Push(item)
 | 
				
			||||||
	q.cond.Signal()
 | 
						q.cond.Signal()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -187,7 +243,7 @@ func (q *Type) Add(item interface{}) {
 | 
				
			|||||||
func (q *Type) Len() int {
 | 
					func (q *Type) Len() int {
 | 
				
			||||||
	q.cond.L.Lock()
 | 
						q.cond.L.Lock()
 | 
				
			||||||
	defer q.cond.L.Unlock()
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
	return len(q.queue)
 | 
						return q.queue.Len()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Get blocks until it can return an item to be processed. If shutdown = true,
 | 
					// Get blocks until it can return an item to be processed. If shutdown = true,
 | 
				
			||||||
@@ -196,18 +252,15 @@ func (q *Type) Len() int {
 | 
				
			|||||||
func (q *Type) Get() (item interface{}, shutdown bool) {
 | 
					func (q *Type) Get() (item interface{}, shutdown bool) {
 | 
				
			||||||
	q.cond.L.Lock()
 | 
						q.cond.L.Lock()
 | 
				
			||||||
	defer q.cond.L.Unlock()
 | 
						defer q.cond.L.Unlock()
 | 
				
			||||||
	for len(q.queue) == 0 && !q.shuttingDown {
 | 
						for q.queue.Len() == 0 && !q.shuttingDown {
 | 
				
			||||||
		q.cond.Wait()
 | 
							q.cond.Wait()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if len(q.queue) == 0 {
 | 
						if q.queue.Len() == 0 {
 | 
				
			||||||
		// We must be shutting down.
 | 
							// We must be shutting down.
 | 
				
			||||||
		return nil, true
 | 
							return nil, true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	item = q.queue[0]
 | 
						item = q.queue.Pop()
 | 
				
			||||||
	// The underlying array still exists and reference this object, so the object will not be garbage collected.
 | 
					 | 
				
			||||||
	q.queue[0] = nil
 | 
					 | 
				
			||||||
	q.queue = q.queue[1:]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	q.metrics.get(item)
 | 
						q.metrics.get(item)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -228,7 +281,7 @@ func (q *Type) Done(item interface{}) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	q.processing.delete(item)
 | 
						q.processing.delete(item)
 | 
				
			||||||
	if q.dirty.has(item) {
 | 
						if q.dirty.has(item) {
 | 
				
			||||||
		q.queue = append(q.queue, item)
 | 
							q.queue.Push(item)
 | 
				
			||||||
		q.cond.Signal()
 | 
							q.cond.Signal()
 | 
				
			||||||
	} else if q.processing.len() == 0 {
 | 
						} else if q.processing.len() == 0 {
 | 
				
			||||||
		q.cond.Signal()
 | 
							q.cond.Signal()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -27,6 +27,23 @@ import (
 | 
				
			|||||||
	"k8s.io/client-go/util/workqueue"
 | 
						"k8s.io/client-go/util/workqueue"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// traceQueue traces whether items are touched
 | 
				
			||||||
 | 
					type traceQueue struct {
 | 
				
			||||||
 | 
						workqueue.Queue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						touched map[interface{}]struct{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (t *traceQueue) Touch(item interface{}) {
 | 
				
			||||||
 | 
						t.Queue.Touch(item)
 | 
				
			||||||
 | 
						if t.touched == nil {
 | 
				
			||||||
 | 
							t.touched = make(map[interface{}]struct{})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						t.touched[item] = struct{}{}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ workqueue.Queue = &traceQueue{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestBasic(t *testing.T) {
 | 
					func TestBasic(t *testing.T) {
 | 
				
			||||||
	tests := []struct {
 | 
						tests := []struct {
 | 
				
			||||||
		queue         *workqueue.Type
 | 
							queue         *workqueue.Type
 | 
				
			||||||
@@ -198,7 +215,11 @@ func TestReinsert(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCollapse(t *testing.T) {
 | 
					func TestCollapse(t *testing.T) {
 | 
				
			||||||
	q := workqueue.New()
 | 
						tq := &traceQueue{Queue: workqueue.DefaultQueue()}
 | 
				
			||||||
 | 
						q := workqueue.NewWithConfig(workqueue.QueueConfig{
 | 
				
			||||||
 | 
							Name:  "",
 | 
				
			||||||
 | 
							Queue: tq,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	// Add a new one twice
 | 
						// Add a new one twice
 | 
				
			||||||
	q.Add("bar")
 | 
						q.Add("bar")
 | 
				
			||||||
	q.Add("bar")
 | 
						q.Add("bar")
 | 
				
			||||||
@@ -216,10 +237,18 @@ func TestCollapse(t *testing.T) {
 | 
				
			|||||||
	if a := q.Len(); a != 0 {
 | 
						if a := q.Len(); a != 0 {
 | 
				
			||||||
		t.Errorf("Expected queue to be empty. Has %v items", a)
 | 
							t.Errorf("Expected queue to be empty. Has %v items", a)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if _, ok := tq.touched["bar"]; !ok {
 | 
				
			||||||
 | 
							t.Errorf("Expected bar to be Touched")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCollapseWhileProcessing(t *testing.T) {
 | 
					func TestCollapseWhileProcessing(t *testing.T) {
 | 
				
			||||||
	q := workqueue.New()
 | 
						tq := &traceQueue{Queue: workqueue.DefaultQueue()}
 | 
				
			||||||
 | 
						q := workqueue.NewWithConfig(workqueue.QueueConfig{
 | 
				
			||||||
 | 
							Name:  "",
 | 
				
			||||||
 | 
							Queue: tq,
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
	q.Add("foo")
 | 
						q.Add("foo")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start processing
 | 
						// Start processing
 | 
				
			||||||
@@ -261,6 +290,10 @@ func TestCollapseWhileProcessing(t *testing.T) {
 | 
				
			|||||||
	if a := q.Len(); a != 0 {
 | 
						if a := q.Len(); a != 0 {
 | 
				
			||||||
		t.Errorf("Expected queue to be empty. Has %v items", a)
 | 
							t.Errorf("Expected queue to be empty. Has %v items", a)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if _, ok := tq.touched["foo"]; ok {
 | 
				
			||||||
 | 
							t.Errorf("Unexpected Touch")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
 | 
					func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user