| @@ -249,8 +249,6 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc | |||||||
| 			} | 			} | ||||||
| 			klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout) | 			klog.V(3).Infof("Starting watch for %s, rv=%s labels=%s fields=%s timeout=%s", req.URL.Path, opts.ResourceVersion, opts.LabelSelector, opts.FieldSelector, timeout) | ||||||
|  |  | ||||||
| 			ctx, cancel := context.WithTimeout(ctx, timeout) |  | ||||||
| 			defer cancel() |  | ||||||
| 			watcher, err := rw.Watch(ctx, &opts) | 			watcher, err := rw.Watch(ctx, &opts) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				scope.err(err, w, req) | 				scope.err(err, w, req) | ||||||
|   | |||||||
| @@ -19,7 +19,6 @@ package cacher | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math" |  | ||||||
| 	"net/http" | 	"net/http" | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -365,16 +364,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, | |||||||
| 		chanSize = 1000 | 		chanSize = 1000 | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Determine watch timeout |  | ||||||
| 	timeout := time.Duration(math.MaxInt64) |  | ||||||
| 	if deadline, ok := ctx.Deadline(); ok { |  | ||||||
| 		timeout = deadline.Sub(time.Now()) |  | ||||||
| 	} |  | ||||||
| 	// Create a watcher here to reduce memory allocations under lock, | 	// Create a watcher here to reduce memory allocations under lock, | ||||||
| 	// given that memory allocation may trigger GC and block the thread. | 	// given that memory allocation may trigger GC and block the thread. | ||||||
| 	// Also note that emptyFunc is a placeholder, until we will be able | 	// Also note that emptyFunc is a placeholder, until we will be able | ||||||
| 	// to compute watcher.forget function (which has to happen under lock). | 	// to compute watcher.forget function (which has to happen under lock). | ||||||
| 	watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, timeout) | 	watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner) | ||||||
|  |  | ||||||
| 	// We explicitly use thread unsafe version and do locking ourself to ensure that | 	// We explicitly use thread unsafe version and do locking ourself to ensure that | ||||||
| 	// no new events will be processed in the meantime. The watchCache will be unlocked | 	// no new events will be processed in the meantime. The watchCache will be unlocked | ||||||
| @@ -407,7 +401,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, | |||||||
| 		c.watcherIdx++ | 		c.watcherIdx++ | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
| 	go watcher.process(ctx, initEvents, watchRV) | 	go watcher.process(initEvents, watchRV) | ||||||
| 	return watcher, nil | 	return watcher, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -894,34 +888,9 @@ type cacheWatcher struct { | |||||||
| 	stopped   bool | 	stopped   bool | ||||||
| 	forget    func() | 	forget    func() | ||||||
| 	versioner storage.Versioner | 	versioner storage.Versioner | ||||||
| 	timer     *time.Timer |  | ||||||
| } | } | ||||||
|  |  | ||||||
| var timerPool sync.Pool | func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { | ||||||
|  |  | ||||||
| func newTimer(d time.Duration) *time.Timer { |  | ||||||
| 	t, ok := timerPool.Get().(*time.Timer) |  | ||||||
| 	if ok { |  | ||||||
| 		t.Reset(d) |  | ||||||
| 	} else { |  | ||||||
| 		t = time.NewTimer(d) |  | ||||||
| 	} |  | ||||||
| 	return t |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func freeTimer(timer *time.Timer) { |  | ||||||
| 	if !timer.Stop() { |  | ||||||
| 		// Consume triggered (but not yet received) timer event |  | ||||||
| 		// so that future reuse does not get a spurious timeout. |  | ||||||
| 		select { |  | ||||||
| 		case <-timer.C: |  | ||||||
| 		default: |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	timerPool.Put(timer) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, timeout time.Duration) *cacheWatcher { |  | ||||||
| 	return &cacheWatcher{ | 	return &cacheWatcher{ | ||||||
| 		input:     make(chan *watchCacheEvent, chanSize), | 		input:     make(chan *watchCacheEvent, chanSize), | ||||||
| 		result:    make(chan watch.Event, chanSize), | 		result:    make(chan watch.Event, chanSize), | ||||||
| @@ -930,7 +899,6 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve | |||||||
| 		stopped:   false, | 		stopped:   false, | ||||||
| 		forget:    forget, | 		forget:    forget, | ||||||
| 		versioner: versioner, | 		versioner: versioner, | ||||||
| 		timer:     newTimer(timeout), |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -951,7 +919,6 @@ func (c *cacheWatcher) stop() { | |||||||
| 		c.stopped = true | 		c.stopped = true | ||||||
| 		close(c.done) | 		close(c.done) | ||||||
| 		close(c.input) | 		close(c.input) | ||||||
| 		freeTimer(c.timer) |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -1040,7 +1007,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { | func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) { | ||||||
| 	defer utilruntime.HandleCrash() | 	defer utilruntime.HandleCrash() | ||||||
|  |  | ||||||
| 	// Check how long we are processing initEvents. | 	// Check how long we are processing initEvents. | ||||||
| @@ -1076,20 +1043,10 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven | |||||||
|  |  | ||||||
| 	defer close(c.result) | 	defer close(c.result) | ||||||
| 	defer c.Stop() | 	defer c.Stop() | ||||||
| 	for { | 	for event := range c.input { | ||||||
| 		select { | 		// only send events newer than resourceVersion | ||||||
| 		case event, ok := <-c.input: | 		if event.ResourceVersion > resourceVersion { | ||||||
| 			if !ok { | 			c.sendWatchCacheEvent(event) | ||||||
| 				return |  | ||||||
| 			} |  | ||||||
| 			// only send events newer than resourceVersion |  | ||||||
| 			if event.ResourceVersion > resourceVersion { |  | ||||||
| 				c.sendWatchCacheEvent(event) |  | ||||||
| 			} |  | ||||||
| 		case <-ctx.Done(): |  | ||||||
| 			return |  | ||||||
| 		case <-c.timer.C: |  | ||||||
| 			return |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
| @@ -19,7 +19,6 @@ package cacher | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math" |  | ||||||
| 	"reflect" | 	"reflect" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -64,8 +63,8 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| 	// set the size of the buffer of w.result to 0, so that the writes to | 	// set the size of the buffer of w.result to 0, so that the writes to | ||||||
| 	// w.result is blocked. | 	// w.result is blocked. | ||||||
| 	w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) | 	w = newCacheWatcher(0, filter, forget, testVersioner{}) | ||||||
| 	go w.process(context.Background(), initEvents, 0) | 	go w.process(initEvents, 0) | ||||||
| 	w.Stop() | 	w.Stop() | ||||||
| 	if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { | 	if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { | ||||||
| 		lock.RLock() | 		lock.RLock() | ||||||
| @@ -183,9 +182,8 @@ TestCase: | |||||||
| 		for j := range testCase.events { | 		for j := range testCase.events { | ||||||
| 			testCase.events[j].ResourceVersion = uint64(j) + 1 | 			testCase.events[j].ResourceVersion = uint64(j) + 1 | ||||||
| 		} | 		} | ||||||
|  | 		w := newCacheWatcher(0, filter, forget, testVersioner{}) | ||||||
| 		w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Duration(math.MaxInt64)) | 		go w.process(testCase.events, 0) | ||||||
| 		go w.process(context.Background(), testCase.events, 0) |  | ||||||
| 		ch := w.ResultChan() | 		ch := w.ResultChan() | ||||||
| 		for j, event := range testCase.expected { | 		for j, event := range testCase.expected { | ||||||
| 			e := <-ch | 			e := <-ch | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Benjamin Elder
					Benjamin Elder