Kubelet watching only its own Node

This commit is contained in:
Wojciech Tyczynski
2015-04-02 10:57:28 +02:00
parent eac24b3241
commit 6b08958264
8 changed files with 127 additions and 11 deletions

View File

@@ -65,6 +65,10 @@ type Etcd struct {
// Called for Create/Update/Get/Delete
KeyFunc func(ctx api.Context, name string) (string, error)
// If field.Selector of Watch contains a label with such name, this will be
// translated to watching a single object (not all objects of that type).
WatchSingleFieldName string
// Called to get the name of an object
ObjectNameFunc func(obj runtime.Object) (string, error)
@@ -404,19 +408,29 @@ func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object
}
// WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
if value, found := field.RequiresExactMatch(e.WatchSingleFieldName); found && len(e.WatchSingleFieldName) > 0 {
key, err := e.KeyFunc(ctx, value)
if err != nil {
return nil, err
}
return e.watchPredicate(key, e.PredicateFunc(label, field), resourceVersion)
}
return e.watchPredicate(e.KeyRootFunc(ctx), e.PredicateFunc(label, field), resourceVersion)
}
// WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
return e.watchPredicate(e.KeyRootFunc(ctx), m, resourceVersion)
}
func (e *Etcd) watchPredicate(key string, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, func(obj runtime.Object) bool {
return e.Helper.WatchList(key, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
if err != nil {
glog.Errorf("unable to match watch: %v", err)

View File

@@ -49,6 +49,7 @@ func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *RES
KeyFunc: func(ctx api.Context, name string) (string, error) {
return prefix + "/" + name, nil
},
WatchSingleFieldName: "name",
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Node).Name, nil
},

View File

@@ -34,6 +34,11 @@ import (
"github.com/coreos/go-etcd/etcd"
)
const (
PASS = iota
FAIL
)
type fakeConnectionInfoGetter struct {
}
@@ -342,6 +347,59 @@ func TestEtcdWatchNodesMatch(t *testing.T) {
watching.Stop()
}
func TestEtcdWatchNodesFields(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
nodeBytes, _ := latest.Codec.Encode(node)
testFieldMap := map[int][]fields.Set{
PASS: {
{"name": "foo"},
},
FAIL: {
{"name": "bar"},
},
}
for _, singleWatchField := range []string{"", "name"} {
storage.WatchSingleFieldName = singleWatchField
for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("unexpected result from channel %#v", r)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Errorf("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}
func TestEtcdWatchNodesNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)

View File

@@ -82,6 +82,13 @@ type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}
// NodeToSelectableFields returns a label set that represents the object.
func NodeToSelectableFields(node *api.Node) labels.Set {
return labels.Set{
"name": node.Name,
}
}
// MatchNode returns a generic matcher for a given label and field selector.
func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
@@ -89,8 +96,8 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
if !ok {
return false, fmt.Errorf("not a node")
}
// TODO: Add support for filtering based on field, once NodeStatus is defined.
return label.Matches(labels.Set(nodeObj.Labels)), nil
fields := NodeToSelectableFields(nodeObj)
return label.Matches(labels.Set(nodeObj.Labels)) && field.Matches(fields), nil
})
}