Merge pull request #3444 from lavalamp/fix
eliminate possibility of double-calling
This commit is contained in:
@@ -80,19 +80,25 @@ func (c *timeCache) Get(key string) T {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// returns the item and true if it is found and not expired, otherwise nil and false.
|
// returns the item and true if it is found and not expired, otherwise nil and false.
|
||||||
|
// If this returns false, it has locked c.inFlightLock and it is caller's responsibility
|
||||||
|
// to unlock that.
|
||||||
func (c *timeCache) get(key string) (T, bool) {
|
func (c *timeCache) get(key string) (T, bool) {
|
||||||
c.lock.RLock()
|
c.lock.RLock()
|
||||||
defer c.lock.RUnlock()
|
defer c.lock.RUnlock()
|
||||||
data, ok := c.cache[key]
|
data, ok := c.cache[key]
|
||||||
now := c.clock.Now()
|
now := c.clock.Now()
|
||||||
if !ok || now.Sub(data.lastUpdate) > c.ttl {
|
if !ok || now.Sub(data.lastUpdate) > c.ttl {
|
||||||
|
// We must lock this while we hold c.lock-- otherwise, a writer could
|
||||||
|
// write to c.cache and remove the channel from c.inFlight before we
|
||||||
|
// manage to read c.inFlight.
|
||||||
|
c.inFlightLock.Lock()
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
return data.item, true
|
return data.item, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// c.inFlightLock MUST be locked before calling this. fillOrWait will unlock it.
|
||||||
func (c *timeCache) fillOrWait(key string) chan T {
|
func (c *timeCache) fillOrWait(key string) chan T {
|
||||||
c.inFlightLock.Lock()
|
|
||||||
defer c.inFlightLock.Unlock()
|
defer c.inFlightLock.Unlock()
|
||||||
|
|
||||||
// Already a call in progress?
|
// Already a call in progress?
|
||||||
@@ -104,7 +110,9 @@ func (c *timeCache) fillOrWait(key string) chan T {
|
|||||||
result := make(chan T, 1) // non-blocking
|
result := make(chan T, 1) // non-blocking
|
||||||
c.inFlight[key] = result
|
c.inFlight[key] = result
|
||||||
go func() {
|
go func() {
|
||||||
// Make potentially slow call
|
// Make potentially slow call.
|
||||||
|
// While this call is in flight, fillOrWait will
|
||||||
|
// presumably exit.
|
||||||
data := timeCacheEntry{
|
data := timeCacheEntry{
|
||||||
item: c.fillFunc(key),
|
item: c.fillFunc(key),
|
||||||
lastUpdate: c.clock.Now(),
|
lastUpdate: c.clock.Now(),
|
||||||
@@ -113,13 +121,13 @@ func (c *timeCache) fillOrWait(key string) chan T {
|
|||||||
|
|
||||||
// Store in cache
|
// Store in cache
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
c.cache[key] = data
|
c.cache[key] = data
|
||||||
c.lock.Unlock()
|
|
||||||
|
|
||||||
// Remove in flight entry
|
// Remove in flight entry
|
||||||
c.inFlightLock.Lock()
|
c.inFlightLock.Lock()
|
||||||
|
defer c.inFlightLock.Unlock()
|
||||||
delete(c.inFlight, key)
|
delete(c.inFlight, key)
|
||||||
c.inFlightLock.Unlock()
|
|
||||||
}()
|
}()
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
@@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -115,3 +117,67 @@ func TestCacheParallelOneCall(t *testing.T) {
|
|||||||
t.Errorf("Expected %v, got %v", e, a)
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheParallelNoDeadlocksNoDoubleCalls(t *testing.T) {
|
||||||
|
// Make 50 random keys
|
||||||
|
keys := []string{}
|
||||||
|
fuzz.New().NilChance(0).NumElements(50, 50).Fuzz(&keys)
|
||||||
|
|
||||||
|
// Data structure for tracking when each key is accessed.
|
||||||
|
type callTrack struct {
|
||||||
|
sync.Mutex
|
||||||
|
accessTimes []time.Time
|
||||||
|
}
|
||||||
|
calls := map[string]*callTrack{}
|
||||||
|
for _, k := range keys {
|
||||||
|
calls[k] = &callTrack{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is called to fill the cache in the case of a cache miss
|
||||||
|
// or cache entry expiration. We record the time.
|
||||||
|
ff := func(key string) T {
|
||||||
|
ct := calls[key]
|
||||||
|
ct.Lock()
|
||||||
|
ct.accessTimes = append(ct.accessTimes, time.Now())
|
||||||
|
ct.Unlock()
|
||||||
|
// make sure that there is time for multiple requests to come in
|
||||||
|
// for the same key before this returns.
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheDur := 10 * time.Millisecond
|
||||||
|
c := NewTimeCache(RealClock{}, cacheDur, ff)
|
||||||
|
|
||||||
|
// Spawn a bunch of goroutines, each of which sequentially requests
|
||||||
|
// 500 random keys from the cache.
|
||||||
|
runtime.GOMAXPROCS(16)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 500; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(seed int64) {
|
||||||
|
r := rand.New(rand.NewSource(seed))
|
||||||
|
for i := 0; i < 500; i++ {
|
||||||
|
c.Get(keys[r.Intn(len(keys))])
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}(rand.Int63())
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Since the cache should hold things for 10ms, no calls for a given key
|
||||||
|
// should be more closely spaced than that.
|
||||||
|
for k, ct := range calls {
|
||||||
|
if len(ct.accessTimes) < 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cur := ct.accessTimes[0]
|
||||||
|
for i := 1; i < len(ct.accessTimes); i++ {
|
||||||
|
next := ct.accessTimes[i]
|
||||||
|
if next.Sub(cur) < cacheDur {
|
||||||
|
t.Errorf("%v was called at %v and %v", k, cur, next)
|
||||||
|
}
|
||||||
|
cur = next
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user