DaemonSet: Use ControllerRefManager to adopt/orphan.

This commit is contained in:
Anthony Yeh
2017-02-25 16:22:54 -08:00
parent 2217363845
commit 421e0bbd83
4 changed files with 163 additions and 103 deletions

View File

@@ -465,32 +465,37 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
}
// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) {
nodeToDaemonPods := make(map[string][]*v1.Pod)
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
if err != nil {
return nil, err
}
daemonPods, err := dsc.podLister.Pods(ds.Namespace).List(selector)
// List all pods to include those that don't match the selector anymore but
// have a ControllerRef pointing to this controller.
pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
if err != nil {
return nodeToDaemonPods, err
return nil, err
}
for i := range daemonPods {
// TODO: Do we need to copy here?
daemonPod := &(*daemonPods[i])
nodeName := daemonPod.Spec.NodeName
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod)
// Use ControllerRefManager to adopt/orphan as needed.
cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, ControllerKind)
claimedPods, err := cm.ClaimPods(pods)
if err != nil {
return nil, err
}
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*v1.Pod)
for _, pod := range claimedPods {
nodeName := pod.Spec.NodeName
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
}
return nodeToDaemonPods, nil
}
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
// Find out which nodes are running the daemon pods selected by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
}
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error {
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything())
@@ -589,15 +594,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
isController := true
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: ds.Name,
UID: ds.UID,
Controller: &isController,
}
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, controllerRef); err != nil {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds, newControllerRef(ds)); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
@@ -676,12 +673,8 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return updateErr
}
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error {
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error {
glog.V(4).Infof("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
}
nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
@@ -758,6 +751,12 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
return nil
}
// Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
@@ -767,7 +766,7 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
}
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
if dsNeedsSync && ds.DeletionTimestamp == nil {
if err := dsc.manage(ds); err != nil {
if err := dsc.manage(ds, nodeToDaemonPods); err != nil {
return err
}
}
@@ -776,14 +775,14 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
if dsNeedsSync && ds.DeletionTimestamp == nil {
switch ds.Spec.UpdateStrategy.Type {
case extensions.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds)
err = dsc.rollingUpdate(ds, nodeToDaemonPods)
}
if err != nil {
return err
}
}
return dsc.updateDaemonSetStatus(ds)
return dsc.updateDaemonSetStatus(ds, nodeToDaemonPods)
}
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
@@ -972,6 +971,18 @@ func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorit
return len(predicateFails) == 0, predicateFails, nil
}
// newControllerRef creates a ControllerRef pointing to the given DaemonSet.
func newControllerRef(ds *extensions.DaemonSet) *metav1.OwnerReference {
isController := true
return &metav1.OwnerReference{
APIVersion: ControllerKind.GroupVersion().String(),
Kind: ControllerKind.Kind,
Name: ds.Name,
UID: ds.UID,
Controller: &isController,
}
}
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
type byCreationTimestamp []*extensions.DaemonSet