From d47e21f19feeb5322de51fb25ee8ce51d0e0ea1c Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 26 Oct 2015 10:34:45 +0100 Subject: [PATCH] Reuse TCP connections in Reflector between resync periods. --- contrib/mesos/pkg/executor/executor_test.go | 2 +- contrib/mesos/pkg/scheduler/plugin_test.go | 2 +- .../mesos/pkg/service/endpoints_controller.go | 6 +- pkg/client/cache/listwatch.go | 23 +++++-- pkg/client/cache/listwatch_test.go | 2 +- pkg/client/cache/reflector.go | 65 ++++++++++++++++++- pkg/client/cache/reflector_test.go | 58 ++++++++++++++--- pkg/controller/daemon/controller.go | 9 +-- .../endpoint/endpoints_controller.go | 6 +- .../framework/fake_controller_source.go | 4 +- .../framework/fake_controller_source_test.go | 6 +- pkg/controller/gc/gc_controller.go | 3 +- pkg/controller/job/controller.go | 6 +- .../namespace/namespace_controller.go | 3 +- pkg/controller/node/nodecontroller.go | 6 +- ...ersistentvolume_claim_binder_controller.go | 6 +- .../persistentvolume_recycler_controller.go | 3 +- .../replication/replication_controller.go | 6 +- .../serviceaccounts_controller.go | 6 +- .../serviceaccount/tokens_controller.go | 6 +- pkg/kubelet/config/apiserver_test.go | 2 +- pkg/kubelet/kubelet.go | 6 +- pkg/proxy/config/api_test.go | 2 +- pkg/storage/cacher.go | 4 +- pkg/storage/watch_cache_test.go | 8 +-- pkg/volume/util.go | 3 +- plugin/pkg/admission/limitranger/admission.go | 3 +- .../namespace/autoprovision/admission.go | 3 +- .../admission/namespace/exists/admission.go | 3 +- .../namespace/lifecycle/admission.go | 3 +- .../pkg/admission/resourcequota/admission.go | 3 +- .../pkg/admission/serviceaccount/admission.go | 6 +- test/e2e/daemon_restart.go | 3 +- test/e2e/density.go | 6 +- test/e2e/latency.go | 3 +- test/e2e/service_latency.go | 3 +- test/e2e/util.go | 3 +- 37 files changed, 184 insertions(+), 108 deletions(-) diff --git a/contrib/mesos/pkg/executor/executor_test.go b/contrib/mesos/pkg/executor/executor_test.go index 54547ff2053..fc147a336ef 100644 --- a/contrib/mesos/pkg/executor/executor_test.go +++ b/contrib/mesos/pkg/executor/executor_test.go @@ -543,7 +543,7 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch { list: initialPodList, } lw.ListWatch = cache.ListWatch{ - WatchFunc: func(resourceVersion string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return lw.fakeWatcher, nil }, ListFunc: func() (runtime.Object, error) { diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index c8fea700210..68fa9246a53 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -166,7 +166,7 @@ func NewMockPodsListWatch(initialPodList api.PodList) *MockPodsListWatch { list: initialPodList, } lw.ListWatch = cache.ListWatch{ - WatchFunc: func(resourceVersion string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return lw.fakeWatcher, nil }, ListFunc: func() (runtime.Object, error) { diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index e64d277f46c..e7fd9fdd68a 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -60,8 +60,7 @@ func NewEndpointController(client *client.Client) *endpointController { ListFunc: func() (runtime.Object, error) { return e.client.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -81,8 +80,7 @@ func NewEndpointController(client *client.Client) *endpointController { ListFunc: func() (runtime.Object, error) { return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/client/cache/listwatch.go b/pkg/client/cache/listwatch.go index 1383565dbc0..c057fce43bc 100644 --- a/pkg/client/cache/listwatch.go +++ b/pkg/client/cache/listwatch.go @@ -17,6 +17,9 @@ limitations under the License. package cache import ( + "time" + + "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/runtime" @@ -27,7 +30,7 @@ import ( type ListFunc func() (runtime.Object, error) // WatchFunc knows how to watch resources -type WatchFunc func(resourceVersion string) (watch.Interface, error) +type WatchFunc func(options api.ListOptions) (watch.Interface, error) // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // It is a convenience function for users of NewReflector, etc. @@ -52,23 +55,33 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe Do(). Get() } - watchFunc := func(resourceVersion string) (watch.Interface, error) { + watchFunc := func(options api.ListOptions) (watch.Interface, error) { return c.Get(). Prefix("watch"). Namespace(namespace). Resource(resource). + // TODO: Use VersionedParams once this is supported for non v1 API. + Param("resourceVersion", options.ResourceVersion). + TimeoutSeconds(timeoutFromListOptions(options)). FieldsSelectorParam(fieldSelector). - Param("resourceVersion", resourceVersion).Watch() + Watch() } return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} } +func timeoutFromListOptions(options api.ListOptions) time.Duration { + if options.TimeoutSeconds != nil { + return time.Duration(*options.TimeoutSeconds) * time.Second + } + return 0 +} + // List a set of apiserver resources func (lw *ListWatch) List() (runtime.Object, error) { return lw.ListFunc() } // Watch a set of apiserver resources -func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) { - return lw.WatchFunc(resourceVersion) +func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) { + return lw.WatchFunc(options) } diff --git a/pkg/client/cache/listwatch_test.go b/pkg/client/cache/listwatch_test.go index db34c48ebba..27b4eb0803c 100644 --- a/pkg/client/cache/listwatch_test.go +++ b/pkg/client/cache/listwatch_test.go @@ -165,7 +165,7 @@ func TestListWatchesCanWatch(t *testing.T) { client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Default.Version()}) lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector) // This test merely tests that the correct request is made. - lw.Watch(item.rv) + lw.Watch(api.ListOptions{ResourceVersion: item.rv}) handler.ValidateRequest(t, item.location, "GET", nil) } } diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 4c2d664205a..d7a02806e12 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net" "net/url" "reflect" @@ -30,6 +31,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/runtime" @@ -43,7 +45,7 @@ type ListerWatcher interface { // ResourceVersion field will be used to start the watch in the right place. List() (runtime.Object, error) // Watch should begin a watch at the specified version. - Watch(resourceVersion string) (watch.Interface, error) + Watch(options api.ListOptions) (watch.Interface, error) } // Reflector watches a specified resource and causes all changes to be reflected in the given store. @@ -61,6 +63,8 @@ type Reflector struct { // the beginning of the next one. period time.Duration resyncPeriod time.Duration + // nextResync is approximate time of next resync (0 if not scheduled) + nextResync time.Time // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store @@ -69,6 +73,22 @@ type Reflector struct { lastSyncResourceVersionMutex sync.RWMutex } +var ( + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + // However, it can be modified to avoid periodic resync to break the + // TCP connection. + minWatchTimeout = 5 * time.Minute + + now func() time.Time = time.Now + // If we are within 'forceResyncThreshold' from the next planned resync + // and are just before issueing Watch(), resync will be forced now. + forceResyncThreshold = 3 * time.Second + // We try to set timeouts for Watch() so that we will finish about + // than 'timeoutThreshold' from next planned periodic resync. + timeoutThreshold = 1 * time.Second +) + // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector // The indexer is configured to key on namespace func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { @@ -160,16 +180,47 @@ var ( // required, and a cleanup function. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { if r.resyncPeriod == 0 { + r.nextResync = time.Time{} return neverExitWatch, func() bool { return false } } // The cleanup function is required: imagine the scenario where watches // always fail so we end up listing frequently. Then, if we don't // manually stop the timer, we could end up with many timers active // concurrently. + r.nextResync = now().Add(r.resyncPeriod) t := time.NewTimer(r.resyncPeriod) return t.C, t.Stop } +// We want to avoid situations when periodic resyncing is breaking the TCP +// connection. +// If response`s body is not read to completion before calling body.Close(), +// that TCP connection will not be reused in the future - see #15664 issue +// for more details. +// Thus, we set timeout for watch requests to be smaller than the remaining +// time until next periodic resync and force resyncing ourself to avoid +// breaking TCP connection. +// +// TODO: This should be parametrizable based on server load. +func (r *Reflector) timeoutForWatch() *int64 { + randTimeout := time.Duration(float64(minWatchTimeout) * (rand.Float64() + 1.0)) + timeout := r.nextResync.Sub(now()) - timeoutThreshold + if timeout < 0 || randTimeout < timeout { + timeout = randTimeout + } + timeoutSeconds := int64(timeout.Seconds()) + return &timeoutSeconds +} + +// Returns true if we are close enough to next planned periodic resync +// and we can force resyncing ourself now. +func (r *Reflector) canForceResyncNow() bool { + if r.nextResync.IsZero() { + return false + } + return now().Add(forceResyncThreshold).After(r.nextResync) +} + // Returns error if ListAndWatch didn't even tried to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { var resourceVersion string @@ -195,7 +246,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { r.setLastSyncResourceVersion(resourceVersion) for { - w, err := r.listerWatcher.Watch(resourceVersion) + options := api.ListOptions{ + ResourceVersion: resourceVersion, + // We want to avoid situations when resyncing is breaking the TCP connection + // - see comment for 'timeoutForWatch()' for more details. + TimeoutSeconds: r.timeoutForWatch(), + } + w, err := r.listerWatcher.Watch(options) if err != nil { switch err { case io.EOF: @@ -225,6 +282,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { } return nil } + if r.canForceResyncNow() { + glog.V(4).Infof("%s: next resync planned for %#v, forcing now", r.name, r.nextResync) + return nil + } } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index f0593434893..aff04e47e6c 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -36,8 +36,8 @@ type testLW struct { } func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } -func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { - return t.WatchFunc(resourceVersion) +func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { + return t.WatchFunc(options.ResourceVersion) } func TestCloseWatchChannelOnError(t *testing.T) { @@ -94,7 +94,7 @@ func TestRunUntil(t *testing.T) { } } -func TestReflector_resyncChan(t *testing.T) { +func TestReflectorResyncChan(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) a, _ := g.resyncChan() @@ -107,7 +107,7 @@ func TestReflector_resyncChan(t *testing.T) { } } -func BenchmarkReflector_resyncChanMany(b *testing.B) { +func BenchmarkReflectorResyncChanMany(b *testing.B) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 25*time.Millisecond) // The improvement to this (calling the timer's Stop() method) makes @@ -119,7 +119,7 @@ func BenchmarkReflector_resyncChanMany(b *testing.B) { } } -func TestReflector_watchHandlerError(t *testing.T) { +func TestReflectorWatchHandlerError(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 0) fw := watch.NewFake() @@ -133,7 +133,7 @@ func TestReflector_watchHandlerError(t *testing.T) { } } -func TestReflector_watchHandler(t *testing.T) { +func TestReflectorWatchHandler(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 0) fw := watch.NewFake() @@ -189,7 +189,7 @@ func TestReflector_watchHandler(t *testing.T) { } } -func TestReflector_watchHandlerTimeout(t *testing.T) { +func TestReflectorWatchHandlerTimeout(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, 0) fw := watch.NewFake() @@ -215,7 +215,7 @@ func TestReflectorStopWatch(t *testing.T) { } } -func TestReflector_ListAndWatch(t *testing.T) { +func TestReflectorListAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc @@ -273,7 +273,7 @@ func TestReflector_ListAndWatch(t *testing.T) { } } -func TestReflector_ListAndWatchWithErrors(t *testing.T) { +func TestReflectorListAndWatchWithErrors(t *testing.T) { mkPod := func(id string, rv string) *api.Pod { return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: rv}} } @@ -360,3 +360,43 @@ func TestReflector_ListAndWatchWithErrors(t *testing.T) { r.ListAndWatch(util.NeverStop) } } + +func TestReflectorResync(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + + currentTime := time.Time{} + now = func() time.Time { return currentTime } + iteration := 0 + + lw := &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + if iteration == 0 { + // Move time, but do not force resync. + currentTime = currentTime.Add(30 * time.Second) + } else if iteration == 1 { + // Move time to force resync. + currentTime = currentTime.Add(28 * time.Second) + } else if iteration >= 2 { + t.Fatalf("should have forced resync earlier") + } + iteration++ + fw := watch.NewFake() + // Send something to the watcher to avoid "watch too short" errors. + go func() { + fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: strconv.Itoa(iteration)}}) + fw.Stop() + }() + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "0"}}, nil + }, + } + resyncPeriod := time.Minute + r := NewReflector(lw, &api.Pod{}, s, resyncPeriod) + + r.ListAndWatch(util.NeverStop) + if iteration != 2 { + t.Errorf("exactly 2 iterations were expected, got: %v", iteration) + } +} diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index f928456d75c..fc0d599e54e 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -99,8 +99,7 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle ListFunc: func() (runtime.Object, error) { return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -132,8 +131,7 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle ListFunc: func() (runtime.Object, error) { return dsc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -151,8 +149,7 @@ func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controlle ListFunc: func() (runtime.Object, error) { return dsc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 0fd2b708a94..303591c81bc 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -64,8 +64,7 @@ func NewEndpointController(client *client.Client, resyncPeriod controller.Resync ListFunc: func() (runtime.Object, error) { return e.client.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -86,8 +85,7 @@ func NewEndpointController(client *client.Client, resyncPeriod controller.Resync ListFunc: func() (runtime.Object, error) { return e.client.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/framework/fake_controller_source.go b/pkg/controller/framework/fake_controller_source.go index 986346e6457..654f022da20 100644 --- a/pkg/controller/framework/fake_controller_source.go +++ b/pkg/controller/framework/fake_controller_source.go @@ -150,10 +150,10 @@ func (f *FakeControllerSource) List() (runtime.Object, error) { // Watch returns a watch, which will be pre-populated with all changes // after resourceVersion. -func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, error) { +func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) { f.lock.RLock() defer f.lock.RUnlock() - rc, err := strconv.Atoi(resourceVersion) + rc, err := strconv.Atoi(options.ResourceVersion) if err != nil { return nil, err } diff --git a/pkg/controller/framework/fake_controller_source_test.go b/pkg/controller/framework/fake_controller_source_test.go index fcf618fb6ff..a70833669ad 100644 --- a/pkg/controller/framework/fake_controller_source_test.go +++ b/pkg/controller/framework/fake_controller_source_test.go @@ -64,7 +64,7 @@ func TestRCNumber(t *testing.T) { source.Modify(pod("foo")) source.Modify(pod("foo")) - w, err := source.Watch("1") + w, err := source.Watch(api.ListOptions{ResourceVersion: "1"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -78,13 +78,13 @@ func TestRCNumber(t *testing.T) { t.Errorf("wanted %v, got %v", e, a) } - w2, err := source.Watch("2") + w2, err := source.Watch(api.ListOptions{ResourceVersion: "2"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } go consume(t, w2, []string{"3"}, wg) - w3, err := source.Watch("3") + w3, err := source.Watch(api.ListOptions{ResourceVersion: "3"}) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index 11bb0412e8e..bb9cd196953 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -68,8 +68,7 @@ func New(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, ListFunc: func() (runtime.Object, error) { return gcc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), terminatedSelector) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, options) }, }, diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 84500f6c9d7..b414823fe82 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -88,8 +88,7 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn ListFunc: func() (runtime.Object, error) { return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return jm.kubeClient.Extensions().Jobs(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -112,8 +111,7 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn ListFunc: func() (runtime.Object, error) { return jm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return jm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index c576e5ff0db..2b6545209af 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -50,8 +50,7 @@ func NewNamespaceController(kubeClient client.Interface, versions *unversioned.A ListFunc: func() (runtime.Object, error) { return kubeClient.Namespaces().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 7ca602df99e..7feb0f59c90 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -166,8 +166,7 @@ func NewNodeController( ListFunc: func() (runtime.Object, error) { return nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return nc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -183,8 +182,7 @@ func NewNodeController( ListFunc: func() (runtime.Object, error) { return nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return nc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index c149f331592..a68ddd069ab 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -58,8 +58,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -77,8 +76,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index 0c64328e224..5788ce0c445 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -66,8 +66,7 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du ListFunc: func() (runtime.Object, error) { return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index 5ba9c35ca37..1cbf4d705b4 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -110,8 +110,7 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller. ListFunc: func() (runtime.Object, error) { return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -152,8 +151,7 @@ func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller. ListFunc: func() (runtime.Object, error) { return rm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/serviceaccount/serviceaccounts_controller.go b/pkg/controller/serviceaccount/serviceaccounts_controller.go index 6a8e76ee352..12b71029e3b 100644 --- a/pkg/controller/serviceaccount/serviceaccounts_controller.go +++ b/pkg/controller/serviceaccount/serviceaccounts_controller.go @@ -79,8 +79,7 @@ func NewServiceAccountsController(cl client.Interface, options ServiceAccountsCo ListFunc: func() (runtime.Object, error) { return e.client.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), accountSelector) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), accountSelector, options) }, }, @@ -97,8 +96,7 @@ func NewServiceAccountsController(cl client.Interface, options ServiceAccountsCo ListFunc: func() (runtime.Object, error) { return e.client.Namespaces().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/pkg/controller/serviceaccount/tokens_controller.go b/pkg/controller/serviceaccount/tokens_controller.go index 92202e12ff2..844fbb4591f 100644 --- a/pkg/controller/serviceaccount/tokens_controller.go +++ b/pkg/controller/serviceaccount/tokens_controller.go @@ -65,8 +65,7 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) * ListFunc: func() (runtime.Object, error) { return e.client.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -86,8 +85,7 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) * ListFunc: func() (runtime.Object, error) { return e.client.Secrets(api.NamespaceAll).List(labels.Everything(), tokenSelector) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return e.client.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, options) }, }, diff --git a/pkg/kubelet/config/apiserver_test.go b/pkg/kubelet/config/apiserver_test.go index 8563df66d52..763acb614c1 100644 --- a/pkg/kubelet/config/apiserver_test.go +++ b/pkg/kubelet/config/apiserver_test.go @@ -35,7 +35,7 @@ func (lw fakePodLW) List() (runtime.Object, error) { return lw.listResp, nil } -func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) { +func (lw fakePodLW) Watch(options api.ListOptions) (watch.Interface, error) { return lw.watchResp, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bdf0c1836da..f68656fb838 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -213,8 +213,7 @@ func NewMainKubelet( ListFunc: func() (runtime.Object, error) { return kubeClient.Services(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } @@ -231,8 +230,7 @@ func NewMainKubelet( ListFunc: func() (runtime.Object, error) { return kubeClient.Nodes().List(labels.Everything(), fieldSelector) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, options) }, } diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index fa4fa122b2e..a6e65d82280 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -35,7 +35,7 @@ func (lw fakeLW) List() (runtime.Object, error) { return lw.listResp, nil } -func (lw fakeLW) Watch(resourceVersion string) (watch.Interface, error) { +func (lw fakeLW) Watch(options api.ListOptions) (watch.Interface, error) { return lw.watchResp, nil } diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index 7fd1c68c071..73312aeae93 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -351,8 +351,8 @@ func (lw *cacherListerWatcher) List() (runtime.Object, error) { } // Implements cache.ListerWatcher interface. -func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, error) { - version, err := ParseWatchResourceVersion(resourceVersion, lw.resourcePrefix) +func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error) { + version, err := ParseWatchResourceVersion(options.ResourceVersion, lw.resourcePrefix) if err != nil { return nil, err } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index bc62ca85b69..d462e67a603 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -232,12 +232,12 @@ func TestEvents(t *testing.T) { type testLW struct { ListFunc func() (runtime.Object, error) - WatchFunc func(resourceVersion string) (watch.Interface, error) + WatchFunc func(options api.ListOptions) (watch.Interface, error) } func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() } -func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { - return t.WatchFunc(resourceVersion) +func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) { + return t.WatchFunc(options) } func TestReflectorForWatchCache(t *testing.T) { @@ -251,7 +251,7 @@ func TestReflectorForWatchCache(t *testing.T) { } lw := &testLW{ - WatchFunc: func(rv string) (watch.Interface, error) { + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { fw := watch.NewFake() go fw.Stop() return fw, nil diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 84db4e32cb9..c7a6c8a1f9f 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -111,8 +111,7 @@ func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, s ListFunc: func() (runtime.Object, error) { return c.client.Pods(namespace).List(labels.Everything(), fieldSelector) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, options) }, } diff --git a/plugin/pkg/admission/limitranger/admission.go b/plugin/pkg/admission/limitranger/admission.go index eb74f2f673e..76f6f6d3212 100644 --- a/plugin/pkg/admission/limitranger/admission.go +++ b/plugin/pkg/admission/limitranger/admission.go @@ -102,8 +102,7 @@ func NewLimitRanger(client client.Interface, limitFunc LimitFunc) admission.Inte ListFunc: func() (runtime.Object, error) { return client.LimitRanges(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return client.LimitRanges(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index b23c9181e62..6ff36e12cae 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -87,8 +87,7 @@ func NewProvision(c client.Interface) admission.Interface { ListFunc: func() (runtime.Object, error) { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index badc431729f..ea23c47208a 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -94,8 +94,7 @@ func NewExists(c client.Interface) admission.Interface { ListFunc: func() (runtime.Object, error) { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index e094342ff74..18d18e2f963 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -111,8 +111,7 @@ func NewLifecycle(c client.Interface) admission.Interface { ListFunc: func() (runtime.Object, error) { return c.Namespaces().List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Namespaces().Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/plugin/pkg/admission/resourcequota/admission.go b/plugin/pkg/admission/resourcequota/admission.go index c49500bae7e..890b60de32f 100644 --- a/plugin/pkg/admission/resourcequota/admission.go +++ b/plugin/pkg/admission/resourcequota/admission.go @@ -53,8 +53,7 @@ func NewResourceQuota(client client.Interface) admission.Interface { ListFunc: func() (runtime.Object, error) { return client.ResourceQuotas(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return client.ResourceQuotas(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, } diff --git a/plugin/pkg/admission/serviceaccount/admission.go b/plugin/pkg/admission/serviceaccount/admission.go index b6fb7790ba0..ced32529aea 100644 --- a/plugin/pkg/admission/serviceaccount/admission.go +++ b/plugin/pkg/admission/serviceaccount/admission.go @@ -85,8 +85,7 @@ func NewServiceAccount(cl client.Interface) *serviceAccount { ListFunc: func() (runtime.Object, error) { return cl.ServiceAccounts(api.NamespaceAll).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return cl.ServiceAccounts(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -100,8 +99,7 @@ func NewServiceAccount(cl client.Interface) *serviceAccount { ListFunc: func() (runtime.Object, error) { return cl.Secrets(api.NamespaceAll).List(labels.Everything(), tokenSelector) }, - WatchFunc: func(resourceVersion string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: resourceVersion} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return cl.Secrets(api.NamespaceAll).Watch(labels.Everything(), tokenSelector, options) }, }, diff --git a/test/e2e/daemon_restart.go b/test/e2e/daemon_restart.go index 581b2f265e6..91a86c4d4e5 100644 --- a/test/e2e/daemon_restart.go +++ b/test/e2e/daemon_restart.go @@ -224,8 +224,7 @@ var _ = Describe("DaemonRestart", func() { ListFunc: func() (runtime.Object, error) { return framework.Client.Pods(ns).List(labelSelector, fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return framework.Client.Pods(ns).Watch(labelSelector, fields.Everything(), options) }, }, diff --git a/test/e2e/density.go b/test/e2e/density.go index 5733d74baa6..696a2292386 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -202,8 +202,7 @@ var _ = Describe("Density", func() { ListFunc: func() (runtime.Object, error) { return c.Events(ns).List(labels.Everything(), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Events(ns).Watch(labels.Everything(), fields.Everything(), options) }, }, @@ -286,8 +285,7 @@ var _ = Describe("Density", func() { ListFunc: func() (runtime.Object, error) { return c.Pods(ns).List(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), options) }, }, diff --git a/test/e2e/latency.go b/test/e2e/latency.go index 57c646f8dca..30b7ebb0995 100644 --- a/test/e2e/latency.go +++ b/test/e2e/latency.go @@ -152,8 +152,7 @@ func runLatencyTest(nodeCount int, c *client.Client, ns string) { ListFunc: func() (runtime.Object, error) { return c.Pods(ns).List(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Pods(ns).Watch(labels.SelectorFromSet(labels.Set{"name": additionalPodsPrefix}), fields.Everything(), options) }, }, diff --git a/test/e2e/service_latency.go b/test/e2e/service_latency.go index f734b888e68..f383685c526 100644 --- a/test/e2e/service_latency.go +++ b/test/e2e/service_latency.go @@ -281,8 +281,7 @@ func startEndpointWatcher(f *Framework, q *endpointQueries) { ListFunc: func() (runtime.Object, error) { return f.Client.Endpoints(f.Namespace.Name).List(labels.Everything()) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return f.Client.Endpoints(f.Namespace.Name).Watch(labels.Everything(), fields.Everything(), options) }, }, diff --git a/test/e2e/util.go b/test/e2e/util.go index 9ee36c4900e..08d7378459d 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -144,8 +144,7 @@ func newPodStore(c *client.Client, namespace string, label labels.Selector, fiel ListFunc: func() (runtime.Object, error) { return c.Pods(namespace).List(label, field) }, - WatchFunc: func(rv string) (watch.Interface, error) { - options := api.ListOptions{ResourceVersion: rv} + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { return c.Pods(namespace).Watch(label, field, options) }, }