controller: fix requeueing progressing deployments
Drop the secondary queue and add either ratelimited or after the required amount of time that we need to wait directly in the main queue. In this way we can always be sure that we will sync back the Deployment if its progress has yet to resolve into a complete (NewReplicaSetAvailable) or TimedOut condition.
This commit is contained in:
@@ -94,8 +94,6 @@ type DeploymentController struct {
|
||||
|
||||
// Deployments that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
// Deployments that need to be checked for progress.
|
||||
progressQueue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewDeploymentController creates a new DeploymentController.
|
||||
@@ -112,7 +110,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r
|
||||
client: client,
|
||||
eventRecorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "deployment-controller"}),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
|
||||
progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"),
|
||||
}
|
||||
dc.rsControl = controller.RealRSControl{
|
||||
KubeClient: client,
|
||||
@@ -150,7 +147,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r
|
||||
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer dc.queue.ShutDown()
|
||||
defer dc.progressQueue.ShutDown()
|
||||
|
||||
glog.Infof("Starting deployment controller")
|
||||
|
||||
@@ -162,7 +158,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(dc.worker, time.Second, stopCh)
|
||||
}
|
||||
go wait.Until(dc.progressWorker, time.Second, stopCh)
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down deployment controller")
|
||||
@@ -357,17 +352,25 @@ func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
|
||||
dc.queue.Add(key)
|
||||
}
|
||||
|
||||
// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue.
|
||||
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
|
||||
// back to the main queue iff it has failed progressing.
|
||||
func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||
func (dc *DeploymentController) enqueueRateLimited(deployment *extensions.Deployment) {
|
||||
key, err := controller.KeyFunc(deployment)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||
return
|
||||
}
|
||||
|
||||
dc.progressQueue.AddAfter(key, after)
|
||||
dc.queue.AddRateLimited(key)
|
||||
}
|
||||
|
||||
// enqueueAfter will enqueue a deployment after the provided amount of time.
|
||||
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||
key, err := controller.KeyFunc(deployment)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||
return
|
||||
}
|
||||
|
||||
dc.queue.AddAfter(key, after)
|
||||
}
|
||||
|
||||
// getDeploymentForPod returns the deployment managing the given Pod.
|
||||
@@ -735,62 +738,3 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De
|
||||
delete(deployment.Annotations, util.OverlapAnnotation)
|
||||
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
|
||||
}
|
||||
|
||||
// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they
|
||||
// have failed progressing and if so it adds them back to the main queue.
|
||||
func (dc *DeploymentController) progressWorker() {
|
||||
for dc.checkNextItemForProgress() {
|
||||
}
|
||||
}
|
||||
|
||||
// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back
|
||||
// to the main queue.
|
||||
func (dc *DeploymentController) checkNextItemForProgress() bool {
|
||||
key, quit := dc.progressQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer dc.progressQueue.Done(key)
|
||||
|
||||
needsResync, err := dc.checkForProgress(key.(string))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
if err == nil && needsResync {
|
||||
glog.V(2).Infof("Deployment %q has failed progressing - syncing it back to the main queue for an update", key.(string))
|
||||
dc.queue.AddRateLimited(key)
|
||||
}
|
||||
dc.progressQueue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
// checkForProgress checks the progress for the provided deployment. Meant to be called
|
||||
// by the progressWorker and work on items synced in a secondary queue.
|
||||
func (dc *DeploymentController) checkForProgress(key string) (bool, error) {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
deployment, err := dc.dLister.Deployments(namespace).Get(name)
|
||||
if errors.IsNotFound(err) {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err)
|
||||
return false, err
|
||||
}
|
||||
cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing)
|
||||
// Already marked with a terminal reason - no need to add it back to the main queue.
|
||||
if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) {
|
||||
return false, nil
|
||||
}
|
||||
// Deep-copy otherwise we may mutate our cache.
|
||||
// TODO: Remove deep-copying from here. This worker does not need to sync the annotations
|
||||
// in the deployment.
|
||||
d, err := util.DeploymentDeepCopy(deployment)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
glog.V(2).Infof("Syncing deployment %q for a progress check", key)
|
||||
return dc.hasFailed(d)
|
||||
}
|
||||
|
Reference in New Issue
Block a user