Fix race for sending errors in watch
This commit is contained in:
		| @@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { | |||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case e := <-wc.incomingEventChan: | 		case e := <-wc.incomingEventChan: | ||||||
| 			res := wc.transform(e) | 			res, err := wc.transform(e) | ||||||
|  | 			if err != nil { | ||||||
|  | 				wc.sendError(err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 			if res == nil { | 			if res == nil { | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| @@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) { | |||||||
|  |  | ||||||
| func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { | func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { | ||||||
| 	p := concurrentOrderedEventProcessing{ | 	p := concurrentOrderedEventProcessing{ | ||||||
| 		input:           wc.incomingEventChan, | 		wc:              wc, | ||||||
| 		processFunc:     wc.transform, | 		processingQueue: make(chan chan *processingResult, processEventConcurrency-1), | ||||||
| 		output:          wc.resultChan, |  | ||||||
| 		processingQueue: make(chan chan *watch.Event, processEventConcurrency-1), |  | ||||||
|  |  | ||||||
| 		objectType:    wc.watcher.objectType, | 		objectType:    wc.watcher.objectType, | ||||||
| 		groupResource: wc.watcher.groupResource, | 		groupResource: wc.watcher.groupResource, | ||||||
| @@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) { | |||||||
| 	}() | 	}() | ||||||
| } | } | ||||||
|  |  | ||||||
| type concurrentOrderedEventProcessing struct { | type processingResult struct { | ||||||
| 	input       chan *event | 	event *watch.Event | ||||||
| 	processFunc func(*event) *watch.Event | 	err   error | ||||||
| 	output      chan watch.Event | } | ||||||
|  |  | ||||||
| 	processingQueue chan chan *watch.Event | type concurrentOrderedEventProcessing struct { | ||||||
|  | 	wc *watchChan | ||||||
|  |  | ||||||
|  | 	processingQueue chan chan *processingResult | ||||||
| 	// Metadata for logging | 	// Metadata for logging | ||||||
| 	objectType    string | 	objectType    string | ||||||
| 	groupResource schema.GroupResource | 	groupResource schema.GroupResource | ||||||
| @@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C | |||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			return | 			return | ||||||
| 		case e = <-p.input: | 		case e = <-p.wc.incomingEventChan: | ||||||
| 		} | 		} | ||||||
| 		processingResponse := make(chan *watch.Event, 1) | 		processingResponse := make(chan *processingResult, 1) | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			return | 			return | ||||||
| 		case p.processingQueue <- processingResponse: | 		case p.processingQueue <- processingResponse: | ||||||
| 		} | 		} | ||||||
| 		wg.Add(1) | 		wg.Add(1) | ||||||
| 		go func(e *event, response chan<- *watch.Event) { | 		go func(e *event, response chan<- *processingResult) { | ||||||
| 			defer wg.Done() | 			defer wg.Done() | ||||||
|  | 			responseEvent, err := p.wc.transform(e) | ||||||
| 			select { | 			select { | ||||||
| 			case <-ctx.Done(): | 			case <-ctx.Done(): | ||||||
| 			case response <- p.processFunc(e): | 			case response <- &processingResult{event: responseEvent, err: err}: | ||||||
| 			} | 			} | ||||||
| 		}(e, processingResponse) | 		}(e, processingResponse) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { | func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) { | ||||||
| 	var processingResponse chan *watch.Event | 	var processingResponse chan *processingResult | ||||||
| 	var e *watch.Event | 	var r *processingResult | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| @@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co | |||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case <-ctx.Done(): | ||||||
| 			return | 			return | ||||||
| 		case e = <-processingResponse: | 		case r = <-processingResponse: | ||||||
| 		} | 		} | ||||||
| 		if e == nil { | 		if r.err != nil { | ||||||
|  | 			p.wc.sendError(r.err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		if r.event == nil { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		if len(p.output) == cap(p.output) { | 		if len(p.wc.resultChan) == cap(p.wc.resultChan) { | ||||||
| 			klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource) | 			klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource) | ||||||
| 		} | 		} | ||||||
| 		// If user couldn't receive results fast enough, we also block incoming events from watcher. | 		// If user couldn't receive results fast enough, we also block incoming events from watcher. | ||||||
| 		// Because storing events in local will cause more memory usage. | 		// Because storing events in local will cause more memory usage. | ||||||
| 		// The worst case would be closing the fast watcher. | 		// The worst case would be closing the fast watcher. | ||||||
| 		select { | 		select { | ||||||
| 		case <-ctx.Done(): | 		case p.wc.resultChan <- *r.event: | ||||||
|  | 		case <-p.wc.ctx.Done(): | ||||||
| 			return | 			return | ||||||
| 		case p.output <- *e: |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -561,12 +572,11 @@ func (wc *watchChan) acceptAll() bool { | |||||||
| } | } | ||||||
|  |  | ||||||
| // transform transforms an event into a result for user if not filtered. | // transform transforms an event into a result for user if not filtered. | ||||||
| func (wc *watchChan) transform(e *event) (res *watch.Event) { | func (wc *watchChan) transform(e *event) (res *watch.Event, err error) { | ||||||
| 	curObj, oldObj, err := wc.prepareObjs(e) | 	curObj, oldObj, err := wc.prepareObjs(e) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		klog.Errorf("failed to prepare current and previous objects: %v", err) | 		klog.Errorf("failed to prepare current and previous objects: %v", err) | ||||||
| 		wc.sendError(err) | 		return nil, err | ||||||
| 		return nil |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	switch { | 	switch { | ||||||
| @@ -574,12 +584,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { | |||||||
| 		object := wc.watcher.newFunc() | 		object := wc.watcher.newFunc() | ||||||
| 		if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { | 		if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { | ||||||
| 			klog.Errorf("failed to propagate object version: %v", err) | 			klog.Errorf("failed to propagate object version: %v", err) | ||||||
| 			return nil | 			return nil, fmt.Errorf("failed to propagate object resource version: %w", err) | ||||||
| 		} | 		} | ||||||
| 		if e.isInitialEventsEndBookmark { | 		if e.isInitialEventsEndBookmark { | ||||||
| 			if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { | 			if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { | ||||||
| 				wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err)) | 				return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err) | ||||||
| 				return nil |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		res = &watch.Event{ | 		res = &watch.Event{ | ||||||
| @@ -588,7 +597,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { | |||||||
| 		} | 		} | ||||||
| 	case e.isDeleted: | 	case e.isDeleted: | ||||||
| 		if !wc.filter(oldObj) { | 		if !wc.filter(oldObj) { | ||||||
| 			return nil | 			return nil, nil | ||||||
| 		} | 		} | ||||||
| 		res = &watch.Event{ | 		res = &watch.Event{ | ||||||
| 			Type:   watch.Deleted, | 			Type:   watch.Deleted, | ||||||
| @@ -596,7 +605,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { | |||||||
| 		} | 		} | ||||||
| 	case e.isCreated: | 	case e.isCreated: | ||||||
| 		if !wc.filter(curObj) { | 		if !wc.filter(curObj) { | ||||||
| 			return nil | 			return nil, nil | ||||||
| 		} | 		} | ||||||
| 		res = &watch.Event{ | 		res = &watch.Event{ | ||||||
| 			Type:   watch.Added, | 			Type:   watch.Added, | ||||||
| @@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { | |||||||
| 				Type:   watch.Modified, | 				Type:   watch.Modified, | ||||||
| 				Object: curObj, | 				Object: curObj, | ||||||
| 			} | 			} | ||||||
| 			return res | 			return res, nil | ||||||
| 		} | 		} | ||||||
| 		curObjPasses := wc.filter(curObj) | 		curObjPasses := wc.filter(curObj) | ||||||
| 		oldObjPasses := wc.filter(oldObj) | 		oldObjPasses := wc.filter(oldObj) | ||||||
| @@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return res | 	return res, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func transformErrorToEvent(err error) *watch.Event { | func transformErrorToEvent(err error) *watch.Event { | ||||||
|   | |||||||
| @@ -144,6 +144,11 @@ func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { | |||||||
| 	storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) | 	storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) { | ||||||
|  | 	ctx, store, _ := testSetup(t) | ||||||
|  | 	storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store}) | ||||||
|  | } | ||||||
|  |  | ||||||
| // ======================================================================= | // ======================================================================= | ||||||
| // Implementation-specific tests are following. | // Implementation-specific tests are following. | ||||||
| // The following tests are exercising the details of the implementation | // The following tests are exercising the details of the implementation | ||||||
|   | |||||||
| @@ -1571,6 +1571,73 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st | |||||||
| 	testCheckNoMoreResults(t, w) | 	testCheckNoMoreResults(t, w) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { | ||||||
|  | 	foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}} | ||||||
|  | 	fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name) | ||||||
|  | 	fooCreated := &example.Pod{} | ||||||
|  | 	if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil { | ||||||
|  | 		t.Errorf("failed to create object: %v", err) | ||||||
|  | 	} | ||||||
|  | 	bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}} | ||||||
|  | 	barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name) | ||||||
|  | 	barCreated := &example.Pod{} | ||||||
|  | 	if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil { | ||||||
|  | 		t.Errorf("failed to create object: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Update transformer to ensure that foo will become effectively corrupted. | ||||||
|  | 	revertTransformer := store.UpdatePrefixTransformer( | ||||||
|  | 		func(transformer *PrefixTransformer) value.Transformer { | ||||||
|  | 			transformer.prefix = []byte("other-prefix") | ||||||
|  | 			return transformer | ||||||
|  | 		}) | ||||||
|  | 	defer revertTransformer() | ||||||
|  |  | ||||||
|  | 	baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}} | ||||||
|  | 	bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name) | ||||||
|  | 	bazCreated := &example.Pod{} | ||||||
|  | 	if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil { | ||||||
|  | 		t.Errorf("failed to create object: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	opts := storage.ListOptions{ | ||||||
|  | 		ResourceVersion: fooCreated.ResourceVersion, | ||||||
|  | 		Predicate:       storage.Everything, | ||||||
|  | 		Recursive:       true, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Run N concurrent watches. Given the asynchronous nature, we increase the | ||||||
|  | 	// probability of hitting the race in at least one of those watches. | ||||||
|  | 	concurrentWatches := 10 | ||||||
|  | 	wg := sync.WaitGroup{} | ||||||
|  | 	for i := 0; i < concurrentWatches; i++ { | ||||||
|  | 		wg.Add(1) | ||||||
|  | 		go func() { | ||||||
|  | 			defer wg.Done() | ||||||
|  | 			w, err := store.Watch(ctx, "/pods", opts) | ||||||
|  | 			if err != nil { | ||||||
|  | 				t.Errorf("failed to create watch: %v", err) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// We issue the watch starting from object bar. | ||||||
|  | 			// The object fails TransformFromStorage and generates ERROR watch event. | ||||||
|  | 			// The further events (i.e. ADDED event for baz object) should not be | ||||||
|  | 			// emitted, so we verify no events other than ERROR type are emitted. | ||||||
|  | 			for { | ||||||
|  | 				event, ok := <-w.ResultChan() | ||||||
|  | 				if !ok { | ||||||
|  | 					break | ||||||
|  | 				} | ||||||
|  | 				if event.Type != watch.Error { | ||||||
|  | 					t.Errorf("unexpected event: %#v", event) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}() | ||||||
|  | 	} | ||||||
|  | 	wg.Wait() | ||||||
|  | } | ||||||
|  |  | ||||||
| func makePod(namePrefix string) *example.Pod { | func makePod(namePrefix string) *example.Pod { | ||||||
| 	return &example.Pod{ | 	return &example.Pod{ | ||||||
| 		ObjectMeta: metav1.ObjectMeta{ | 		ObjectMeta: metav1.ObjectMeta{ | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Wojciech Tyczyński
					Wojciech Tyczyński