controller: sync stuck deployments in a secondary queue
This commit is contained in:
@@ -85,6 +85,8 @@ type DeploymentController struct {
|
||||
|
||||
// Deployments that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
// Deployments that need to be checked for progress.
|
||||
progressQueue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewDeploymentController creates a new DeploymentController.
|
||||
@@ -101,13 +103,14 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
|
||||
client: client,
|
||||
eventRecorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "deployment-controller"}),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
||||
progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"),
|
||||
}
|
||||
|
||||
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addDeploymentNotification,
|
||||
UpdateFunc: dc.updateDeploymentNotification,
|
||||
AddFunc: dc.addDeployment,
|
||||
UpdateFunc: dc.updateDeployment,
|
||||
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||
DeleteFunc: dc.deleteDeploymentNotification,
|
||||
DeleteFunc: dc.deleteDeployment,
|
||||
})
|
||||
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: dc.addReplicaSet,
|
||||
@@ -129,6 +132,7 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
|
||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer dc.queue.ShutDown()
|
||||
defer dc.progressQueue.ShutDown()
|
||||
|
||||
glog.Infof("Starting deployment controller")
|
||||
|
||||
@@ -139,25 +143,26 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(dc.worker, time.Second, stopCh)
|
||||
}
|
||||
go wait.Until(dc.progressWorker, time.Second, stopCh)
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down deployment controller")
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
|
||||
func (dc *DeploymentController) addDeployment(obj interface{}) {
|
||||
d := obj.(*extensions.Deployment)
|
||||
glog.V(4).Infof("Adding deployment %s", d.Name)
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) updateDeploymentNotification(old, cur interface{}) {
|
||||
func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
|
||||
oldD := old.(*extensions.Deployment)
|
||||
glog.V(4).Infof("Updating deployment %s", oldD.Name)
|
||||
// Resync on deployment object relist.
|
||||
dc.enqueueDeployment(cur.(*extensions.Deployment))
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) deleteDeploymentNotification(obj interface{}) {
|
||||
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
|
||||
d, ok := obj.(*extensions.Deployment)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
@@ -266,27 +271,37 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
|
||||
dc.queue.Add(key)
|
||||
}
|
||||
|
||||
// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue.
|
||||
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
|
||||
// back to the main queue iff it has failed progressing.
|
||||
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||
key, err := controller.KeyFunc(deployment)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||
return
|
||||
}
|
||||
|
||||
dc.progressQueue.AddAfter(key, after)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) worker() {
|
||||
work := func() bool {
|
||||
key, quit := dc.queue.Get()
|
||||
if quit {
|
||||
return true
|
||||
}
|
||||
defer dc.queue.Done(key)
|
||||
|
||||
err := dc.syncHandler(key.(string))
|
||||
dc.handleErr(err, key)
|
||||
for dc.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) processNextWorkItem() bool {
|
||||
key, quit := dc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer dc.queue.Done(key)
|
||||
|
||||
for {
|
||||
if quit := work(); quit {
|
||||
return
|
||||
}
|
||||
}
|
||||
err := dc.syncHandler(key.(string))
|
||||
dc.handleErr(err, key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||
@@ -310,6 +325,7 @@ func (dc *DeploymentController) handleErr(err error, key interface{}) {
|
||||
// This function is not meant to be invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
startTime := time.Now()
|
||||
glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
@@ -453,3 +469,57 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De
|
||||
delete(deployment.Annotations, util.OverlapAnnotation)
|
||||
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
||||
}
|
||||
|
||||
// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they
|
||||
// have failed progressing and if so it adds them back to the main queue.
|
||||
func (dc *DeploymentController) progressWorker() {
|
||||
for dc.checkNextItemForProgress() {
|
||||
}
|
||||
}
|
||||
|
||||
// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back
|
||||
// to the main queue.
|
||||
func (dc *DeploymentController) checkNextItemForProgress() bool {
|
||||
key, quit := dc.progressQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer dc.progressQueue.Done(key)
|
||||
|
||||
needsResync, err := dc.checkForProgress(key.(string))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
if err == nil && needsResync {
|
||||
dc.queue.AddRateLimited(key)
|
||||
}
|
||||
dc.progressQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
// checkForProgress checks the progress for the provided deployment. Meant to be called
|
||||
// by the progressWorker and work on items synced in a secondary queue.
|
||||
func (dc *DeploymentController) checkForProgress(key string) (bool, error) {
|
||||
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err)
|
||||
return false, err
|
||||
}
|
||||
if !exists {
|
||||
return false, nil
|
||||
}
|
||||
deployment := obj.(*extensions.Deployment)
|
||||
cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing)
|
||||
// Already marked with a terminal reason - no need to add it back to the main queue.
|
||||
if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) {
|
||||
return false, nil
|
||||
}
|
||||
// Deep-copy otherwise we may mutate our cache.
|
||||
// TODO: Remove deep-copying from here. This worker does not need to sync the annotations
|
||||
// in the deployment.
|
||||
d, err := util.DeploymentDeepCopy(deployment)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return dc.hasFailed(d)
|
||||
}
|
||||
|
Reference in New Issue
Block a user