Merge pull request #32298 from wojtek-t/hwm_in_cacher

Automatic merge from submit-queue

Log water mark for incoming queue in cacher

Ref #32257
This commit is contained in:
Kubernetes Submit Queue
2016-09-09 04:06:57 -07:00
committed by GitHub
5 changed files with 62 additions and 55 deletions

View File

@@ -162,7 +162,8 @@ type Cacher struct {
watchers indexedWatchers watchers indexedWatchers
// Incoming events that should be dispatched to watchers. // Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent incoming chan watchCacheEvent
incomingHWM HighWaterMark
// Handling graceful termination. // Handling graceful termination.
stopLock sync.RWMutex stopLock sync.RWMutex
@@ -410,6 +411,10 @@ func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
} }
func (c *Cacher) processEvent(event watchCacheEvent) { func (c *Cacher) processEvent(event watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- event c.incoming <- event
} }

View File

@@ -21,7 +21,6 @@ import (
"net/http" "net/http"
"reflect" "reflect"
"sync" "sync"
"sync/atomic"
"time" "time"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
@@ -47,23 +46,6 @@ const (
EtcdExpire = "expire" EtcdExpire = "expire"
) )
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}
// TransformFunc attempts to convert an object to another object for use with a watcher. // TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error) type TransformFunc func(runtime.Object) (runtime.Object, error)
@@ -109,8 +91,8 @@ type etcdWatcher struct {
emit func(watch.Event) emit func(watch.Event)
// HighWaterMarks for performance debugging. // HighWaterMarks for performance debugging.
incomingHWM HighWaterMark incomingHWM storage.HighWaterMark
outgoingHWM HighWaterMark outgoingHWM storage.HighWaterMark
cache etcdCache cache etcdCache
} }

View File

@@ -17,9 +17,7 @@ limitations under the License.
package etcd package etcd
import ( import (
"math/rand"
rt "runtime" rt "runtime"
"sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@@ -557,34 +555,3 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("Unexpected event from stopped watcher: %#v", event) t.Errorf("Unexpected event from stopped watcher: %#v", event)
} }
} }
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Update(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Update(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Update(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}

View File

@@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/validation/path" "k8s.io/kubernetes/pkg/api/validation/path"
@@ -149,3 +150,20 @@ func hasPathPrefix(s, pathPrefix string) bool {
} }
return false return false
} }
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

View File

@@ -16,7 +16,11 @@ limitations under the License.
package storage package storage
import "testing" import (
"math/rand"
"sync"
"testing"
)
func TestEtcdParseWatchResourceVersion(t *testing.T) { func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct { testCases := []struct {
@@ -99,3 +103,34 @@ func TestHasPathPrefix(t *testing.T) {
} }
} }
} }
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Update(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Update(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Update(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}