Merge pull request #77170 from smarterclayton/delay_queue_reentrant
DelayingQueue.ShutDown() should be reentrant
This commit is contained in:
		| @@ -18,6 +18,7 @@ package workqueue | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"container/heap" | 	"container/heap" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"k8s.io/apimachinery/pkg/util/clock" | 	"k8s.io/apimachinery/pkg/util/clock" | ||||||
| @@ -66,6 +67,8 @@ type delayingType struct { | |||||||
|  |  | ||||||
| 	// stopCh lets us signal a shutdown to the waiting loop | 	// stopCh lets us signal a shutdown to the waiting loop | ||||||
| 	stopCh chan struct{} | 	stopCh chan struct{} | ||||||
|  | 	// stopOnce guarantees we only signal shutdown a single time | ||||||
|  | 	stopOnce sync.Once | ||||||
|  |  | ||||||
| 	// heartbeat ensures we wait no more than maxWait before firing | 	// heartbeat ensures we wait no more than maxWait before firing | ||||||
| 	heartbeat clock.Ticker | 	heartbeat clock.Ticker | ||||||
| @@ -133,11 +136,14 @@ func (pq waitForPriorityQueue) Peek() interface{} { | |||||||
| 	return pq[0] | 	return pq[0] | ||||||
| } | } | ||||||
|  |  | ||||||
| // ShutDown gives a way to shut off this queue | // ShutDown stops the queue. After the queue drains, the returned shutdown bool | ||||||
|  | // on Get() will be true. This method may be invoked more than once. | ||||||
| func (q *delayingType) ShutDown() { | func (q *delayingType) ShutDown() { | ||||||
| 	q.Interface.ShutDown() | 	q.stopOnce.Do(func() { | ||||||
| 	close(q.stopCh) | 		q.Interface.ShutDown() | ||||||
| 	q.heartbeat.Stop() | 		close(q.stopCh) | ||||||
|  | 		q.heartbeat.Stop() | ||||||
|  | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // AddAfter adds the given item to the work queue after the given delay | // AddAfter adds the given item to the work queue after the given delay | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot