Fix expectations in Deployment. Ref #19299.

This commit is contained in:
Brian Grant
2016-02-25 06:40:14 +00:00
parent e63127e0eb
commit 39f0edca75
5 changed files with 148 additions and 61 deletions

View File

@@ -173,6 +173,8 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
// When pod is created, we need to update deployment's expectations
AddFunc: dc.addPod,
// When pod updates (becomes ready), we need to enqueue deployment
UpdateFunc: dc.updatePod,
// When pod is deleted, we need to update deployment's expectations
@@ -210,7 +212,8 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
dc.rsExpectations.CreationObserved(dKey)
// Decrement expected creations
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
dc.enqueueDeployment(d)
}
}
@@ -302,6 +305,25 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De
return nil
}
// When a pod is created, update expectations of the controller that manages the pod.
func (dc *DeploymentController) addPod(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}
glog.V(4).Infof("Pod %s created.", pod.Name)
if d := dc.getDeploymentForPod(pod); d != nil {
dKey, err := controller.KeyFunc(d)
if err != nil {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
// Decrement expected creations
dc.podExpectations.LowerExpectations(dKey, 1, 0)
dc.enqueueDeployment(d)
}
}
// updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
// the old and new deployments. old and cur must be *api.Pod types.
@@ -350,7 +372,9 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
return
}
dc.podExpectations.DeletionObserved(dKey)
// Decrement expected deletions
dc.podExpectations.LowerExpectations(dKey, 0, 1)
dc.enqueueDeployment(d)
}
}
@@ -383,7 +407,7 @@ func (dc *DeploymentController) worker() {
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing deployment: %v", err)
glog.Errorf("Error syncing deployment %v: %v", key, err)
}
}()
}
@@ -417,8 +441,26 @@ func (dc *DeploymentController) syncDeployment(key string) error {
dc.rsExpectations.DeleteExpectations(key)
return nil
}
d := *obj.(*extensions.Deployment)
// Note: The expectations cache is not thread-safe for a given key.
// Check the replica set expectations of the deployment before creating a new one.
// TODO: Explicitly expire expectations if we haven't sync'ed in a long time.
dKey, err := controller.KeyFunc(&d)
if err != nil {
return fmt.Errorf("couldn't get key for deployment %#v: %v", d, err)
}
if !dc.rsExpectations.SatisfiedExpectations(dKey) {
return fmt.Errorf("replicaset expectations not met yet for %v in syncDeployment", dKey)
}
if !dc.podExpectations.SatisfiedExpectations(dKey) {
return fmt.Errorf("pod expectations not met yet for %v in syncDeployment", dKey)
}
// Ensure that an expectations record exists and clear previous expectations.
dc.rsExpectations.SetExpectations(dKey, 0, 0)
dc.podExpectations.SetExpectations(dKey, 0, 0)
if d.Spec.Paused {
// TODO: Implement scaling for paused deployments.
// Dont take any action for paused deployment.
@@ -573,7 +615,7 @@ func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extension
}
// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment, true)
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, deployment)
if err != nil {
return err
}
@@ -701,15 +743,6 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
return nil, nil
}
// Check the replica set expectations of the deployment before creating a new one.
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
if !dc.rsExpectations.SatisfiedExpectations(dKey) {
dc.enqueueDeployment(&deployment)
return nil, fmt.Errorf("replica set expectations not met yet before getting new replica set\n")
}
// new ReplicaSet does not exist, create one.
namespace := deployment.ObjectMeta.Namespace
podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template)
@@ -717,12 +750,15 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
// Add podTemplateHash label to selector.
newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
// Set ReplicaSet expectations (1 ReplicaSet should be created)
dKey, err = controller.KeyFunc(&deployment)
// Set ReplicaSet expectations (1 ReplicaSet should be created).
// This clobbers previous expectations, but we checked that in syncDeployment.
// We don't set expectations for deletions of 0-replica ReplicaSets because re-setting
// expectations would clobber these, and redundant deletions shouldn't cause harm.
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err)
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
dc.rsExpectations.ExpectCreations(dKey, 1)
// Create new ReplicaSet
newRS := extensions.ReplicaSet{
ObjectMeta: api.ObjectMeta{
@@ -742,11 +778,23 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
if err != nil {
return nil, err
}
// Increment expected creations
dc.rsExpectations.RaiseExpectations(dKey, 1, 0)
if newReplicasCount != 0 {
dc.podExpectations.RaiseExpectations(dKey, newReplicasCount, 0)
}
newRS.Spec.Replicas = newReplicasCount
createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
if err != nil {
dc.rsExpectations.DeleteExpectations(dKey)
return nil, fmt.Errorf("error creating replica set: %v", err)
// Decrement expected creations
dc.rsExpectations.LowerExpectations(dKey, 1, 0)
if newReplicasCount != 0 {
dc.podExpectations.LowerExpectations(dKey, newReplicasCount, 0)
}
dc.enqueueDeployment(deployment)
return nil, fmt.Errorf("error creating replica set %v: %v", dKey, err)
}
if newReplicasCount > 0 {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
@@ -822,24 +870,13 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl
return scaled, err
}
// Set expectationsCheck to false to bypass expectations check when testing
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment, expectationsCheck bool) (bool, error) {
func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment extensions.Deployment) (bool, error) {
oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
return false, nil
}
// Check the expectations of deployment before reconciling
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
}
if expectationsCheck && !dc.podExpectations.SatisfiedExpectations(dKey) {
glog.V(4).Infof("Pod expectations not met yet before reconciling old replica sets\n")
return false, nil
}
minReadySeconds := deployment.Spec.MinReadySeconds
allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{newRS}, minReadySeconds)
@@ -903,10 +940,6 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep
}
totalScaledDown := cleanupCount + scaledDownCount
if expectationsCheck {
dc.podExpectations.ExpectDeletions(dKey, totalScaledDown)
}
return totalScaledDown > 0, nil
}
@@ -1081,13 +1114,35 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.Rep
if rs.Spec.Replicas == newScale {
return false, rs, nil
}
scalingOperation := "down"
dKey, err := controller.KeyFunc(&deployment)
if err != nil {
return false, nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
}
var scalingOperation string
// Set expectations first, because if the update is successful, the expectations will be handled asynchronously immediately.
if rs.Spec.Replicas < newScale {
scalingOperation = "up"
// Increment expected creations
dc.podExpectations.RaiseExpectations(dKey, newScale-rs.Spec.Replicas, 0)
} else {
scalingOperation = "down"
// Increment expected deletions
dc.podExpectations.RaiseExpectations(dKey, 0, rs.Spec.Replicas-newScale)
}
newRS, err := dc.scaleReplicaSet(rs, newScale)
if err == nil {
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
} else {
// Back out the expectation changes. If we observed a failure even though the update succeeded, this will be wrong.
if rs.Spec.Replicas < newScale {
// Decrement expected creations
dc.podExpectations.LowerExpectations(dKey, newScale-rs.Spec.Replicas, 0)
dc.enqueueDeployment(deployment)
} else {
// Decrement expected deletions
dc.podExpectations.LowerExpectations(dKey, 0, rs.Spec.Replicas-newScale)
dc.enqueueDeployment(deployment)
}
}
return true, newRS, err
}