Rewritte util.* -> wait.* wherever reasonable

This commit is contained in:
Jan Chaloupka
2016-02-02 11:57:06 +01:00
parent 43a47a8234
commit 4389b3f0d6
71 changed files with 231 additions and 225 deletions

View File

@@ -515,7 +515,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
UpdateFunc: ks.updateService,
},
)
go serviceController.Run(util.NeverStop)
go serviceController.Run(wait.NeverStop)
return serviceStore
}
@@ -533,7 +533,7 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
},
)
go eController.Run(util.NeverStop)
go eController.Run(wait.NeverStop)
return eStore
}
@@ -551,7 +551,7 @@ func watchPods(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
},
)
go eController.Run(util.NeverStop)
go eController.Run(wait.NeverStop)
return eStore
}

View File

@@ -191,11 +191,11 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc).
Run(3, util.NeverStop)
Run(3, wait.NeverStop)
// TODO: Write an integration test for the replication controllers watch.
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas).
Run(3, util.NeverStop)
Run(3, wait.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)

View File

@@ -175,17 +175,17 @@ func Run(s *options.CMServer) error {
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *client.Config, stop <-chan struct{}) error {
go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)).
Run(s.ConcurrentEndpointSyncs, util.NeverStop)
Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
go replicationcontroller.NewReplicationManager(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
).Run(s.ConcurrentRCSyncs, util.NeverStop)
).Run(s.ConcurrentRCSyncs, wait.NeverStop)
if s.TerminatedPodGCThreshold > 0 {
go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), ResyncPeriod(s), s.TerminatedPodGCThreshold).
Run(util.NeverStop)
Run(wait.NeverStop)
}
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
@@ -219,7 +219,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resourcequota-controller")),
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
@@ -265,25 +265,25 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s)).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
Run(s.ConcurrentDSCSyncs, wait.NeverStop)
}
if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)).
Run(s.ConcurrentJobSyncs, util.NeverStop)
Run(s.ConcurrentJobSyncs, wait.NeverStop)
}
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
Run(s.ConcurrentDeploymentSyncs, wait.NeverStop)
}
if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas).
Run(s.ConcurrentRSSyncs, util.NeverStop)
Run(s.ConcurrentRSSyncs, wait.NeverStop)
}
}

View File

@@ -45,6 +45,7 @@ import (
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"github.com/spf13/cobra"
@@ -274,12 +275,12 @@ func (s *ProxyServer) Run() error {
http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", s.ProxyMode)
})
go util.Until(func() {
go wait.Until(func() {
err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(s.Config.HealthzPort), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}, 5*time.Second, wait.NeverStop)
}
// Tune conntrack, if requested

View File

@@ -63,6 +63,7 @@ import (
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
)
@@ -322,12 +323,12 @@ func Run(s *options.KubeletServer, kcfg *KubeletConfig) error {
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go util.Until(func() {
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(s.HealthzPort)), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
@@ -611,18 +612,18 @@ func RunKubelet(kcfg *KubeletConfig) error {
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
// start the kubelet server
if kc.EnableServer {
go util.Until(func() {
go wait.Until(func() {
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.Auth, kc.EnableDebuggingHandlers)
}, 0, util.NeverStop)
}, 0, wait.NeverStop)
}
if kc.ReadOnlyPort > 0 {
go util.Until(func() {
go wait.Until(func() {
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
}, 0, util.NeverStop)
}, 0, wait.NeverStop)
}
}

View File

@@ -128,14 +128,14 @@ func (s *CMServer) Run(_ []string) error {
}()
endpoints := s.createEndpointController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "endpoint-controller")))
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop)
Run(s.ConcurrentRCSyncs, wait.NeverStop)
if s.TerminatedPodGCThreshold > 0 {
go gc.New(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "garbage-collector")), s.resyncPeriod, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
Run(wait.NeverStop)
}
//TODO(jdef) should eventually support more cloud providers here
@@ -154,7 +154,7 @@ func (s *CMServer) Run(_ []string) error {
nodeController.Run(s.NodeSyncPeriod)
nodeStatusUpdaterController := node.NewStatusUpdater(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "node-status-controller")), s.NodeMonitorPeriod, time.Now)
if err := nodeStatusUpdaterController.Run(util.NeverStop); err != nil {
if err := nodeStatusUpdaterController.Run(wait.NeverStop); err != nil {
glog.Fatalf("Failed to start node status update controller: %v", err)
}
@@ -173,7 +173,7 @@ func (s *CMServer) Run(_ []string) error {
}
go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, util.NeverStop)
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
@@ -220,19 +220,19 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
Run(s.ConcurrentDSCSyncs, wait.NeverStop)
}
if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
Run(s.ConcurrentJobSyncs, wait.NeverStop)
}
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), s.resyncPeriod).
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
Run(s.ConcurrentDeploymentSyncs, wait.NeverStop)
}
}

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -57,7 +57,7 @@ type etcdMasterElector struct {
func (e *etcdMasterElector) Elect(path, id string) watch.Interface {
e.done = make(chan empty)
e.events = make(chan watch.Event)
go util.Until(func() { e.run(path, id) }, time.Second*5, util.NeverStop)
go wait.Until(func() { e.run(path, id) }, time.Second*5, wait.NeverStop)
return e
}

View File

@@ -39,8 +39,8 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/mesos/mesos-go/mesosproto"
@@ -233,7 +233,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
executor.LaunchTask(mockDriver, taskInfo)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return !registry.empty()
@@ -251,7 +251,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished := kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}
@@ -267,7 +267,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
})
executor.KillTask(mockDriver, taskInfo.TaskId)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return registry.empty()
@@ -277,7 +277,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
finished = kmruntime.After(statusUpdateCalls.Wait)
select {
case <-finished:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for status update calls to finish")
}
@@ -428,7 +428,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executor.LaunchTask(mockDriver, taskInfo)
// must wait for this otherwise phase changes may not apply
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return !registry.empty()
@@ -444,7 +444,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
// from k.tasks through the "task-lost:foo" message below.
select {
case <-called:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate for the running task")
}
@@ -462,7 +462,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executor.FrameworkMessage(mockDriver, "task-lost:foo")
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
executor.lock.Lock()
defer executor.lock.Unlock()
return registry.empty()
@@ -470,7 +470,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
select {
case <-called:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for SendStatusUpdate")
}
@@ -614,7 +614,7 @@ func TestExecutorsendFrameworkMessage(t *testing.T) {
// guard against data race in mock driver between AssertExpectations and Called
select {
case <-called: // expected
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expected call to SendFrameworkMessage")
}
mockDriver.AssertExpectations(t)

View File

@@ -20,8 +20,8 @@ import (
log "github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// executorKubelet decorates the kubelet with a Run function that notifies the
@@ -76,7 +76,7 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {
// will not take very long. Peeking into the future (current k8s master) it
// seems that the backlog has grown from 1 to 50 -- this may negatively impact
// us going forward, time will tell.
util.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
wait.Until(func() { kl.Kubelet.Run(closableUpdates) }, 0, kl.executorDone)
//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.

View File

@@ -51,7 +51,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -74,7 +74,7 @@ func (srv *TestServer) LookupNode(name string) *api.Node {
}
func (srv *TestServer) WaitForNode(name string) {
assertext.EventuallyTrue(srv.t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(srv.t, wait.ForeverTestTimeout, func() bool {
return srv.LookupNode(name) != nil
})
}
@@ -366,7 +366,7 @@ func (o *EventObserver) PastEventf(object runtime.Object, timestamp unversioned.
func (a *EventAssertions) Event(observer *EventObserver, pred EventPredicate, msgAndArgs ...interface{}) bool {
// parse msgAndArgs: first possibly a duration, otherwise a format string with further args
timeout := util.ForeverTestTimeout
timeout := wait.ForeverTestTimeout
msg := "event not received"
msgArgStart := 0
if len(msgAndArgs) > 0 {
@@ -685,7 +685,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
// and wait that framework message is sent to executor
lt.driver.AssertNumberOfCalls(t, "SendFrameworkMessage", 1)
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timed out waiting for launchTasks call")
}
@@ -721,7 +721,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
}
t.Fatalf("unknown offer used to start a pod")
return nil, nil, nil
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatal("timed out waiting for launchTasks")
return nil, nil, nil
}
@@ -784,7 +784,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
newTaskStatusForTask(launchedTask.taskInfo, mesos.TaskState_TASK_FINISHED),
)
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Fatal("timed out waiting for KillTask")
}
@@ -820,7 +820,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
lt.framework.StatusUpdate(lt.driver, status)
// wait until pod is looked up at the apiserver
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
return lt.apiServer.Stats(pod.Name) == beforePodLookups+1
}, "expect that reconcileTask will access apiserver for pod %v", pod.Name)
}
@@ -838,7 +838,7 @@ func TestScheduler_LifeCycle(t *testing.T) {
failPodFromExecutor(launchedTask.taskInfo)
podKey, _ := podtask.MakePodKey(api.NewDefaultContext(), pod.Name)
assertext.EventuallyTrue(t, util.ForeverTestTimeout, func() bool {
assertext.EventuallyTrue(t, wait.ForeverTestTimeout, func() bool {
t, _ := lt.sched.Tasks().ForPod(podKey)
return t == nil
})

View File

@@ -32,10 +32,10 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
@@ -123,7 +123,7 @@ func (e *endpointController) Run(workers int, stopCh <-chan struct{}) {
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh)
go wait.Until(e.worker, time.Second, stopCh)
}
go func() {
defer utilruntime.HandleCrash()

View File

@@ -32,7 +32,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
type Tester struct {
@@ -927,7 +927,7 @@ func (t *Tester) testWatchFields(obj runtime.Object, emitFn EmitFunc, fieldsPass
if !ok {
t.Errorf("watch channel should be open")
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("unexpected timeout from result channel")
}
watcher.Stop()
@@ -975,7 +975,7 @@ func (t *Tester) testWatchLabels(obj runtime.Object, emitFn EmitFunc, labelsPass
if !ok {
t.Errorf("watch channel should be open")
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("unexpected timeout from result channel")
}
watcher.Stop()

View File

@@ -38,7 +38,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -418,8 +418,8 @@ func TestWatchHTTPTimeout(t *testing.T) {
if !watcher.Stopped {
t.Errorf("Leaked watch on timeout")
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("Failed to stop watcher after %s of timeout signal", util.ForeverTestTimeout.String())
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Failed to stop watcher after %s of timeout signal", wait.ForeverTestTimeout.String())
}
// Make sure we can't receive any more events through the timeout watch

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestTTLExpirationBasic(t *testing.T) {
@@ -55,7 +56,7 @@ func TestTTLExpirationBasic(t *testing.T) {
if delKey != key {
t.Errorf("Unexpected delete for key %s", key)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
}
close(deleteChan)
@@ -100,7 +101,7 @@ func TestTTLList(t *testing.T) {
t.Errorf("Unexpected delete for key %s", delKey)
}
expireKeys.Delete(delKey)
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Unexpected timeout waiting on delete")
return
}

View File

@@ -35,8 +35,8 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -157,13 +157,13 @@ outer:
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run starts a goroutine and returns immediately.
func (r *Reflector) Run() {
go util.Until(func() { r.ListAndWatch(util.NeverStop) }, r.period, util.NeverStop)
go wait.Until(func() { r.ListAndWatch(wait.NeverStop) }, r.period, wait.NeverStop)
}
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
go util.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
go wait.Until(func() { r.ListAndWatch(stopCh) }, r.period, stopCh)
}
var (

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -54,15 +54,15 @@ func TestCloseWatchChannelOnError(t *testing.T) {
return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "1"}}, nil
},
}
go r.ListAndWatch(util.NeverStop)
go r.ListAndWatch(wait.NeverStop)
fw.Error(pod)
select {
case _, ok := <-fw.ResultChan():
if ok {
t.Errorf("Watch channel left open after cancellation")
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", util.ForeverTestTimeout.String())
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
@@ -90,8 +90,8 @@ func TestRunUntil(t *testing.T) {
if ok {
t.Errorf("Watch channel left open after stopping the watch")
}
case <-time.After(util.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", util.ForeverTestTimeout.String())
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
@@ -100,7 +100,7 @@ func TestReflectorResyncChan(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
a, _ := g.resyncChan()
b := time.After(util.ForeverTestTimeout)
b := time.After(wait.ForeverTestTimeout)
select {
case <-a:
t.Logf("got timeout as expected")
@@ -129,7 +129,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop)
err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
@@ -149,7 +149,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Stop()
}()
var resumeRV string
err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop)
err := g.watchHandler(fw, &resumeRV, neverExitWatch, wait.NeverStop)
if err != nil {
t.Errorf("unexpected error %v", err)
}
@@ -198,7 +198,7 @@ func TestReflectorWatchHandlerTimeout(t *testing.T) {
var resumeRV string
exit := make(chan time.Time, 1)
exit <- time.Now()
err := g.watchHandler(fw, &resumeRV, exit, util.NeverStop)
err := g.watchHandler(fw, &resumeRV, exit, wait.NeverStop)
if err != errorResyncRequested {
t.Errorf("expected timeout error, but got %q", err)
}
@@ -242,7 +242,7 @@ func TestReflectorListAndWatch(t *testing.T) {
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &api.Pod{}, s, 0)
go r.ListAndWatch(util.NeverStop)
go r.ListAndWatch(wait.NeverStop)
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
var fw *watch.FakeWatcher
@@ -359,7 +359,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
},
}
r := NewReflector(lw, &api.Pod{}, s, 0)
r.ListAndWatch(util.NeverStop)
r.ListAndWatch(wait.NeverStop)
}
}
@@ -397,7 +397,7 @@ func TestReflectorResync(t *testing.T) {
r := NewReflector(lw, &api.Pod{}, s, resyncPeriod)
r.now = func() time.Time { return currentTime }
r.ListAndWatch(util.NeverStop)
r.ListAndWatch(wait.NeverStop)
if iteration != 2 {
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
}

View File

@@ -60,7 +60,6 @@ import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@@ -204,7 +203,7 @@ func (le *LeaderElector) IsLeader() bool {
// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds.
func (le *LeaderElector) acquire() {
stop := make(chan struct{})
util.Until(func() {
wait.Until(func() {
succeeded := le.tryAcquireOrRenew()
le.maybeReportTransition()
if !succeeded {
@@ -221,7 +220,7 @@ func (le *LeaderElector) acquire() {
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails.
func (le *LeaderElector) renew() {
stop := make(chan struct{})
util.Until(func() {
wait.Until(func() {
err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
})

View File

@@ -36,10 +36,10 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@@ -192,7 +192,7 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(dsc.worker, time.Second, stopCh)
go wait.Until(dsc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down Daemon Set Controller")

View File

@@ -42,6 +42,7 @@ import (
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@@ -192,7 +193,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
go dc.rcController.Run(stopCh)
go dc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down deployment controller")

View File

@@ -32,9 +32,9 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
@@ -128,7 +128,7 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh)
go wait.Until(e.worker, time.Second, stopCh)
}
go func() {
defer utilruntime.HandleCrash()

View File

@@ -22,8 +22,8 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// Config contains all the settings for a Controller.
@@ -94,7 +94,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
r.RunUntil(stopCh)
util.Until(c.processLoop, time.Second, stopCh)
wait.Until(c.processLoop, time.Second, stopCh)
}
// Returns true once this controller has completed an initial resource listing

View File

@@ -29,8 +29,8 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@@ -79,7 +79,7 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun
func (gcc *GCController) Run(stop <-chan struct{}) {
go gcc.podStoreSyncer.Run(stop)
go util.Until(gcc.gc, gcCheckPeriod, stop)
go wait.Until(gcc.gc, gcCheckPeriod, stop)
<-stop
}

View File

@@ -34,8 +34,8 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@@ -140,7 +140,7 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
go jm.jobController.Run(stopCh)
go jm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(jm.worker, time.Second, stopCh)
go wait.Until(jm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down Job Manager")

View File

@@ -31,8 +31,8 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -626,7 +626,7 @@ func TestWatchJobs(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.jobController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
// We're sending new job to see if it reaches syncHandler.
testJob.Name = "foo"
@@ -691,7 +691,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(1, api.PodRunning, testJob)
testPod := pods[0]

View File

@@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
)
@@ -215,16 +216,16 @@ func NewNodeController(
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run(period time.Duration) {
go nc.nodeController.Run(util.NeverStop)
go nc.podController.Run(util.NeverStop)
go nc.daemonSetController.Run(util.NeverStop)
go nc.nodeController.Run(wait.NeverStop)
go nc.podController.Run(wait.NeverStop)
go nc.daemonSetController.Run(wait.NeverStop)
// Incorporate the results of node status pushed from kubelet to master.
go util.Until(func() {
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, util.NeverStop)
}, nc.nodeMonitorPeriod, wait.NeverStop)
// Managing eviction of nodes:
// 1. when we delete pods off a node, if the node was not empty at the time we then
@@ -238,7 +239,7 @@ func (nc *NodeController) Run(period time.Duration) {
// b. If there are no pods left terminating, exit
// c. If there are pods still terminating, wait for their estimated completion
// before retrying
go util.Until(func() {
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
@@ -253,11 +254,11 @@ func (nc *NodeController) Run(period time.Duration) {
}
return true, 0
})
}, nodeEvictionPeriod, util.NeverStop)
}, nodeEvictionPeriod, wait.NeverStop)
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go util.Until(func() {
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
@@ -280,7 +281,7 @@ func (nc *NodeController) Run(period time.Duration) {
}
return false, remaining
})
}, nodeEvictionPeriod, util.NeverStop)
}, nodeEvictionPeriod, wait.NeverStop)
}
// Generates num pod CIDRs that could be assigned to nodes.

View File

@@ -31,7 +31,7 @@ import (
unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
unversioned_extensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@@ -68,11 +68,11 @@ func NewHorizontalController(evtNamespacer unversioned_core.EventsGetter, scaleN
}
func (a *HorizontalController) Run(syncPeriod time.Duration) {
go util.Until(func() {
go wait.Until(func() {
if err := a.reconcileAutoscalers(); err != nil {
glog.Errorf("Couldn't reconcile horizontal pod autoscalers: %v", err)
}
}, syncPeriod, util.NeverStop)
}, syncPeriod, wait.NeverStop)
}
func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale) (int, *int, time.Time, error) {

View File

@@ -35,8 +35,8 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@@ -188,7 +188,7 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
go rsc.rsController.Run(stopCh)
go rsc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(rsc.worker, time.Second, stopCh)
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")

View File

@@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -198,7 +199,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
if key != expected {
t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
}
}
@@ -454,14 +455,14 @@ func TestWatchControllers(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.rsController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
testRSSpec.Name = "foo"
fakeWatch.Add(&testRSSpec)
select {
case <-received:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
@@ -498,7 +499,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec)
testPod := pods.Items[0]
@@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) {
select {
case <-received:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
@@ -529,7 +530,7 @@ func TestUpdatePods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
// Put 2 ReplicaSets and one pod into the controller's stores
labelMap1 := map[string]string{"foo": "bar"}
@@ -558,7 +559,7 @@ func TestUpdatePods(t *testing.T) {
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for replica sets within 100ms each")
}
}
@@ -599,7 +600,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
if key != expectedKey {
t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
manager.queue.ShutDown()
t.Errorf("Expected to find a ReplicaSet in the queue, found none.")
}

View File

@@ -34,8 +34,8 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@@ -191,7 +191,7 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
go rm.rcController.Run(stopCh)
go rm.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(rm.worker, time.Second, stopCh)
go wait.Until(rm.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")

View File

@@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -193,7 +194,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
if key != expected {
t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
}
}
@@ -441,14 +442,14 @@ func TestWatchControllers(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.rcController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
testControllerSpec.Name = "foo"
fakeWatch.Add(&testControllerSpec)
select {
case <-received:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
@@ -484,7 +485,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec)
testPod := pods.Items[0]
@@ -493,7 +494,7 @@ func TestWatchPods(t *testing.T) {
select {
case <-received:
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected 1 call but got 0")
}
}
@@ -515,7 +516,7 @@ func TestUpdatePods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go util.Until(manager.worker, 10*time.Millisecond, stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
// Put 2 rcs and one pod into the controller's stores
testControllerSpec1 := newReplicationController(1)
@@ -542,7 +543,7 @@ func TestUpdatePods(t *testing.T) {
if !expected.Has(got) {
t.Errorf("Expected keys %#v got %v", expected, got)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update notifications for controllers within 100ms each")
}
}
@@ -583,7 +584,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
if key != expectedKey {
t.Errorf("Expected requeue of controller with key %s got %s", expectedKey, key)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
manager.queue.ShutDown()
t.Errorf("Expected to find an rc in the queue, found none.")
}

View File

@@ -28,8 +28,8 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
@@ -166,9 +166,9 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
go rq.rqController.Run(stopCh)
go rq.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(rq.worker, time.Second, stopCh)
go wait.Until(rq.worker, time.Second, stopCh)
}
go util.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
<-stopCh
glog.Infof("Shutting down ResourceQuotaController")
rq.queue.ShutDown()

View File

@@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
type RouteController struct {
@@ -45,11 +45,11 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, clusterNam
}
func (rc *RouteController) Run(syncPeriod time.Duration) {
go util.Until(func() {
go wait.Until(func() {
if err := rc.reconcileNodeRoutes(); err != nil {
glog.Errorf("Couldn't reconcile node routes: %v", err)
}
}, syncPeriod, util.NeverStop)
}, syncPeriod, wait.NeverStop)
}
func (rc *RouteController) reconcileNodeRoutes() error {

View File

@@ -33,12 +33,12 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@@ -272,7 +272,7 @@ func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error {
}
// Run ensure state functions every minute.
go util.Until(func() {
go wait.Until(func() {
for _, cont := range cm.systemContainers {
if cont.ensureStateFunc != nil {
if err := cont.ensureStateFunc(cont.manager); err != nil {
@@ -280,7 +280,7 @@ func (cm *containerManagerImpl) Start(nodeConfig NodeConfig) error {
}
}
}
}, time.Minute, util.NeverStop)
}, time.Minute, wait.NeverStop)
return nil
}

View File

@@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
)
@@ -45,7 +45,7 @@ func NewSourceFile(path string, nodeName string, period time.Duration, updates c
updates: updates,
}
glog.V(1).Infof("Watching path %q", path)
go util.Until(config.run, period, util.NeverStop)
go wait.Until(config.run, period, wait.NeverStop)
}
func (s *sourceFile) run() {

View File

@@ -29,8 +29,8 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestExtractFromNonExistentFile(t *testing.T) {
@@ -53,7 +53,7 @@ func TestUpdateOnNonExistentFile(t *testing.T) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update, timeout instead")
}
}
@@ -154,7 +154,7 @@ func TestReadPodsFromFile(t *testing.T) {
if !api.Semantic.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("%s: Expected update, timeout instead", testCase.desc)
}
}()

View File

@@ -26,7 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
)
@@ -49,7 +49,7 @@ func NewSourceURL(url string, header http.Header, nodeName string, period time.D
data: nil,
}
glog.V(1).Infof("Watching URL %s", url)
go util.Until(config.run, period, util.NeverStop)
go wait.Until(config.run, period, wait.NeverStop)
}
func (s *sourceURL) run() {

View File

@@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
type imagePullRequest struct {
@@ -59,7 +60,7 @@ func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, im
backOff: imageBackOff,
pullRequests: make(chan *imagePullRequest, 10),
}
go util.Until(imagePuller.pullImages, time.Second, util.NeverStop)
go wait.Until(imagePuller.pullImages, time.Second, wait.NeverStop)
return imagePuller
}

View File

@@ -28,8 +28,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)
const (
@@ -130,7 +130,7 @@ func newImageManager(runtime container.Runtime, cadvisorInterface cadvisor.Inter
}
func (im *realImageManager) Start() error {
go util.Until(func() {
go wait.Until(func() {
// Initial detection make detected time "unknown" in the past.
var ts time.Time
if im.initialized {
@@ -142,7 +142,7 @@ func (im *realImageManager) Start() error {
} else {
im.initialized = true
}
}, 5*time.Minute, util.NeverStop)
}, 5*time.Minute, wait.NeverStop)
return nil
}

View File

@@ -82,6 +82,7 @@ import (
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
@@ -890,17 +891,17 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
// Starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
go util.Until(func() {
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Container garbage collection failed: %v", err)
}
}, time.Minute, util.NeverStop)
}, time.Minute, wait.NeverStop)
go util.Until(func() {
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
glog.Errorf("Image garbage collection failed: %v", err)
}
}, 5*time.Minute, util.NeverStop)
}, 5*time.Minute, wait.NeverStop)
}
// initializeModules will initialize internal modules that do not require the container runtime to be up.
@@ -974,14 +975,14 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go util.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, util.NeverStop)
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go util.Until(kl.syncNetworkStatus, 30*time.Second, util.NeverStop)
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()

View File

@@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/util/bandwidth"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
@@ -3877,7 +3878,7 @@ func TestRegisterExistingNodeWithApiserver(t *testing.T) {
done <- struct{}{}
}()
select {
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for registration")
case <-done:
return

View File

@@ -24,8 +24,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)
// GenericPLEG is an extremely simple generic PLEG that relies solely on
@@ -110,7 +110,7 @@ func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
go util.Until(g.relist, g.relistPeriod, util.NeverStop)
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
func generateEvent(podID types.UID, cid string, oldState, newState plegContainerState) *PodLifecycleEvent {

View File

@@ -27,8 +27,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)
// Manager manages pod probing. It creates a probe "worker" for every container that specifies a
@@ -97,7 +97,7 @@ func NewManager(
// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go util.Forever(m.updateReadiness, 0)
go wait.Forever(m.updateReadiness, 0)
}
// Key uniquely identifying container probes

View File

@@ -26,7 +26,6 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@@ -260,7 +259,7 @@ func TestUpdateReadiness(t *testing.T) {
// Start syncing readiness without leaking goroutine.
stopCh := make(chan struct{})
go util.Until(m.updateReadiness, 0, stopCh)
go wait.Until(m.updateReadiness, 0, stopCh)
defer func() {
close(stopCh)
// Send an update to exit updateReadiness()
@@ -332,7 +331,7 @@ func waitForWorkerExit(m *manager, workerPaths []probeKey) error {
continue // Already exited, no need to poll.
}
glog.Infof("Polling %v", w)
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
return err
}
}
@@ -357,7 +356,7 @@ func waitForReadyStatus(m *manager, ready bool) error {
return status.ContainerStatuses[0].Ready == ready, nil
}
glog.Infof("Polling for ready state %v", ready)
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
return err
}
@@ -378,7 +377,7 @@ func cleanup(t *testing.T, m *manager) {
if exited, _ := condition(); exited {
return // Already exited, no need to poll.
}
if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil {
if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
t.Fatalf("Error during cleanup: %v", err)
}
}

View File

@@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestCacheOperations(t *testing.T) {
@@ -58,7 +58,7 @@ func TestUpdates(t *testing.T) {
if expected != u {
t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting for update %v: %s", expected, msg)
}
}

View File

@@ -30,7 +30,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@@ -232,7 +231,7 @@ func TestCleanUp(t *testing.T) {
return ready == results.Success, nil
}
if ready, _ := condition(); !ready {
if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil {
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, condition); err != nil {
t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err)
}
}

View File

@@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
@@ -68,7 +68,7 @@ func (s *fsResourceAnalyzer) Start() {
return
}
glog.Info("Starting FS ResourceAnalyzer")
go util.Forever(func() {
go wait.Forever(func() {
startTime := time.Now()
s.updateCachedPodVolumeStats()
glog.V(3).Infof("Finished calculating volume stats in %v.", time.Now().Sub(startTime))

View File

@@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
// A wrapper around api.PodStatus that includes a version to enforce that stale pod statuses are
@@ -125,7 +126,7 @@ func (m *manager) Start() {
glog.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go util.Forever(func() {
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)

View File

@@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// Controller is the controller manager for the core bootstrap Kubernetes controller
@@ -99,7 +100,7 @@ func (c *Controller) Start() {
// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
util.Until(func() {
wait.Until(func() {
// Service definition is not reconciled after first
// run, ports and type will be corrected only during
// start.

View File

@@ -74,8 +74,8 @@ import (
"k8s.io/kubernetes/pkg/storage"
etcdmetrics "k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
daemonetcd "k8s.io/kubernetes/pkg/registry/daemonset/etcd"
horizontalpodautoscaleretcd "k8s.io/kubernetes/pkg/registry/horizontalpodautoscaler/etcd"
@@ -605,7 +605,7 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
thirdPartyResourceRegistry: thirdPartyResourceStorage,
}
go func() {
util.Forever(func() {
wait.Forever(func() {
if err := thirdPartyControl.SyncResources(); err != nil {
glog.Warningf("third party resource sync failed: %v", err)
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/ssh"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
@@ -125,7 +126,7 @@ func (c *SSHTunneler) SecondsSinceSync() int64 {
}
func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
go util.Until(func() {
go wait.Until(func() {
if c.InstallSSHKey == nil {
glog.Error("Won't attempt to install ssh key: InstallSSHKey function is nil")
return
@@ -150,7 +151,7 @@ func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
// each time (Update() is a noop if no changes are necessary).
func (c *SSHTunneler) nodesSyncLoop() {
// TODO (cjcullen) make this watch.
go util.Until(func() {
go wait.Until(func() {
addrs, err := c.getAddresses()
glog.Infof("Calling update w/ addrs: %v", addrs)
if err != nil {

View File

@@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
const TomcatPort int = 8080
@@ -76,7 +76,7 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []a
}
// Unittests will hard timeout in 5m with a stack trace, prevent that
// and surface a clearer reason for failure.
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services)
return
}
@@ -122,7 +122,7 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints
}
// Unittests will hard timeout in 5m with a stack trace, prevent that
// and surface a clearer reason for failure.
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out. Expected %#v, Got %#v", expectedEndpoints, endpoints)
return
}

View File

@@ -23,8 +23,8 @@ import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/wait"
)
var (
@@ -83,7 +83,7 @@ func newPortRangeAllocator(r net.PortRange) PortAllocator {
ports: make(chan int, portsBufSize),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
go util.Until(func() { ra.fillPorts(util.NeverStop) }, nextFreePortCooldown, util.NeverStop)
go wait.Until(func() { ra.fillPorts(wait.NeverStop) }, nextFreePortCooldown, wait.NeverStop)
return ra
}

View File

@@ -26,7 +26,6 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
@@ -66,7 +65,7 @@ func NewRepair(interval time.Duration, registry service.Registry, network *net.I
// RunUntil starts the controller until the provided ch is closed.
func (c *Repair) RunUntil(ch chan struct{}) {
util.Until(func() {
wait.Until(func() {
if err := c.RunOnce(); err != nil {
runtime.HandleError(err)
}

View File

@@ -25,9 +25,9 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/registry/service/portallocator"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)
// See ipallocator/controller/repair.go; this is a copy for ports.
@@ -51,7 +51,7 @@ func NewRepair(interval time.Duration, registry service.Registry, portRange net.
// RunUntil starts the controller until the provided ch is closed.
func (c *Repair) RunUntil(ch chan struct{}) {
util.Until(func() {
wait.Until(func() {
if err := c.RunOnce(); err != nil {
runtime.HandleError(err)
}

View File

@@ -39,7 +39,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/ssh"
"k8s.io/kubernetes/pkg/util"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@@ -297,7 +296,7 @@ func NewSSHTunnelList(user, keyfile string, healthCheckURL *url.URL, stopChan ch
healthCheckURL: healthCheckURL,
}
healthCheckPoll := 1 * time.Minute
go util.Until(func() {
go wait.Until(func() {
l.tunnelsLock.Lock()
defer l.tunnelsLock.Unlock()
// Healthcheck each tunnel every minute

View File

@@ -30,7 +30,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@@ -156,7 +156,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
keyFunc: config.KeyFunc,
stopped: false,
// We need to (potentially) stop both:
// - util.Until go-routine
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
@@ -171,7 +171,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
stopCh := cacher.stopCh
cacher.stopWg.Add(1)
go func() {
util.Until(
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)

View File

@@ -33,8 +33,8 @@ import (
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"golang.org/x/net/context"
@@ -158,7 +158,7 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType
if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting for an event")
}
}

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -319,7 +320,7 @@ func TestReflectorForWatchCache(t *testing.T) {
},
}
r := cache.NewReflector(lw, &api.Pod{}, store, 0)
r.ListAndWatch(util.NeverStop)
r.ListAndWatch(wait.NeverStop)
{
_, version, err := store.WaitUntilFreshAndList(10)

View File

@@ -20,7 +20,7 @@ import (
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) {
@@ -38,7 +38,7 @@ func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) {
t.Errorf("Expected to find %v, found %v", expectedValue, actualValue)
return
}
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Error("Value could not be read")
return
}

View File

@@ -19,7 +19,7 @@ package config
import (
"sync"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
type Merger interface {
@@ -74,7 +74,7 @@ func (m *Mux) Channel(source string) chan interface{} {
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go util.Until(func() { m.listen(source, newChannel) }, 0, util.NeverStop)
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}

View File

@@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
@@ -99,7 +99,7 @@ func TestDecoder_SourceClose(t *testing.T) {
select {
case <-done:
break
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Error("Timeout")
}
}

View File

@@ -23,7 +23,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
type myType struct {
@@ -113,7 +113,7 @@ func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
}(m.Watch(), m.Watch())
m.Action(Added, &myType{})
select {
case <-time.After(util.ForeverTestTimeout):
case <-time.After(wait.ForeverTestTimeout):
t.Error("timeout: deadlocked")
case <-done:
}

View File

@@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
@@ -104,7 +104,7 @@ func New(c *Config) *Scheduler {
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
go util.Until(s.scheduleOne, 0, s.config.StopEverything)
go wait.Until(s.scheduleOne, 0, s.config.StopEverything)
}
func (s *Scheduler) scheduleOne() {

View File

@@ -30,7 +30,6 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
@@ -290,7 +289,7 @@ func testMasterUpgrade(ip, v string, mUp func(v string) error) {
done := make(chan struct{}, 1)
// Let's make sure we've finished the heartbeat before shutting things down.
var wg sync.WaitGroup
go util.Until(func() {
go wait.Until(func() {
defer GinkgoRecover()
wg.Add(1)
defer wg.Done()

View File

@@ -36,8 +36,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
)
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
@@ -397,7 +397,7 @@ func (r *resourceCollector) Start() {
r.stopCh = make(chan struct{}, 1)
// Keep the last observed stats for comparison.
oldStats := make(map[string]*cadvisorapi.ContainerStats)
go util.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
}
// Stop sends a signal to terminate the stats collecting goroutine.

View File

@@ -506,12 +506,12 @@ func (config *KubeProxyTestConfig) deleteNetProxyPod() {
config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
config.endpointPods = config.endpointPods[1:]
// wait for pod being deleted.
err := waitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, util.ForeverTestTimeout)
err := waitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
if err != nil {
Failf("Failed to delete %s pod: %v", pod.Name, err)
}
// wait for endpoint being removed.
err = waitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, util.ForeverTestTimeout)
err = waitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout)
if err != nil {
Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
}

View File

@@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -62,13 +62,13 @@ var _ = Describe("Mesos", func() {
It("starts static pods on every node in the mesos cluster", func() {
client := framework.Client
expectNoError(allNodesReady(client, util.ForeverTestTimeout), "all nodes ready")
expectNoError(allNodesReady(client, wait.ForeverTestTimeout), "all nodes ready")
nodelist := ListSchedulableNodesOrDie(framework.Client)
const ns = "static-pods"
numpods := len(nodelist.Items)
expectNoError(waitForPodsRunningReady(ns, numpods, util.ForeverTestTimeout),
expectNoError(waitForPodsRunningReady(ns, numpods, wait.ForeverTestTimeout),
fmt.Sprintf("number of static pods in namespace %s is %d", ns, numpods))
})

View File

@@ -27,7 +27,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
)
@@ -150,7 +150,7 @@ var _ = Describe("Port forwarding", func() {
By("Closing the connection to the local port")
conn.Close()
logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
verifyLogMessage(logOutput, "Accepted client connection")
verifyLogMessage(logOutput, "Expected to read 3 bytes from client, but got 0 instead")
})
@@ -195,7 +195,7 @@ var _ = Describe("Port forwarding", func() {
Failf("Expected %q from server, got %q", e, a)
}
logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
verifyLogMessage(logOutput, "^Accepted client connection$")
verifyLogMessage(logOutput, "^Received expected client data$")
verifyLogMessage(logOutput, "^Done$")
@@ -232,7 +232,7 @@ var _ = Describe("Port forwarding", func() {
Failf("Expected %q from server, got %q", e, a)
}
logOutput := runKubectlWithTimeout(util.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
logOutput := runKubectlWithTimeout(wait.ForeverTestTimeout, "logs", fmt.Sprintf("--namespace=%v", framework.Namespace.Name), "-f", podName)
verifyLogMessage(logOutput, "Accepted client connection")
verifyLogMessage(logOutput, "Done")
})

View File

@@ -31,7 +31,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/test/integration/framework"
@@ -160,8 +160,8 @@ func TestSingleWatch(t *testing.T) {
defer w.Stop()
select {
case <-time.After(util.ForeverTestTimeout):
t.Fatalf("watch took longer than %s", util.ForeverTestTimeout.String())
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("watch took longer than %s", wait.ForeverTestTimeout.String())
case got, ok := <-w.ResultChan():
if !ok {
t.Fatal("Watch channel closed unexpectedly.")

View File

@@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
@@ -295,7 +294,7 @@ func DoTestPodScheduling(t *testing.T, restClient *client.Client) {
t.Fatalf("Failed to create pod: %v", err)
}
err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
if err != nil {
t.Fatalf("Failed to schedule pod: %v", err)
}

View File

@@ -36,7 +36,6 @@ import (
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
@@ -242,7 +241,7 @@ func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore
}
// There are no schedulable nodes - the pod shouldn't be scheduled.
err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
if err == nil {
t.Errorf("Pod scheduled successfully on unschedulable nodes")
}
@@ -260,7 +259,7 @@ func DoTestUnschedulableNodes(t *testing.T, restClient *client.Client, nodeStore
mod.makeSchedulable(t, schedNode, nodeStore, restClient)
// Wait until the pod is scheduled.
err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
err = wait.Poll(time.Second, wait.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name))
if err != nil {
t.Errorf("Test %d: failed to schedule a pod: %v", i, err)
} else {