From 1c75d7646e21c0230db859d61c6acdff67d44767 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 9 Oct 2014 13:13:08 -0700 Subject: [PATCH 1/6] Shorten 'CodecForVersionOrDie' name, add 'ResourceVersioner' to testapi --- pkg/api/testapi/testapi.go | 16 ++++++++++++++-- pkg/client/restclient_test.go | 2 +- pkg/controller/replication_controller_test.go | 4 ++-- pkg/service/endpoints_controller_test.go | 4 ++-- plugin/pkg/scheduler/factory/factory_test.go | 2 +- 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/api/testapi/testapi.go b/pkg/api/testapi/testapi.go index 79e4fedcf95..23a5b25391b 100644 --- a/pkg/api/testapi/testapi.go +++ b/pkg/api/testapi/testapi.go @@ -24,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) -// Version returns the API version to test against as set by the KUBE_API_VERSION env var. +// Version returns the API version to test against, as set by the KUBE_API_VERSION env var. func Version() string { version := os.Getenv("KUBE_API_VERSION") if version == "" { @@ -33,10 +33,22 @@ func Version() string { return version } -func CodecForVersionOrDie() runtime.Codec { +// Codec returns the codec for the API version to test against, as set by the +// KUBE_API_VERSION env var. +func Codec() runtime.Codec { interfaces, err := latest.InterfacesFor(Version()) if err != nil { panic(err) } return interfaces.Codec } + +// ResourceVersioner returns the ResourceVersioner for the API version to test against, +// as set by the KUBE_API_VERSION env var. +func ResourceVersioner() runtime.ResourceVersioner { + interfaces, err := latest.InterfacesFor(Version()) + if err != nil { + panic(err) + } + return interfaces.ResourceVersioner +} diff --git a/pkg/client/restclient_test.go b/pkg/client/restclient_test.go index 0240fcc8d4c..8c1df0d0ae8 100644 --- a/pkg/client/restclient_test.go +++ b/pkg/client/restclient_test.go @@ -109,7 +109,7 @@ func TestDoRequest(t *testing.T) { {Request: testRequest{Method: "GET", Path: "error"}, Response: Response{StatusCode: 500}, Error: true}, {Request: testRequest{Method: "POST", Path: "faildecode"}, Response: Response{StatusCode: 200, RawBody: &invalid}}, {Request: testRequest{Method: "GET", Path: "failread"}, Response: Response{StatusCode: 200, RawBody: &invalid}}, - {Client: &Client{&RESTClient{baseURL: uri, Codec: testapi.CodecForVersionOrDie()}}, Request: testRequest{Method: "GET", Path: "nocertificate"}, Error: true}, + {Client: &Client{&RESTClient{baseURL: uri, Codec: testapi.Codec()}}, Request: testRequest{Method: "GET", Path: "nocertificate"}, Error: true}, } for _, c := range testClients { client := c.Setup() diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index fa3c3890d91..4b81737c6cb 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -149,7 +149,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { } func TestSyncReplicationControllerCreates(t *testing.T) { - body := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), newPodList(0)) + body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0)) fakeHandler := util.FakeHandler{ StatusCode: 200, ResponseBody: string(body), @@ -170,7 +170,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) { func TestCreateReplica(t *testing.T) { ctx := api.NewDefaultContext() - body := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Pod{}) + body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{}) fakeHandler := util.FakeHandler{ StatusCode: 200, ResponseBody: string(body), diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index e85f1d77f3e..fd074cc24ab 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -203,7 +203,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { if err := endpoints.SyncServiceEndpoints(); err != nil { t.Errorf("unexpected error: %v", err) } - data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{ + data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ TypeMeta: api.TypeMeta{ ID: "foo", ResourceVersion: "1", @@ -261,7 +261,7 @@ func TestSyncEndpointsItems(t *testing.T) { if err := endpoints.SyncServiceEndpoints(); err != nil { t.Errorf("unexpected error: %v", err) } - data := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Endpoints{ + data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ TypeMeta: api.TypeMeta{ ResourceVersion: "", }, diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index f333f1e3fb3..d7b3cf008ff 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -317,7 +317,7 @@ func TestBind(t *testing.T) { t.Errorf("Unexpected error: %v", err) continue } - expectedBody := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), item.binding) + expectedBody := runtime.EncodeOrDie(testapi.Codec(), item.binding) handler.ValidateRequest(t, "/api/"+testapi.Version()+"/bindings", "POST", &expectedBody) } } From 9a9362e896cfb953f8c77cb95f19fa228eaaf957 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 9 Oct 2014 13:56:30 -0700 Subject: [PATCH 2/6] Add generic registry object so we can stop rewriting this code --- pkg/registry/generic/doc.go | 19 ++ pkg/registry/generic/etcd.go | 119 +++++++ pkg/registry/generic/etcd_test.go | 437 ++++++++++++++++++++++++++ pkg/registry/generic/registry.go | 67 ++++ pkg/registry/generic/registry_test.go | 92 ++++++ pkg/registry/registrytest/generic.go | 87 +++++ pkg/tools/fake_etcd_client.go | 1 + 7 files changed, 822 insertions(+) create mode 100644 pkg/registry/generic/doc.go create mode 100644 pkg/registry/generic/etcd.go create mode 100644 pkg/registry/generic/etcd_test.go create mode 100644 pkg/registry/generic/registry.go create mode 100644 pkg/registry/generic/registry_test.go create mode 100644 pkg/registry/registrytest/generic.go diff --git a/pkg/registry/generic/doc.go b/pkg/registry/generic/doc.go new file mode 100644 index 00000000000..67a48db6aa5 --- /dev/null +++ b/pkg/registry/generic/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package generic provides a generic object store interface and a +// generic label/field matching type. +package generic diff --git a/pkg/registry/generic/etcd.go b/pkg/registry/generic/etcd.go new file mode 100644 index 00000000000..e43405635dd --- /dev/null +++ b/pkg/registry/generic/etcd.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Etcd implements generic.Registry, backing it with etcd storage. +// It's intended to be embeddable, so that you can implement any +// non-generic functions if needed. +// You must supply a value for every field below before use; these are +// left public as it's meant to be overridable if need be. +type Etcd struct { + // Called to make a new object, should return e.g., &api.Pod{} + NewFunc func() runtime.Object + + // Called to make a new listing object, should return e.g., &api.PodList{} + NewListFunc func() runtime.Object + + // Used for error reporting + EndpointName string + + // Used for listing/watching; should not include trailing "/" + KeyRoot string + + // Called for Create/Update/Get/Delete + KeyFunc func(id string) string + + // Used for all etcd access functions + Helper tools.EtcdHelper +} + +// List returns a list of all the items matching m. +func (e *Etcd) List(ctx api.Context, m Matcher) (runtime.Object, error) { + list := e.NewListFunc() + err := e.Helper.ExtractToList(e.KeyRoot, list) + if err != nil { + return nil, err + } + return FilterList(list, m) +} + +// FilterList filters any list object that conforms to the api conventions, +// provided that 'm' works with the concrete type of list. +func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err error) { + // TODO: push a matcher down into tools.EtcdHelper to avoid all this + // nonsense. This is a lot of unnecessary copies. + items, err := runtime.ExtractList(list) + if err != nil { + return nil, err + } + var filteredItems []runtime.Object + for _, obj := range items { + if match, err := m.Matches(obj); err == nil && match { + filteredItems = append(filteredItems, obj) + } + } + err = runtime.SetList(list, filteredItems) + if err != nil { + return nil, err + } + return list, nil +} + +// Create inserts a new item. +func (e *Etcd) Create(ctx api.Context, id string, obj runtime.Object) error { + err := e.Helper.CreateObj(e.KeyFunc(id), obj, 0) + return etcderr.InterpretCreateError(err, e.EndpointName, id) +} + +// Update updates the item. +func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error { + err := e.Helper.SetObj(e.KeyFunc(id), obj) + return etcderr.InterpretUpdateError(err, e.EndpointName, id) +} + +// Get retrieves the item from etcd. +func (e *Etcd) Get(ctx api.Context, id string) (runtime.Object, error) { + obj := e.NewFunc() + err := e.Helper.ExtractObj(e.KeyFunc(id), obj, false) + if err != nil { + return nil, etcderr.InterpretGetError(err, e.EndpointName, id) + } + return obj, nil +} + +// Delete removes the item from etcd. +func (e *Etcd) Delete(ctx api.Context, id string) error { + err := e.Helper.Delete(e.KeyFunc(id), false) + return etcderr.InterpretDeleteError(err, e.EndpointName, id) +} + +// Watch starts a watch for the items that m matches. +// TODO: Detect if m references a single object instead of a list. +func (e *Etcd) Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) { + return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { + matches, err := m.Matches(obj) + return err == nil && matches + }) +} diff --git a/pkg/registry/generic/etcd_test.go b/pkg/registry/generic/etcd_test.go new file mode 100644 index 00000000000..52571776b74 --- /dev/null +++ b/pkg/registry/generic/etcd_test.go @@ -0,0 +1,437 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "fmt" + "path" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" +) + +func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { + f := tools.NewFakeEtcdClient(t) + f.TestIndex = true + h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.ResourceVersioner()}} + return f, &Etcd{ + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + EndpointName: "pods", + KeyRoot: "/registry/pods", + KeyFunc: func(id string) string { + return path.Join("/registry/pods", id) + }, + Helper: h, + } +} + +// SetMatcher is a matcher that matches any pod with id in the set. +// Makes testing simpler. +type SetMatcher struct { + util.StringSet +} + +func (sm SetMatcher) Matches(obj runtime.Object) (bool, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return false, fmt.Errorf("wrong object") + } + return sm.Has(pod.ID), nil +} + +// EverythingMatcher matches everything +type EverythingMatcher struct{} + +func (EverythingMatcher) Matches(obj runtime.Object) (bool, error) { + return true, nil +} + +func TestEtcdList(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "bar"}, + DesiredState: api.PodState{Host: "machine"}, + } + + normalListResp := &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + {Value: runtime.EncodeOrDie(testapi.Codec(), podA)}, + {Value: runtime.EncodeOrDie(testapi.Codec(), podB)}, + }, + }, + } + + table := map[string]struct { + in tools.EtcdResponseWithError + m Matcher + out runtime.Object + succeed bool + }{ + "empty": { + in: tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{}, + }, + }, + E: nil, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{}}, + succeed: true, + }, + "notFound": { + in: tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{}}, + succeed: true, + }, + "normal": { + in: tools.EtcdResponseWithError{ + R: normalListResp, + E: nil, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{*podA, *podB}}, + succeed: true, + }, + "normalFiltered": { + in: tools.EtcdResponseWithError{ + R: normalListResp, + E: nil, + }, + m: SetMatcher{util.NewStringSet("foo")}, + out: &api.PodList{Items: []api.Pod{*podA}}, + succeed: true, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[registry.KeyRoot] = item.in + list, err := registry.List(api.NewContext(), item.m) + if e, a := item.succeed, err == nil; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + continue + } + + if e, a := item.out, list; !reflect.DeepEqual(e, a) { + t.Errorf("%v: Expected %#v, got %#v", name, e, a) + } + } +} + +func TestEtcdCreate(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, + expect: nodeWithPodA, + toCreate: podA, + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithPodA, + expect: nodeWithPodA, + toCreate: podB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Create(api.NewContext(), key, item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + } + } +} + +func TestEtcdUpdate(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + nodeWithPodB := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podB), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toUpdate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: nodeWithPodB, + toUpdate: podB, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: nodeWithPodA, + toUpdate: podA, + // TODO: Should updating a non-existing thing fail? + errOK: func(err error) bool { return err == nil }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Update(api.NewContext(), key, item.toUpdate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + } + } +} + +func TestEtcdGet(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: podA, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: nil, + errOK: errors.IsNotFound, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + got, err := registry.Get(api.NewContext(), key) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + } + } +} + +func TestEtcdDelete(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: emptyNode, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: emptyNode, + errOK: func(err error) bool { return err == nil }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Delete(api.NewContext(), key) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + } + } +} + +func TestEtcdWatch(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + respWithPodA := &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + Action: "create", + } + + fakeClient, registry := NewTestGenericEtcdRegistry(t) + wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + go func() { + fakeClient.WatchResponse <- respWithPodA + }() + + got, open := <-wi.ResultChan() + if !open { + t.Fatalf("unexpected channel close") + } + + if e, a := podA, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("difference: %s", runtime.ObjectDiff(e, a)) + } +} diff --git a/pkg/registry/generic/registry.go b/pkg/registry/generic/registry.go new file mode 100644 index 00000000000..e34373c0957 --- /dev/null +++ b/pkg/registry/generic/registry.go @@ -0,0 +1,67 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// AttrFunc returns label and field sets for List or Watch to compare against, or an error. +type AttrFunc func(obj runtime.Object) (label, field labels.Set, err error) + +// SelectionPredicate implements a generic predicate that can be passed to +// GenericRegistry's List or Watch methods. Implements the Matcher interface. +type SelectionPredicate struct { + Label labels.Selector + Field labels.Selector + GetAttrs AttrFunc +} + +// Matches returns true if the given object's labels and fields (as +// returned by s.GetAttrs) match s.Label and s.Field. An error is +// returned if s.GetAttrs fails. +func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { + if s.Label.Empty() && s.Field.Empty() { + return true, nil + } + labels, fields, err := s.GetAttrs(obj) + if err != nil { + return false, err + } + return s.Label.Matches(labels) && s.Field.Matches(fields), nil +} + +// Matcher can return true if an object matches the Matcher's selection +// criteria. +type Matcher interface { + Matches(obj runtime.Object) (bool, error) +} + +// Registry knows how to store & list any runtime.Object. Can be used for +// any object types which don't require special features from the storage +// layer. +type Registry interface { + List(api.Context, Matcher) (runtime.Object, error) + Create(ctx api.Context, id string, obj runtime.Object) error + Update(ctx api.Context, id string, obj runtime.Object) error + Get(ctx api.Context, id string) (runtime.Object, error) + Delete(ctx api.Context, id string) error + Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) +} diff --git a/pkg/registry/generic/registry_test.go b/pkg/registry/generic/registry_test.go new file mode 100644 index 00000000000..e0c5b843b9b --- /dev/null +++ b/pkg/registry/generic/registry_test.go @@ -0,0 +1,92 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "errors" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" +) + +type Ignored struct{} + +func (*Ignored) IsAnAPIObject() {} + +func TestSelectionPredicate(t *testing.T) { + table := map[string]struct { + labelSelector, fieldSelector string + labels, fields labels.Set + err error + shouldMatch bool + }{ + "A": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: labels.Set{"uid": "12345"}, + shouldMatch: true, + }, + "B": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: labels.Set{}, + shouldMatch: false, + }, + "C": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{}, + fields: labels.Set{"uid": "12345"}, + shouldMatch: false, + }, + "error": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + err: errors.New("maybe this is a 'wrong object type' error"), + shouldMatch: false, + }, + } + + for name, item := range table { + parsedLabel, err := labels.ParseSelector(item.labelSelector) + if err != nil { + panic(err) + } + parsedField, err := labels.ParseSelector(item.fieldSelector) + if err != nil { + panic(err) + } + sp := &SelectionPredicate{ + Label: parsedLabel, + Field: parsedField, + GetAttrs: func(runtime.Object) (label, field labels.Set, err error) { + return item.labels, item.fields, item.err + }, + } + got, err := sp.Matches(&Ignored{}) + if e, a := item.err, err; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + continue + } + if e, a := item.shouldMatch, got; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + } + } +} diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go new file mode 100644 index 00000000000..01b5d0293b3 --- /dev/null +++ b/pkg/registry/registrytest/generic.go @@ -0,0 +1,87 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// GenericRegistry knows how to store & list any runtime.Object. Events don't require +// any non-generic features from the storage layer. +type GenericRegistry struct { + Err error + Object runtime.Object + ObjectList runtime.Object + sync.Mutex + + Mux *watch.Mux +} + +func NewGeneric(list runtime.Object) *GenericRegistry { + return &GenericRegistry{ + ObjectList: list, + Mux: watch.NewMux(0), + } +} + +func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { + r.Lock() + defer r.Unlock() + if r.Err != nil { + return nil, r.Err + } + return generic.FilterList(r.ObjectList, m) +} + +func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { + // TODO: wire filter down into the mux; it needs access to current and previous state :( + return r.Mux.Watch(), nil +} + +func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) { + r.Lock() + defer r.Unlock() + return r.Object, r.Err +} + +func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object) error { + r.Lock() + defer r.Unlock() + r.Object = obj + r.Mux.Action(watch.Added, obj) + return r.Err +} + +func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object) error { + r.Lock() + defer r.Unlock() + r.Object = obj + r.Mux.Action(watch.Modified, obj) + return r.Err +} + +func (r *GenericRegistry) Delete(ctx api.Context, id string) error { + r.Lock() + defer r.Unlock() + r.Mux.Action(watch.Deleted, r.Object) + return r.Err +} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index bc8dec9330f..ff9e7311963 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -167,6 +167,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons Value: value, CreatedIndex: i, ModifiedIndex: i, + TTL: int64(ttl), }, }, } From 3e076e12fe22f0e59982a80b951a453ab947d894 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 9 Oct 2014 15:21:44 -0700 Subject: [PATCH 3/6] Add event registry and type --- pkg/registry/event/doc.go | 19 +++ pkg/registry/event/registry.go | 57 +++++++ pkg/registry/event/registry_test.go | 104 +++++++++++++ pkg/registry/event/rest.go | 121 +++++++++++++++ pkg/registry/event/rest_test.go | 226 ++++++++++++++++++++++++++++ 5 files changed, 527 insertions(+) create mode 100644 pkg/registry/event/doc.go create mode 100644 pkg/registry/event/registry.go create mode 100644 pkg/registry/event/registry_test.go create mode 100644 pkg/registry/event/rest.go create mode 100644 pkg/registry/event/rest_test.go diff --git a/pkg/registry/event/doc.go b/pkg/registry/event/doc.go new file mode 100644 index 00000000000..18aed52243e --- /dev/null +++ b/pkg/registry/event/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package event provides Registry interface and it's REST +// implementation for storing Event api objects. +package event diff --git a/pkg/registry/event/registry.go b/pkg/registry/event/registry.go new file mode 100644 index 00000000000..e4cd9073a8f --- /dev/null +++ b/pkg/registry/event/registry.go @@ -0,0 +1,57 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "path" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +// registry implements custom changes to generic.Etcd. +type registry struct { + *generic.Etcd + ttl uint64 +} + +// Create stores the object with a ttl, so that events don't stay in the system forever. +func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error { + err := r.Etcd.Helper.CreateObj(r.Etcd.KeyFunc(id), obj, r.ttl) + return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id) +} + +// NewEtcdRegistry returns a registry which will store Events in the given +// EtcdHelper. ttl is the time that Events will be retained by the system. +func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry { + return registry{ + Etcd: &generic.Etcd{ + NewFunc: func() runtime.Object { return &api.Event{} }, + NewListFunc: func() runtime.Object { return &api.EventList{} }, + EndpointName: "events", + KeyRoot: "/registry/events", + KeyFunc: func(id string) string { + return path.Join("/registry/events", id) + }, + Helper: h, + }, + ttl: ttl, + } +} diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go new file mode 100644 index 00000000000..fa60b9d5297 --- /dev/null +++ b/pkg/registry/event/registry_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + + "github.com/coreos/go-etcd/etcd" +) + +var testTTL uint64 = 60 + +func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { + f := tools.NewFakeEtcdClient(t) + f.TestIndex = true + h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.ResourceVersioner()}} + return f, NewEtcdRegistry(h, testTTL) +} + +func TestEventCreate(t *testing.T) { + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + eventB := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + + nodeWithEventA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), eventA), + ModifiedIndex: 1, + CreatedIndex: 1, + TTL: int64(testTTL), + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/events/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, + expect: nodeWithEventA, + toCreate: eventA, + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithEventA, + expect: nodeWithEventA, + toCreate: eventB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestEventEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Create(api.NewContext(), key, item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + } + } +} diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go new file mode 100644 index 00000000000..b43088e2a29 --- /dev/null +++ b/pkg/registry/event/rest.go @@ -0,0 +1,121 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// REST adapts an event registry into apiserver's RESTStorage model. +type REST struct { + registry generic.Registry +} + +// NewREST returns a new REST. You must use a registry created by +// NewEtcdRegistry unless you're testing. +func NewREST(registry generic.Registry) *REST { + return &REST{ + registry: registry, + } +} + +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { + event, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + event.CreationTimestamp = util.Now() + + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.Create(ctx, event.ID, event) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, event.ID) + }), nil +} + +func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { + obj, err := rs.registry.Get(ctx, id) + if err != nil { + return nil, err + } + _, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + return apiserver.MakeAsync(func() (runtime.Object, error) { + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) + }), nil +} + +func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { + obj, err := rs.registry.Get(ctx, id) + if err != nil { + return nil, err + } + event, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + return event, err +} + +func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, err error) { + event, ok := obj.(*api.Event) + if !ok { + return nil, nil, fmt.Errorf("invalid object type") + } + return labels.Set{}, labels.Set{ + "InvolvedObject.Kind": event.InvolvedObject.Kind, + "InvolvedObject.Name": event.InvolvedObject.Name, + "InvolvedObject.UID": event.InvolvedObject.UID, + "InvolvedObject.APIVersion": event.InvolvedObject.APIVersion, + "InvolvedObject.ResourceVersion": fmt.Sprintf("%s", event.InvolvedObject.ResourceVersion), + "InvolvedObject.FieldPath": event.InvolvedObject.FieldPath, + "Status": event.Status, + "Reason": event.Reason, + }, nil +} + +func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { + return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) +} + +// Watch returns Events events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) +} + +// New returns a new api.Event +func (*REST) New() runtime.Object { + return &api.Event{} +} + +// Update returns an error: Events are not mutable. +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { + return nil, fmt.Errorf("not allowed: 'Event' objects are not mutable") +} diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go new file mode 100644 index 00000000000..9bb1b0c67c9 --- /dev/null +++ b/pkg/registry/event/rest_test.go @@ -0,0 +1,226 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +type testRegistry struct { + *registrytest.GenericRegistry +} + +func NewTestREST() (testRegistry, *REST) { + reg := testRegistry{registrytest.NewGeneric(nil)} + return reg, NewREST(reg) +} + +func TestRESTCreate(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := eventA, <-c; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } +} + +func TestRESTDelete(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + c, err = rest.Delete(api.NewContext(), eventA.ID) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if stat := (<-c).(*api.Status); stat.Status != api.StatusSuccess { + t.Errorf("unexpected status: %v", stat) + } +} + +func TestRESTGet(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + got, err := rest.Get(api.NewContext(), eventA.ID) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := eventA, got; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } +} + +func TestRESTgetAttrs(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + label, field, err := rest.getAttrs(eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := label, (labels.Set{}); !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } + expect := labels.Set{ + "InvolvedObject.Kind": "Pod", + "InvolvedObject.Name": "foo", + "InvolvedObject.UID": "long uid string", + "InvolvedObject.APIVersion": testapi.Version(), + "InvolvedObject.ResourceVersion": "0", + "InvolvedObject.FieldPath": "", + "Status": "tested", + "Reason": "forTesting", + } + if e, a := expect, field; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } +} + +func TestRESTUpdate(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + _, err = rest.Update(api.NewContext(), eventA) + if err == nil { + t.Errorf("unexpected non-error") + } +} + +func TestRESTList(t *testing.T) { + reg, rest := NewTestREST() + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + eventB := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "bar", + UID: "other long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + eventC := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "baz", + UID: "yet another long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "untested", + Reason: "forTesting", + } + reg.ObjectList = &api.EventList{ + Items: []api.Event{*eventA, *eventB, *eventC}, + } + got, err := rest.List(api.NewContext(), labels.Everything(), labels.Set{"Status": "tested"}.AsSelector()) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + expect := &api.EventList{ + Items: []api.Event{*eventA, *eventB}, + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } +} + +func TestRESTWatch(t *testing.T) { + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + reg, rest := NewTestREST() + wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + go func() { + reg.Mux.Action(watch.Added, eventA) + }() + got := <-wi.ResultChan() + if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + } +} From 15680731f7a14bdd5b4410272cd52a5f21541c08 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 9 Oct 2014 15:46:41 -0700 Subject: [PATCH 4/6] Add event endpoint to apiserver --- cmd/apiserver/apiserver.go | 14 ++++++++------ pkg/master/master.go | 6 ++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 46fa835812f..a96d39cb987 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -49,17 +49,18 @@ import ( var ( port = flag.Uint("port", 8080, "The port to listen on. Default 8080") address = util.IP(net.ParseIP("127.0.0.1")) - apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'") + apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs") + minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.") - healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true") - minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds") - tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication") + healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") + minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.") + eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") + tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.") etcdServerList util.StringList - etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers") + etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.") machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") @@ -178,6 +179,7 @@ func main() { HealthCheckMinions: *healthCheckMinions, Minions: machineList, MinionCacheTTL: *minionCacheTTL, + EventTTL: *eventTTL, MinionRegexp: *minionRegexp, PodInfoGetter: podInfoGetter, NodeResources: api.NodeResources{ diff --git a/pkg/master/master.go b/pkg/master/master.go index 0d53423ff89..56370c759e5 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -31,6 +31,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" @@ -49,6 +51,7 @@ type Config struct { HealthCheckMinions bool Minions []string MinionCacheTTL time.Duration + EventTTL time.Duration MinionRegexp string PodInfoGetter client.PodInfoGetter NodeResources api.NodeResources @@ -62,6 +65,7 @@ type Master struct { endpointRegistry endpoint.Registry minionRegistry minion.Registry bindingRegistry binding.Registry + eventRegistry generic.Registry storage map[string]apiserver.RESTStorage client *client.Client } @@ -92,6 +96,7 @@ func New(c *Config) *Master { serviceRegistry: serviceRegistry, endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, } @@ -147,6 +152,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf "services": service.NewREST(m.serviceRegistry, cloud, m.minionRegistry), "endpoints": endpoint.NewREST(m.endpointRegistry), "minions": minion.NewREST(m.minionRegistry), + "events": event.NewREST(m.eventRegistry), // TODO: should appear only in scheduler API group. "bindings": binding.NewREST(m.bindingRegistry), From d3d9f7ac8bfda807e2b5bf8a5043a273ee033b49 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 9 Oct 2014 15:55:15 -0700 Subject: [PATCH 5/6] ObjectDiff moved after rebase --- pkg/registry/event/registry_test.go | 3 ++- pkg/registry/event/rest_test.go | 14 +++++++------- pkg/registry/generic/etcd_test.go | 10 +++++----- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go index fa60b9d5297..db08b7e0bcc 100644 --- a/pkg/registry/event/registry_test.go +++ b/pkg/registry/event/registry_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -98,7 +99,7 @@ func TestEventCreate(t *testing.T) { } if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { - t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) } } } diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 9bb1b0c67c9..84c42727c9d 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -24,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -48,7 +48,7 @@ func TestRESTCreate(t *testing.T) { t.Fatalf("Unexpected error %v", err) } if e, a := eventA, <-c; !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } } @@ -88,7 +88,7 @@ func TestRESTGet(t *testing.T) { t.Fatalf("Unexpected error %v", err) } if e, a := eventA, got; !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } } @@ -111,7 +111,7 @@ func TestRESTgetAttrs(t *testing.T) { t.Fatalf("Unexpected error %v", err) } if e, a := label, (labels.Set{}); !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } expect := labels.Set{ "InvolvedObject.Kind": "Pod", @@ -124,7 +124,7 @@ func TestRESTgetAttrs(t *testing.T) { "Reason": "forTesting", } if e, a := expect, field; !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } } @@ -194,7 +194,7 @@ func TestRESTList(t *testing.T) { Items: []api.Event{*eventA, *eventB}, } if e, a := expect, got; !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } } @@ -221,6 +221,6 @@ func TestRESTWatch(t *testing.T) { }() got := <-wi.ResultChan() if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("diff: %s", runtime.ObjectDiff(e, a)) + t.Errorf("diff: %s", util.ObjectDiff(e, a)) } } diff --git a/pkg/registry/generic/etcd_test.go b/pkg/registry/generic/etcd_test.go index 52571776b74..f34eeeaca05 100644 --- a/pkg/registry/generic/etcd_test.go +++ b/pkg/registry/generic/etcd_test.go @@ -209,7 +209,7 @@ func TestEtcdCreate(t *testing.T) { } if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { - t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) } } } @@ -284,7 +284,7 @@ func TestEtcdUpdate(t *testing.T) { } if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { - t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) } } } @@ -340,7 +340,7 @@ func TestEtcdGet(t *testing.T) { } if e, a := item.expect, got; !reflect.DeepEqual(e, a) { - t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) } } } @@ -396,7 +396,7 @@ func TestEtcdDelete(t *testing.T) { } if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { - t.Errorf("%v:\n%s", name, runtime.ObjectDiff(e, a)) + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) } } } @@ -432,6 +432,6 @@ func TestEtcdWatch(t *testing.T) { } if e, a := podA, got.Object; !reflect.DeepEqual(e, a) { - t.Errorf("difference: %s", runtime.ObjectDiff(e, a)) + t.Errorf("difference: %s", util.ObjectDiff(e, a)) } } From b1a6b3eee84eef0326ba204f7103594095a8c705 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 10 Oct 2014 15:46:30 -0700 Subject: [PATCH 6/6] Split generic; add test, address other review comments --- pkg/registry/event/registry.go | 5 ++- pkg/registry/event/rest.go | 16 +++---- pkg/registry/event/rest_test.go | 18 ++++---- pkg/registry/generic/etcd/doc.go | 19 ++++++++ pkg/registry/generic/{ => etcd}/etcd.go | 32 +++---------- pkg/registry/generic/{ => etcd}/etcd_test.go | 5 ++- pkg/registry/generic/registry.go | 39 ++++++++++++++++ pkg/registry/generic/registry_test.go | 47 +++++++++++++++++++- 8 files changed, 132 insertions(+), 49 deletions(-) create mode 100644 pkg/registry/generic/etcd/doc.go rename pkg/registry/generic/{ => etcd}/etcd.go (76%) rename pkg/registry/generic/{ => etcd}/etcd_test.go (99%) diff --git a/pkg/registry/event/registry.go b/pkg/registry/event/registry.go index e4cd9073a8f..f36d0163e5b 100644 --- a/pkg/registry/event/registry.go +++ b/pkg/registry/event/registry.go @@ -22,13 +22,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" ) // registry implements custom changes to generic.Etcd. type registry struct { - *generic.Etcd + *etcdgeneric.Etcd ttl uint64 } @@ -42,7 +43,7 @@ func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error { // EtcdHelper. ttl is the time that Events will be retained by the system. func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry { return registry{ - Etcd: &generic.Etcd{ + Etcd: &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Event{} }, NewListFunc: func() runtime.Object { return &api.EventList{} }, EndpointName: "events", diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go index b43088e2a29..83031f6cdb1 100644 --- a/pkg/registry/event/rest.go +++ b/pkg/registry/event/rest.go @@ -89,14 +89,14 @@ func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, e return nil, nil, fmt.Errorf("invalid object type") } return labels.Set{}, labels.Set{ - "InvolvedObject.Kind": event.InvolvedObject.Kind, - "InvolvedObject.Name": event.InvolvedObject.Name, - "InvolvedObject.UID": event.InvolvedObject.UID, - "InvolvedObject.APIVersion": event.InvolvedObject.APIVersion, - "InvolvedObject.ResourceVersion": fmt.Sprintf("%s", event.InvolvedObject.ResourceVersion), - "InvolvedObject.FieldPath": event.InvolvedObject.FieldPath, - "Status": event.Status, - "Reason": event.Reason, + "involvedObject.kind": event.InvolvedObject.Kind, + "involvedObject.name": event.InvolvedObject.Name, + "involvedObject.uid": event.InvolvedObject.UID, + "involvedObject.apiVersion": event.InvolvedObject.APIVersion, + "involvedObject.resourceVersion": fmt.Sprintf("%s", event.InvolvedObject.ResourceVersion), + "involvedObject.fieldPath": event.InvolvedObject.FieldPath, + "status": event.Status, + "reason": event.Reason, }, nil } diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 84c42727c9d..820562485ca 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -114,14 +114,14 @@ func TestRESTgetAttrs(t *testing.T) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) } expect := labels.Set{ - "InvolvedObject.Kind": "Pod", - "InvolvedObject.Name": "foo", - "InvolvedObject.UID": "long uid string", - "InvolvedObject.APIVersion": testapi.Version(), - "InvolvedObject.ResourceVersion": "0", - "InvolvedObject.FieldPath": "", - "Status": "tested", - "Reason": "forTesting", + "involvedObject.kind": "Pod", + "involvedObject.name": "foo", + "involvedObject.uid": "long uid string", + "involvedObject.apiVersion": testapi.Version(), + "involvedObject.resourceVersion": "0", + "involvedObject.fieldPath": "", + "status": "tested", + "reason": "forTesting", } if e, a := expect, field; !reflect.DeepEqual(e, a) { t.Errorf("diff: %s", util.ObjectDiff(e, a)) @@ -186,7 +186,7 @@ func TestRESTList(t *testing.T) { reg.ObjectList = &api.EventList{ Items: []api.Event{*eventA, *eventB, *eventC}, } - got, err := rest.List(api.NewContext(), labels.Everything(), labels.Set{"Status": "tested"}.AsSelector()) + got, err := rest.List(api.NewContext(), labels.Everything(), labels.Set{"status": "tested"}.AsSelector()) if err != nil { t.Fatalf("Unexpected error %v", err) } diff --git a/pkg/registry/generic/etcd/doc.go b/pkg/registry/generic/etcd/doc.go new file mode 100644 index 00000000000..c96fbd56f5c --- /dev/null +++ b/pkg/registry/generic/etcd/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package etcd has a generic implementation of a registry that +// stores things in etcd. +package etcd diff --git a/pkg/registry/generic/etcd.go b/pkg/registry/generic/etcd/etcd.go similarity index 76% rename from pkg/registry/generic/etcd.go rename to pkg/registry/generic/etcd/etcd.go index e43405635dd..90ca035b707 100644 --- a/pkg/registry/generic/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -14,11 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package generic +package etcd import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -50,35 +51,13 @@ type Etcd struct { } // List returns a list of all the items matching m. -func (e *Etcd) List(ctx api.Context, m Matcher) (runtime.Object, error) { +func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { list := e.NewListFunc() err := e.Helper.ExtractToList(e.KeyRoot, list) if err != nil { return nil, err } - return FilterList(list, m) -} - -// FilterList filters any list object that conforms to the api conventions, -// provided that 'm' works with the concrete type of list. -func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err error) { - // TODO: push a matcher down into tools.EtcdHelper to avoid all this - // nonsense. This is a lot of unnecessary copies. - items, err := runtime.ExtractList(list) - if err != nil { - return nil, err - } - var filteredItems []runtime.Object - for _, obj := range items { - if match, err := m.Matches(obj); err == nil && match { - filteredItems = append(filteredItems, obj) - } - } - err = runtime.SetList(list, filteredItems) - if err != nil { - return nil, err - } - return list, nil + return generic.FilterList(list, m) } // Create inserts a new item. @@ -89,6 +68,7 @@ func (e *Etcd) Create(ctx api.Context, id string, obj runtime.Object) error { // Update updates the item. func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error { + // TODO: verify that SetObj checks ResourceVersion before succeeding. err := e.Helper.SetObj(e.KeyFunc(id), obj) return etcderr.InterpretUpdateError(err, e.EndpointName, id) } @@ -111,7 +91,7 @@ func (e *Etcd) Delete(ctx api.Context, id string) error { // Watch starts a watch for the items that m matches. // TODO: Detect if m references a single object instead of a list. -func (e *Etcd) Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) { +func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { matches, err := m.Matches(obj) return err == nil && matches diff --git a/pkg/registry/generic/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go similarity index 99% rename from pkg/registry/generic/etcd_test.go rename to pkg/registry/generic/etcd/etcd_test.go index f34eeeaca05..0772bd96fc6 100644 --- a/pkg/registry/generic/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package generic +package etcd import ( "fmt" @@ -25,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -90,7 +91,7 @@ func TestEtcdList(t *testing.T) { table := map[string]struct { in tools.EtcdResponseWithError - m Matcher + m generic.Matcher out runtime.Object succeed bool }{ diff --git a/pkg/registry/generic/registry.go b/pkg/registry/generic/registry.go index e34373c0957..d368905cd8e 100644 --- a/pkg/registry/generic/registry.go +++ b/pkg/registry/generic/registry.go @@ -54,6 +54,19 @@ type Matcher interface { Matches(obj runtime.Object) (bool, error) } +// MatcherFunc makes a matcher from the provided function. For easy definition +// of matchers for testing. +func MatcherFunc(f func(obj runtime.Object) (bool, error)) Matcher { + return matcherFunc(f) +} + +type matcherFunc func(obj runtime.Object) (bool, error) + +// Matches calls the embedded function. +func (m matcherFunc) Matches(obj runtime.Object) (bool, error) { + return m(obj) +} + // Registry knows how to store & list any runtime.Object. Can be used for // any object types which don't require special features from the storage // layer. @@ -65,3 +78,29 @@ type Registry interface { Delete(ctx api.Context, id string) error Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) } + +// FilterList filters any list object that conforms to the api conventions, +// provided that 'm' works with the concrete type of list. +func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err error) { + // TODO: push a matcher down into tools.EtcdHelper to avoid all this + // nonsense. This is a lot of unnecessary copies. + items, err := runtime.ExtractList(list) + if err != nil { + return nil, err + } + var filteredItems []runtime.Object + for _, obj := range items { + match, err := m.Matches(obj) + if err != nil { + return nil, err + } + if match { + filteredItems = append(filteredItems, obj) + } + } + err = runtime.SetList(list, filteredItems) + if err != nil { + return nil, err + } + return list, nil +} diff --git a/pkg/registry/generic/registry_test.go b/pkg/registry/generic/registry_test.go index e0c5b843b9b..81c2ddc731a 100644 --- a/pkg/registry/generic/registry_test.go +++ b/pkg/registry/generic/registry_test.go @@ -18,15 +18,23 @@ package generic import ( "errors" + "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) -type Ignored struct{} +type Ignored struct { + ID string +} -func (*Ignored) IsAnAPIObject() {} +type IgnoredList struct { + Items []Ignored +} + +func (*Ignored) IsAnAPIObject() {} +func (*IgnoredList) IsAnAPIObject() {} func TestSelectionPredicate(t *testing.T) { table := map[string]struct { @@ -90,3 +98,38 @@ func TestSelectionPredicate(t *testing.T) { } } } + +func TestFilterList(t *testing.T) { + try := &IgnoredList{ + Items: []Ignored{ + {"foo"}, + {"bar"}, + {"baz"}, + {"qux"}, + {"zot"}, + }, + } + expect := &IgnoredList{ + Items: []Ignored{ + {"bar"}, + {"baz"}, + }, + } + + got, err := FilterList(try, + MatcherFunc(func(obj runtime.Object) (bool, error) { + i, ok := obj.(*Ignored) + if !ok { + return false, errors.New("wrong type") + } + return i.ID[0] == 'b', nil + }), + ) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +}