281 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			281 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package workqueue
 | 
						|
 | 
						|
import (
 | 
						|
	"container/heap"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/util/clock"
 | 
						|
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
)
 | 
						|
 | 
						|
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
 | 
						|
// requeue items after failures without ending up in a hot-loop.
 | 
						|
type DelayingInterface interface {
 | 
						|
	Interface
 | 
						|
	// AddAfter adds an item to the workqueue after the indicated duration has passed
 | 
						|
	AddAfter(item interface{}, duration time.Duration)
 | 
						|
}
 | 
						|
 | 
						|
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
 | 
						|
func NewDelayingQueue() DelayingInterface {
 | 
						|
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
 | 
						|
}
 | 
						|
 | 
						|
// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
 | 
						|
// inject custom queue Interface instead of the default one
 | 
						|
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
 | 
						|
	return newDelayingQueue(clock.RealClock{}, q, name)
 | 
						|
}
 | 
						|
 | 
						|
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
 | 
						|
func NewNamedDelayingQueue(name string) DelayingInterface {
 | 
						|
	return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
 | 
						|
}
 | 
						|
 | 
						|
// NewDelayingQueueWithCustomClock constructs a new named workqueue
 | 
						|
// with ability to inject real or fake clock for testing purposes
 | 
						|
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
 | 
						|
	return newDelayingQueue(clock, NewNamed(name), name)
 | 
						|
}
 | 
						|
 | 
						|
func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
 | 
						|
	ret := &delayingType{
 | 
						|
		Interface:       q,
 | 
						|
		clock:           clock,
 | 
						|
		heartbeat:       clock.NewTicker(maxWait),
 | 
						|
		stopCh:          make(chan struct{}),
 | 
						|
		waitingForAddCh: make(chan *waitFor, 1000),
 | 
						|
		metrics:         newRetryMetrics(name),
 | 
						|
	}
 | 
						|
 | 
						|
	go ret.waitingLoop()
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// delayingType wraps an Interface and provides delayed re-enquing
 | 
						|
type delayingType struct {
 | 
						|
	Interface
 | 
						|
 | 
						|
	// clock tracks time for delayed firing
 | 
						|
	clock clock.Clock
 | 
						|
 | 
						|
	// stopCh lets us signal a shutdown to the waiting loop
 | 
						|
	stopCh chan struct{}
 | 
						|
	// stopOnce guarantees we only signal shutdown a single time
 | 
						|
	stopOnce sync.Once
 | 
						|
 | 
						|
	// heartbeat ensures we wait no more than maxWait before firing
 | 
						|
	heartbeat clock.Ticker
 | 
						|
 | 
						|
	// waitingForAddCh is a buffered channel that feeds waitingForAdd
 | 
						|
	waitingForAddCh chan *waitFor
 | 
						|
 | 
						|
	// metrics counts the number of retries
 | 
						|
	metrics retryMetrics
 | 
						|
}
 | 
						|
 | 
						|
// waitFor holds the data to add and the time it should be added
 | 
						|
type waitFor struct {
 | 
						|
	data    t
 | 
						|
	readyAt time.Time
 | 
						|
	// index in the priority queue (heap)
 | 
						|
	index int
 | 
						|
}
 | 
						|
 | 
						|
// waitForPriorityQueue implements a priority queue for waitFor items.
 | 
						|
//
 | 
						|
// waitForPriorityQueue implements heap.Interface. The item occurring next in
 | 
						|
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
 | 
						|
// Peek returns this minimum item at index 0. Pop returns the minimum item after
 | 
						|
// it has been removed from the queue and placed at index Len()-1 by
 | 
						|
// container/heap. Push adds an item at index Len(), and container/heap
 | 
						|
// percolates it into the correct location.
 | 
						|
type waitForPriorityQueue []*waitFor
 | 
						|
 | 
						|
func (pq waitForPriorityQueue) Len() int {
 | 
						|
	return len(pq)
 | 
						|
}
 | 
						|
func (pq waitForPriorityQueue) Less(i, j int) bool {
 | 
						|
	return pq[i].readyAt.Before(pq[j].readyAt)
 | 
						|
}
 | 
						|
func (pq waitForPriorityQueue) Swap(i, j int) {
 | 
						|
	pq[i], pq[j] = pq[j], pq[i]
 | 
						|
	pq[i].index = i
 | 
						|
	pq[j].index = j
 | 
						|
}
 | 
						|
 | 
						|
// Push adds an item to the queue. Push should not be called directly; instead,
 | 
						|
// use `heap.Push`.
 | 
						|
func (pq *waitForPriorityQueue) Push(x interface{}) {
 | 
						|
	n := len(*pq)
 | 
						|
	item := x.(*waitFor)
 | 
						|
	item.index = n
 | 
						|
	*pq = append(*pq, item)
 | 
						|
}
 | 
						|
 | 
						|
// Pop removes an item from the queue. Pop should not be called directly;
 | 
						|
// instead, use `heap.Pop`.
 | 
						|
func (pq *waitForPriorityQueue) Pop() interface{} {
 | 
						|
	n := len(*pq)
 | 
						|
	item := (*pq)[n-1]
 | 
						|
	item.index = -1
 | 
						|
	*pq = (*pq)[0:(n - 1)]
 | 
						|
	return item
 | 
						|
}
 | 
						|
 | 
						|
// Peek returns the item at the beginning of the queue, without removing the
 | 
						|
// item or otherwise mutating the queue. It is safe to call directly.
 | 
						|
func (pq waitForPriorityQueue) Peek() interface{} {
 | 
						|
	return pq[0]
 | 
						|
}
 | 
						|
 | 
						|
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
 | 
						|
// on Get() will be true. This method may be invoked more than once.
 | 
						|
func (q *delayingType) ShutDown() {
 | 
						|
	q.stopOnce.Do(func() {
 | 
						|
		q.Interface.ShutDown()
 | 
						|
		close(q.stopCh)
 | 
						|
		q.heartbeat.Stop()
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// AddAfter adds the given item to the work queue after the given delay
 | 
						|
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
 | 
						|
	// don't add if we're already shutting down
 | 
						|
	if q.ShuttingDown() {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	q.metrics.retry()
 | 
						|
 | 
						|
	// immediately add things with no delay
 | 
						|
	if duration <= 0 {
 | 
						|
		q.Add(item)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-q.stopCh:
 | 
						|
		// unblock if ShutDown() is called
 | 
						|
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
 | 
						|
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
 | 
						|
// expired item sitting for more than 10 seconds.
 | 
						|
const maxWait = 10 * time.Second
 | 
						|
 | 
						|
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
 | 
						|
func (q *delayingType) waitingLoop() {
 | 
						|
	defer utilruntime.HandleCrash()
 | 
						|
 | 
						|
	// Make a placeholder channel to use when there are no items in our list
 | 
						|
	never := make(<-chan time.Time)
 | 
						|
 | 
						|
	// Make a timer that expires when the item at the head of the waiting queue is ready
 | 
						|
	var nextReadyAtTimer clock.Timer
 | 
						|
 | 
						|
	waitingForQueue := &waitForPriorityQueue{}
 | 
						|
	heap.Init(waitingForQueue)
 | 
						|
 | 
						|
	waitingEntryByData := map[t]*waitFor{}
 | 
						|
 | 
						|
	for {
 | 
						|
		if q.Interface.ShuttingDown() {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		now := q.clock.Now()
 | 
						|
 | 
						|
		// Add ready entries
 | 
						|
		for waitingForQueue.Len() > 0 {
 | 
						|
			entry := waitingForQueue.Peek().(*waitFor)
 | 
						|
			if entry.readyAt.After(now) {
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			entry = heap.Pop(waitingForQueue).(*waitFor)
 | 
						|
			q.Add(entry.data)
 | 
						|
			delete(waitingEntryByData, entry.data)
 | 
						|
		}
 | 
						|
 | 
						|
		// Set up a wait for the first item's readyAt (if one exists)
 | 
						|
		nextReadyAt := never
 | 
						|
		if waitingForQueue.Len() > 0 {
 | 
						|
			if nextReadyAtTimer != nil {
 | 
						|
				nextReadyAtTimer.Stop()
 | 
						|
			}
 | 
						|
			entry := waitingForQueue.Peek().(*waitFor)
 | 
						|
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
 | 
						|
			nextReadyAt = nextReadyAtTimer.C()
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-q.stopCh:
 | 
						|
			return
 | 
						|
 | 
						|
		case <-q.heartbeat.C():
 | 
						|
			// continue the loop, which will add ready items
 | 
						|
 | 
						|
		case <-nextReadyAt:
 | 
						|
			// continue the loop, which will add ready items
 | 
						|
 | 
						|
		case waitEntry := <-q.waitingForAddCh:
 | 
						|
			if waitEntry.readyAt.After(q.clock.Now()) {
 | 
						|
				insert(waitingForQueue, waitingEntryByData, waitEntry)
 | 
						|
			} else {
 | 
						|
				q.Add(waitEntry.data)
 | 
						|
			}
 | 
						|
 | 
						|
			drained := false
 | 
						|
			for !drained {
 | 
						|
				select {
 | 
						|
				case waitEntry := <-q.waitingForAddCh:
 | 
						|
					if waitEntry.readyAt.After(q.clock.Now()) {
 | 
						|
						insert(waitingForQueue, waitingEntryByData, waitEntry)
 | 
						|
					} else {
 | 
						|
						q.Add(waitEntry.data)
 | 
						|
					}
 | 
						|
				default:
 | 
						|
					drained = true
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
 | 
						|
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
 | 
						|
	// if the entry already exists, update the time only if it would cause the item to be queued sooner
 | 
						|
	existing, exists := knownEntries[entry.data]
 | 
						|
	if exists {
 | 
						|
		if existing.readyAt.After(entry.readyAt) {
 | 
						|
			existing.readyAt = entry.readyAt
 | 
						|
			heap.Fix(q, existing.index)
 | 
						|
		}
 | 
						|
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	heap.Push(q, entry)
 | 
						|
	knownEntries[entry.data] = entry
 | 
						|
}
 |