diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 46fa835812f..a96d39cb987 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -49,17 +49,18 @@ import ( var ( port = flag.Uint("port", 8080, "The port to listen on. Default 8080") address = util.IP(net.ParseIP("127.0.0.1")) - apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'") + apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.") storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs") + minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.") - healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true") - minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds") - tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication") + healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.") + minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.") + eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.") + tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.") etcdServerList util.StringList - etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers") + etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.") machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") @@ -178,6 +179,7 @@ func main() { HealthCheckMinions: *healthCheckMinions, Minions: machineList, MinionCacheTTL: *minionCacheTTL, + EventTTL: *eventTTL, MinionRegexp: *minionRegexp, PodInfoGetter: podInfoGetter, NodeResources: api.NodeResources{ diff --git a/pkg/master/master.go b/pkg/master/master.go index 0d53423ff89..56370c759e5 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -31,6 +31,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" @@ -49,6 +51,7 @@ type Config struct { HealthCheckMinions bool Minions []string MinionCacheTTL time.Duration + EventTTL time.Duration MinionRegexp string PodInfoGetter client.PodInfoGetter NodeResources api.NodeResources @@ -62,6 +65,7 @@ type Master struct { endpointRegistry endpoint.Registry minionRegistry minion.Registry bindingRegistry binding.Registry + eventRegistry generic.Registry storage map[string]apiserver.RESTStorage client *client.Client } @@ -92,6 +96,7 @@ func New(c *Config) *Master { serviceRegistry: serviceRegistry, endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, } @@ -147,6 +152,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf "services": service.NewREST(m.serviceRegistry, cloud, m.minionRegistry), "endpoints": endpoint.NewREST(m.endpointRegistry), "minions": minion.NewREST(m.minionRegistry), + "events": event.NewREST(m.eventRegistry), // TODO: should appear only in scheduler API group. "bindings": binding.NewREST(m.bindingRegistry), diff --git a/pkg/registry/event/doc.go b/pkg/registry/event/doc.go new file mode 100644 index 00000000000..18aed52243e --- /dev/null +++ b/pkg/registry/event/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package event provides Registry interface and it's REST +// implementation for storing Event api objects. +package event diff --git a/pkg/registry/event/registry.go b/pkg/registry/event/registry.go new file mode 100644 index 00000000000..f36d0163e5b --- /dev/null +++ b/pkg/registry/event/registry.go @@ -0,0 +1,58 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "path" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +// registry implements custom changes to generic.Etcd. +type registry struct { + *etcdgeneric.Etcd + ttl uint64 +} + +// Create stores the object with a ttl, so that events don't stay in the system forever. +func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error { + err := r.Etcd.Helper.CreateObj(r.Etcd.KeyFunc(id), obj, r.ttl) + return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id) +} + +// NewEtcdRegistry returns a registry which will store Events in the given +// EtcdHelper. ttl is the time that Events will be retained by the system. +func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry { + return registry{ + Etcd: &etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &api.Event{} }, + NewListFunc: func() runtime.Object { return &api.EventList{} }, + EndpointName: "events", + KeyRoot: "/registry/events", + KeyFunc: func(id string) string { + return path.Join("/registry/events", id) + }, + Helper: h, + }, + ttl: ttl, + } +} diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go new file mode 100644 index 00000000000..db08b7e0bcc --- /dev/null +++ b/pkg/registry/event/registry_test.go @@ -0,0 +1,105 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" +) + +var testTTL uint64 = 60 + +func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { + f := tools.NewFakeEtcdClient(t) + f.TestIndex = true + h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.ResourceVersioner()}} + return f, NewEtcdRegistry(h, testTTL) +} + +func TestEventCreate(t *testing.T) { + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + eventB := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + + nodeWithEventA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), eventA), + ModifiedIndex: 1, + CreatedIndex: 1, + TTL: int64(testTTL), + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/events/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, + expect: nodeWithEventA, + toCreate: eventA, + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithEventA, + expect: nodeWithEventA, + toCreate: eventB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestEventEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Create(api.NewContext(), key, item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } +} diff --git a/pkg/registry/event/rest.go b/pkg/registry/event/rest.go new file mode 100644 index 00000000000..83031f6cdb1 --- /dev/null +++ b/pkg/registry/event/rest.go @@ -0,0 +1,121 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// REST adapts an event registry into apiserver's RESTStorage model. +type REST struct { + registry generic.Registry +} + +// NewREST returns a new REST. You must use a registry created by +// NewEtcdRegistry unless you're testing. +func NewREST(registry generic.Registry) *REST { + return &REST{ + registry: registry, + } +} + +func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { + event, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + event.CreationTimestamp = util.Now() + + return apiserver.MakeAsync(func() (runtime.Object, error) { + err := rs.registry.Create(ctx, event.ID, event) + if err != nil { + return nil, err + } + return rs.registry.Get(ctx, event.ID) + }), nil +} + +func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { + obj, err := rs.registry.Get(ctx, id) + if err != nil { + return nil, err + } + _, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + return apiserver.MakeAsync(func() (runtime.Object, error) { + return &api.Status{Status: api.StatusSuccess}, rs.registry.Delete(ctx, id) + }), nil +} + +func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { + obj, err := rs.registry.Get(ctx, id) + if err != nil { + return nil, err + } + event, ok := obj.(*api.Event) + if !ok { + return nil, fmt.Errorf("invalid object type") + } + return event, err +} + +func (rs *REST) getAttrs(obj runtime.Object) (objLabels, objFields labels.Set, err error) { + event, ok := obj.(*api.Event) + if !ok { + return nil, nil, fmt.Errorf("invalid object type") + } + return labels.Set{}, labels.Set{ + "involvedObject.kind": event.InvolvedObject.Kind, + "involvedObject.name": event.InvolvedObject.Name, + "involvedObject.uid": event.InvolvedObject.UID, + "involvedObject.apiVersion": event.InvolvedObject.APIVersion, + "involvedObject.resourceVersion": fmt.Sprintf("%s", event.InvolvedObject.ResourceVersion), + "involvedObject.fieldPath": event.InvolvedObject.FieldPath, + "status": event.Status, + "reason": event.Reason, + }, nil +} + +func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { + return rs.registry.List(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}) +} + +// Watch returns Events events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion) +} + +// New returns a new api.Event +func (*REST) New() runtime.Object { + return &api.Event{} +} + +// Update returns an error: Events are not mutable. +func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) { + return nil, fmt.Errorf("not allowed: 'Event' objects are not mutable") +} diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go new file mode 100644 index 00000000000..820562485ca --- /dev/null +++ b/pkg/registry/event/rest_test.go @@ -0,0 +1,226 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +type testRegistry struct { + *registrytest.GenericRegistry +} + +func NewTestREST() (testRegistry, *REST) { + reg := testRegistry{registrytest.NewGeneric(nil)} + return reg, NewREST(reg) +} + +func TestRESTCreate(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := eventA, <-c; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } +} + +func TestRESTDelete(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + c, err = rest.Delete(api.NewContext(), eventA.ID) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if stat := (<-c).(*api.Status); stat.Status != api.StatusSuccess { + t.Errorf("unexpected status: %v", stat) + } +} + +func TestRESTGet(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + got, err := rest.Get(api.NewContext(), eventA.ID) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := eventA, got; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } +} + +func TestRESTgetAttrs(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + label, field, err := rest.getAttrs(eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + if e, a := label, (labels.Set{}); !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } + expect := labels.Set{ + "involvedObject.kind": "Pod", + "involvedObject.name": "foo", + "involvedObject.uid": "long uid string", + "involvedObject.apiVersion": testapi.Version(), + "involvedObject.resourceVersion": "0", + "involvedObject.fieldPath": "", + "status": "tested", + "reason": "forTesting", + } + if e, a := expect, field; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } +} + +func TestRESTUpdate(t *testing.T) { + _, rest := NewTestREST() + eventA := &api.Event{ + TypeMeta: api.TypeMeta{ID: "foo"}, + Reason: "forTesting", + } + c, err := rest.Create(api.NewContext(), eventA) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + <-c + _, err = rest.Update(api.NewContext(), eventA) + if err == nil { + t.Errorf("unexpected non-error") + } +} + +func TestRESTList(t *testing.T) { + reg, rest := NewTestREST() + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + eventB := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "bar", + UID: "other long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + eventC := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "baz", + UID: "yet another long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "untested", + Reason: "forTesting", + } + reg.ObjectList = &api.EventList{ + Items: []api.Event{*eventA, *eventB, *eventC}, + } + got, err := rest.List(api.NewContext(), labels.Everything(), labels.Set{"status": "tested"}.AsSelector()) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + expect := &api.EventList{ + Items: []api.Event{*eventA, *eventB}, + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } +} + +func TestRESTWatch(t *testing.T) { + eventA := &api.Event{ + InvolvedObject: api.ObjectReference{ + Kind: "Pod", + Name: "foo", + UID: "long uid string", + APIVersion: testapi.Version(), + ResourceVersion: "0", + FieldPath: "", + }, + Status: "tested", + Reason: "forTesting", + } + reg, rest := NewTestREST() + wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + go func() { + reg.Mux.Action(watch.Added, eventA) + }() + got := <-wi.ResultChan() + if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("diff: %s", util.ObjectDiff(e, a)) + } +} diff --git a/pkg/registry/generic/doc.go b/pkg/registry/generic/doc.go new file mode 100644 index 00000000000..67a48db6aa5 --- /dev/null +++ b/pkg/registry/generic/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package generic provides a generic object store interface and a +// generic label/field matching type. +package generic diff --git a/pkg/registry/generic/etcd/doc.go b/pkg/registry/generic/etcd/doc.go new file mode 100644 index 00000000000..c96fbd56f5c --- /dev/null +++ b/pkg/registry/generic/etcd/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package etcd has a generic implementation of a registry that +// stores things in etcd. +package etcd diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go new file mode 100644 index 00000000000..90ca035b707 --- /dev/null +++ b/pkg/registry/generic/etcd/etcd.go @@ -0,0 +1,99 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Etcd implements generic.Registry, backing it with etcd storage. +// It's intended to be embeddable, so that you can implement any +// non-generic functions if needed. +// You must supply a value for every field below before use; these are +// left public as it's meant to be overridable if need be. +type Etcd struct { + // Called to make a new object, should return e.g., &api.Pod{} + NewFunc func() runtime.Object + + // Called to make a new listing object, should return e.g., &api.PodList{} + NewListFunc func() runtime.Object + + // Used for error reporting + EndpointName string + + // Used for listing/watching; should not include trailing "/" + KeyRoot string + + // Called for Create/Update/Get/Delete + KeyFunc func(id string) string + + // Used for all etcd access functions + Helper tools.EtcdHelper +} + +// List returns a list of all the items matching m. +func (e *Etcd) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { + list := e.NewListFunc() + err := e.Helper.ExtractToList(e.KeyRoot, list) + if err != nil { + return nil, err + } + return generic.FilterList(list, m) +} + +// Create inserts a new item. +func (e *Etcd) Create(ctx api.Context, id string, obj runtime.Object) error { + err := e.Helper.CreateObj(e.KeyFunc(id), obj, 0) + return etcderr.InterpretCreateError(err, e.EndpointName, id) +} + +// Update updates the item. +func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error { + // TODO: verify that SetObj checks ResourceVersion before succeeding. + err := e.Helper.SetObj(e.KeyFunc(id), obj) + return etcderr.InterpretUpdateError(err, e.EndpointName, id) +} + +// Get retrieves the item from etcd. +func (e *Etcd) Get(ctx api.Context, id string) (runtime.Object, error) { + obj := e.NewFunc() + err := e.Helper.ExtractObj(e.KeyFunc(id), obj, false) + if err != nil { + return nil, etcderr.InterpretGetError(err, e.EndpointName, id) + } + return obj, nil +} + +// Delete removes the item from etcd. +func (e *Etcd) Delete(ctx api.Context, id string) error { + err := e.Helper.Delete(e.KeyFunc(id), false) + return etcderr.InterpretDeleteError(err, e.EndpointName, id) +} + +// Watch starts a watch for the items that m matches. +// TODO: Detect if m references a single object instead of a list. +func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { + return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool { + matches, err := m.Matches(obj) + return err == nil && matches + }) +} diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go new file mode 100644 index 00000000000..0772bd96fc6 --- /dev/null +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -0,0 +1,438 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + "path" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/coreos/go-etcd/etcd" +) + +func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { + f := tools.NewFakeEtcdClient(t) + f.TestIndex = true + h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.ResourceVersioner()}} + return f, &Etcd{ + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + EndpointName: "pods", + KeyRoot: "/registry/pods", + KeyFunc: func(id string) string { + return path.Join("/registry/pods", id) + }, + Helper: h, + } +} + +// SetMatcher is a matcher that matches any pod with id in the set. +// Makes testing simpler. +type SetMatcher struct { + util.StringSet +} + +func (sm SetMatcher) Matches(obj runtime.Object) (bool, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return false, fmt.Errorf("wrong object") + } + return sm.Has(pod.ID), nil +} + +// EverythingMatcher matches everything +type EverythingMatcher struct{} + +func (EverythingMatcher) Matches(obj runtime.Object) (bool, error) { + return true, nil +} + +func TestEtcdList(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "bar"}, + DesiredState: api.PodState{Host: "machine"}, + } + + normalListResp := &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{ + {Value: runtime.EncodeOrDie(testapi.Codec(), podA)}, + {Value: runtime.EncodeOrDie(testapi.Codec(), podB)}, + }, + }, + } + + table := map[string]struct { + in tools.EtcdResponseWithError + m generic.Matcher + out runtime.Object + succeed bool + }{ + "empty": { + in: tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Nodes: []*etcd.Node{}, + }, + }, + E: nil, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{}}, + succeed: true, + }, + "notFound": { + in: tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{}}, + succeed: true, + }, + "normal": { + in: tools.EtcdResponseWithError{ + R: normalListResp, + E: nil, + }, + m: EverythingMatcher{}, + out: &api.PodList{Items: []api.Pod{*podA, *podB}}, + succeed: true, + }, + "normalFiltered": { + in: tools.EtcdResponseWithError{ + R: normalListResp, + E: nil, + }, + m: SetMatcher{util.NewStringSet("foo")}, + out: &api.PodList{Items: []api.Pod{*podA}}, + succeed: true, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[registry.KeyRoot] = item.in + list, err := registry.List(api.NewContext(), item.m) + if e, a := item.succeed, err == nil; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + continue + } + + if e, a := item.out, list; !reflect.DeepEqual(e, a) { + t.Errorf("%v: Expected %#v, got %#v", name, e, a) + } + } +} + +func TestEtcdCreate(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, + expect: nodeWithPodA, + toCreate: podA, + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithPodA, + expect: nodeWithPodA, + toCreate: podB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Create(api.NewContext(), key, item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } +} + +func TestEtcdUpdate(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo"}, + DesiredState: api.PodState{Host: "machine"}, + } + podB := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + nodeWithPodB := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podB), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toUpdate runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: nodeWithPodB, + toUpdate: podB, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: nodeWithPodA, + toUpdate: podA, + // TODO: Should updating a non-existing thing fail? + errOK: func(err error) bool { return err == nil }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Update(api.NewContext(), key, item.toUpdate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } +} + +func TestEtcdGet(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect runtime.Object + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: podA, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: nil, + errOK: errors.IsNotFound, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + got, err := registry.Get(api.NewContext(), key) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } +} + +func TestEtcdDelete(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: emptyNode, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: emptyNode, + errOK: func(err error) bool { return err == nil }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + err := registry.Delete(api.NewContext(), key) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } +} + +func TestEtcdWatch(t *testing.T) { + podA := &api.Pod{ + TypeMeta: api.TypeMeta{ID: "foo", ResourceVersion: "1"}, + DesiredState: api.PodState{Host: "machine"}, + } + respWithPodA := &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + Action: "create", + } + + fakeClient, registry := NewTestGenericEtcdRegistry(t) + wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + go func() { + fakeClient.WatchResponse <- respWithPodA + }() + + got, open := <-wi.ResultChan() + if !open { + t.Fatalf("unexpected channel close") + } + + if e, a := podA, got.Object; !reflect.DeepEqual(e, a) { + t.Errorf("difference: %s", util.ObjectDiff(e, a)) + } +} diff --git a/pkg/registry/generic/registry.go b/pkg/registry/generic/registry.go new file mode 100644 index 00000000000..d368905cd8e --- /dev/null +++ b/pkg/registry/generic/registry.go @@ -0,0 +1,106 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// AttrFunc returns label and field sets for List or Watch to compare against, or an error. +type AttrFunc func(obj runtime.Object) (label, field labels.Set, err error) + +// SelectionPredicate implements a generic predicate that can be passed to +// GenericRegistry's List or Watch methods. Implements the Matcher interface. +type SelectionPredicate struct { + Label labels.Selector + Field labels.Selector + GetAttrs AttrFunc +} + +// Matches returns true if the given object's labels and fields (as +// returned by s.GetAttrs) match s.Label and s.Field. An error is +// returned if s.GetAttrs fails. +func (s *SelectionPredicate) Matches(obj runtime.Object) (bool, error) { + if s.Label.Empty() && s.Field.Empty() { + return true, nil + } + labels, fields, err := s.GetAttrs(obj) + if err != nil { + return false, err + } + return s.Label.Matches(labels) && s.Field.Matches(fields), nil +} + +// Matcher can return true if an object matches the Matcher's selection +// criteria. +type Matcher interface { + Matches(obj runtime.Object) (bool, error) +} + +// MatcherFunc makes a matcher from the provided function. For easy definition +// of matchers for testing. +func MatcherFunc(f func(obj runtime.Object) (bool, error)) Matcher { + return matcherFunc(f) +} + +type matcherFunc func(obj runtime.Object) (bool, error) + +// Matches calls the embedded function. +func (m matcherFunc) Matches(obj runtime.Object) (bool, error) { + return m(obj) +} + +// Registry knows how to store & list any runtime.Object. Can be used for +// any object types which don't require special features from the storage +// layer. +type Registry interface { + List(api.Context, Matcher) (runtime.Object, error) + Create(ctx api.Context, id string, obj runtime.Object) error + Update(ctx api.Context, id string, obj runtime.Object) error + Get(ctx api.Context, id string) (runtime.Object, error) + Delete(ctx api.Context, id string) error + Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error) +} + +// FilterList filters any list object that conforms to the api conventions, +// provided that 'm' works with the concrete type of list. +func FilterList(list runtime.Object, m Matcher) (filtered runtime.Object, err error) { + // TODO: push a matcher down into tools.EtcdHelper to avoid all this + // nonsense. This is a lot of unnecessary copies. + items, err := runtime.ExtractList(list) + if err != nil { + return nil, err + } + var filteredItems []runtime.Object + for _, obj := range items { + match, err := m.Matches(obj) + if err != nil { + return nil, err + } + if match { + filteredItems = append(filteredItems, obj) + } + } + err = runtime.SetList(list, filteredItems) + if err != nil { + return nil, err + } + return list, nil +} diff --git a/pkg/registry/generic/registry_test.go b/pkg/registry/generic/registry_test.go new file mode 100644 index 00000000000..81c2ddc731a --- /dev/null +++ b/pkg/registry/generic/registry_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package generic + +import ( + "errors" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" +) + +type Ignored struct { + ID string +} + +type IgnoredList struct { + Items []Ignored +} + +func (*Ignored) IsAnAPIObject() {} +func (*IgnoredList) IsAnAPIObject() {} + +func TestSelectionPredicate(t *testing.T) { + table := map[string]struct { + labelSelector, fieldSelector string + labels, fields labels.Set + err error + shouldMatch bool + }{ + "A": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: labels.Set{"uid": "12345"}, + shouldMatch: true, + }, + "B": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{"name": "foo"}, + fields: labels.Set{}, + shouldMatch: false, + }, + "C": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + labels: labels.Set{}, + fields: labels.Set{"uid": "12345"}, + shouldMatch: false, + }, + "error": { + labelSelector: "name=foo", + fieldSelector: "uid=12345", + err: errors.New("maybe this is a 'wrong object type' error"), + shouldMatch: false, + }, + } + + for name, item := range table { + parsedLabel, err := labels.ParseSelector(item.labelSelector) + if err != nil { + panic(err) + } + parsedField, err := labels.ParseSelector(item.fieldSelector) + if err != nil { + panic(err) + } + sp := &SelectionPredicate{ + Label: parsedLabel, + Field: parsedField, + GetAttrs: func(runtime.Object) (label, field labels.Set, err error) { + return item.labels, item.fields, item.err + }, + } + got, err := sp.Matches(&Ignored{}) + if e, a := item.err, err; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + continue + } + if e, a := item.shouldMatch, got; e != a { + t.Errorf("%v: expected %v, got %v", name, e, a) + } + } +} + +func TestFilterList(t *testing.T) { + try := &IgnoredList{ + Items: []Ignored{ + {"foo"}, + {"bar"}, + {"baz"}, + {"qux"}, + {"zot"}, + }, + } + expect := &IgnoredList{ + Items: []Ignored{ + {"bar"}, + {"baz"}, + }, + } + + got, err := FilterList(try, + MatcherFunc(func(obj runtime.Object) (bool, error) { + i, ok := obj.(*Ignored) + if !ok { + return false, errors.New("wrong type") + } + return i.ID[0] == 'b', nil + }), + ) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go new file mode 100644 index 00000000000..01b5d0293b3 --- /dev/null +++ b/pkg/registry/registrytest/generic.go @@ -0,0 +1,87 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrytest + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// GenericRegistry knows how to store & list any runtime.Object. Events don't require +// any non-generic features from the storage layer. +type GenericRegistry struct { + Err error + Object runtime.Object + ObjectList runtime.Object + sync.Mutex + + Mux *watch.Mux +} + +func NewGeneric(list runtime.Object) *GenericRegistry { + return &GenericRegistry{ + ObjectList: list, + Mux: watch.NewMux(0), + } +} + +func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Object, error) { + r.Lock() + defer r.Unlock() + if r.Err != nil { + return nil, r.Err + } + return generic.FilterList(r.ObjectList, m) +} + +func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) { + // TODO: wire filter down into the mux; it needs access to current and previous state :( + return r.Mux.Watch(), nil +} + +func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) { + r.Lock() + defer r.Unlock() + return r.Object, r.Err +} + +func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object) error { + r.Lock() + defer r.Unlock() + r.Object = obj + r.Mux.Action(watch.Added, obj) + return r.Err +} + +func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object) error { + r.Lock() + defer r.Unlock() + r.Object = obj + r.Mux.Action(watch.Modified, obj) + return r.Err +} + +func (r *GenericRegistry) Delete(ctx api.Context, id string) error { + r.Lock() + defer r.Unlock() + r.Mux.Action(watch.Deleted, r.Object) + return r.Err +} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index bc8dec9330f..ff9e7311963 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -167,6 +167,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons Value: value, CreatedIndex: i, ModifiedIndex: i, + TTL: int64(ttl), }, }, }