Łukasz Oleś
2017-02-16 11:18:16 +01:00
parent 8124705c81
commit b27308c317
20 changed files with 1091 additions and 89 deletions

View File

@@ -44,6 +44,7 @@ import (
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon/util"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@@ -255,6 +256,17 @@ func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
dsc.queue.Add(key)
}
func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
dsc.queue.AddAfter(key, after)
}
func (dsc *DaemonSetsController) getPodDaemonSet(pod *v1.Pod) *extensions.DaemonSet {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached {
@@ -342,8 +354,14 @@ func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
return
}
glog.V(4).Infof("Pod %s updated.", curPod.Name)
changedToReady := !v1.IsPodReady(oldPod) && v1.IsPodReady(curPod)
if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
dsc.enqueueDaemonSet(curDS)
// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
if changedToReady && curDS.Spec.MinReadySeconds > 0 {
dsc.enqueueDaemonSetAfter(curDS, time.Duration(curDS.Spec.MinReadySeconds)*time.Second)
}
}
// If the labels have not changed, then the daemon set responsible for
// the pod is the same as it was before. In that case we have enqueued the daemon
@@ -521,11 +539,23 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
}
}
}
errors := dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods)
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name))
}
return utilerrors.NewAggregate(errors)
}
// syncNodes deletes given pods and creates new daemon set pods on the given node
// returns slice with erros if any
func (dsc *DaemonSetsController) syncNodes(ds *extensions.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string) []error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
return []error{fmt.Errorf("couldn't get key for object %#v: %v", ds, err)}
}
createDiff := len(nodesNeedingDaemonPods)
@@ -546,10 +576,11 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
createWait.Add(createDiff)
template := util.GetPodTemplateWithGeneration(ds.Spec.Template, ds.TemplateGeneration)
for i := 0; i < createDiff; i++ {
go func(ix int) {
defer createWait.Done()
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
@@ -581,18 +612,17 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
for err := range errCh {
errors = append(errors, err)
}
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 {
errors = append(errors, fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name))
}
return utilerrors.NewAggregate(errors)
return errors
}
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady int) error {
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable int) error {
if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
int(ds.Status.NumberReady) == numberReady &&
int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
int(ds.Status.NumberAvailable) == numberAvailable &&
int(ds.Status.NumberUnavailable) == numberUnavailable &&
ds.Status.ObservedGeneration >= ds.Generation {
return nil
}
@@ -611,6 +641,9 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
toUpdate.Status.NumberReady = int32(numberReady)
toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
toUpdate.Status.NumberAvailable = int32(numberAvailable)
toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
if _, updateErr = dsClient.UpdateStatus(toUpdate); updateErr == nil {
return nil
@@ -638,7 +671,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
return fmt.Errorf("couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
}
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady int
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
for i := range nodeList {
node := nodeList[i]
wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
@@ -652,11 +685,18 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
desiredNumberScheduled++
if scheduled {
currentNumberScheduled++
// Sort the daemon pods by creation time, so the the oldest is first.
// Sort the daemon pods by creation time, so that the oldest is first.
daemonPods, _ := nodeToDaemonPods[node.Name]
sort.Sort(podByCreationTimestamp(daemonPods))
if v1.IsPodReady(daemonPods[0]) {
pod := daemonPods[0]
if v1.IsPodReady(pod) {
numberReady++
if v1.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
numberAvailable++
}
}
if util.IsPodUpdated(ds.TemplateGeneration, pod) {
updatedNumberScheduled++
}
}
} else {
@@ -665,8 +705,9 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
}
}
}
numberUnavailable := desiredNumberScheduled - numberAvailable
err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady)
err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable)
if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
}
@@ -714,6 +755,17 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
}
}
dsNeedsSync = dsc.expectations.SatisfiedExpectations(dsKey)
if dsNeedsSync && ds.DeletionTimestamp == nil {
switch ds.Spec.UpdateStrategy.Type {
case extensions.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds)
}
if err != nil {
return err
}
}
return dsc.updateDaemonSetStatus(ds)
}