Deployment: Use ControllerRef to route watch events.

This is part of the completion of ControllerRef, as described here:

https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md#watches
This commit is contained in:
Anthony Yeh
2017-02-26 16:25:21 -08:00
parent 887acb07ea
commit 0d9c9bfee0
3 changed files with 393 additions and 50 deletions

View File

@@ -23,11 +23,9 @@ package deployment
import (
"fmt"
"reflect"
"sort"
"time"
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -195,17 +193,46 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) {
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*extensions.ReplicaSet)
glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
if d := dc.getDeploymentForReplicaSet(rs); d != nil {
if rs.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dc.deleteReplicaSet(rs)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(rs); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind {
// It's controller by a different type of controller.
return
}
glog.V(4).Infof("ReplicaSet %s added.", rs.Name)
d, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
dc.enqueueDeployment(d)
return
}
// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
// them to see if anyone wants to adopt it.
ds := dc.getDeploymentsForReplicaSet(rs)
if len(ds) == 0 {
return
}
glog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
// getDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet.
func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *extensions.ReplicaSet) []*extensions.Deployment {
deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name)
return nil
}
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
@@ -213,11 +240,12 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return the older one
if len(deployments) > 1 {
sort.Sort(util.BySelectorLastUpdateTime(deployments))
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
glog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return deployments[0]
return deployments
}
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
@@ -232,16 +260,45 @@ func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
// Two different versions of the same replica set will always have different RVs.
return
}
// TODO: Write a unittest for this case
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
if d := dc.getDeploymentForReplicaSet(curRS); d != nil {
dc.enqueueDeployment(d)
curControllerRef := controller.GetControllerOf(curRS)
oldControllerRef := controller.GetControllerOf(oldRS)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged &&
oldControllerRef != nil && oldControllerRef.Kind == controllerKind.Kind {
// The ControllerRef was changed. Sync the old controller, if any.
d, err := dc.dLister.Deployments(oldRS.Namespace).Get(oldControllerRef.Name)
if err == nil {
dc.enqueueDeployment(d)
}
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
if !apiequality.Semantic.DeepEqual(oldRS, curRS) {
if oldD := dc.getDeploymentForReplicaSet(oldRS); oldD != nil {
dc.enqueueDeployment(oldD)
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
if curControllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
glog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
d, err := dc.dLister.Deployments(curRS.Namespace).Get(curControllerRef.Name)
if err != nil {
return
}
dc.enqueueDeployment(d)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
if labelChanged || controllerRefChanged {
ds := dc.getDeploymentsForReplicaSet(curRS)
if len(ds) == 0 {
return
}
glog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
}
@@ -268,10 +325,23 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
return
}
}
glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
if d := dc.getDeploymentForReplicaSet(rs); d != nil {
dc.enqueueDeployment(d)
controllerRef := controller.GetControllerOf(rs)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
if controllerRef.Kind != controllerKind.Kind {
// It's controlled by a different type of controller.
return
}
glog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
d, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
if err != nil {
return
}
dc.enqueueDeployment(d)
}
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
@@ -351,35 +421,34 @@ func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *extensions.Dep
// Find the owning replica set
var rs *extensions.ReplicaSet
var err error
// Look at the owner reference
controllerRef := controller.GetControllerOf(&pod.ObjectMeta)
if controllerRef != nil {
controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller owns this Pod.
return nil
}
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
// Not a pod owned by a replica set.
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
return nil
}
} else {
// Fallback to listing replica sets.
rss, err := dc.rsLister.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("Cannot list replica sets for pod %q: %v", pod.Name, err)
return nil
}
// TODO: Handle multiple replica sets gracefully
// For now we return the oldest replica set.
if len(rss) > 1 {
utilruntime.HandleError(fmt.Errorf("more than one ReplicaSet is selecting pod %q with labels: %+v", pod.Name, pod.Labels))
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
}
rs = rss[0]
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil {
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
return nil
}
return dc.getDeploymentForReplicaSet(rs)
// Now find the Deployment that owns that ReplicaSet.
controllerRef = controller.GetControllerOf(rs)
if controllerRef == nil {
return nil
}
if controllerRef.Kind != controllerKind.Kind {
return nil
}
d, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
return d
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.