diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 0c5916787d3..a1537a210b4 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -515,7 +515,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { UpdateFunc: ks.updateService, }, ) - go serviceController.Run(util.NeverStop) + go serviceController.Run(wait.NeverStop) return serviceStore } @@ -533,7 +533,7 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { }, ) - go eController.Run(util.NeverStop) + go eController.Run(wait.NeverStop) return eStore } @@ -551,7 +551,7 @@ func watchPods(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { }, ) - go eController.Run(util.NeverStop) + go eController.Run(wait.NeverStop) return eStore } diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index d37f4bd58a7..9e19d5d8f1b 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -191,11 +191,11 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string // ensure the service endpoints are sync'd several times within the window that the integration tests wait go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc). - Run(3, util.NeverStop) + Run(3, wait.NeverStop) // TODO: Write an integration test for the replication controllers watch. go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas). - Run(3, util.NeverStop) + Run(3, wait.NeverStop) nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 58024abf1a3..16afe173aae 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -175,17 +175,17 @@ func Run(s *options.CMServer) error { func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *client.Config, stop <-chan struct{}) error { go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)). - Run(s.ConcurrentEndpointSyncs, util.NeverStop) + Run(s.ConcurrentEndpointSyncs, wait.NeverStop) go replicationcontroller.NewReplicationManager( clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")), ResyncPeriod(s), replicationcontroller.BurstReplicas, - ).Run(s.ConcurrentRCSyncs, util.NeverStop) + ).Run(s.ConcurrentRCSyncs, wait.NeverStop) if s.TerminatedPodGCThreshold > 0 { go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), ResyncPeriod(s), s.TerminatedPodGCThreshold). - Run(util.NeverStop) + Run(wait.NeverStop) } cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) @@ -219,7 +219,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig go resourcequotacontroller.NewResourceQuotaController( clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resourcequota-controller")), - controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) + controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop) // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. @@ -265,25 +265,25 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s)). - Run(s.ConcurrentDSCSyncs, util.NeverStop) + Run(s.ConcurrentDSCSyncs, wait.NeverStop) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)). - Run(s.ConcurrentJobSyncs, util.NeverStop) + Run(s.ConcurrentJobSyncs, wait.NeverStop) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)). - Run(s.ConcurrentDeploymentSyncs, util.NeverStop) + Run(s.ConcurrentDeploymentSyncs, wait.NeverStop) } if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas). - Run(s.ConcurrentRSSyncs, util.NeverStop) + Run(s.ConcurrentRSSyncs, wait.NeverStop) } } diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 0923876c57c..f43d2166063 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -45,6 +45,7 @@ import ( utilnet "k8s.io/kubernetes/pkg/util/net" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" "github.com/spf13/cobra" @@ -274,12 +275,12 @@ func (s *ProxyServer) Run() error { http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) - go util.Until(func() { + go wait.Until(func() { err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(s.Config.HealthzPort), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } - }, 5*time.Second, util.NeverStop) + }, 5*time.Second, wait.NeverStop) } // Tune conntrack, if requested diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 09eea24a6e6..8dfbc5601bf 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -63,6 +63,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/volume" ) @@ -322,12 +323,12 @@ func Run(s *options.KubeletServer, kcfg *KubeletConfig) error { if s.HealthzPort > 0 { healthz.DefaultHealthz() - go util.Until(func() { + go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(s.HealthzPort)), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } - }, 5*time.Second, util.NeverStop) + }, 5*time.Second, wait.NeverStop) } if s.RunOnce { @@ -611,18 +612,18 @@ func RunKubelet(kcfg *KubeletConfig) error { func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) { // start the kubelet - go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop) + go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // start the kubelet server if kc.EnableServer { - go util.Until(func() { + go wait.Until(func() { k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers) - }, 0, util.NeverStop) + }, 0, wait.NeverStop) } if kc.ReadOnlyPort > 0 { - go util.Until(func() { + go wait.Until(func() { k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort) - }, 0, util.NeverStop) + }, 0, wait.NeverStop) } } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 60cbb6c705a..eb573016c3d 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -128,14 +128,14 @@ func (s *CMServer) Run(_ []string) error { }() endpoints := s.createEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller"))) - go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) + go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop) go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas). - Run(s.ConcurrentRCSyncs, util.NeverStop) + Run(s.ConcurrentRCSyncs, wait.NeverStop) if s.TerminatedPodGCThreshold > 0 { go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), s.resyncPeriod, s.TerminatedPodGCThreshold). - Run(util.NeverStop) + Run(wait.NeverStop) } //TODO(jdef) should eventually support more cloud providers here @@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error { nodeController.Run(s.NodeSyncPeriod) nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod, time.Now) - if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil { + if err := nodeStatusUpdaterController.Run(wait.NeverStop); err != nil { glog.Fatalf("Failed to start node status update controller: %v", err) } @@ -173,7 +173,7 @@ func (s *CMServer) Run(_ []string) error { } go resourcequotacontroller.NewResourceQuotaController( - clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop) + clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop) // If apiserver is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and controller manager at the same time. @@ -220,19 +220,19 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod). - Run(s.ConcurrentDSCSyncs, util.NeverStop) + Run(s.ConcurrentDSCSyncs, wait.NeverStop) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). - Run(s.ConcurrentJobSyncs, util.NeverStop) + Run(s.ConcurrentJobSyncs, wait.NeverStop) } if containsResource(resources, "deployments") { glog.Infof("Starting deployment controller") go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), s.resyncPeriod). - Run(s.ConcurrentDeploymentSyncs, util.NeverStop) + Run(s.ConcurrentDeploymentSyncs, wait.NeverStop) } } diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index f93a3ac8e38..92fa08ea23b 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -26,7 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -57,7 +57,7 @@ type etcdMasterElector struct { func (e *etcdMasterElector) Elect(path, id string) watch.Interface { e.done = make(chan empty) e.events = make(chan watch.Event) - go util.Until(func() { e.run(path, id) }, time.Second*5, util.NeverStop) + go wait.Until(func() { e.run(path, id) }, time.Second*5, wait.NeverStop) return e } diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 8d86762943b..47a20421578 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -39,8 +39,8 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "github.com/mesos/mesos-go/mesosproto" @@ -233,7 +233,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { executor.LaunchTask(mockDriver, taskInfo) - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return !registry.empty() @@ -251,7 +251,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished := kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } @@ -267,7 +267,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { }) executor.KillTask(mockDriver, taskInfo.TaskId) - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return registry.empty() @@ -277,7 +277,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) { finished = kmruntime.After(statusUpdateCalls.Wait) select { case <-finished: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("timed out waiting for status update calls to finish") } @@ -428,7 +428,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { executor.LaunchTask(mockDriver, taskInfo) // must wait for this otherwise phase changes may not apply - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return !registry.empty() @@ -444,7 +444,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { // from k.tasks through the "task-lost:foo" message below. select { case <-called: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate for the running task") } @@ -462,7 +462,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { executor.FrameworkMessage(mockDriver, "task-lost:foo") - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { executor.lock.Lock() defer executor.lock.Unlock() return registry.empty() @@ -470,7 +470,7 @@ func TestExecutorFrameworkMessage(t *testing.T) { select { case <-called: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("timed out waiting for SendStatusUpdate") } @@ -614,7 +614,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) { // guard against data race in mock driver between AssertExpectations and Called select { case <-called: // expected - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("expected call to SendFrameworkMessage") } mockDriver.AssertExpectations(t) diff --git a/contrib/mesos/pkg/executor/service/kubelet.go b/contrib/mesos/pkg/executor/service/kubelet.go index 7e089b372c6..f4525294c51 100644 --- a/contrib/mesos/pkg/executor/service/kubelet.go +++ b/contrib/mesos/pkg/executor/service/kubelet.go @@ -20,8 +20,8 @@ import ( log "github.com/golang/glog" "k8s.io/kubernetes/pkg/kubelet" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // executorKubelet decorates the kubelet with a Run function that notifies the @@ -76,7 +76,7 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) { // will not take very long. Peeking into the future (current k8s master) it // seems that the backlog has grown from 1 to 50 -- this may negatively impact // us going forward, time will tell. - util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) + wait.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone) //TODO(jdef) revisit this if/when executor failover lands // Force kubelet to delete all pods. diff --git a/contrib/mesos/pkg/scheduler/integration/integration_test.go b/contrib/mesos/pkg/scheduler/integration/integration_test.go index bb89063d379..239856e4d17 100644 --- a/contrib/mesos/pkg/scheduler/integration/integration_test.go +++ b/contrib/mesos/pkg/scheduler/integration/integration_test.go @@ -51,7 +51,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -74,7 +74,7 @@ func (srv *TestServer) LookupNode(name string) *api.Node { } func (srv *TestServer) WaitForNode(name string) { - assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(srv.t, wait.ForeverTestTimeout, func() bool { return srv.LookupNode(name) != nil }) } @@ -366,7 +366,7 @@ func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned. func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool { // parse msgAndArgs: first possibly a duration, otherwise a format string with further args - timeout := util.ForeverTestTimeout + timeout := wait.ForeverTestTimeout msg := "event not received" msgArgStart := 0 if len(msgAndArgs) > 0 { @@ -685,7 +685,7 @@ func TestScheduler_LifeCycle(t *testing.T) { // and wait that framework message is sent to executor lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1) - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatalf("timed out waiting for launchTasks call") } @@ -721,7 +721,7 @@ func TestScheduler_LifeCycle(t *testing.T) { } t.Fatalf("unknown offer used to start a pod") return nil, nil, nil - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatal("timed out waiting for launchTasks") return nil, nil, nil } @@ -784,7 +784,7 @@ func TestScheduler_LifeCycle(t *testing.T) { newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED), ) - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Fatal("timed out waiting for KillTask") } @@ -820,7 +820,7 @@ func TestScheduler_LifeCycle(t *testing.T) { lt.framework.StatusUpdate(lt.driver, status) // wait until pod is looked up at the apiserver - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { return lt.apiServer.Stats(pod.Name) == beforePodLookups+1 }, "expect that reconcileTask will access apiserver for pod %v", pod.Name) } @@ -838,7 +838,7 @@ func TestScheduler_LifeCycle(t *testing.T) { failPodFromExecutor(launchedTask.taskInfo) podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name) - assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool { + assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool { t, _ := lt.sched.Tasks().ForPod(podKey) return t == nil }) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 0a02b0346ef..1b032c0e0dc 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -32,10 +32,10 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -123,7 +123,7 @@ func (e *endpointController) Run(workers int, stopCh <-chan struct{}) { go e.serviceController.Run(stopCh) go e.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(e.worker, time.Second, stopCh) + go wait.Until(e.worker, time.Second, stopCh) } go func() { defer utilruntime.HandleCrash() diff --git a/pkg/api/rest/resttest/resttest.go b/pkg/api/rest/resttest/resttest.go index f170ec7711d..fbf0bf87d81 100644 --- a/pkg/api/rest/resttest/resttest.go +++ b/pkg/api/rest/resttest/resttest.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) type Tester struct { @@ -927,7 +927,7 @@ func (t *Tester) testWatchFields(obj runtime.Object, emitFn EmitFunc, fieldsPass if !ok { t.Errorf("watch channel should be open") } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("unexpected timeout from result channel") } watcher.Stop() @@ -975,7 +975,7 @@ func (t *Tester) testWatchLabels(obj runtime.Object, emitFn EmitFunc, labelsPass if !ok { t.Errorf("watch channel should be open") } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("unexpected timeout from result channel") } watcher.Stop() diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 7f5165afe88..ab4854b5f5c 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -38,7 +38,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -418,8 +418,8 @@ func TestWatchHTTPTimeout(t *testing.T) { if !watcher.Stopped { t.Errorf("Leaked watch on timeout") } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("Failed to stop watcher after %s of timeout signal", util.ForeverTestTimeout.String()) + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Failed to stop watcher after %s of timeout signal", wait.ForeverTestTimeout.String()) } // Make sure we can't receive any more events through the timeout watch diff --git a/pkg/client/cache/expiration_cache_test.go b/pkg/client/cache/expiration_cache_test.go index ff6bd1d4fa7..2e8cc5b572f 100644 --- a/pkg/client/cache/expiration_cache_test.go +++ b/pkg/client/cache/expiration_cache_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) func TestTTLExpirationBasic(t *testing.T) { @@ -55,7 +56,7 @@ func TestTTLExpirationBasic(t *testing.T) { if delKey != key { t.Errorf("Unexpected delete for key %s", key) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Unexpected timeout waiting on delete") } close(deleteChan) @@ -100,7 +101,7 @@ func TestTTLList(t *testing.T) { t.Errorf("Unexpected delete for key %s", delKey) } expireKeys.Delete(delKey) - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Unexpected timeout waiting on delete") return } diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 59eaac779ad..fa761d5c593 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -35,8 +35,8 @@ import ( apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -157,13 +157,13 @@ outer: // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. func (r *Reflector) Run() { - go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop) + go wait.Until(func() { r.ListAndWatch(wait.NeverStop) }, r.period, wait.NeverStop) } // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. func (r *Reflector) RunUntil(stopCh <-chan struct{}) { - go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh) + go wait.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh) } var ( diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index b4c0da0b243..223769adea0 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -26,7 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -54,15 +54,15 @@ func TestCloseWatchChannelOnError(t *testing.T) { return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "1"}}, nil }, } - go r.ListAndWatch(util.NeverStop) + go r.ListAndWatch(wait.NeverStop) fw.Error(pod) select { case _, ok := <-fw.ResultChan(): if ok { t.Errorf("Watch channel left open after cancellation") } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("the cancellation is at least %s late", util.ForeverTestTimeout.String()) + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) break } } @@ -90,8 +90,8 @@ func TestRunUntil(t *testing.T) { if ok { t.Errorf("Watch channel left open after stopping the watch") } - case <-time.After(util.ForeverTestTimeout): - t.Errorf("the cancellation is at least %s late", util.ForeverTestTimeout.String()) + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) break } } @@ -100,7 +100,7 @@ func TestReflectorResyncChan(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) a, _ := g.resyncChan() - b := time.After(util.ForeverTestTimeout) + b := time.After(wait.ForeverTestTimeout) select { case <-a: t.Logf("got timeout as expected") @@ -129,7 +129,7 @@ func TestReflectorWatchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) + err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -149,7 +149,7 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) + err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -198,7 +198,7 @@ func TestReflectorWatchHandlerTimeout(t *testing.T) { var resumeRV string exit := make(chan time.Time, 1) exit <- time.Now() - err := g.watchHandler(fw, &resumeRV, exit, util.NeverStop) + err := g.watchHandler(fw, &resumeRV, exit, wait.NeverStop) if err != errorResyncRequested { t.Errorf("expected timeout error, but got %q", err) } @@ -242,7 +242,7 @@ func TestReflectorListAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &api.Pod{}, s, 0) - go r.ListAndWatch(util.NeverStop) + go r.ListAndWatch(wait.NeverStop) ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -359,7 +359,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &api.Pod{}, s, 0) - r.ListAndWatch(util.NeverStop) + r.ListAndWatch(wait.NeverStop) } } @@ -397,7 +397,7 @@ func TestReflectorResync(t *testing.T) { r := NewReflector(lw, &api.Pod{}, s, resyncPeriod) r.now = func() time.Time { return currentTime } - r.ListAndWatch(util.NeverStop) + r.ListAndWatch(wait.NeverStop) if iteration != 2 { t.Errorf("exactly 2 iterations were expected, got: %v", iteration) } diff --git a/pkg/client/leaderelection/leaderelection.go b/pkg/client/leaderelection/leaderelection.go index 6c2f52c6634..e7a3dac8df3 100644 --- a/pkg/client/leaderelection/leaderelection.go +++ b/pkg/client/leaderelection/leaderelection.go @@ -60,7 +60,6 @@ import ( "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -204,7 +203,7 @@ func (le *LeaderElector) IsLeader() bool { // acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. func (le *LeaderElector) acquire() { stop := make(chan struct{}) - util.Until(func() { + wait.Until(func() { succeeded := le.tryAcquireOrRenew() le.maybeReportTransition() if !succeeded { @@ -221,7 +220,7 @@ func (le *LeaderElector) acquire() { // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. func (le *LeaderElector) renew() { stop := make(chan struct{}) - util.Until(func() { + wait.Until(func() { err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { return le.tryAcquireOrRenew(), nil }) diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index fb761ffbb5e..7fbff804288 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -36,10 +36,10 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" @@ -192,7 +192,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { go dsc.podController.Run(stopCh) go dsc.nodeController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(dsc.worker, time.Second, stopCh) + go wait.Until(dsc.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down Daemon Set Controller") diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 508d6f58de4..6bbe2162092 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -42,6 +42,7 @@ import ( labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -192,7 +193,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { go dc.rcController.Run(stopCh) go dc.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(dc.worker, time.Second, stopCh) + go wait.Until(dc.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down deployment controller") diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 8b2716fd6a1..a8d327eb745 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -32,9 +32,9 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -128,7 +128,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { go e.serviceController.Run(stopCh) go e.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(e.worker, time.Second, stopCh) + go wait.Until(e.worker, time.Second, stopCh) } go func() { defer utilruntime.HandleCrash() diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index 8e7abd2592b..ed819525266 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -22,8 +22,8 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // Config contains all the settings for a Controller. @@ -94,7 +94,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { r.RunUntil(stopCh) - util.Until(c.processLoop, time.Second, stopCh) + wait.Until(c.processLoop, time.Second, stopCh) } // Returns true once this controller has completed an initial resource listing diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index da65447d8eb..bf09ae928b2 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -29,8 +29,8 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -79,7 +79,7 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun func (gcc *GCController) Run(stop <-chan struct{}) { go gcc.podStoreSyncer.Run(stop) - go util.Until(gcc.gc, gcCheckPeriod, stop) + go wait.Until(gcc.gc, gcCheckPeriod, stop) <-stop } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index a64f764edbf..5acb0685f26 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -34,8 +34,8 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -140,7 +140,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { go jm.jobController.Run(stopCh) go jm.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(jm.worker, time.Second, stopCh) + go wait.Until(jm.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down Job Manager") diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 90ebd5d1ef7..c8838ead320 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -31,8 +31,8 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/rand" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -626,7 +626,7 @@ func TestWatchJobs(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.jobController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // We're sending new job to see if it reaches syncHandler. testJob.Name = "foo" @@ -691,7 +691,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(1, api.PodRunning, testJob) testPod := pods[0] diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 228a4c579f3..b89947c9889 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/watch" ) @@ -215,16 +216,16 @@ func NewNodeController( // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration) { - go nc.nodeController.Run(util.NeverStop) - go nc.podController.Run(util.NeverStop) - go nc.daemonSetController.Run(util.NeverStop) + go nc.nodeController.Run(wait.NeverStop) + go nc.podController.Run(wait.NeverStop) + go nc.daemonSetController.Run(wait.NeverStop) // Incorporate the results of node status pushed from kubelet to master. - go util.Until(func() { + go wait.Until(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } - }, nc.nodeMonitorPeriod, util.NeverStop) + }, nc.nodeMonitorPeriod, wait.NeverStop) // Managing eviction of nodes: // 1. when we delete pods off a node, if the node was not empty at the time we then @@ -238,7 +239,7 @@ func (nc *NodeController) Run(period time.Duration) { // b. If there are no pods left terminating, exit // c. If there are pods still terminating, wait for their estimated completion // before retrying - go util.Until(func() { + go wait.Until(func() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { @@ -253,11 +254,11 @@ func (nc *NodeController) Run(period time.Duration) { } return true, 0 }) - }, nodeEvictionPeriod, util.NeverStop) + }, nodeEvictionPeriod, wait.NeverStop) // TODO: replace with a controller that ensures pods that are terminating complete // in a particular time period - go util.Until(func() { + go wait.Until(func() { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { @@ -280,7 +281,7 @@ func (nc *NodeController) Run(period time.Duration) { } return false, remaining }) - }, nodeEvictionPeriod, util.NeverStop) + }, nodeEvictionPeriod, wait.NeverStop) } // Generates num pod CIDRs that could be assigned to nodes. diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 34d43151d05..4fa3e33716a 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -31,7 +31,7 @@ import ( unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" unversioned_extensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -68,11 +68,11 @@ func NewHorizontalController(evtNamespacer unversioned_core.EventsGetter, scaleN } func (a *HorizontalController) Run(syncPeriod time.Duration) { - go util.Until(func() { + go wait.Until(func() { if err := a.reconcileAutoscalers(); err != nil { glog.Errorf("Couldn't reconcile horizontal pod autoscalers: %v", err) } - }, syncPeriod, util.NeverStop) + }, syncPeriod, wait.NeverStop) } func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, time.Time, error) { diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 14080bc405b..8ca3393e279 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -35,8 +35,8 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -188,7 +188,7 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { go rsc.rsController.Run(stopCh) go rsc.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(rsc.worker, time.Second, stopCh) + go wait.Until(rsc.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down ReplicaSet Controller") diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index ad20255be0d..4476060e432 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -198,7 +199,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { if key != expected { t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") } } @@ -454,14 +455,14 @@ func TestWatchControllers(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.rsController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) testRSSpec.Name = "foo" fakeWatch.Add(&testRSSpec) select { case <-received: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected 1 call but got 0") } } @@ -498,7 +499,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec) testPod := pods.Items[0] @@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) { select { case <-received: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected 1 call but got 0") } } @@ -529,7 +530,7 @@ func TestUpdatePods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // Put 2 ReplicaSets and one pod into the controller's stores labelMap1 := map[string]string{"foo": "bar"} @@ -558,7 +559,7 @@ func TestUpdatePods(t *testing.T) { if !expected.Has(got) { t.Errorf("Expected keys %#v got %v", expected, got) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected update notifications for replica sets within 100ms each") } } @@ -599,7 +600,7 @@ func TestControllerUpdateRequeue(t *testing.T) { if key != expectedKey { t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): manager.queue.ShutDown() t.Errorf("Expected to find a ReplicaSet in the queue, found none.") } diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 3cb562e0bb5..b757d3bc0f4 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -34,8 +34,8 @@ import ( "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -191,7 +191,7 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(rm.worker, time.Second, stopCh) + go wait.Until(rm.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down RC Manager") diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 9e519de95c1..b5361d73bee 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -193,7 +194,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { if key != expected { t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") } } @@ -441,14 +442,14 @@ func TestWatchControllers(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.rcController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) testControllerSpec.Name = "foo" fakeWatch.Add(&testControllerSpec) select { case <-received: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected 1 call but got 0") } } @@ -484,7 +485,7 @@ func TestWatchPods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) go manager.podController.Run(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) pods := newPodList(nil, 1, api.PodRunning, testControllerSpec) testPod := pods.Items[0] @@ -493,7 +494,7 @@ func TestWatchPods(t *testing.T) { select { case <-received: - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected 1 call but got 0") } } @@ -515,7 +516,7 @@ func TestUpdatePods(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - go util.Until(manager.worker, 10*time.Millisecond, stopCh) + go wait.Until(manager.worker, 10*time.Millisecond, stopCh) // Put 2 rcs and one pod into the controller's stores testControllerSpec1 := newReplicationController(1) @@ -542,7 +543,7 @@ func TestUpdatePods(t *testing.T) { if !expected.Has(got) { t.Errorf("Expected keys %#v got %v", expected, got) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected update notifications for controllers within 100ms each") } } @@ -583,7 +584,7 @@ func TestControllerUpdateRequeue(t *testing.T) { if key != expectedKey { t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): manager.queue.ShutDown() t.Errorf("Expected to find an rc in the queue, found none.") } diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 7ea41a494a9..58f0f5444c1 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -28,8 +28,8 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" ) @@ -166,9 +166,9 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { go rq.rqController.Run(stopCh) go rq.podController.Run(stopCh) for i := 0; i < workers; i++ { - go util.Until(rq.worker, time.Second, stopCh) + go wait.Until(rq.worker, time.Second, stopCh) } - go util.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) + go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) <-stopCh glog.Infof("Shutting down ResourceQuotaController") rq.queue.ShutDown() diff --git a/pkg/controller/route/routecontroller.go b/pkg/controller/route/routecontroller.go index 951b135470f..f21ee3a57ca 100644 --- a/pkg/controller/route/routecontroller.go +++ b/pkg/controller/route/routecontroller.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/cloudprovider" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) type RouteController struct { @@ -45,11 +45,11 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam } func (rc *RouteController) Run(syncPeriod time.Duration) { - go util.Until(func() { + go wait.Until(func() { if err := rc.reconcileNodeRoutes(); err != nil { glog.Errorf("Couldn't reconcile node routes: %v", err) } - }, syncPeriod, util.NeverStop) + }, syncPeriod, wait.NeverStop) } func (rc *RouteController) reconcileNodeRoutes() error { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index e472a067599..17644008655 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -33,12 +33,12 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/kubelet/cadvisor" - "k8s.io/kubernetes/pkg/util" utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/sets" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -272,7 +272,7 @@ func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error { } // Run ensure state functions every minute. - go util.Until(func() { + go wait.Until(func() { for _, cont := range cm.systemContainers { if cont.ensureStateFunc != nil { if err := cont.ensureStateFunc(cont.manager); err != nil { @@ -280,7 +280,7 @@ func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error { } } } - }, time.Minute, util.NeverStop) + }, time.Minute, wait.NeverStop) return nil } diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index eff6c6956b6..da5cd74007d 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -27,7 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" ) @@ -45,7 +45,7 @@ func NewSourceFile(path string, nodeName string, period time.Duration, updates c updates: updates, } glog.V(1).Infof("Watching path %q", path) - go util.Until(config.run, period, util.NeverStop) + go wait.Until(config.run, period, wait.NeverStop) } func (s *sourceFile) run() { diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index bfa26291edc..fad1f227b99 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -29,8 +29,8 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" - "k8s.io/kubernetes/pkg/util" utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/util/wait" ) func TestExtractFromNonExistentFile(t *testing.T) { @@ -53,7 +53,7 @@ func TestUpdateOnNonExistentFile(t *testing.T) { t.Fatalf("Expected %#v, Got %#v", expected, update) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Expected update, timeout instead") } } @@ -154,7 +154,7 @@ func TestReadPodsFromFile(t *testing.T) { if !api.Semantic.DeepEqual(testCase.expected, update) { t.Errorf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("%s: Expected update, timeout instead", testCase.desc) } }() diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index fbdde92352f..11bed577278 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -26,7 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" ) @@ -49,7 +49,7 @@ func NewSourceURL(url string, header http.Header, nodeName string, period time.D data: nil, } glog.V(1).Infof("Watching URL %s", url) - go util.Until(config.run, period, util.NeverStop) + go wait.Until(config.run, period, wait.NeverStop) } func (s *sourceURL) run() { diff --git a/pkg/kubelet/container/serialized_image_puller.go b/pkg/kubelet/container/serialized_image_puller.go index e8d3deffb64..04507491994 100644 --- a/pkg/kubelet/container/serialized_image_puller.go +++ b/pkg/kubelet/container/serialized_image_puller.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) type imagePullRequest struct { @@ -59,7 +60,7 @@ func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, im backOff: imageBackOff, pullRequests: make(chan *imagePullRequest, 10), } - go util.Until(imagePuller.pullImages, time.Second, util.NeverStop) + go wait.Until(imagePuller.pullImages, time.Second, wait.NeverStop) return imagePuller } diff --git a/pkg/kubelet/image_manager.go b/pkg/kubelet/image_manager.go index 73eeb09efb3..96c15fac95b 100644 --- a/pkg/kubelet/image_manager.go +++ b/pkg/kubelet/image_manager.go @@ -28,8 +28,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -130,7 +130,7 @@ func newImageManager(runtime container.Runtime, cadvisorInterface cadvisor.Inter } func (im *realImageManager) Start() error { - go util.Until(func() { + go wait.Until(func() { // Initial detection make detected time "unknown" in the past. var ts time.Time if im.initialized { @@ -142,7 +142,7 @@ func (im *realImageManager) Start() error { } else { im.initialized = true } - }, 5*time.Minute, util.NeverStop) + }, 5*time.Minute, wait.NeverStop) return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cad8eb87edb..d964668814a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -82,6 +82,7 @@ import ( "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/validation/field" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/watch" @@ -890,17 +891,17 @@ func (kl *Kubelet) GetNode() (*api.Node, error) { // Starts garbage collection threads. func (kl *Kubelet) StartGarbageCollection() { - go util.Until(func() { + go wait.Until(func() { if err := kl.containerGC.GarbageCollect(); err != nil { glog.Errorf("Container garbage collection failed: %v", err) } - }, time.Minute, util.NeverStop) + }, time.Minute, wait.NeverStop) - go util.Until(func() { + go wait.Until(func() { if err := kl.imageManager.GarbageCollect(); err != nil { glog.Errorf("Image garbage collection failed: %v", err) } - }, 5*time.Minute, util.NeverStop) + }, 5*time.Minute, wait.NeverStop) } // initializeModules will initialize internal modules that do not require the container runtime to be up. @@ -974,14 +975,14 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. - go util.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, util.NeverStop) + go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } - go util.Until(kl.syncNetworkStatus, 30*time.Second, util.NeverStop) - go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) + go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop) + go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). - go util.Until(kl.podKiller, 1*time.Second, util.NeverStop) + go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // Start component sync loops. kl.statusManager.Start() diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6d2c29a3ad9..bbb74e505a6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -61,6 +61,7 @@ import ( "k8s.io/kubernetes/pkg/util/bandwidth" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" @@ -3877,7 +3878,7 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) { done <- struct{}{} }() select { - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("timed out waiting for registration") case <-done: return diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 07ecf71ed1b..c603659ca33 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -24,8 +24,8 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) // GenericPLEG is an extremely simple generic PLEG that relies solely on @@ -110,7 +110,7 @@ func (g *GenericPLEG) Watch() chan *PodLifecycleEvent { // Start spawns a goroutine to relist periodically. func (g *GenericPLEG) Start() { - go util.Until(g.relist, g.relistPeriod, util.NeverStop) + go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) } func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent { diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index 969a42c443b..4541c21d5c8 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -27,8 +27,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) // Manager manages pod probing. It creates a probe "worker" for every container that specifies a @@ -97,7 +97,7 @@ func NewManager( // Start syncing probe status. This should only be called once. func (m *manager) Start() { // Start syncing readiness. - go util.Forever(m.updateReadiness, 0) + go wait.Forever(m.updateReadiness, 0) } // Key uniquely identifying container probes diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index b915aa424cb..f01ba50cf04 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -26,7 +26,6 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -260,7 +259,7 @@ func TestUpdateReadiness(t *testing.T) { // Start syncing readiness without leaking goroutine. stopCh := make(chan struct{}) - go util.Until(m.updateReadiness, 0, stopCh) + go wait.Until(m.updateReadiness, 0, stopCh) defer func() { close(stopCh) // Send an update to exit updateReadiness() @@ -332,7 +331,7 @@ func waitForWorkerExit(m *manager, workerPaths []probeKey) error { continue // Already exited, no need to poll. } glog.Infof("Polling %v", w) - if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { + if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil { return err } } @@ -357,7 +356,7 @@ func waitForReadyStatus(m *manager, ready bool) error { return status.ContainerStatuses[0].Ready == ready, nil } glog.Infof("Polling for ready state %v", ready) - if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { + if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil { return err } @@ -378,7 +377,7 @@ func cleanup(t *testing.T, m *manager) { if exited, _ := condition(); exited { return // Already exited, no need to poll. } - if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { + if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil { t.Fatalf("Error during cleanup: %v", err) } } diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go index 64e815b8fb4..d42aebb6e9a 100644 --- a/pkg/kubelet/prober/results/results_manager_test.go +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) func TestCacheOperations(t *testing.T) { @@ -58,7 +58,7 @@ func TestUpdates(t *testing.T) { if expected != u { t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Timed out waiting for update %v: %s", expected, msg) } } diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 3b828e151de..192bc30c0c5 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -232,7 +231,7 @@ func TestCleanUp(t *testing.T) { return ready == results.Success, nil } if ready, _ := condition(); !ready { - if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil { + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, condition); err != nil { t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err) } } diff --git a/pkg/kubelet/server/stats/fs_resource_analyzer.go b/pkg/kubelet/server/stats/fs_resource_analyzer.go index f988e7a20b4..829bab8107d 100644 --- a/pkg/kubelet/server/stats/fs_resource_analyzer.go +++ b/pkg/kubelet/server/stats/fs_resource_analyzer.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" @@ -68,7 +68,7 @@ func (s *fsResourceAnalyzer) Start() { return } glog.Info("Starting FS ResourceAnalyzer") - go util.Forever(func() { + go wait.Forever(func() { startTime := time.Now() s.updateCachedPodVolumeStats() glog.V(3).Infof("Finished calculating volume stats in %v.", time.Now().Sub(startTime)) diff --git a/pkg/kubelet/status/manager.go b/pkg/kubelet/status/manager.go index c50d8431abd..d991f6f619e 100644 --- a/pkg/kubelet/status/manager.go +++ b/pkg/kubelet/status/manager.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) // A wrapper around api.PodStatus that includes a version to enforce that stale pod statuses are @@ -125,7 +126,7 @@ func (m *manager) Start() { glog.Info("Starting to sync pod status with apiserver") syncTicker := time.Tick(syncPeriod) // syncPod and syncBatch share the same go routine to avoid sync races. - go util.Forever(func() { + go wait.Forever(func() { select { case syncRequest := <-m.podStatusChannel: m.syncPod(syncRequest.podUID, syncRequest.status) diff --git a/pkg/master/controller.go b/pkg/master/controller.go index ee4672f3eec..09ba85c9c97 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // Controller is the controller manager for the core bootstrap Kubernetes controller @@ -99,7 +100,7 @@ func (c *Controller) Start() { // RunKubernetesService periodically updates the kubernetes service func (c *Controller) RunKubernetesService(ch chan struct{}) { - util.Until(func() { + wait.Until(func() { // Service definition is not reconciled after first // run, ports and type will be corrected only during // start. diff --git a/pkg/master/master.go b/pkg/master/master.go index e114fb4cfd5..0f02c0a7834 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -74,8 +74,8 @@ import ( "k8s.io/kubernetes/pkg/storage" etcdmetrics "k8s.io/kubernetes/pkg/storage/etcd/metrics" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" daemonetcd "k8s.io/kubernetes/pkg/registry/daemonset/etcd" horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/horizontalpodautoscaler/etcd" @@ -605,7 +605,7 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { thirdPartyResourceRegistry: thirdPartyResourceStorage, } go func() { - util.Forever(func() { + wait.Forever(func() { if err := thirdPartyControl.SyncResources(); err != nil { glog.Warningf("third party resource sync failed: %v", err) } diff --git a/pkg/master/tunneler.go b/pkg/master/tunneler.go index e79b9e5cf0f..8a23ce3ff42 100644 --- a/pkg/master/tunneler.go +++ b/pkg/master/tunneler.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/ssh" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" @@ -125,7 +126,7 @@ func (c *SSHTunneler) SecondsSinceSync() int64 { } func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) { - go util.Until(func() { + go wait.Until(func() { if c.InstallSSHKey == nil { glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil") return @@ -150,7 +151,7 @@ func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) { // each time (Update() is a noop if no changes are necessary). func (c *SSHTunneler) nodesSyncLoop() { // TODO (cjcullen) make this watch. - go util.Until(func() { + go wait.Until(func() { addrs, err := c.getAddresses() glog.Infof("Calling update w/ addrs: %v", addrs) if err != nil { diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index bd606088180..7855a93ec9c 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" . "k8s.io/kubernetes/pkg/proxy/config" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) const TomcatPort int = 8080 @@ -76,7 +76,7 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []a } // Unittests will hard timeout in 5m with a stack trace, prevent that // and surface a clearer reason for failure. - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services) return } @@ -122,7 +122,7 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints } // Unittests will hard timeout in 5m with a stack trace, prevent that // and surface a clearer reason for failure. - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints) return } diff --git a/pkg/proxy/userspace/port_allocator.go b/pkg/proxy/userspace/port_allocator.go index a1ff408ea62..22182f9c98b 100644 --- a/pkg/proxy/userspace/port_allocator.go +++ b/pkg/proxy/userspace/port_allocator.go @@ -23,8 +23,8 @@ import ( "sync" "time" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/wait" ) var ( @@ -83,7 +83,7 @@ func newPortRangeAllocator(r net.PortRange) PortAllocator { ports: make(chan int, portsBufSize), rand: rand.New(rand.NewSource(time.Now().UnixNano())), } - go util.Until(func() { ra.fillPorts(util.NeverStop) }, nextFreePortCooldown, util.NeverStop) + go wait.Until(func() { ra.fillPorts(wait.NeverStop) }, nextFreePortCooldown, wait.NeverStop) return ra } diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index 8f51cb2f10c..b0a2b90a5dd 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -26,7 +26,6 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service/ipallocator" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" ) @@ -66,7 +65,7 @@ func NewRepair(interval time.Duration, registry service.Registry, network *net.I // RunUntil starts the controller until the provided ch is closed. func (c *Repair) RunUntil(ch chan struct{}) { - util.Until(func() { + wait.Until(func() { if err := c.RunOnce(); err != nil { runtime.HandleError(err) } diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go index cb8f1c2f11f..cff3f5f3edb 100644 --- a/pkg/registry/service/portallocator/controller/repair.go +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -25,9 +25,9 @@ import ( client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service/portallocator" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // See ipallocator/controller/repair.go; this is a copy for ports. @@ -51,7 +51,7 @@ func NewRepair(interval time.Duration, registry service.Registry, portRange net. // RunUntil starts the controller until the provided ch is closed. func (c *Repair) RunUntil(ch chan struct{}) { - util.Until(func() { + wait.Until(func() { if err := c.RunOnce(); err != nil { runtime.HandleError(err) } diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go index a015bee916f..a85bb760537 100644 --- a/pkg/ssh/ssh.go +++ b/pkg/ssh/ssh.go @@ -39,7 +39,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "golang.org/x/crypto/ssh" - "k8s.io/kubernetes/pkg/util" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -297,7 +296,7 @@ func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan ch healthCheckURL: healthCheckURL, } healthCheckPoll := 1 * time.Minute - go util.Until(func() { + go wait.Until(func() { l.tunnelsLock.Lock() defer l.tunnelsLock.Unlock() // Healthcheck each tunnel every minute diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 9469e46de26..63faf16cbbc 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -30,7 +30,7 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -156,7 +156,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { keyFunc: config.KeyFunc, stopped: false, // We need to (potentially) stop both: - // - util.Until go-routine + // - wait.Until go-routine // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. @@ -171,7 +171,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher { stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { - util.Until( + wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 24b3e4df015..dd829739c91 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -33,8 +33,8 @@ import ( etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" "golang.org/x/net/context" @@ -158,7 +158,7 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) { t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a) } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Errorf("Timed out waiting for an event") } } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index 3251be96731..e2268fe956d 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -319,7 +320,7 @@ func TestReflectorForWatchCache(t *testing.T) { }, } r := cache.NewReflector(lw, &api.Pod{}, store, 0) - r.ListAndWatch(util.NeverStop) + r.ListAndWatch(wait.NeverStop) { _, version, err := store.WaitUntilFreshAndList(10) diff --git a/pkg/util/atomic/value_test.go b/pkg/util/atomic/value_test.go index 0cb839dd5e5..052b350828b 100644 --- a/pkg/util/atomic/value_test.go +++ b/pkg/util/atomic/value_test.go @@ -20,7 +20,7 @@ import ( "testing" "time" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) { @@ -38,7 +38,7 @@ func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) { t.Errorf("Expected to find %v, found %v", expectedValue, actualValue) return } - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Error("Value could not be read") return } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 7bb49b3836f..37f6f6ab186 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -19,7 +19,7 @@ package config import ( "sync" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) type Merger interface { @@ -74,7 +74,7 @@ func (m *Mux) Channel(source string) chan interface{} { } newChannel := make(chan interface{}) m.sources[source] = newChannel - go util.Until(func() { m.listen(source, newChannel) }, 0, util.NeverStop) + go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop) return newChannel } diff --git a/pkg/watch/json/decoder_test.go b/pkg/watch/json/decoder_test.go index 39bb892dfae..ceb50b2ce0f 100644 --- a/pkg/watch/json/decoder_test.go +++ b/pkg/watch/json/decoder_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -99,7 +99,7 @@ func TestDecoder_SourceClose(t *testing.T) { select { case <-done: break - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Error("Timeout") } } diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 066b898bc4e..23976c06596 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -23,7 +23,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" ) type myType struct { @@ -113,7 +113,7 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) { }(m.Watch(), m.Watch()) m.Action(Added, &myType{}) select { - case <-time.After(util.ForeverTestTimeout): + case <-time.After(wait.ForeverTestTimeout): t.Error("timeout: deadlocked") case <-done: } diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 371a3240ed5..124d8240b5e 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -24,7 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" @@ -104,7 +104,7 @@ func New(c *Config) *Scheduler { // Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { - go util.Until(s.scheduleOne, 0, s.config.StopEverything) + go wait.Until(s.scheduleOne, 0, s.config.StopEverything) } func (s *Scheduler) scheduleOne() { diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 3c1bc14f208..35294ad7e9b 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -30,7 +30,6 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" @@ -290,7 +289,7 @@ func testMasterUpgrade(ip, v string, mUp func(v string) error) { done := make(chan struct{}, 1) // Let's make sure we've finished the heartbeat before shutting things down. var wg sync.WaitGroup - go util.Until(func() { + go wait.Until(func() { defer GinkgoRecover() wg.Add(1) defer wg.Done() diff --git a/test/e2e/kubelet_stats.go b/test/e2e/kubelet_stats.go index c874ffa0d35..6e8c1c5d7a0 100644 --- a/test/e2e/kubelet_stats.go +++ b/test/e2e/kubelet_stats.go @@ -36,8 +36,8 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/master/ports" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint. @@ -397,7 +397,7 @@ func (r *resourceCollector) Start() { r.stopCh = make(chan struct{}, 1) // Keep the last observed stats for comparison. oldStats := make(map[string]*cadvisorapi.ContainerStats) - go util.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) + go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh) } // Stop sends a signal to terminate the stats collecting goroutine. diff --git a/test/e2e/kubeproxy.go b/test/e2e/kubeproxy.go index ed0ff49d6ef..a945a4d6380 100644 --- a/test/e2e/kubeproxy.go +++ b/test/e2e/kubeproxy.go @@ -506,12 +506,12 @@ func (config *KubeProxyTestConfig) deleteNetProxyPod() { config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0)) config.endpointPods = config.endpointPods[1:] // wait for pod being deleted. - err := waitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, util.ForeverTestTimeout) + err := waitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout) if err != nil { Failf("Failed to delete %s pod: %v", pod.Name, err) } // wait for endpoint being removed. - err = waitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, util.ForeverTestTimeout) + err = waitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout) if err != nil { Failf("Failed to remove endpoint from service: %s", nodePortServiceName) } diff --git a/test/e2e/mesos.go b/test/e2e/mesos.go index 3335026e71e..36f36179229 100644 --- a/test/e2e/mesos.go +++ b/test/e2e/mesos.go @@ -23,7 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -62,13 +62,13 @@ var _ = Describe("Mesos", func() { It("starts static pods on every node in the mesos cluster", func() { client := framework.Client - expectNoError(allNodesReady(client, util.ForeverTestTimeout), "all nodes ready") + expectNoError(allNodesReady(client, wait.ForeverTestTimeout), "all nodes ready") nodelist := ListSchedulableNodesOrDie(framework.Client) const ns = "static-pods" numpods := len(nodelist.Items) - expectNoError(waitForPodsRunningReady(ns, numpods, util.ForeverTestTimeout), + expectNoError(waitForPodsRunningReady(ns, numpods, wait.ForeverTestTimeout), fmt.Sprintf("number of static pods in namespace %s is %d", ns, numpods)) }) diff --git a/test/e2e/portforward.go b/test/e2e/portforward.go index c07a3ae9458..03566bd8372 100644 --- a/test/e2e/portforward.go +++ b/test/e2e/portforward.go @@ -27,7 +27,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" ) @@ -150,7 +150,7 @@ var _ = Describe("Port forwarding", func() { By("Closing the connection to the local port") conn.Close() - logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) + logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) verifyLogMessage(logOutput, "Accepted client connection") verifyLogMessage(logOutput, "Expected to read 3 bytes from client, but got 0 instead") }) @@ -195,7 +195,7 @@ var _ = Describe("Port forwarding", func() { Failf("Expected %q from server, got %q", e, a) } - logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) + logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) verifyLogMessage(logOutput, "^Accepted client connection$") verifyLogMessage(logOutput, "^Received expected client data$") verifyLogMessage(logOutput, "^Done$") @@ -232,7 +232,7 @@ var _ = Describe("Port forwarding", func() { Failf("Expected %q from server, got %q", e, a) } - logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) + logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName) verifyLogMessage(logOutput, "Accepted client connection") verifyLogMessage(logOutput, "Done") }) diff --git a/test/integration/client_test.go b/test/integration/client_test.go index 32fe76767ea..a8f4194031d 100644 --- a/test/integration/client_test.go +++ b/test/integration/client_test.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/api/testapi" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/test/integration/framework" @@ -160,8 +160,8 @@ func TestSingleWatch(t *testing.T) { defer w.Stop() select { - case <-time.After(util.ForeverTestTimeout): - t.Fatalf("watch took longer than %s", util.ForeverTestTimeout.String()) + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("watch took longer than %s", wait.ForeverTestTimeout.String()) case got, ok := <-w.ResultChan(): if !ok { t.Fatal("Watch channel closed unexpectedly.") diff --git a/test/integration/extender_test.go b/test/integration/extender_test.go index 5a1b7a6aee2..c3178c9d3a7 100644 --- a/test/integration/extender_test.go +++ b/test/integration/extender_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/master" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" @@ -295,7 +294,7 @@ func DoTestPodScheduling(t *testing.T, restClient *client.Client) { t.Fatalf("Failed to create pod: %v", err) } - err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) + err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) if err != nil { t.Fatalf("Failed to schedule pod: %v", err) } diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index e7c73840d26..2573f45b994 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/master" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler" _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" @@ -242,7 +241,7 @@ func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore } // There are no schedulable nodes - the pod shouldn't be scheduled. - err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) + err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) if err == nil { t.Errorf("Pod scheduled successfully on unschedulable nodes") } @@ -260,7 +259,7 @@ func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore mod.makeSchedulable(t, schedNode, nodeStore, restClient) // Wait until the pod is scheduled. - err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) + err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) if err != nil { t.Errorf("Test %d: failed to schedule a pod: %v", i, err) } else {