Add a mutex to assure atomicity of reat_limited queue operations and remove 'leaky' version of it
This commit is contained in:
		@@ -20,6 +20,7 @@ import (
 | 
				
			|||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"net"
 | 
						"net"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
						"github.com/golang/glog"
 | 
				
			||||||
@@ -88,7 +89,9 @@ type NodeController struct {
 | 
				
			|||||||
	// to aviod the problem with time skew across the cluster.
 | 
						// to aviod the problem with time skew across the cluster.
 | 
				
			||||||
	nodeStatusMap map[string]nodeStatusData
 | 
						nodeStatusMap map[string]nodeStatusData
 | 
				
			||||||
	now           func() util.Time
 | 
						now           func() util.Time
 | 
				
			||||||
	// worker that evicts pods from unresponsive nodes.
 | 
						// Lock to access evictor workers
 | 
				
			||||||
 | 
						evictorLock *sync.Mutex
 | 
				
			||||||
 | 
						// workers that evicts pods from unresponsive nodes.
 | 
				
			||||||
	podEvictor         *RateLimitedTimedQueue
 | 
						podEvictor         *RateLimitedTimedQueue
 | 
				
			||||||
	terminationEvictor *RateLimitedTimedQueue
 | 
						terminationEvictor *RateLimitedTimedQueue
 | 
				
			||||||
	podEvictionTimeout time.Duration
 | 
						podEvictionTimeout time.Duration
 | 
				
			||||||
@@ -120,6 +123,7 @@ func NewNodeController(
 | 
				
			|||||||
	if allocateNodeCIDRs && clusterCIDR == nil {
 | 
						if allocateNodeCIDRs && clusterCIDR == nil {
 | 
				
			||||||
		glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
 | 
							glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						evictorLock := sync.Mutex{}
 | 
				
			||||||
	return &NodeController{
 | 
						return &NodeController{
 | 
				
			||||||
		cloud:                  cloud,
 | 
							cloud:                  cloud,
 | 
				
			||||||
		knownNodeSet:           make(util.StringSet),
 | 
							knownNodeSet:           make(util.StringSet),
 | 
				
			||||||
@@ -127,8 +131,9 @@ func NewNodeController(
 | 
				
			|||||||
		recorder:               recorder,
 | 
							recorder:               recorder,
 | 
				
			||||||
		podEvictionTimeout:     podEvictionTimeout,
 | 
							podEvictionTimeout:     podEvictionTimeout,
 | 
				
			||||||
		maximumGracePeriod:     5 * time.Minute,
 | 
							maximumGracePeriod:     5 * time.Minute,
 | 
				
			||||||
		podEvictor:             NewRateLimitedTimedQueue(podEvictionLimiter, false),
 | 
							evictorLock:            &evictorLock,
 | 
				
			||||||
		terminationEvictor:     NewRateLimitedTimedQueue(podEvictionLimiter, false),
 | 
							podEvictor:             NewRateLimitedTimedQueue(podEvictionLimiter),
 | 
				
			||||||
 | 
							terminationEvictor:     NewRateLimitedTimedQueue(podEvictionLimiter),
 | 
				
			||||||
		nodeStatusMap:          make(map[string]nodeStatusData),
 | 
							nodeStatusMap:          make(map[string]nodeStatusData),
 | 
				
			||||||
		nodeMonitorGracePeriod: nodeMonitorGracePeriod,
 | 
							nodeMonitorGracePeriod: nodeMonitorGracePeriod,
 | 
				
			||||||
		nodeMonitorPeriod:      nodeMonitorPeriod,
 | 
							nodeMonitorPeriod:      nodeMonitorPeriod,
 | 
				
			||||||
@@ -162,6 +167,8 @@ func (nc *NodeController) Run(period time.Duration) {
 | 
				
			|||||||
	//    c. If there are pods still terminating, wait for their estimated completion
 | 
						//    c. If there are pods still terminating, wait for their estimated completion
 | 
				
			||||||
	//       before retrying
 | 
						//       before retrying
 | 
				
			||||||
	go util.Until(func() {
 | 
						go util.Until(func() {
 | 
				
			||||||
 | 
							nc.evictorLock.Lock()
 | 
				
			||||||
 | 
							defer nc.evictorLock.Unlock()
 | 
				
			||||||
		nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
 | 
							nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
 | 
				
			||||||
			remaining, err := nc.deletePods(value.Value)
 | 
								remaining, err := nc.deletePods(value.Value)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
@@ -178,6 +185,8 @@ func (nc *NodeController) Run(period time.Duration) {
 | 
				
			|||||||
	// TODO: replace with a controller that ensures pods that are terminating complete
 | 
						// TODO: replace with a controller that ensures pods that are terminating complete
 | 
				
			||||||
	// in a particular time period
 | 
						// in a particular time period
 | 
				
			||||||
	go util.Until(func() {
 | 
						go util.Until(func() {
 | 
				
			||||||
 | 
							nc.evictorLock.Lock()
 | 
				
			||||||
 | 
							defer nc.evictorLock.Unlock()
 | 
				
			||||||
		nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
 | 
							nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
 | 
				
			||||||
			completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
 | 
								completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
@@ -551,12 +560,17 @@ func (nc *NodeController) hasPods(nodeName string) (bool, error) {
 | 
				
			|||||||
// evictPods queues an eviction for the provided node name, and returns false if the node is already
 | 
					// evictPods queues an eviction for the provided node name, and returns false if the node is already
 | 
				
			||||||
// queued for eviction.
 | 
					// queued for eviction.
 | 
				
			||||||
func (nc *NodeController) evictPods(nodeName string) bool {
 | 
					func (nc *NodeController) evictPods(nodeName string) bool {
 | 
				
			||||||
 | 
						nc.evictorLock.Lock()
 | 
				
			||||||
 | 
						defer nc.evictorLock.Unlock()
 | 
				
			||||||
	return nc.podEvictor.Add(nodeName)
 | 
						return nc.podEvictor.Add(nodeName)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
 | 
					// cancelPodEviction removes any queued evictions, typically because the node is available again. It
 | 
				
			||||||
// returns true if an eviction was queued.
 | 
					// returns true if an eviction was queued.
 | 
				
			||||||
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
 | 
					func (nc *NodeController) cancelPodEviction(nodeName string) bool {
 | 
				
			||||||
 | 
						glog.V(2).Infof("Cancelling pod Eviction on Node: %v", nodeName)
 | 
				
			||||||
 | 
						nc.evictorLock.Lock()
 | 
				
			||||||
 | 
						defer nc.evictorLock.Unlock()
 | 
				
			||||||
	wasDeleting := nc.podEvictor.Remove(nodeName)
 | 
						wasDeleting := nc.podEvictor.Remove(nodeName)
 | 
				
			||||||
	wasTerminating := nc.terminationEvictor.Remove(nodeName)
 | 
						wasTerminating := nc.terminationEvictor.Remove(nodeName)
 | 
				
			||||||
	return wasDeleting || wasTerminating
 | 
						return wasDeleting || wasTerminating
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -136,19 +136,16 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
 | 
				
			|||||||
type RateLimitedTimedQueue struct {
 | 
					type RateLimitedTimedQueue struct {
 | 
				
			||||||
	queue   UniqueQueue
 | 
						queue   UniqueQueue
 | 
				
			||||||
	limiter util.RateLimiter
 | 
						limiter util.RateLimiter
 | 
				
			||||||
	leak    bool
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Creates new queue which will use given RateLimiter to oversee execution. If leak is true,
 | 
					// Creates new queue which will use given RateLimiter to oversee execution.
 | 
				
			||||||
// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued.
 | 
					func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue {
 | 
				
			||||||
func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue {
 | 
					 | 
				
			||||||
	return &RateLimitedTimedQueue{
 | 
						return &RateLimitedTimedQueue{
 | 
				
			||||||
		queue: UniqueQueue{
 | 
							queue: UniqueQueue{
 | 
				
			||||||
			queue: TimedQueue{},
 | 
								queue: TimedQueue{},
 | 
				
			||||||
			set:   util.NewStringSet(),
 | 
								set:   util.NewStringSet(),
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		limiter: limiter,
 | 
							limiter: limiter,
 | 
				
			||||||
		leak:    leak,
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -164,12 +161,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
 | 
				
			|||||||
	val, ok := q.queue.Head()
 | 
						val, ok := q.queue.Head()
 | 
				
			||||||
	for ok {
 | 
						for ok {
 | 
				
			||||||
		// rate limit the queue checking
 | 
							// rate limit the queue checking
 | 
				
			||||||
		if q.leak {
 | 
							if !q.limiter.CanAccept() {
 | 
				
			||||||
			if !q.limiter.CanAccept() {
 | 
								// Try again later
 | 
				
			||||||
				break
 | 
								break
 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			q.limiter.Accept()
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		now := now()
 | 
							now := now()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -38,7 +38,7 @@ func CheckSetEq(lhs, rhs util.StringSet) bool {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestAddNode(t *testing.T) {
 | 
					func TestAddNode(t *testing.T) {
 | 
				
			||||||
	evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
 | 
						evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -61,7 +61,7 @@ func TestAddNode(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestDelNode(t *testing.T) {
 | 
					func TestDelNode(t *testing.T) {
 | 
				
			||||||
	evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
 | 
						evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -83,7 +83,7 @@ func TestDelNode(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
 | 
							t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
 | 
						evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -105,7 +105,7 @@ func TestDelNode(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
 | 
							t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
 | 
						evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -129,7 +129,7 @@ func TestDelNode(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestTry(t *testing.T) {
 | 
					func TestTry(t *testing.T) {
 | 
				
			||||||
	evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
 | 
						evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -151,7 +151,7 @@ func TestTry(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestTryOrdering(t *testing.T) {
 | 
					func TestTryOrdering(t *testing.T) {
 | 
				
			||||||
	evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
 | 
						evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
@@ -183,7 +183,7 @@ func TestTryOrdering(t *testing.T) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestTryRemovingWhileTry(t *testing.T) {
 | 
					func TestTryRemovingWhileTry(t *testing.T) {
 | 
				
			||||||
	evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
 | 
						evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
 | 
				
			||||||
	evictor.Add("first")
 | 
						evictor.Add("first")
 | 
				
			||||||
	evictor.Add("second")
 | 
						evictor.Add("second")
 | 
				
			||||||
	evictor.Add("third")
 | 
						evictor.Add("third")
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user