add goroutine to move unschedulablepods to activeq regularly

This commit is contained in:
wangqingcan
2019-01-16 12:08:19 +08:00
parent 10bb353a95
commit de8cfdcd79
2 changed files with 86 additions and 0 deletions

View File

@@ -49,6 +49,10 @@ var (
queueClosed = "scheduling queue is closed"
)
// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to activeQ.
const unschedulableQTimeInterval = 60 * time.Second
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
@@ -279,6 +283,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
// run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
// Add adds a pod to the active queue. It should be called only when a new pod
@@ -443,6 +448,26 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
}
}
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the durationStayUnschedulableQ
// to activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*v1.Pod
currentTime := p.clock.Now()
for _, pod := range p.unschedulableQ.pods {
lastScheduleTime := podTimestamp(pod)
if !lastScheduleTime.IsZero() && currentTime.Sub(lastScheduleTime.Time) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pod)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveQueue(podsToMove)
}
}
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.