Store previous value in WatchCache for filtering
This commit is contained in:
@@ -184,7 +184,7 @@ func (c *Cacher) List(key string, listObj runtime.Object) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cacher) processEvent(event watch.Event) {
|
||||
func (c *Cacher) processEvent(event cache.WatchCacheEvent) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, watcher := range c.watchers {
|
||||
@@ -271,16 +271,16 @@ func (lw *cacherListerWatcher) Watch(resourceVersion string) (watch.Interface, e
|
||||
// cacherWatch implements watch.Interface
|
||||
type cacheWatcher struct {
|
||||
sync.Mutex
|
||||
input chan watch.Event
|
||||
input chan cache.WatchCacheEvent
|
||||
result chan watch.Event
|
||||
filter FilterFunc
|
||||
stopped bool
|
||||
forget func()
|
||||
}
|
||||
|
||||
func newCacheWatcher(initEvents []watch.Event, filter FilterFunc, forget func()) *cacheWatcher {
|
||||
func newCacheWatcher(initEvents []cache.WatchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
|
||||
watcher := &cacheWatcher{
|
||||
input: make(chan watch.Event, 10),
|
||||
input: make(chan cache.WatchCacheEvent, 10),
|
||||
result: make(chan watch.Event, 10),
|
||||
filter: filter,
|
||||
stopped: false,
|
||||
@@ -310,15 +310,29 @@ func (c *cacheWatcher) stop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) add(event watch.Event) {
|
||||
func (c *cacheWatcher) add(event cache.WatchCacheEvent) {
|
||||
c.input <- event
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) process(initEvents []watch.Event) {
|
||||
func (c *cacheWatcher) sendWatchCacheEvent(event cache.WatchCacheEvent) {
|
||||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Object)
|
||||
oldObjPasses := false
|
||||
if event.PrevObject != nil {
|
||||
oldObjPasses = c.filter(event.PrevObject)
|
||||
}
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
c.result <- watch.Event{watch.Added, event.Object}
|
||||
case curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{watch.Modified, event.Object}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{watch.Deleted, event.Object}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cacheWatcher) process(initEvents []cache.WatchCacheEvent) {
|
||||
for _, event := range initEvents {
|
||||
if c.filter(event.Object) {
|
||||
c.result <- event
|
||||
}
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
defer close(c.result)
|
||||
defer c.Stop()
|
||||
@@ -327,8 +341,6 @@ func (c *cacheWatcher) process(initEvents []watch.Event) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if c.filter(event.Object) {
|
||||
c.result <- event
|
||||
}
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
}
|
||||
|
@@ -25,7 +25,9 @@ import (
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
@@ -300,6 +302,146 @@ func TestWatch(t *testing.T) {
|
||||
close(fakeClient.WatchResponse)
|
||||
}
|
||||
|
||||
func TestFiltering(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
fakeClient.ExpectNotFoundGet(prefixedKey)
|
||||
cacher := newTestCacher(fakeClient)
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
podFoo := makeTestPod("foo")
|
||||
podFoo.ObjectMeta.Labels = map[string]string{"filter": "foo"}
|
||||
podFooFiltered := makeTestPod("foo")
|
||||
|
||||
testCases := []struct {
|
||||
object *api.Pod
|
||||
etcdResponse *etcd.Response
|
||||
filtered bool
|
||||
event watch.EventType
|
||||
}{
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
filtered: true,
|
||||
event: watch.Added,
|
||||
},
|
||||
{
|
||||
object: podFooFiltered,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooFiltered)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 2,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
},
|
||||
filtered: true,
|
||||
// Deleted, because the new object doesn't match filter.
|
||||
event: watch.Deleted,
|
||||
},
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 3,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFooFiltered)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 2,
|
||||
},
|
||||
},
|
||||
filtered: true,
|
||||
// Added, because the previous object didn't match filter.
|
||||
event: watch.Added,
|
||||
},
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "set",
|
||||
Node: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 4,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 3,
|
||||
},
|
||||
},
|
||||
filtered: true,
|
||||
event: watch.Modified,
|
||||
},
|
||||
{
|
||||
object: podFoo,
|
||||
etcdResponse: &etcd.Response{
|
||||
Action: "delete",
|
||||
Node: &etcd.Node{
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 5,
|
||||
},
|
||||
PrevNode: &etcd.Node{
|
||||
Value: string(runtime.EncodeOrDie(testapi.Codec(), podFoo)),
|
||||
CreatedIndex: 1,
|
||||
ModifiedIndex: 4,
|
||||
},
|
||||
},
|
||||
filtered: true,
|
||||
event: watch.Deleted,
|
||||
},
|
||||
}
|
||||
|
||||
// Set up Watch for object "podFoo" with label filter set.
|
||||
selector := labels.SelectorFromSet(labels.Set{"filter": "foo"})
|
||||
filter := func(obj runtime.Object) bool {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return false
|
||||
}
|
||||
return selector.Matches(labels.Set(metadata.Labels()))
|
||||
}
|
||||
watcher, err := cacher.Watch("pods/ns/foo", 1, filter)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
fakeClient.WatchResponse <- test.etcdResponse
|
||||
if test.filtered {
|
||||
event := <-watcher.ResultChan()
|
||||
if e, a := test.event, event.Type; e != a {
|
||||
t.Errorf("%v %v", e, a)
|
||||
}
|
||||
// unset fields that are set by the infrastructure
|
||||
obj := event.Object.(*api.Pod)
|
||||
obj.ObjectMeta.ResourceVersion = ""
|
||||
obj.ObjectMeta.CreationTimestamp = util.Time{}
|
||||
if e, a := test.object, obj; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected: %#v, got: %#v", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(fakeClient.WatchResponse)
|
||||
}
|
||||
|
||||
func TestStorageError(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
prefixedKey := etcdtest.AddPrefix("pods")
|
||||
|
Reference in New Issue
Block a user