Implement DaemonSet history logic in controller

1. Create controllerrevisions (history) and label pods with template
   hash for both RollingUpdate and OnDelete update strategy
2. Clean up old, non-live history based on revisionHistoryLimit
3. Remove duplicate controllerrevisions (the ones with the same template)
   and relabel their pods
4. Update RBAC to allow DaemonSet controller to manage
   controllerrevisions
5. In DaemonSet controller unit tests, create new pods with hash labels
This commit is contained in:
Janet Kuo
2017-05-17 16:53:46 -07:00
parent 4e6f70ff67
commit d02f40a5e7
18 changed files with 742 additions and 136 deletions

View File

@@ -39,11 +39,14 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/extensions/v1beta1"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
@@ -99,6 +102,11 @@ type DaemonSetsController struct {
// dsStoreSynced returns true if the daemonset store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dsStoreSynced cache.InformerSynced
// historyLister get list/get history from the shared informers's store
historyLister appslisters.ControllerRevisionLister
// historyStoreSynced returns true if the history store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
historyStoreSynced cache.InformerSynced
// podLister get list/get pods from the shared informers's store
podLister corelisters.PodLister
// podStoreSynced returns true if the pod store has been synced at least once.
@@ -114,7 +122,7 @@ type DaemonSetsController struct {
queue workqueue.RateLimitingInterface
}
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController {
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@@ -152,6 +160,14 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
dsc.dsLister = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addHistory,
UpdateFunc: dsc.updateHistory,
DeleteFunc: dsc.deleteHistory,
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -273,6 +289,138 @@ func (dsc *DaemonSetsController) getPodDaemonSets(pod *v1.Pod) []*extensions.Dae
return sets
}
// getDaemonSetsForHistory returns a list of DaemonSets that potentially
// match a ControllerRevision.
func (dsc *DaemonSetsController) getDaemonSetsForHistory(history *apps.ControllerRevision) []*extensions.DaemonSet {
daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
if err != nil || len(daemonSets) == 0 {
return nil
}
if len(daemonSets) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
glog.V(4).Infof("User error! more than one DaemonSets is selecting ControllerRevision %s/%s with labels: %#v",
history.Namespace, history.Name, history.Labels)
}
return daemonSets
}
// addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
// or when the controller manager is restarted.
func (dsc *DaemonSetsController) addHistory(obj interface{}) {
history := obj.(*apps.ControllerRevision)
if history.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dsc.deleteHistory(history)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(history); controllerRef != nil {
ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
if ds == nil {
return
}
glog.V(4).Infof("ControllerRevision %s added.", history.Name)
return
}
// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
// them to see if anyone wants to adopt it.
daemonSets := dsc.getDaemonSetsForHistory(history)
if len(daemonSets) == 0 {
return
}
glog.V(4).Infof("Orphan ControllerRevision %s added.", history.Name)
for _, ds := range daemonSets {
dsc.enqueueDaemonSet(ds)
}
}
// updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
// is updated and wake them up. If the anything of the ControllerRevision have changed, we need to
// awaken both the old and new DaemonSets.
func (dsc *DaemonSetsController) updateHistory(old, cur interface{}) {
curHistory := cur.(*apps.ControllerRevision)
oldHistory := old.(*apps.ControllerRevision)
if curHistory.ResourceVersion == oldHistory.ResourceVersion {
// Periodic resync will send update events for all known ControllerRevisions.
return
}
curControllerRef := controller.GetControllerOf(curHistory)
oldControllerRef := controller.GetControllerOf(oldHistory)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
dsc.enqueueDaemonSet(ds)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
if ds == nil {
return
}
glog.V(4).Infof("ControllerRevision %s updated.", curHistory.Name)
dsc.enqueueDaemonSet(ds)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
if labelChanged || controllerRefChanged {
daemonSets := dsc.getDaemonSetsForHistory(curHistory)
if len(daemonSets) == 0 {
return
}
glog.V(4).Infof("Orphan ControllerRevision %s updated.", curHistory.Name)
for _, ds := range daemonSets {
dsc.enqueueDaemonSet(ds)
}
}
}
// deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
// the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
// a DeletionFinalStateUnknown marker item.
func (dsc *DaemonSetsController) deleteHistory(obj interface{}) {
history, ok := obj.(*apps.ControllerRevision)
// When a delete is dropped, the relist will notice a ControllerRevision in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the ControllerRevision
// changed labels the new DaemonSet will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
history, ok = tombstone.Obj.(*apps.ControllerRevision)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
return
}
}
controllerRef := controller.GetControllerOf(history)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
if ds == nil {
return
}
glog.V(4).Infof("ControllerRevision %s deleted.", history.Name)
dsc.enqueueDaemonSet(ds)
}
func (dsc *DaemonSetsController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
@@ -486,11 +634,11 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
}
}
// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
// getDaemonPods returns daemon pods owned by the given ds.
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) {
func (dsc *DaemonSetsController) getDaemonPods(ds *extensions.DaemonSet) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
if err != nil {
return nil, err
@@ -516,7 +664,15 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
})
// Use ControllerRefManager to adopt/orphan as needed.
cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, canAdoptFunc)
claimedPods, err := cm.ClaimPods(pods)
return cm.ClaimPods(pods)
}
// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
// This also reconciles ControllerRef by adopting/orphaning.
// Note that returned Pods are pointers to objects in the cache.
// If you want to modify one, you need to deep-copy it first.
func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*v1.Pod, error) {
claimedPods, err := dsc.getDaemonPods(ds)
if err != nil {
return nil, err
}
@@ -554,18 +710,18 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
return ds
}
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error) {
// Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
return "", fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
return "", fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
}
var nodesNeedingDaemonPods, podsToDelete []string
var failedPodsObserved int
@@ -612,23 +768,33 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
}
}
}
errors := dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods)
// Find current history of the DaemonSet, and label new pods using the hash label value of the current history when creating them
cur, _, err := dsc.constructHistory(ds)
if err != nil {
return "", fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return "", err
}
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name))
return "", fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
}
return utilerrors.NewAggregate(errors)
return hash, nil
}
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with erros if any
func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string) []error {
func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return []error{fmt.Errorf("couldn't get key for object %#v: %v", ds, err)}
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
createDiff := len(nodesNeedingDaemonPods)
@@ -649,7 +815,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
createWait.Add(createDiff)
template := util.GetPodTemplateWithGeneration(ds.Spec.Template, ds.Spec.TemplateGeneration)
template := util.CreatePodTemplate(ds.Spec.Template, ds.Spec.TemplateGeneration, hash)
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
@@ -685,7 +851,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelet
for err := range errCh {
errors = append(errors, err)
}
return errors
return utilerrors.NewAggregate(errors)
}
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int) error {
@@ -767,7 +933,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
numberAvailable++
}
}
if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod) {
if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]) {
updatedNumberScheduled++
}
}
@@ -825,21 +991,28 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
return dsc.updateDaemonSetStatus(ds)
}
if err := dsc.manage(ds); err != nil {
hash, err := dsc.manage(ds)
if err != nil {
return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case extensions.OnDeleteDaemonSetStrategyType:
case extensions.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds)
err = dsc.rollingUpdate(ds, hash)
}
if err != nil {
return err
}
}
err = dsc.cleanupHistory(ds)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
return dsc.updateDaemonSetStatus(ds)
}