Terminate watchers when watch cache is destroyed
This commit is contained in:
		@@ -351,6 +351,7 @@ func NewCacherFromConfig(config Config) *Cacher {
 | 
				
			|||||||
	cacher.stopWg.Add(1)
 | 
						cacher.stopWg.Add(1)
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		defer cacher.stopWg.Done()
 | 
							defer cacher.stopWg.Done()
 | 
				
			||||||
 | 
							defer cacher.terminateAllWatchers()
 | 
				
			||||||
		wait.Until(
 | 
							wait.Until(
 | 
				
			||||||
			func() {
 | 
								func() {
 | 
				
			||||||
				if !cacher.isStopped() {
 | 
									if !cacher.isStopped() {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -496,6 +496,42 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
 | 
				
			||||||
 | 
						backingStorage := &dummyStorage{}
 | 
				
			||||||
 | 
						cacher, _ := newTestCacher(backingStorage, 1000)
 | 
				
			||||||
 | 
						defer cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Wait until cacher is initialized.
 | 
				
			||||||
 | 
						cacher.ready.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Failed to create watch: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						watchClosed := make(chan struct{})
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							defer close(watchClosed)
 | 
				
			||||||
 | 
							for event := range w.ResultChan() {
 | 
				
			||||||
 | 
								switch event.Type {
 | 
				
			||||||
 | 
								case watch.Added, watch.Modified, watch.Deleted:
 | 
				
			||||||
 | 
									// ok
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									t.Errorf("unexpected event %#v", event)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cacher.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-watchClosed:
 | 
				
			||||||
 | 
						case <-time.After(wait.ForeverTestTimeout):
 | 
				
			||||||
 | 
							t.Errorf("timed out waiting for watch to close")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestTimeBucketWatchersBasic(t *testing.T) {
 | 
					func TestTimeBucketWatchersBasic(t *testing.T) {
 | 
				
			||||||
	filter := func(_ string, _ labels.Set, _ fields.Set) bool {
 | 
						filter := func(_ string, _ labels.Set, _ fields.Set) bool {
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user