diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2e66881f9c8..869d6b03875 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/node" @@ -63,6 +64,7 @@ type CMServer struct { CloudConfigFile string ConcurrentEndpointSyncs int ConcurrentRCSyncs int + ConcurrentDSCSyncs int ServiceSyncPeriod time.Duration NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -98,6 +100,7 @@ func NewCMServer() *CMServer { Address: net.ParseIP("127.0.0.1"), ConcurrentEndpointSyncs: 5, ConcurrentRCSyncs: 5, + ConcurrentDSCSyncs: 2, ServiceSyncPeriod: 5 * time.Minute, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, @@ -213,6 +216,9 @@ func (s *CMServer) Run(_ []string) error { controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go daemon.NewDaemonSetsController(kubeClient). + Run(s.ConcurrentDSCSyncs, util.NeverStop) + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { glog.Fatalf("Cloud provider could not be initialized: %v", err) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index ad44a1c0e2b..70d653ba088 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -47,6 +47,7 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/pflag" + "k8s.io/kubernetes/pkg/controller/daemon" ) // CMServer is the main context object for the controller manager. @@ -113,6 +114,9 @@ func (s *CMServer) Run(_ []string) error { controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go daemon.NewDaemonSetsController(kubeClient). + Run(s.ConcurrentDSCSyncs, util.NeverStop) + //TODO(jdef) should eventually support more cloud providers here if s.CloudProvider != mesos.ProviderName { glog.Fatalf("Only provider %v is supported, you specified %v", mesos.ProviderName, s.CloudProvider) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 128ca80fd29..d49d720981b 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -20,17 +20,19 @@ import ( "fmt" "time" + "sync/atomic" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/validation" + "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "sync/atomic" ) const ( @@ -213,6 +215,8 @@ func NewControllerExpectations() *ControllerExpectations { type PodControlInterface interface { // CreateReplica creates new replicated pods according to the spec. CreateReplica(namespace string, controller *api.ReplicationController) error + // CreateReplicaOnNode creates a new pod according to the spec on the specified node. + CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string) error } @@ -290,6 +294,40 @@ func (r RealPodControl) CreateReplica(namespace string, controller *api.Replicat return nil } +func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { + desiredLabels := getReplicaLabelSet(ds.Spec.Template) + desiredAnnotations, err := getReplicaAnnotationSet(ds.Spec.Template, ds) + if err != nil { + return err + } + prefix := getReplicaPrefix(ds.Name) + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Labels: desiredLabels, + Annotations: desiredAnnotations, + GenerateName: prefix, + }, + } + if err := api.Scheme.Convert(&ds.Spec.Template.Spec, &pod.Spec); err != nil { + return fmt.Errorf("unable to convert pod template: %v", err) + } + // if a pod does not have labels then it cannot be controlled by any controller + if labels.Set(pod.Labels).AsSelector().Empty() { + return fmt.Errorf("unable to create pod replica, no labels") + } + pod.Spec.NodeName = nodeName + if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil { + r.Recorder.Eventf(ds, "failedCreate", "Error creating: %v", err) + return fmt.Errorf("unable to create pod replica: %v", err) + } else { + glog.V(4).Infof("Controller %v created pod %v", ds.Name, newPod.Name) + r.Recorder.Eventf(ds, "successfulCreate", "Created pod: %v", newPod.Name) + } + + return nil +} + func (r RealPodControl) DeletePod(namespace, podID string) error { return r.KubeClient.Pods(namespace).Delete(podID, nil) } diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go new file mode 100644 index 00000000000..e6db35b14d1 --- /dev/null +++ b/pkg/controller/daemon/controller.go @@ -0,0 +1,497 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemon + +import ( + "reflect" + "sort" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/workqueue" + "k8s.io/kubernetes/pkg/watch" +) + +const ( + // Daemon sets will periodically check that their daemon pods are running as expected. + FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. + // Nodes don't need relisting. + FullNodeResyncPeriod = 0 + // Daemon pods don't need relisting. + FullDaemonPodResyncPeriod = 0 + // If sending a status upate to API server fails, we retry a finite number of times. + StatusUpdateRetries = 1 +) + +// DaemonSetsController is responsible for synchronizing DaemonSet objects stored +// in the system with actual running pods. +type DaemonSetsController struct { + kubeClient client.Interface + podControl controller.PodControlInterface + + // To allow injection of syncDaemonSet for testing. + syncHandler func(dsKey string) error + // A TTLCache of pod creates/deletes each ds expects to see + expectations controller.ControllerExpectationsInterface + // A store of daemon sets + dsStore cache.StoreToDaemonSetLister + // A store of pods + podStore cache.StoreToPodLister + // A store of nodes + nodeStore cache.StoreToNodeLister + // Watches changes to all daemon sets. + dsController *framework.Controller + // Watches changes to all pods + podController *framework.Controller + // Watches changes to all nodes. + nodeController *framework.Controller + // Daemon sets that need to be synced. + queue *workqueue.Type +} + +func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + + dsc := &DaemonSetsController{ + kubeClient: kubeClient, + podControl: controller.RealPodControl{ + KubeClient: kubeClient, + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}), + }, + expectations: controller.NewControllerExpectations(), + queue: workqueue.New(), + } + // Manage addition/update of daemon sets. + dsc.dsStore.Store, dsc.dsController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &experimental.DaemonSet{}, + FullDaemonSetResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ds := obj.(*experimental.DaemonSet) + glog.V(4).Infof("Adding daemon set %s", ds.Name) + dsc.enqueueDaemonSet(obj) + }, + UpdateFunc: func(old, cur interface{}) { + oldDS := old.(*experimental.DaemonSet) + glog.V(4).Infof("Updating daemon set %s", oldDS.Name) + dsc.enqueueDaemonSet(cur) + }, + DeleteFunc: func(obj interface{}) { + ds := obj.(*experimental.DaemonSet) + glog.V(4).Infof("Deleting daemon set %s", ds.Name) + dsc.enqueueDaemonSet(obj) + }, + }, + ) + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete + // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. + dsc.podStore.Store, dsc.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + FullDaemonPodResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addPod, + UpdateFunc: dsc.updatePod, + DeleteFunc: dsc.deletePod, + }, + ) + // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, + dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return dsc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Node{}, + FullNodeResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addNode, + UpdateFunc: dsc.updateNode, + }, + ) + dsc.syncHandler = dsc.syncDaemonSet + return dsc +} + +// Run begins watching and syncing daemon sets. +func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { + go dsc.dsController.Run(stopCh) + go dsc.podController.Run(stopCh) + go dsc.nodeController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(dsc.worker, time.Second, stopCh) + } + <-stopCh + glog.Infof("Shutting down Daemon Set Controller") + dsc.queue.ShutDown() +} + +func (dsc *DaemonSetsController) worker() { + for { + func() { + dsKey, quit := dsc.queue.Get() + if quit { + return + } + defer dsc.queue.Done(dsKey) + err := dsc.syncHandler(dsKey.(string)) + if err != nil { + glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err) + } + }() + } +} + +func (dsc *DaemonSetsController) enqueueAllDaemonSets() { + glog.V(4).Infof("Enqueueing all daemon sets") + ds, err := dsc.dsStore.List() + if err != nil { + glog.Errorf("Error enqueueing daemon sets: %v", err) + return + } + for i := range ds { + dsc.enqueueDaemonSet(&ds[i]) + } +} + +func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + dsc.queue.Add(key) +} + +func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *experimental.DaemonSet { + sets, err := dsc.dsStore.GetPodDaemonSets(pod) + if err != nil { + glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name) + return nil + } + // More than two items in this list indicates user error. If two daemon + // sets overlap, sort by creation timestamp, subsort by name, then pick + // the first. + glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels) + sort.Sort(byCreationTimestamp(sets)) + return &sets[0] +} + +func (dsc *DaemonSetsController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + glog.V(4).Infof("Pod %s added.", pod.Name) + if ds := dsc.getPodDaemonSet(pod); ds != nil { + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.CreationObserved(dsKey) + dsc.enqueueDaemonSet(ds) + } +} + +// When a pod is updated, figure out what sets manage it and wake them +// up. If the labels of the pod have changed we need to awaken both the old +// and new set. old and cur must be *api.Pod types. +func (dsc *DaemonSetsController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known pods. + return + } + curPod := cur.(*api.Pod) + glog.V(4).Infof("Pod %s updated.", curPod.Name) + if curDS := dsc.getPodDaemonSet(curPod); curDS != nil { + dsc.enqueueDaemonSet(curDS) + } + oldPod := old.(*api.Pod) + // If the labels have not changed, then the daemon set responsible for + // the pod is the same as it was before. In that case we have enqueued the daemon + // set above, and do not have to enqueue the set again. + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + // It's ok if both oldDS and curDS are the same, because curDS will set + // the expectations on its run so oldDS will have no effect. + if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil { + dsc.enqueueDaemonSet(oldDS) + } + } +} + +func (dsc *DaemonSetsController) deletePod(obj interface{}) { + pod, ok := obj.(*api.Pod) + glog.V(4).Infof("Pod %s deleted.", pod.Name) + // When a delete is dropped, the relist will notice a pod in the store not + // in the list, leading to the insertion of a tombstone object which contains + // the deleted key/value. Note that this value might be stale. If the pod + // changed labels the new rc will not be woken up till the periodic resync. + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %+v", obj) + return + } + pod, ok = tombstone.Obj.(*api.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a pod %+v", obj) + return + } + } + if ds := dsc.getPodDaemonSet(pod); ds != nil { + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.DeletionObserved(dsKey) + dsc.enqueueDaemonSet(ds) + } +} + +func (dsc *DaemonSetsController) addNode(obj interface{}) { + // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). + dsc.enqueueAllDaemonSets() +} + +func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { + oldNode := old.(*api.Node) + curNode := cur.(*api.Node) + if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) { + // A periodic relist will send update events for all known pods. + return + } + // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). + dsc.enqueueAllDaemonSets() +} + +// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes. +func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *experimental.DaemonSet) (map[string][]*api.Pod, error) { + nodeToDaemonPods := make(map[string][]*api.Pod) + daemonPods, err := dsc.podStore.Pods(ds.Namespace).List(labels.Set(ds.Spec.Selector).AsSelector()) + if err != nil { + return nodeToDaemonPods, err + } + for i := range daemonPods.Items { + nodeName := daemonPods.Items[i].Spec.NodeName + nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i]) + } + return nodeToDaemonPods, nil +} + +func (dsc *DaemonSetsController) manage(ds *experimental.DaemonSet) { + // Find out which nodes are running the daemon pods selected by ds. + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err) + } + + // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon + // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. + nodeList, err := dsc.nodeStore.List() + if err != nil { + glog.Errorf("Couldn't get list of nodes when syncing daemon set %+v: %v", ds, err) + } + var nodesNeedingDaemonPods, podsToDelete []string + for i := range nodeList.Items { + // Check if the node satisfies the daemon set's node selector. + nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() + shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels)) + // If the daemon set specifies a node name, check that it matches with nodeName. + nodeName := nodeList.Items[i].Name + shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == nodeName) + daemonPods, isRunning := nodeToDaemonPods[nodeName] + if shouldRun && !isRunning { + // If daemon pod is supposed to be running on node, but isn't, create daemon pod. + nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodeName) + } else if shouldRun && len(daemonPods) > 1 { + // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. + // TODO: sort the daemon pods by creation time, so the the oldest is preserved. + for i := 1; i < len(daemonPods); i++ { + podsToDelete = append(podsToDelete, daemonPods[i].Name) + } + } else if !shouldRun && isRunning { + // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. + for i := range daemonPods { + podsToDelete = append(podsToDelete, daemonPods[i].Name) + } + } + } + + // We need to set expectations before creating/deleting pods to avoid race conditions. + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return + } + dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete)) + + glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods) + for i := range nodesNeedingDaemonPods { + if err := dsc.podControl.CreateReplicaOnNode(ds.Namespace, ds, nodesNeedingDaemonPods[i]); err != nil { + glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.CreationObserved(dsKey) + util.HandleError(err) + } + } + + glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete) + for i := range podsToDelete { + if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i]); err != nil { + glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) + dsc.expectations.DeletionObserved(dsKey) + util.HandleError(err) + } + } +} + +func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *experimental.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error { + if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled { + return nil + } + + var updateErr, getErr error + for i := 0; i <= StatusUpdateRetries; i++ { + ds.Status.DesiredNumberScheduled = desiredNumberScheduled + ds.Status.CurrentNumberScheduled = currentNumberScheduled + ds.Status.NumberMisscheduled = numberMisscheduled + _, updateErr = dsClient.Update(ds) + if updateErr == nil { + // successful update + return nil + } + // Update the set with the latest resource version for the next poll + if ds, getErr = dsClient.Get(ds.Name); getErr != nil { + // If the GET fails we can't trust status.Replicas anymore. This error + // is bound to be more interesting than the update failure. + return getErr + } + } + return updateErr +} + +func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *experimental.DaemonSet) { + glog.Infof("Updating daemon set status") + nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + if err != nil { + glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err) + } + + nodeList, err := dsc.nodeStore.List() + if err != nil { + glog.Errorf("Couldn't get list of nodes when updating daemon set %+v: %v", ds, err) + } + + var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int + for _, node := range nodeList.Items { + nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector() + shouldRun := nodeSelector.Matches(labels.Set(node.Labels)) + numDaemonPods := len(nodeToDaemonPods[node.Name]) + + if numDaemonPods > 0 { + currentNumberScheduled++ + } + + if shouldRun { + desiredNumberScheduled++ + } else if numDaemonPods >= 0 { + numberMisscheduled++ + } + } + + err = storeDaemonSetStatus(dsc.kubeClient.Experimental().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled) + if err != nil { + glog.Errorf("Error storing status for daemon set %+v: %v", ds, err) + } +} + +func (dsc *DaemonSetsController) syncDaemonSet(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := dsc.dsStore.Store.GetByKey(key) + if err != nil { + glog.Infof("Unable to retrieve ds %v from store: %v", key, err) + dsc.queue.Add(key) + return err + } + if !exists { + glog.V(3).Infof("daemon set has been deleted %v", key) + dsc.expectations.DeleteExpectations(key) + return nil + } + ds := obj.(*experimental.DaemonSet) + + // Don't process a daemon set until all its creations and deletions have been processed. + // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, + // then we do not want to call manage on foo until the daemon pods have been created. + dsKey, err := controller.KeyFunc(ds) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", ds, err) + return err + } + dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) + if dsNeedsSync { + dsc.manage(ds) + } + + dsc.updateDaemonSetStatus(ds) + return nil +} + +// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. +type byCreationTimestamp []experimental.DaemonSet + +func (o byCreationTimestamp) Len() int { return len(o) } +func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go new file mode 100644 index 00000000000..1ab55b2b72c --- /dev/null +++ b/pkg/controller/daemon/controller_test.go @@ -0,0 +1,321 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemon + +import ( + "fmt" + "sync" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" + "k8s.io/kubernetes/pkg/client/cache" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/securitycontext" +) + +var ( + simpleDaemonSetLabel = map[string]string{"name": "simple-daemon", "type": "production"} + simpleDaemonSetLabel2 = map[string]string{"name": "simple-daemon", "type": "test"} + simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"} + simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"} +) + +type FakePodControl struct { + daemonSet []experimental.DaemonSet + deletePodName []string + lock sync.Mutex + err error +} + +func init() { + api.ForTesting_ReferencesAllowBlankSelfLinks = true +} + +func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error { + return nil +} + +func (f *FakePodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.daemonSet = append(f.daemonSet, *ds) + return nil +} + +func (f *FakePodControl) DeletePod(namespace string, podName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.deletePodName = append(f.deletePodName, podName) + return nil +} +func (f *FakePodControl) clear() { + f.lock.Lock() + defer f.lock.Unlock() + f.deletePodName = []string{} + f.daemonSet = []experimental.DaemonSet{} +} + +func newDaemonSet(name string) *experimental.DaemonSet { + return &experimental.DaemonSet{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Experimental.Version()}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: api.NamespaceDefault, + }, + Spec: experimental.DaemonSetSpec{ + Selector: simpleDaemonSetLabel, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: simpleDaemonSetLabel, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + DNSPolicy: api.DNSDefault, + }, + }, + }, + } +} + +func newNode(name string, label map[string]string) *api.Node { + return &api.Node{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()}, + ObjectMeta: api.ObjectMeta{ + Name: name, + Labels: label, + Namespace: api.NamespaceDefault, + }, + } +} + +func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) { + for i := startIndex; i < startIndex+numNodes; i++ { + nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label)) + } +} + +func newPod(podName string, nodeName string, label map[string]string) *api.Pod { + pod := &api.Pod{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()}, + ObjectMeta: api.ObjectMeta{ + GenerateName: podName, + Labels: label, + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + NodeName: nodeName, + Containers: []api.Container{ + { + Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), + }, + }, + DNSPolicy: api.DNSDefault, + }, + } + api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta) + return pod +} + +func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) { + for i := 0; i < number; i++ { + podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label)) + } +} + +func newTestController() (*DaemonSetsController, *FakePodControl) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) + manager := NewDaemonSetsController(client) + podControl := &FakePodControl{} + manager.podControl = podControl + return manager, podControl +} + +func validateSyncDaemonSets(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) { + if len(fakePodControl.daemonSet) != expectedCreates { + t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSet)) + } + if len(fakePodControl.deletePodName) != expectedDeletes { + t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName)) + } +} + +func syncAndValidateDaemonSets(t *testing.T, manager *DaemonSetsController, ds *experimental.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) { + key, err := controller.KeyFunc(ds) + if err != nil { + t.Errorf("Could not get key for daemon.") + } + manager.syncHandler(key) + validateSyncDaemonSets(t, podControl, expectedCreates, expectedDeletes) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestSimpleDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestNoNodesDoesNothing(t *testing.T) { + manager, podControl := newTestController() + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSets without node selectors should launch pods on every node. +func TestOneNodeDaemonLaunchesPod(t *testing.T) { + manager, podControl := newTestController() + manager.nodeStore.Add(newNode("only-node", nil)) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. +func TestDealsWithExistingPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2) + addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5) + addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2) + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5) +} + +// Daemon with node selector should launch pods on nodes matching selector. +func TestSelectorDaemonLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + daemon := newDaemonSet("foo") + daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(daemon) + syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0) +} + +// Daemon with node selector should delete pods from nodes that do not satisfy selector. +func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) + addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1) + addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1) + daemon := newDaemonSet("foo") + daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(daemon) + syncAndValidateDaemonSets(t, manager, daemon, podControl, 5, 4) +} + +// DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes. +func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel) + addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3) + addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2) + addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4) + addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13) + addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4) + addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1) + addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 3, 20) +} + +// DaemonSet with node selector which does not match any node labels should not launch pods. +func TestBadSelectorDaemonDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel2 + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSet with node name should launch pod on node with corresponding name. +func TestNameDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeName = "node-0" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSet with node name that does not exist should not launch pods. +func TestBadNameDaemonSetDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 5, nil) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeName = "node-10" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} + +// DaemonSet with node selector, and node name, matching a node, should launch a pod on the node. +func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + ds.Spec.Template.Spec.NodeName = "node-6" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +} + +// DaemonSet with node selector that matches some nodes, and node name that matches a different node, should do nothing. +func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { + manager, podControl := newTestController() + addNodes(manager.nodeStore.Store, 0, 4, nil) + addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel) + ds := newDaemonSet("foo") + ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel + ds.Spec.Template.Spec.NodeName = "node-0" + manager.dsStore.Add(ds) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) +} diff --git a/pkg/controller/daemon/doc.go b/pkg/controller/daemon/doc.go new file mode 100644 index 00000000000..db689ac1bb6 --- /dev/null +++ b/pkg/controller/daemon/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package daemon contains logic for watching and synchronizing +// daemons. +package daemon diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index fae13b3e011..404099a0271 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" @@ -69,6 +70,10 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo return nil } +func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *experimental.DaemonSet, nodeName string) error { + return nil +} + func (f *FakePodControl) DeletePod(namespace string, podName string) error { f.lock.Lock() defer f.lock.Unlock() diff --git a/test/e2e/daemon_set.go b/test/e2e/daemon_set.go new file mode 100644 index 00000000000..05f412768ce --- /dev/null +++ b/test/e2e/daemon_set.go @@ -0,0 +1,221 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/wait" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Daemon set", func() { + f := &Framework{BaseName: "daemonsets"} + + BeforeEach(func() { + f.beforeEach() + err := clearNodeLabels(f.Client) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + err := clearNodeLabels(f.Client) + Expect(err).NotTo(HaveOccurred()) + f.afterEach() + }) + + It("should launch a daemon pod on every node of the cluster", func() { + testDaemonSets(f) + }) +}) + +func clearNodeLabels(c *client.Client) error { + nodeClient := c.Nodes() + nodeList, err := nodeClient.List(labels.Everything(), fields.Everything()) + if err != nil { + return err + } + for _, node := range nodeList.Items { + if len(node.Labels) != 0 { + node.Labels = map[string]string{} + newNode, err := nodeClient.Update(&node) + if err != nil { + return err + } else if len(newNode.Labels) != 0 { + return fmt.Errorf("Could not make node labels nil.") + } + } + } + return nil +} + +func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames []string) func() (bool, error) { + return func() (bool, error) { + podList, err := f.Client.Pods(f.Namespace.Name).List(labels.Set(selector).AsSelector(), fields.Everything()) + if err != nil { + return false, nil + } + pods := podList.Items + + nodesToPodCount := make(map[string]int) + for _, pod := range pods { + nodesToPodCount[pod.Spec.NodeName] += 1 + } + + // Ensure that exactly 1 pod is running on all nodes in nodeNames. + for _, nodeName := range nodeNames { + if nodesToPodCount[nodeName] != 1 { + return false, nil + } + } + + // Ensure that sizes of the lists are the same. We've verified that every element of nodeNames is in + // nodesToPodCount, so verifying the lengths are equal ensures that there aren't pods running on any + // other nodes. + return len(nodesToPodCount) == len(nodeNames), nil + } +} + +func checkRunningOnAllNodes(f *Framework, selector map[string]string) func() (bool, error) { + return func() (bool, error) { + nodeList, err := f.Client.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return false, nil + } + nodeNames := make([]string, 0) + for _, node := range nodeList.Items { + nodeNames = append(nodeNames, node.Name) + } + return checkDaemonPodOnNodes(f, selector, nodeNames)() + } +} + +func checkRunningOnNoNodes(f *Framework, selector map[string]string) func() (bool, error) { + return checkDaemonPodOnNodes(f, selector, make([]string, 0)) +} + +func testDaemonSets(f *Framework) { + ns := f.Namespace.Name + c := f.Client + simpleDSName := "simple-daemon-set" + image := "gcr.io/google_containers/serve_hostname:1.1" + label := map[string]string{"name": simpleDSName} + retryTimeout := 1 * time.Minute + retryInterval := 5 * time.Second + + Logf("Creating simple daemon set %s", simpleDSName) + _, err := c.DaemonSets(ns).Create(&experimental.DaemonSet{ + ObjectMeta: api.ObjectMeta{ + Name: simpleDSName, + }, + Spec: experimental.DaemonSetSpec{ + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: label, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: simpleDSName, + Image: image, + Ports: []api.ContainerPort{{ContainerPort: 9376}}, + }, + }, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Check that daemon pods launch on every node of the cluster.") + Expect(err).NotTo(HaveOccurred()) + err = wait.Poll(retryInterval, retryTimeout, checkRunningOnAllNodes(f, label)) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to start") + + By("Stop a daemon pod, check that the daemon pod is revived.") + podClient := c.Pods(ns) + + podList, err := podClient.List(labels.Set(label).AsSelector(), fields.Everything()) + Expect(err).NotTo(HaveOccurred()) + Expect(len(podList.Items)).To(BeNumerically(">", 0)) + pod := podList.Items[0] + err = podClient.Delete(pod.Name, nil) + Expect(err).NotTo(HaveOccurred()) + err = wait.Poll(retryInterval, retryTimeout, checkRunningOnAllNodes(f, label)) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive") + + complexDSName := "complex-daemon-set" + complexLabel := map[string]string{"name": complexDSName} + nodeSelector := map[string]string{"color": "blue"} + Logf("Creating daemon with a node selector %s", complexDSName) + _, err = c.DaemonSets(ns).Create(&experimental.DaemonSet{ + ObjectMeta: api.ObjectMeta{ + Name: complexDSName, + }, + Spec: experimental.DaemonSetSpec{ + Selector: complexLabel, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: complexLabel, + }, + Spec: api.PodSpec{ + NodeSelector: nodeSelector, + Containers: []api.Container{ + { + Name: complexDSName, + Image: image, + Ports: []api.ContainerPort{{ContainerPort: 9376}}, + }, + }, + }, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Initially, daemon pods should not be running on any nodes.") + err = wait.Poll(retryInterval, retryTimeout, checkRunningOnNoNodes(f, complexLabel)) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on no nodes") + + By("Change label of node, check that daemon pod is launched.") + nodeClient := c.Nodes() + nodeList, err := nodeClient.List(labels.Everything(), fields.Everything()) + Expect(len(nodeList.Items)).To(BeNumerically(">", 0)) + nodeList.Items[0].Labels = nodeSelector + newNode, err := nodeClient.Update(&nodeList.Items[0]) + Expect(err).NotTo(HaveOccurred()) + Expect(len(newNode.Labels)).To(Equal(1)) + err = wait.Poll(retryInterval, retryTimeout, checkDaemonPodOnNodes(f, complexLabel, []string{newNode.Name})) + Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on new nodes") + + By("remove the node selector and wait for") + newNode, err = nodeClient.Get(newNode.Name) + Expect(err).NotTo(HaveOccurred(), "error getting node") + newNode.Labels = map[string]string{} + newNode, err = nodeClient.Update(newNode) + Expect(err).NotTo(HaveOccurred()) + Expect(wait.Poll(retryInterval, retryTimeout, checkRunningOnNoNodes(f, complexLabel))). + NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes") +}