Fix logging; extend queue length to 100

This commit is contained in:
Daniel Smith
2015-06-29 14:21:00 -07:00
parent 102a8a1e36
commit eabc344eed
3 changed files with 62 additions and 6 deletions

View File

@@ -18,6 +18,7 @@ package util
import (
"sync"
"sync/atomic"
)
// TODO(ArtfulCoder)
@@ -40,3 +41,20 @@ func (at *AtomicValue) Load() interface{} {
defer at.valueMutex.RUnlock()
return at.value
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Check returns true iff 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Check(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

View File

@@ -17,6 +17,8 @@ limitations under the License.
package util
import (
"math/rand"
"sync"
"testing"
"time"
)
@@ -48,3 +50,34 @@ func TestAtomicValue(t *testing.T) {
atomicValue.Store(10)
ExpectValue(t, atomicValue, 10)
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Check(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Check(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Check(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}