Merge pull request #6627 from bprashanth/watcher_cancel
Fix RunUntil and stop leaking watch channel on etcd error
This commit is contained in:
		
							
								
								
									
										28
									
								
								pkg/client/cache/reflector.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										28
									
								
								pkg/client/cache/reflector.go
									
									
									
									
										vendored
									
									
								
							| @@ -85,13 +85,13 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn | ||||
| // Run starts a watch and handles watch events. Will restart the watch if it is closed. | ||||
| // Run starts a goroutine and returns immediately. | ||||
| func (r *Reflector) Run() { | ||||
| 	go util.Forever(func() { r.listAndWatch() }, r.period) | ||||
| 	go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period) | ||||
| } | ||||
|  | ||||
| // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. | ||||
| // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. | ||||
| func (r *Reflector) RunUntil(stopCh <-chan struct{}) { | ||||
| 	go util.Until(func() { r.listAndWatch() }, r.period, stopCh) | ||||
| 	go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh) | ||||
| } | ||||
|  | ||||
| var ( | ||||
| @@ -100,6 +100,10 @@ var ( | ||||
|  | ||||
| 	// Used to indicate that watching stopped so that a resync could happen. | ||||
| 	errorResyncRequested = errors.New("resync channel fired") | ||||
|  | ||||
| 	// Used to indicate that watching stopped because of a signal from the stop | ||||
| 	// channel passed in from a client of the reflector. | ||||
| 	errorStopRequested = errors.New("Stop requested") | ||||
| ) | ||||
|  | ||||
| // resyncChan returns a channel which will receive something when a resync is required. | ||||
| @@ -110,9 +114,9 @@ func (r *Reflector) resyncChan() <-chan time.Time { | ||||
| 	return time.After(r.resyncPeriod) | ||||
| } | ||||
|  | ||||
| func (r *Reflector) listAndWatch() { | ||||
| func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { | ||||
| 	var resourceVersion string | ||||
| 	exitWatch := r.resyncChan() | ||||
| 	resyncCh := r.resyncChan() | ||||
|  | ||||
| 	list, err := r.listerWatcher.List() | ||||
| 	if err != nil { | ||||
| @@ -149,9 +153,9 @@ func (r *Reflector) listAndWatch() { | ||||
| 			} | ||||
| 			return | ||||
| 		} | ||||
| 		if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil { | ||||
| 		if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { | ||||
| 			if err != errorResyncRequested { | ||||
| 				glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) | ||||
| 				glog.Errorf("watch of %v ended with: %v", r.expectedType, err) | ||||
| 			} | ||||
| 			return | ||||
| 		} | ||||
| @@ -169,14 +173,20 @@ func (r *Reflector) syncWith(items []runtime.Object) error { | ||||
| } | ||||
|  | ||||
| // watchHandler watches w and keeps *resourceVersion up to date. | ||||
| func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error { | ||||
| func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error { | ||||
| 	start := time.Now() | ||||
| 	eventCount := 0 | ||||
|  | ||||
| 	// Stopping the watcher should be idempotent and if we return from this function there's no way | ||||
| 	// we're coming back in with the same watch interface. | ||||
| 	defer w.Stop() | ||||
|  | ||||
| loop: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-exitWatch: | ||||
| 			w.Stop() | ||||
| 		case <-stopCh: | ||||
| 			return errorStopRequested | ||||
| 		case <-resyncCh: | ||||
| 			return errorResyncRequested | ||||
| 		case event, ok := <-w.ResultChan(): | ||||
| 			if !ok { | ||||
|   | ||||
							
								
								
									
										78
									
								
								pkg/client/cache/reflector_test.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										78
									
								
								pkg/client/cache/reflector_test.go
									
									
									
									
										vendored
									
									
								
							| @@ -24,6 +24,7 @@ import ( | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" | ||||
| ) | ||||
|  | ||||
| @@ -37,6 +38,60 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { | ||||
| 	return t.WatchFunc(resourceVersion) | ||||
| } | ||||
|  | ||||
| func TestCloseWatchChannelOnError(t *testing.T) { | ||||
| 	r := NewReflector(&testLW{}, &api.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) | ||||
| 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} | ||||
| 	fw := watch.NewFake() | ||||
| 	r.listerWatcher = &testLW{ | ||||
| 		WatchFunc: func(rv string) (watch.Interface, error) { | ||||
| 			return fw, nil | ||||
| 		}, | ||||
| 		ListFunc: func() (runtime.Object, error) { | ||||
| 			return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	go r.listAndWatch(util.NeverStop) | ||||
| 	fw.Error(pod) | ||||
| 	select { | ||||
| 	case _, ok := <-fw.ResultChan(): | ||||
| 		if ok { | ||||
| 			t.Errorf("Watch channel left open after cancellation") | ||||
| 		} | ||||
| 	case <-time.After(100 * time.Millisecond): | ||||
| 		t.Errorf("the cancellation is at least 99 milliseconds late") | ||||
| 		break | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestRunUntil(t *testing.T) { | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	store := NewStore(MetaNamespaceKeyFunc) | ||||
| 	r := NewReflector(&testLW{}, &api.Pod{}, store, 0) | ||||
| 	fw := watch.NewFake() | ||||
| 	r.listerWatcher = &testLW{ | ||||
| 		WatchFunc: func(rv string) (watch.Interface, error) { | ||||
| 			return fw, nil | ||||
| 		}, | ||||
| 		ListFunc: func() (runtime.Object, error) { | ||||
| 			return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil | ||||
| 		}, | ||||
| 	} | ||||
| 	r.RunUntil(stopCh) | ||||
| 	// Synchronously add a dummy pod into the watch channel so we | ||||
| 	// know the RunUntil go routine is in the watch handler. | ||||
| 	fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}) | ||||
| 	stopCh <- struct{}{} | ||||
| 	select { | ||||
| 	case _, ok := <-fw.ResultChan(): | ||||
| 		if ok { | ||||
| 			t.Errorf("Watch channel left open after stopping the watch") | ||||
| 		} | ||||
| 	case <-time.After(100 * time.Millisecond): | ||||
| 		t.Errorf("the cancellation is at least 99 milliseconds late") | ||||
| 		break | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflector_resyncChan(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) | ||||
| @@ -57,7 +112,7 @@ func TestReflector_watchHandlerError(t *testing.T) { | ||||
| 		fw.Stop() | ||||
| 	}() | ||||
| 	var resumeRV string | ||||
| 	err := g.watchHandler(fw, &resumeRV, neverExitWatch) | ||||
| 	err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) | ||||
| 	if err == nil { | ||||
| 		t.Errorf("unexpected non-error") | ||||
| 	} | ||||
| @@ -77,7 +132,7 @@ func TestReflector_watchHandler(t *testing.T) { | ||||
| 		fw.Stop() | ||||
| 	}() | ||||
| 	var resumeRV string | ||||
| 	err := g.watchHandler(fw, &resumeRV, neverExitWatch) | ||||
| 	err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error %v", err) | ||||
| 	} | ||||
| @@ -126,12 +181,25 @@ func TestReflector_watchHandlerTimeout(t *testing.T) { | ||||
| 	var resumeRV string | ||||
| 	exit := make(chan time.Time, 1) | ||||
| 	exit <- time.Now() | ||||
| 	err := g.watchHandler(fw, &resumeRV, exit) | ||||
| 	err := g.watchHandler(fw, &resumeRV, exit, util.NeverStop) | ||||
| 	if err != errorResyncRequested { | ||||
| 		t.Errorf("expected timeout error, but got %q", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflectorStopWatch(t *testing.T) { | ||||
| 	s := NewStore(MetaNamespaceKeyFunc) | ||||
| 	g := NewReflector(&testLW{}, &api.Pod{}, s, 0) | ||||
| 	fw := watch.NewFake() | ||||
| 	var resumeRV string | ||||
| 	stopWatch := make(chan struct{}, 1) | ||||
| 	stopWatch <- struct{}{} | ||||
| 	err := g.watchHandler(fw, &resumeRV, neverExitWatch, stopWatch) | ||||
| 	if err != errorStopRequested { | ||||
| 		t.Errorf("expected stop error, got %q", err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReflector_listAndWatch(t *testing.T) { | ||||
| 	createdFakes := make(chan *watch.FakeWatcher) | ||||
|  | ||||
| @@ -157,7 +225,7 @@ func TestReflector_listAndWatch(t *testing.T) { | ||||
| 	} | ||||
| 	s := NewFIFO(MetaNamespaceKeyFunc) | ||||
| 	r := NewReflector(lw, &api.Pod{}, s, 0) | ||||
| 	go r.listAndWatch() | ||||
| 	go r.listAndWatch(util.NeverStop) | ||||
|  | ||||
| 	ids := []string{"foo", "bar", "baz", "qux", "zoo"} | ||||
| 	var fw *watch.FakeWatcher | ||||
| @@ -274,6 +342,6 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { | ||||
| 			}, | ||||
| 		} | ||||
| 		r := NewReflector(lw, &api.Pod{}, s, 0) | ||||
| 		r.listAndWatch() | ||||
| 		r.listAndWatch(util.NeverStop) | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Daniel Smith
					Daniel Smith