Merge pull request #78029 from liggitt/crd-watch
Terminate custom resource watches when storage is destroyed
This commit is contained in:
		@@ -27,6 +27,8 @@ import (
 | 
			
		||||
	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/client-go/dynamic"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -91,7 +93,7 @@ func TestChangeCRD(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Set up 100 loops creating and reading custom resources
 | 
			
		||||
	// Set up 100 loops creating and reading and watching custom resources
 | 
			
		||||
	for i := 0; i < 100; i++ {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
@@ -112,6 +114,31 @@ func TestChangeCRD(t *testing.T) {
 | 
			
		||||
				time.Sleep(10 * time.Millisecond)
 | 
			
		||||
			}
 | 
			
		||||
		}(i)
 | 
			
		||||
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		go func(i int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case <-stopChan:
 | 
			
		||||
					return
 | 
			
		||||
				default:
 | 
			
		||||
					w, err := noxuNamespacedResourceClient.Watch(metav1.ListOptions{})
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						t.Fatalf("unexpected error establishing watch: %v", err)
 | 
			
		||||
					}
 | 
			
		||||
					for event := range w.ResultChan() {
 | 
			
		||||
						switch event.Type {
 | 
			
		||||
						case watch.Added, watch.Modified, watch.Deleted:
 | 
			
		||||
							// all expected
 | 
			
		||||
						default:
 | 
			
		||||
							t.Errorf("unexpected watch event: %#v", event)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				time.Sleep(10 * time.Millisecond)
 | 
			
		||||
			}
 | 
			
		||||
		}(i)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Let all the established get request loops soak
 | 
			
		||||
@@ -121,5 +148,15 @@ func TestChangeCRD(t *testing.T) {
 | 
			
		||||
	close(stopChan)
 | 
			
		||||
 | 
			
		||||
	// Let loops drain
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	drained := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(drained)
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-drained:
 | 
			
		||||
	case <-time.After(wait.ForeverTestTimeout):
 | 
			
		||||
		t.Error("timed out waiting for clients to complete")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -351,6 +351,7 @@ func NewCacherFromConfig(config Config) *Cacher {
 | 
			
		||||
	cacher.stopWg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer cacher.stopWg.Done()
 | 
			
		||||
		defer cacher.terminateAllWatchers()
 | 
			
		||||
		wait.Until(
 | 
			
		||||
			func() {
 | 
			
		||||
				if !cacher.isStopped() {
 | 
			
		||||
 
 | 
			
		||||
@@ -496,6 +496,42 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
 | 
			
		||||
	backingStorage := &dummyStorage{}
 | 
			
		||||
	cacher, _ := newTestCacher(backingStorage, 1000)
 | 
			
		||||
	defer cacher.Stop()
 | 
			
		||||
 | 
			
		||||
	// Wait until cacher is initialized.
 | 
			
		||||
	cacher.ready.wait()
 | 
			
		||||
 | 
			
		||||
	w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Failed to create watch: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	watchClosed := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(watchClosed)
 | 
			
		||||
		for event := range w.ResultChan() {
 | 
			
		||||
			switch event.Type {
 | 
			
		||||
			case watch.Added, watch.Modified, watch.Deleted:
 | 
			
		||||
				// ok
 | 
			
		||||
			default:
 | 
			
		||||
				t.Errorf("unexpected event %#v", event)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	cacher.Stop()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-watchClosed:
 | 
			
		||||
	case <-time.After(wait.ForeverTestTimeout):
 | 
			
		||||
		t.Errorf("timed out waiting for watch to close")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTimeBucketWatchersBasic(t *testing.T) {
 | 
			
		||||
	filter := func(_ string, _ labels.Set, _ fields.Set) bool {
 | 
			
		||||
		return true
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user