|
|
@@ -38,6 +38,7 @@ import (
|
|
|
|
"k8s.io/kubernetes/pkg/controller/framework/informers"
|
|
|
|
"k8s.io/kubernetes/pkg/controller/framework/informers"
|
|
|
|
"k8s.io/kubernetes/pkg/labels"
|
|
|
|
"k8s.io/kubernetes/pkg/labels"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
|
|
|
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
|
|
|
"k8s.io/kubernetes/pkg/util/metrics"
|
|
|
|
"k8s.io/kubernetes/pkg/util/metrics"
|
|
|
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
|
|
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/util/wait"
|
|
|
|
"k8s.io/kubernetes/pkg/util/wait"
|
|
|
@@ -55,10 +56,6 @@ const (
|
|
|
|
// performance requirements for kubernetes 1.0.
|
|
|
|
// performance requirements for kubernetes 1.0.
|
|
|
|
BurstReplicas = 500
|
|
|
|
BurstReplicas = 500
|
|
|
|
|
|
|
|
|
|
|
|
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
|
|
|
|
|
|
|
|
// avoid a hot loop, we'll wait this long between checks.
|
|
|
|
|
|
|
|
PodStoreSyncedPollPeriod = 100 * time.Millisecond
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If sending a status upate to API server fails, we retry a finite number of times.
|
|
|
|
// If sending a status upate to API server fails, we retry a finite number of times.
|
|
|
|
StatusUpdateRetries = 1
|
|
|
|
StatusUpdateRetries = 1
|
|
|
|
)
|
|
|
|
)
|
|
|
@@ -99,12 +96,15 @@ type DaemonSetsController struct {
|
|
|
|
nodeController *framework.Controller
|
|
|
|
nodeController *framework.Controller
|
|
|
|
// podStoreSynced returns true if the pod store has been synced at least once.
|
|
|
|
// podStoreSynced returns true if the pod store has been synced at least once.
|
|
|
|
// Added as a member to the struct to allow injection for testing.
|
|
|
|
// Added as a member to the struct to allow injection for testing.
|
|
|
|
podStoreSynced func() bool
|
|
|
|
podStoreSynced framework.InformerSynced
|
|
|
|
|
|
|
|
// nodeStoreSynced returns true if the node store has been synced at least once.
|
|
|
|
|
|
|
|
// Added as a member to the struct to allow injection for testing.
|
|
|
|
|
|
|
|
nodeStoreSynced framework.InformerSynced
|
|
|
|
|
|
|
|
|
|
|
|
lookupCache *controller.MatchingCache
|
|
|
|
lookupCache *controller.MatchingCache
|
|
|
|
|
|
|
|
|
|
|
|
// Daemon sets that need to be synced.
|
|
|
|
// DaemonSet keys that need to be synced.
|
|
|
|
queue *workqueue.Type
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
|
|
|
|
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
|
|
|
@@ -125,7 +125,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
|
|
|
|
},
|
|
|
|
},
|
|
|
|
burstReplicas: BurstReplicas,
|
|
|
|
burstReplicas: BurstReplicas,
|
|
|
|
expectations: controller.NewControllerExpectations(),
|
|
|
|
expectations: controller.NewControllerExpectations(),
|
|
|
|
queue: workqueue.NewNamed("daemonset"),
|
|
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Manage addition/update of daemon sets.
|
|
|
|
// Manage addition/update of daemon sets.
|
|
|
|
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
|
|
|
|
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
|
|
|
@@ -199,6 +199,8 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
|
|
|
|
UpdateFunc: dsc.updateNode,
|
|
|
|
UpdateFunc: dsc.updateNode,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
dsc.nodeStoreSynced = dsc.nodeController.HasSynced
|
|
|
|
|
|
|
|
|
|
|
|
dsc.syncHandler = dsc.syncDaemonSet
|
|
|
|
dsc.syncHandler = dsc.syncDaemonSet
|
|
|
|
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
|
|
|
|
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
|
|
|
|
return dsc
|
|
|
|
return dsc
|
|
|
@@ -233,10 +235,17 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
|
|
|
|
// Run begins watching and syncing daemon sets.
|
|
|
|
// Run begins watching and syncing daemon sets.
|
|
|
|
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
|
|
|
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
|
|
|
|
defer dsc.queue.ShutDown()
|
|
|
|
|
|
|
|
|
|
|
|
glog.Infof("Starting Daemon Sets controller manager")
|
|
|
|
glog.Infof("Starting Daemon Sets controller manager")
|
|
|
|
go dsc.dsController.Run(stopCh)
|
|
|
|
go dsc.dsController.Run(stopCh)
|
|
|
|
go dsc.podController.Run(stopCh)
|
|
|
|
go dsc.podController.Run(stopCh)
|
|
|
|
go dsc.nodeController.Run(stopCh)
|
|
|
|
go dsc.nodeController.Run(stopCh)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if !framework.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for i := 0; i < workers; i++ {
|
|
|
|
for i := 0; i < workers; i++ {
|
|
|
|
go wait.Until(dsc.runWorker, time.Second, stopCh)
|
|
|
|
go wait.Until(dsc.runWorker, time.Second, stopCh)
|
|
|
|
}
|
|
|
|
}
|
|
|
@@ -247,23 +256,33 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
|
|
|
|
|
|
|
|
|
|
|
<-stopCh
|
|
|
|
<-stopCh
|
|
|
|
glog.Infof("Shutting down Daemon Set Controller")
|
|
|
|
glog.Infof("Shutting down Daemon Set Controller")
|
|
|
|
dsc.queue.ShutDown()
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) runWorker() {
|
|
|
|
func (dsc *DaemonSetsController) runWorker() {
|
|
|
|
for {
|
|
|
|
for dsc.processNextWorkItem() {
|
|
|
|
dsKey, quit := dsc.queue.Get()
|
|
|
|
|
|
|
|
if quit {
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
err := dsc.syncHandler(dsKey.(string))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
dsc.queue.Done(dsKey)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) processNextWorkItem() bool {
|
|
|
|
|
|
|
|
dsKey, quit := dsc.queue.Get()
|
|
|
|
|
|
|
|
if quit {
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
defer dsc.queue.Done(dsKey)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
err := dsc.syncHandler(dsKey.(string))
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
|
|
|
dsc.queue.Forget(dsKey)
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
|
|
|
|
|
|
|
|
dsc.queue.AddRateLimited(dsKey)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
|
|
|
|
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
|
|
|
|
key, err := controller.KeyFunc(ds)
|
|
|
|
key, err := controller.KeyFunc(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
@@ -467,18 +486,18 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
|
|
|
|
return nodeToDaemonPods, nil
|
|
|
|
return nodeToDaemonPods, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
|
|
|
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
|
|
|
|
// Find out which nodes are running the daemon pods selected by ds.
|
|
|
|
// Find out which nodes are running the daemon pods selected by ds.
|
|
|
|
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
|
|
|
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
|
|
|
|
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
|
|
|
|
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
|
|
|
|
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
|
|
|
|
nodeList, err := dsc.nodeStore.List()
|
|
|
|
nodeList, err := dsc.nodeStore.List()
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var nodesNeedingDaemonPods, podsToDelete []string
|
|
|
|
var nodesNeedingDaemonPods, podsToDelete []string
|
|
|
|
for _, node := range nodeList.Items {
|
|
|
|
for _, node := range nodeList.Items {
|
|
|
@@ -508,8 +527,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
|
|
|
// We need to set expectations before creating/deleting pods to avoid race conditions.
|
|
|
|
// We need to set expectations before creating/deleting pods to avoid race conditions.
|
|
|
|
dsKey, err := controller.KeyFunc(ds)
|
|
|
|
dsKey, err := controller.KeyFunc(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
createDiff := len(nodesNeedingDaemonPods)
|
|
|
|
createDiff := len(nodesNeedingDaemonPods)
|
|
|
@@ -524,6 +542,9 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
|
|
|
|
|
|
|
|
|
|
|
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
|
|
|
|
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
|
|
|
|
|
|
|
|
errCh := make(chan error, createDiff+deleteDiff)
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
|
|
|
|
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
|
|
|
|
createWait := sync.WaitGroup{}
|
|
|
|
createWait := sync.WaitGroup{}
|
|
|
|
createWait.Add(createDiff)
|
|
|
|
createWait.Add(createDiff)
|
|
|
@@ -533,6 +554,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
|
|
|
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
|
|
|
|
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
|
|
|
|
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
|
|
|
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
|
|
|
dsc.expectations.CreationObserved(dsKey)
|
|
|
|
dsc.expectations.CreationObserved(dsKey)
|
|
|
|
|
|
|
|
errCh <- err
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(i)
|
|
|
|
}(i)
|
|
|
@@ -548,11 +570,20 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
|
|
|
|
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
|
|
|
|
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
|
|
|
|
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
|
|
|
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
|
|
|
dsc.expectations.DeletionObserved(dsKey)
|
|
|
|
dsc.expectations.DeletionObserved(dsKey)
|
|
|
|
|
|
|
|
errCh <- err
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(i)
|
|
|
|
}(i)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
deleteWait.Wait()
|
|
|
|
deleteWait.Wait()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// collect errors if any for proper reporting/retry logic in the controller
|
|
|
|
|
|
|
|
errors := []error{}
|
|
|
|
|
|
|
|
close(errCh)
|
|
|
|
|
|
|
|
for err := range errCh {
|
|
|
|
|
|
|
|
errors = append(errors, err)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return utilerrors.NewAggregate(errors)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
|
|
|
|
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
|
|
|
@@ -582,18 +613,16 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
|
|
|
|
return updateErr
|
|
|
|
return updateErr
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) {
|
|
|
|
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error {
|
|
|
|
glog.V(4).Infof("Updating daemon set status")
|
|
|
|
glog.V(4).Infof("Updating daemon set status")
|
|
|
|
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
|
|
|
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
nodeList, err := dsc.nodeStore.List()
|
|
|
|
nodeList, err := dsc.nodeStore.List()
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
|
|
|
|
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
|
|
|
@@ -616,8 +645,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
|
|
|
|
|
|
|
|
|
|
|
|
err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
|
|
|
|
err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error storing status for daemon set %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
|
|
|
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
|
|
@@ -626,19 +657,9 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
|
|
|
glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
|
|
|
|
glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
|
|
|
|
}()
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
|
|
if !dsc.podStoreSynced() {
|
|
|
|
|
|
|
|
// Sleep so we give the pod reflector goroutine a chance to run.
|
|
|
|
|
|
|
|
time.Sleep(PodStoreSyncedPollPeriod)
|
|
|
|
|
|
|
|
glog.Infof("Waiting for pods controller to sync, requeuing ds %v", key)
|
|
|
|
|
|
|
|
dsc.queue.Add(key)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
obj, exists, err := dsc.dsStore.Store.GetByKey(key)
|
|
|
|
obj, exists, err := dsc.dsStore.Store.GetByKey(key)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Infof("Unable to retrieve ds %v from store: %v", key, err)
|
|
|
|
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
|
|
|
|
dsc.queue.Add(key)
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !exists {
|
|
|
|
if !exists {
|
|
|
|
glog.V(3).Infof("daemon set has been deleted %v", key)
|
|
|
|
glog.V(3).Infof("daemon set has been deleted %v", key)
|
|
|
@@ -658,16 +679,16 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
|
|
|
// then we do not want to call manage on foo until the daemon pods have been created.
|
|
|
|
// then we do not want to call manage on foo until the daemon pods have been created.
|
|
|
|
dsKey, err := controller.KeyFunc(ds)
|
|
|
|
dsKey, err := controller.KeyFunc(ds)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
|
|
|
|
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
|
|
|
|
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
|
|
|
|
if dsNeedsSync && ds.DeletionTimestamp == nil {
|
|
|
|
if dsNeedsSync && ds.DeletionTimestamp == nil {
|
|
|
|
dsc.manage(ds)
|
|
|
|
if err := dsc.manage(ds); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
dsc.updateDaemonSetStatus(ds)
|
|
|
|
return dsc.updateDaemonSetStatus(ds)
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
|
|
|
|
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
|
|
|
|