Better waiting for watch event delivery in cacher
This commit is contained in:
@@ -532,17 +532,23 @@ func (c *Cacher) dispatchEvents() {
|
||||
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
triggerValues, supported := c.triggerValues(event)
|
||||
|
||||
// TODO: For now we assume we have a given <timeout> budget for dispatching
|
||||
// a single event. We should consider changing to the approach with:
|
||||
// - budget has upper bound at <max_timeout>
|
||||
// - we add <portion> to current timeout every second
|
||||
timeout := time.Duration(250) * time.Millisecond
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// Iterate over "allWatchers" no matter what the trigger function is.
|
||||
for _, watcher := range c.watchers.allWatchers {
|
||||
watcher.add(event)
|
||||
watcher.add(event, &timeout)
|
||||
}
|
||||
if supported {
|
||||
// Iterate over watchers interested in the given values of the trigger.
|
||||
for _, triggerValue := range triggerValues {
|
||||
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
|
||||
watcher.add(event)
|
||||
watcher.add(event, &timeout)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -555,7 +561,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
||||
// Iterate over watchers interested in exact values for all values.
|
||||
for _, watchers := range c.watchers.valueWatchers {
|
||||
for _, watcher := range watchers {
|
||||
watcher.add(event)
|
||||
watcher.add(event, &timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -729,7 +735,7 @@ func (c *cacheWatcher) stop() {
|
||||
|
||||
var timerPool sync.Pool
|
||||
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent) {
|
||||
func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
|
||||
// Try to send the event immediately, without blocking.
|
||||
select {
|
||||
case c.input <- *event:
|
||||
@@ -737,20 +743,16 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
||||
default:
|
||||
}
|
||||
|
||||
// OK, block sending, but only for up to 5 seconds.
|
||||
// OK, block sending, but only for up to <timeout>.
|
||||
// cacheWatcher.add is called very often, so arrange
|
||||
// to reuse timers instead of constantly allocating.
|
||||
trace := util.NewTrace(
|
||||
fmt.Sprintf("cacheWatcher %v: waiting for add (initial result size %v)",
|
||||
reflect.TypeOf(event.Object).String(), len(c.result)))
|
||||
defer trace.LogIfLong(50 * time.Millisecond)
|
||||
startTime := time.Now()
|
||||
|
||||
const timeout = 5 * time.Second
|
||||
t, ok := timerPool.Get().(*time.Timer)
|
||||
if ok {
|
||||
t.Reset(timeout)
|
||||
t.Reset(*timeout)
|
||||
} else {
|
||||
t = time.NewTimer(timeout)
|
||||
t = time.NewTimer(*timeout)
|
||||
}
|
||||
defer timerPool.Put(t)
|
||||
|
||||
@@ -769,6 +771,10 @@ func (c *cacheWatcher) add(event *watchCacheEvent) {
|
||||
c.forget(false)
|
||||
c.stop()
|
||||
}
|
||||
|
||||
if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
|
||||
*timeout = 0
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
||||
|
Reference in New Issue
Block a user