Merge pull request #41510 from kargakis/fix-progress-check-requeue

Automatic merge from submit-queue (batch tested with PRs 41714, 41510, 42052, 41918, 31515)

controller: fix requeueing progressing deployments

Drop the secondary queue and add either ratelimited or after the
required amount of time that we need to wait directly in the main
queue. In this way we can always be sure that we will sync back
the Deployment if its progress has yet to resolve into a complete
(NewReplicaSetAvailable) or TimedOut condition.

This should also simplify the deployment controller a bit.

Fixes https://github.com/kubernetes/kubernetes/issues/39785. Once this change soaks, I will move the test out of the flaky suite.

@kubernetes/sig-apps-misc
This commit is contained in:
Kubernetes Submit Queue
2017-02-25 02:17:53 -08:00
committed by GitHub
4 changed files with 225 additions and 84 deletions

View File

@@ -94,8 +94,6 @@ type DeploymentController struct {
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
// Deployments that need to be checked for progress.
progressQueue workqueue.RateLimitingInterface
}
// NewDeploymentController creates a new DeploymentController.
@@ -112,7 +110,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
progressQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "progress-check"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
@@ -150,7 +147,6 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
defer dc.progressQueue.ShutDown()
glog.Infof("Starting deployment controller")
@@ -162,7 +158,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
go wait.Until(dc.progressWorker, time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down deployment controller")
@@ -357,17 +352,25 @@ func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
dc.queue.Add(key)
}
// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue.
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
// back to the main queue iff it has failed progressing.
func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) {
func (dc *DeploymentController) enqueueRateLimited(deployment *extensions.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
return
}
dc.progressQueue.AddAfter(key, after)
dc.queue.AddRateLimited(key)
}
// enqueueAfter will enqueue a deployment after the provided amount of time.
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.AddAfter(key, after)
}
// getDeploymentForPod returns the deployment managing the given Pod.
@@ -712,62 +715,3 @@ func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.De
delete(deployment.Annotations, util.OverlapAnnotation)
return dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(deployment)
}
// progressWorker runs a worker thread that pops items out of a secondary queue, checks if they
// have failed progressing and if so it adds them back to the main queue.
func (dc *DeploymentController) progressWorker() {
for dc.checkNextItemForProgress() {
}
}
// checkNextItemForProgress checks if a deployment has failed progressing and if so it adds it back
// to the main queue.
func (dc *DeploymentController) checkNextItemForProgress() bool {
key, quit := dc.progressQueue.Get()
if quit {
return false
}
defer dc.progressQueue.Done(key)
needsResync, err := dc.checkForProgress(key.(string))
if err != nil {
utilruntime.HandleError(err)
}
if err == nil && needsResync {
glog.V(2).Infof("Deployment %q has failed progressing - syncing it back to the main queue for an update", key.(string))
dc.queue.AddRateLimited(key)
}
dc.progressQueue.Forget(key)
return true
}
// checkForProgress checks the progress for the provided deployment. Meant to be called
// by the progressWorker and work on items synced in a secondary queue.
func (dc *DeploymentController) checkForProgress(key string) (bool, error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
glog.V(2).Infof("Cannot retrieve deployment %q found in the secondary queue: %#v", key, err)
return false, err
}
cond := util.GetDeploymentCondition(deployment.Status, extensions.DeploymentProgressing)
// Already marked with a terminal reason - no need to add it back to the main queue.
if cond != nil && (cond.Reason == util.TimedOutReason || cond.Reason == util.NewRSAvailableReason) {
return false, nil
}
// Deep-copy otherwise we may mutate our cache.
// TODO: Remove deep-copying from here. This worker does not need to sync the annotations
// in the deployment.
d, err := util.DeploymentDeepCopy(deployment)
if err != nil {
return false, err
}
glog.V(2).Infof("Syncing deployment %q for a progress check", key)
return dc.hasFailed(d)
}