WatchList should not convey events for the root key
It's possible to watch /foo/bar, get events for /foo/bar/baz, and then if someone deletes /foo/bar you'll get an event for /foo/bar without a value. This results in an error being printed in the logs because /foo/bar has value "" and we can't decode that. This commit excludes the parent directory from watch events for now.
This commit is contained in:
@@ -61,7 +61,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
|
||||
// watch.Interface. resourceVersion may be used to specify what version to begin
|
||||
// watching (e.g., for reconnecting without missing any updates).
|
||||
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
|
||||
w := newEtcdWatcher(true, filter, h.Codec, h.ResourceVersioner, nil)
|
||||
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.ResourceVersioner, nil)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w, nil
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
|
||||
//
|
||||
// Errors will be sent down the channel.
|
||||
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
|
||||
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.ResourceVersioner, transform)
|
||||
go w.etcdWatch(h.Client, key, resourceVersion)
|
||||
return w
|
||||
}
|
||||
@@ -98,14 +98,25 @@ func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, trans
|
||||
// TransformFunc attempts to convert an object to another object for use with a watcher.
|
||||
type TransformFunc func(runtime.Object) (runtime.Object, error)
|
||||
|
||||
// includeFunc returns true if the given key should be considered part of a watch
|
||||
type includeFunc func(key string) bool
|
||||
|
||||
// exceptKey is an includeFunc that returns false when the provided key matches the watched key
|
||||
func exceptKey(except string) includeFunc {
|
||||
return func(key string) bool {
|
||||
return key != except
|
||||
}
|
||||
}
|
||||
|
||||
// etcdWatcher converts a native etcd watch to a watch.Interface.
|
||||
type etcdWatcher struct {
|
||||
encoding runtime.Codec
|
||||
versioner EtcdResourceVersioner
|
||||
transform TransformFunc
|
||||
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
filter FilterFunc
|
||||
list bool // If we're doing a recursive watch, should be true.
|
||||
include includeFunc
|
||||
filter FilterFunc
|
||||
|
||||
etcdIncoming chan *etcd.Response
|
||||
etcdError chan error
|
||||
@@ -126,12 +137,13 @@ const watchWaitDuration = 100 * time.Millisecond
|
||||
|
||||
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
|
||||
// and a versioner, the versioner must be able to handle the objects that transform creates.
|
||||
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdResourceVersioner, transform TransformFunc) *etcdWatcher {
|
||||
w := &etcdWatcher{
|
||||
encoding: encoding,
|
||||
versioner: versioner,
|
||||
transform: transform,
|
||||
list: list,
|
||||
include: include,
|
||||
filter: filter,
|
||||
etcdIncoming: make(chan *etcd.Response),
|
||||
etcdError: make(chan error, 1),
|
||||
@@ -258,6 +270,9 @@ func (w *etcdWatcher) sendAdd(res *etcd.Response) {
|
||||
glog.Errorf("unexpected nil node: %#v", res)
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.Node.Value)
|
||||
obj, err := w.decodeObject(data, res.Node.ModifiedIndex)
|
||||
if err != nil {
|
||||
@@ -285,6 +300,9 @@ func (w *etcdWatcher) sendModify(res *etcd.Response) {
|
||||
glog.Errorf("unexpected nil node: %#v", res)
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.Node.Key) {
|
||||
return
|
||||
}
|
||||
curData := []byte(res.Node.Value)
|
||||
curObj, err := w.decodeObject(curData, res.Node.ModifiedIndex)
|
||||
if err != nil {
|
||||
@@ -331,6 +349,9 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {
|
||||
glog.Errorf("unexpected nil prev node: %#v", res)
|
||||
return
|
||||
}
|
||||
if w.include != nil && !w.include(res.PrevNode.Key) {
|
||||
return
|
||||
}
|
||||
data := []byte(res.PrevNode.Value)
|
||||
index := res.PrevNode.ModifiedIndex
|
||||
if res.Node != nil {
|
||||
|
Reference in New Issue
Block a user