Separate GetOldRS and GetNewRS in deployment controller (get and mutate) and deployment util (get only)
This commit is contained in:
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
@@ -40,6 +41,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util/integer"
|
||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||
podutil "k8s.io/kubernetes/pkg/util/pod"
|
||||
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
@@ -672,33 +674,31 @@ func lastRevision(allRSs []*extensions.ReplicaSet) int64 {
|
||||
|
||||
// getOldReplicaSets returns two sets of old replica sets of the deployment. The first set of old replica sets doesn't include
|
||||
// the ones with no pods, and the second set of old replica sets include all old replica sets.
|
||||
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
||||
func (dc *DeploymentController) getOldReplicaSets(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
||||
return deploymentutil.GetOldReplicaSetsFromLists(deployment, dc.client,
|
||||
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
return &podList, err
|
||||
},
|
||||
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
||||
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
|
||||
})
|
||||
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
|
||||
rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
|
||||
}
|
||||
return deploymentutil.FindOldReplicaSets(deployment, rsList, podList)
|
||||
}
|
||||
|
||||
// Returns a replica set that matches the intent of the given deployment.
|
||||
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
|
||||
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
|
||||
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
|
||||
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
|
||||
// Note that the pod-template-hash will be added to adopted RSes and pods.
|
||||
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
|
||||
// Calculate revision number for this new replica set
|
||||
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
||||
|
||||
existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client,
|
||||
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
return &podList, err
|
||||
},
|
||||
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
||||
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
|
||||
})
|
||||
// List the deployment's RSes and apply pod-template-hash info to deployment's adopted RSes/Pods
|
||||
rsList, _, err := dc.rsAndPodsWithHashKeySynced(deployment)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
|
||||
}
|
||||
existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if existingNewRS != nil {
|
||||
@@ -754,6 +754,130 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
|
||||
return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
|
||||
}
|
||||
|
||||
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
|
||||
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) {
|
||||
rsList, err := deploymentutil.ListReplicaSets(deployment,
|
||||
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
||||
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
|
||||
}
|
||||
syncedRSList := []extensions.ReplicaSet{}
|
||||
for _, rs := range rsList {
|
||||
// Add pod-template-hash information if it's not in the RS.
|
||||
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
|
||||
// that aren't constrained by the pod-template-hash.
|
||||
syncedRS, err := dc.addHashKeyToRSAndPods(rs)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
syncedRSList = append(syncedRSList, *syncedRS)
|
||||
}
|
||||
syncedPodList, err := deploymentutil.ListPods(deployment,
|
||||
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
return &podList, err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return syncedRSList, syncedPodList, nil
|
||||
}
|
||||
|
||||
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
|
||||
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
|
||||
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
|
||||
// 3. Add hash label to the rs's label and selector
|
||||
func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) {
|
||||
updatedRS = &rs
|
||||
// If the rs already has the new hash label in its selector, it's done syncing
|
||||
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
||||
return
|
||||
}
|
||||
namespace := rs.Namespace
|
||||
hash := rsutil.GetPodTemplateSpecHash(rs)
|
||||
rsUpdated := false
|
||||
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
|
||||
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
|
||||
func(updated *extensions.ReplicaSet) error {
|
||||
// Precondition: the RS doesn't contain the new hash in its pod template label.
|
||||
if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
|
||||
return utilerrors.ErrPreconditionViolated
|
||||
}
|
||||
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
|
||||
}
|
||||
if !rsUpdated {
|
||||
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
|
||||
// Return here and retry in the next sync loop.
|
||||
return &rs, nil
|
||||
}
|
||||
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
|
||||
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
|
||||
if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
|
||||
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
|
||||
|
||||
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
|
||||
selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
|
||||
}
|
||||
options := api.ListOptions{LabelSelector: selector}
|
||||
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
|
||||
}
|
||||
allPodsLabeled := false
|
||||
if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
|
||||
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
|
||||
}
|
||||
// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
|
||||
// Return here and retry in the next sync loop.
|
||||
if !allPodsLabeled {
|
||||
return updatedRS, nil
|
||||
}
|
||||
|
||||
// We need to wait for the replicaset controller to observe the pods being
|
||||
// labeled with pod template hash. Because previously we've called
|
||||
// WaitForReplicaSetUpdated, the replicaset controller should have dropped
|
||||
// FullyLabeledReplicas to 0 already, we only need to wait it to increase
|
||||
// back to the number of replicas in the spec.
|
||||
if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
|
||||
return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
|
||||
}
|
||||
|
||||
// 3. Update rs label and selector to include the new hash label
|
||||
// Copy the old selector, so that we can scrub out any orphaned pods
|
||||
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
|
||||
func(updated *extensions.ReplicaSet) error {
|
||||
// Precondition: the RS doesn't contain the new hash in its label or selector.
|
||||
if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
|
||||
return utilerrors.ErrPreconditionViolated
|
||||
}
|
||||
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
|
||||
}
|
||||
if rsUpdated {
|
||||
glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
|
||||
}
|
||||
// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
|
||||
|
||||
// TODO: look for orphaned pods and label them in the background somewhere else periodically
|
||||
|
||||
return updatedRS, nil
|
||||
}
|
||||
|
||||
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
|
||||
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
|
||||
func setNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string) bool {
|
||||
|
Reference in New Issue
Block a user