Filter List in Storage level to avoid additional copies.
This commit is contained in:
@@ -243,7 +243,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
||||
func (h *etcdHelper) GetToList(key string, filter storage.FilterFunc, listObj runtime.Object) error {
|
||||
trace := util.NewTrace("GetToList " + getTypeName(listObj))
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
if err != nil {
|
||||
@@ -265,7 +265,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
||||
nodes := make([]*etcd.Node, 0)
|
||||
nodes = append(nodes, response.Node)
|
||||
|
||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
||||
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Object decoded")
|
||||
@@ -278,7 +278,7 @@ func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error {
|
||||
}
|
||||
|
||||
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
||||
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
|
||||
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
|
||||
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
v, err := conversion.EnforcePtr(slicePtr)
|
||||
@@ -289,14 +289,16 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
for _, node := range nodes {
|
||||
if node.Dir {
|
||||
trace.Step("Decoding dir " + node.Key + " START")
|
||||
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
|
||||
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Decoding dir " + node.Key + " END")
|
||||
continue
|
||||
}
|
||||
if obj, found := h.getFromCache(node.ModifiedIndex); found {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
if filter(obj) {
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
||||
}
|
||||
} else {
|
||||
obj := reflect.New(v.Type().Elem())
|
||||
if err := h.codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
|
||||
@@ -306,7 +308,9 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
if filter(obj.Interface().(runtime.Object)) {
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
}
|
||||
if node.ModifiedIndex != 0 {
|
||||
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
|
||||
}
|
||||
@@ -317,7 +321,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
}
|
||||
|
||||
// Implements storage.Interface.
|
||||
func (h *etcdHelper) List(key string, listObj runtime.Object) error {
|
||||
func (h *etcdHelper) List(key string, filter storage.FilterFunc, listObj runtime.Object) error {
|
||||
trace := util.NewTrace("List " + getTypeName(listObj))
|
||||
defer trace.LogIfLong(time.Second)
|
||||
listPtr, err := runtime.GetItemsPtr(listObj)
|
||||
@@ -333,7 +337,7 @@ func (h *etcdHelper) List(key string, listObj runtime.Object) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := h.decodeNodeList(nodes, listPtr); err != nil {
|
||||
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("Node list decoded")
|
||||
|
||||
@@ -155,7 +155,69 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
err := helper.List("/some/key", &got)
|
||||
err := helper.List("/some/key", storage.Everything, &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListFiltered(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
helper := newEtcdHelper(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
key := etcdtest.AddPrefix("/some/key")
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
EtcdIndex: 10,
|
||||
Node: &etcd.Node{
|
||||
Dir: true,
|
||||
Nodes: []*etcd.Node{
|
||||
{
|
||||
Key: "/foo",
|
||||
Value: getEncodedPod("foo"),
|
||||
Dir: false,
|
||||
ModifiedIndex: 1,
|
||||
},
|
||||
{
|
||||
Key: "/bar",
|
||||
Value: getEncodedPod("bar"),
|
||||
Dir: false,
|
||||
ModifiedIndex: 2,
|
||||
},
|
||||
{
|
||||
Key: "/baz",
|
||||
Value: getEncodedPod("baz"),
|
||||
Dir: false,
|
||||
ModifiedIndex: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
grace := int64(30)
|
||||
expect := api.PodList{
|
||||
ListMeta: api.ListMeta{ResourceVersion: "10"},
|
||||
Items: []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
|
||||
Spec: api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
filter := func(obj runtime.Object) bool {
|
||||
pod := obj.(*api.Pod)
|
||||
return pod.Name == "bar"
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
err := helper.List("/some/key", filter, &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
@@ -243,7 +305,7 @@ func TestListAcrossDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
err := helper.List("/some/key", &got)
|
||||
err := helper.List("/some/key", storage.Everything, &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
@@ -318,7 +380,7 @@ func TestListExcludesDirectories(t *testing.T) {
|
||||
}
|
||||
|
||||
var got api.PodList
|
||||
err := helper.List("/some/key", &got)
|
||||
err := helper.List("/some/key", storage.Everything, &got)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user