Merge pull request #12340 from wojtek-t/rewrite_service_etcd
Refactor "service" registry to use standard REST storage (and generic etcd)
This commit is contained in:
75
pkg/registry/service/etcd/etcd.go
Normal file
75
pkg/registry/service/etcd/etcd.go
Normal file
@@ -0,0 +1,75 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
)
|
||||
|
||||
type REST struct {
|
||||
etcdgeneric.Etcd
|
||||
}
|
||||
|
||||
func NewStorage(s storage.Interface) *REST {
|
||||
prefix := "/services/specs"
|
||||
store := etcdgeneric.Etcd{
|
||||
NewFunc: func() runtime.Object { return &api.Service{} },
|
||||
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
|
||||
KeyRootFunc: func(ctx api.Context) string {
|
||||
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
|
||||
},
|
||||
KeyFunc: func(ctx api.Context, name string) (string, error) {
|
||||
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
|
||||
},
|
||||
ObjectNameFunc: func(obj runtime.Object) (string, error) {
|
||||
return obj.(*api.Service).Name, nil
|
||||
},
|
||||
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return MatchServices(label, field)
|
||||
},
|
||||
EndpointName: "services",
|
||||
|
||||
CreateStrategy: rest.Services,
|
||||
UpdateStrategy: rest.Services,
|
||||
|
||||
Storage: s,
|
||||
}
|
||||
return &REST{store}
|
||||
}
|
||||
|
||||
func MatchServices(label labels.Selector, field fields.Selector) generic.Matcher {
|
||||
return &generic.SelectionPredicate{label, field, ServiceAttributes}
|
||||
}
|
||||
|
||||
func ServiceAttributes(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) {
|
||||
service, ok := obj.(*api.Service)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("invalid object type %#v", obj)
|
||||
}
|
||||
return service.Labels, fields.Set{
|
||||
"metadata.name": service.Name,
|
||||
}, nil
|
||||
}
|
@@ -22,6 +22,8 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/service"
|
||||
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@@ -94,7 +96,7 @@ func (c *Repair) RunOnce() error {
|
||||
}
|
||||
|
||||
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
|
||||
list, err := c.registry.ListServices(ctx)
|
||||
list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to refresh the service IP block: %v", err)
|
||||
}
|
||||
|
@@ -21,6 +21,8 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/service"
|
||||
"k8s.io/kubernetes/pkg/registry/service/portallocator"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
@@ -79,7 +81,7 @@ func (c *Repair) RunOnce() error {
|
||||
}
|
||||
|
||||
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
|
||||
list, err := c.registry.ListServices(ctx)
|
||||
list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to refresh the port block: %v", err)
|
||||
}
|
||||
|
@@ -18,6 +18,7 @@ package service
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
@@ -25,7 +26,7 @@ import (
|
||||
|
||||
// Registry is an interface for things that know how to store services.
|
||||
type Registry interface {
|
||||
ListServices(ctx api.Context) (*api.ServiceList, error)
|
||||
ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error)
|
||||
CreateService(ctx api.Context, svc *api.Service) (*api.Service, error)
|
||||
GetService(ctx api.Context, name string) (*api.Service, error)
|
||||
DeleteService(ctx api.Context, name string) error
|
||||
@@ -33,6 +34,58 @@ type Registry interface {
|
||||
WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error)
|
||||
}
|
||||
|
||||
// storage puts strong typing around storage calls
|
||||
type storage struct {
|
||||
rest.StandardStorage
|
||||
}
|
||||
|
||||
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
|
||||
// types will panic.
|
||||
func NewRegistry(s rest.StandardStorage) Registry {
|
||||
return &storage{s}
|
||||
}
|
||||
|
||||
func (s *storage) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) {
|
||||
obj, err := s.List(ctx, label, field)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.(*api.ServiceList), nil
|
||||
}
|
||||
|
||||
func (s *storage) CreateService(ctx api.Context, svc *api.Service) (*api.Service, error) {
|
||||
obj, err := s.Create(ctx, svc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.(*api.Service), nil
|
||||
}
|
||||
|
||||
func (s *storage) GetService(ctx api.Context, name string) (*api.Service, error) {
|
||||
obj, err := s.Get(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.(*api.Service), nil
|
||||
}
|
||||
|
||||
func (s *storage) DeleteService(ctx api.Context, name string) error {
|
||||
_, err := s.Delete(ctx, name, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *storage) UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error) {
|
||||
obj, _, err := s.Update(ctx, svc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.(*api.Service), nil
|
||||
}
|
||||
|
||||
func (s *storage) WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return s.Watch(ctx, labels, fields, resourceVersion)
|
||||
}
|
||||
|
||||
// TODO: Move to a general location (as other components may need allocation in future; it's not service specific)
|
||||
// RangeRegistry is a registry that can retrieve or persist a RangeAllocation object.
|
||||
type RangeRegistry interface {
|
||||
|
335
pkg/registry/service/registry_test.go
Normal file
335
pkg/registry/service/registry_test.go
Normal file
@@ -0,0 +1,335 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/latest"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
etcdservice "k8s.io/kubernetes/pkg/registry/service/etcd"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/tools/etcdtest"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func NewTestEtcdRegistry(client tools.EtcdClient) (Registry, *etcdservice.REST) {
|
||||
storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix())
|
||||
rest := etcdservice.NewStorage(storage)
|
||||
registry := NewRegistry(rest)
|
||||
return registry, rest
|
||||
}
|
||||
|
||||
func makeTestService(name string) *api.Service {
|
||||
return &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: name, Namespace: "default"},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port", Protocol: api.ProtocolTCP, Port: 12345, TargetPort: util.NewIntOrStringFromInt(12345)},
|
||||
},
|
||||
Type: api.ServiceTypeClusterIP,
|
||||
SessionAffinity: api.ServiceAffinityNone,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdListServicesNotFound(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
ctx := api.NewDefaultContext()
|
||||
key := rest.KeyRootFunc(ctx)
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
}
|
||||
services, err := registry.ListServices(ctx, labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(services.Items) != 0 {
|
||||
t.Errorf("Unexpected services list: %#v", services)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdListServices(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
key := rest.KeyRootFunc(ctx)
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Nodes: []*etcd.Node{
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, makeTestService("foo")),
|
||||
},
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, makeTestService("bar")),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
E: nil,
|
||||
}
|
||||
services, err := registry.ListServices(ctx, labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(services.Items) != 2 || services.Items[0].Name != "foo" || services.Items[1].Name != "bar" {
|
||||
t.Errorf("Unexpected service list: %#v", services)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreateService(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
_, err := registry.CreateService(ctx, makeTestService("foo"))
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
key, _ := rest.KeyFunc(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
var service api.Service
|
||||
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &service)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if service.Name != "foo" {
|
||||
t.Errorf("Unexpected service: %#v %s", service, resp.Node.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreateServiceAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := rest.KeyFunc(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0)
|
||||
_, err := registry.CreateService(ctx, makeTestService("foo"))
|
||||
if !errors.IsAlreadyExists(err) {
|
||||
t.Errorf("expected already exists err, got %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEtcdGetServiceDifferentNamespace ensures same-name services in different namespaces do not clash
|
||||
func TestEtcdGetServiceDifferentNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
|
||||
ctx1 := api.NewDefaultContext()
|
||||
ctx2 := api.WithNamespace(api.NewContext(), "other")
|
||||
|
||||
key1, _ := rest.KeyFunc(ctx1, "foo")
|
||||
key2, _ := rest.KeyFunc(ctx2, "foo")
|
||||
|
||||
key1 = etcdtest.AddPrefix(key1)
|
||||
key2 = etcdtest.AddPrefix(key2)
|
||||
|
||||
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0)
|
||||
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0)
|
||||
|
||||
service1, err := registry.GetService(ctx1, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if service1.Name != "foo" {
|
||||
t.Errorf("Unexpected service: %#v", service1)
|
||||
}
|
||||
if service1.Namespace != "default" {
|
||||
t.Errorf("Unexpected service: %#v", service1)
|
||||
}
|
||||
|
||||
service2, err := registry.GetService(ctx2, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if service2.Name != "foo" {
|
||||
t.Errorf("Unexpected service: %#v", service2)
|
||||
}
|
||||
if service2.Namespace != "other" {
|
||||
t.Errorf("Unexpected service: %#v", service2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEtcdGetService(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := rest.KeyFunc(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0)
|
||||
service, err := registry.GetService(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if service.Name != "foo" {
|
||||
t.Errorf("Unexpected service: %#v", service)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetServiceNotFound(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := rest.KeyFunc(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
}
|
||||
_, err := registry.GetService(ctx, "foo")
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error returned: %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdDeleteService(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, _ := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/services/specs", "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0)
|
||||
path, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/services/endpoints", "foo")
|
||||
endpointsKey := etcdtest.AddPrefix(path)
|
||||
fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
|
||||
err := registry.DeleteService(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(fakeClient.DeletedKeys) != 1 {
|
||||
t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdUpdateService(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.TestIndex = true
|
||||
registry, rest := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := rest.KeyFunc(ctx, "uniquefoo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("uniquefoo")), 0)
|
||||
testService := api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "uniquefoo",
|
||||
ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10),
|
||||
Labels: map[string]string{
|
||||
"baz": "bar",
|
||||
},
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "port", Protocol: api.ProtocolTCP, Port: 12345, TargetPort: util.NewIntOrStringFromInt(12345)},
|
||||
},
|
||||
Selector: map[string]string{
|
||||
"baz": "bar",
|
||||
},
|
||||
SessionAffinity: "None",
|
||||
Type: api.ServiceTypeClusterIP,
|
||||
},
|
||||
}
|
||||
_, err := registry.UpdateService(ctx, &testService)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
svc, err := registry.GetService(ctx, "uniquefoo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Clear modified indices before the equality test.
|
||||
svc.ResourceVersion = ""
|
||||
testService.ResourceVersion = ""
|
||||
if !api.Semantic.DeepEqual(*svc, testService) {
|
||||
t.Errorf("Unexpected service: got\n %#v\n, wanted\n %#v", svc, testService)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdWatchServices(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry, _ := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchServices(ctx,
|
||||
labels.Everything(),
|
||||
fields.SelectorFromSet(fields.Set{"name": "foo"}),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
select {
|
||||
case _, ok := <-watching.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("watching channel should be open")
|
||||
}
|
||||
default:
|
||||
}
|
||||
fakeClient.WatchInjectError <- nil
|
||||
if _, ok := <-watching.ResultChan(); ok {
|
||||
t.Errorf("watching channel should be closed")
|
||||
}
|
||||
watching.Stop()
|
||||
}
|
||||
|
||||
// TODO We need a test for the compare and swap behavior. This basically requires two things:
|
||||
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
|
||||
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
|
||||
// 2) We need to make the map from key to (response, error) actually be a [](response, error) and pop
|
||||
// our way through the responses. That will enable us to hand back multiple different responses for
|
||||
// the same key.
|
||||
// Once that infrastructure is in place, the test looks something like:
|
||||
// Routine #1 Routine #2
|
||||
// Read
|
||||
// Wait for sync on update Read
|
||||
// Update
|
||||
// Update
|
||||
// In the buggy case, this will result in lost data. In the correct case, the second update should fail
|
||||
// and be retried.
|
@@ -144,6 +144,13 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: can leave dangling endpoints, and potentially return incorrect
|
||||
// endpoints if a new service is created with the same name
|
||||
err = rs.endpoints.DeleteEndpoints(ctx, id)
|
||||
if err != nil && !errors.IsNotFound(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if api.IsServiceIPSet(service) {
|
||||
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
|
||||
}
|
||||
@@ -160,26 +167,11 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
|
||||
}
|
||||
|
||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||
service, err := rs.registry.GetService(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return service, err
|
||||
return rs.registry.GetService(ctx, id)
|
||||
}
|
||||
|
||||
func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) {
|
||||
list, err := rs.registry.ListServices(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var filtered []api.Service
|
||||
for _, service := range list.Items {
|
||||
if label.Matches(labels.Set(service.Labels)) {
|
||||
filtered = append(filtered, service)
|
||||
}
|
||||
}
|
||||
list.Items = filtered
|
||||
return list, err
|
||||
return rs.registry.ListServices(ctx, label, field)
|
||||
}
|
||||
|
||||
// Watch returns Services events via a watch.Interface.
|
||||
|
Reference in New Issue
Block a user