diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index ed9cbfdb8c0..19c96164f51 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -80,17 +80,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou }, nil } -// Allocate attempts to allocate the item locally and then in etcd. +// Allocate attempts to allocate the item. func (e *Etcd) Allocate(offset int) (bool, error) { e.lock.Lock() defer e.lock.Unlock() - ok, err := e.alloc.Allocate(offset) - if !ok || err != nil { - return ok, err - } - - err = e.tryUpdate(func() error { + err := e.tryUpdate(func() error { ok, err := e.alloc.Allocate(offset) if err != nil { return err @@ -109,49 +104,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) { return true, nil } -// AllocateNext attempts to allocate the next item locally and then in etcd. +// AllocateNext attempts to allocate the next item. func (e *Etcd) AllocateNext() (int, bool, error) { e.lock.Lock() defer e.lock.Unlock() - - offset, ok, err := e.alloc.AllocateNext() - if !ok || err != nil { - return offset, ok, err - } + var offset int + var ok bool + var err error err = e.tryUpdate(func() error { - ok, err := e.alloc.Allocate(offset) + // update the offset here + offset, ok, err = e.alloc.AllocateNext() if err != nil { return err } if !ok { - // update the offset here - offset, ok, err = e.alloc.AllocateNext() - if err != nil { - return err - } - if !ok { - return errorUnableToAllocate - } - return nil + return errorUnableToAllocate } return nil }) - return offset, ok, err + + if err != nil { + if err == errorUnableToAllocate { + return offset, false, nil + } + return offset, false, err + } + return offset, true, nil } -// Release attempts to release the provided item locally and then in etcd. +// Release attempts to release the provided item. func (e *Etcd) Release(item int) error { e.lock.Lock() defer e.lock.Unlock() - if err := e.alloc.Release(item); err != nil { - return err - } - return e.tryUpdate(func() error { return e.alloc.Release(item) }) + } func (e *Etcd) ForEach(fn func(int)) { @@ -172,9 +162,9 @@ func (e *Etcd) tryUpdate(fn func() error) error { if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { return nil, err } - if err := fn(); err != nil { - return nil, err - } + } + if err := fn(); err != nil { + return nil, err } e.last = existing.ResourceVersion rangeSpec, data := e.alloc.Snapshot() diff --git a/pkg/registry/core/service/allocator/storage/storage_test.go b/pkg/registry/core/service/allocator/storage/storage_test.go index bfb39cef84f..34d5040bc4d 100644 --- a/pkg/registry/core/service/allocator/storage/storage_test.go +++ b/pkg/registry/core/service/allocator/storage/storage_test.go @@ -103,3 +103,59 @@ func TestStore(t *testing.T) { t.Fatal(err) } } + +// Test that one item is allocated in storage but is not allocated locally +// When try to allocate it, it should fail despite it's free in the local bitmap +// bot not in the storage +func TestAllocatedStorageButReleasedLocally(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the storage + if _, err := storage.Allocate(2); err != nil { + t.Fatal(err) + } + + // Release the item in the local bitmap + // emulating it's out of sync with the storage + err := backing.Release(2) + if err != nil { + t.Fatal(err) + } + + // It should fail trying to allocate it deespite it's free + // in the local bitmap because it's not in the storage + ok, err := storage.Allocate(2) + if ok || err != nil { + t.Fatal(err) + } + +} + +// Test that one item is free in storage but is allocated locally +// When try to allocate it, it should succeed despite it's allocated +// in the local bitmap bot not in the storage +func TestAllocatedLocallyButReleasedStorage(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the local bitmap only but not in the storage + // emulating it's out of sync with the storage + if _, err := backing.Allocate(2); err != nil { + t.Fatal(err) + } + + // It should be able to allocate it + // because it's free in the storage + ok, err := storage.Allocate(2) + if !ok || err != nil { + t.Fatal(err) + } + +} diff --git a/pkg/registry/core/service/portallocator/BUILD b/pkg/registry/core/service/portallocator/BUILD index 2317938eae8..4f42bb77b3e 100644 --- a/pkg/registry/core/service/portallocator/BUILD +++ b/pkg/registry/core/service/portallocator/BUILD @@ -47,6 +47,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/registry/core/service/portallocator/controller:all-srcs", + "//pkg/registry/core/service/portallocator/storage:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/registry/core/service/portallocator/storage/BUILD b/pkg/registry/core/service/portallocator/storage/BUILD new file mode 100644 index 00000000000..98ce69e0658 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/BUILD @@ -0,0 +1,45 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["storage_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/apis/core:go_default_library", + "//pkg/apis/core/install:go_default_library", + "//pkg/registry/core/service/allocator:go_default_library", + "//pkg/registry/core/service/allocator/storage:go_default_library", + "//pkg/registry/core/service/portallocator:go_default_library", + "//pkg/registry/registrytest:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = ["storage.go"], + importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/storage", +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/registry/core/service/portallocator/storage/storage.go b/pkg/registry/core/service/portallocator/storage/storage.go new file mode 100644 index 00000000000..8b40a8ba012 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/storage.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +// Keep CI happy; it is unhappy if a directory only contains tests diff --git a/pkg/registry/core/service/portallocator/storage/storage_test.go b/pkg/registry/core/service/portallocator/storage/storage_test.go new file mode 100644 index 00000000000..a86c1ddf4c2 --- /dev/null +++ b/pkg/registry/core/service/portallocator/storage/storage_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "fmt" + "strings" + "testing" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/storage" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + api "k8s.io/kubernetes/pkg/apis/core" + _ "k8s.io/kubernetes/pkg/apis/core/install" + "k8s.io/kubernetes/pkg/registry/core/service/allocator" + allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" + "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + "k8s.io/kubernetes/pkg/registry/registrytest" +) + +const ( + basePortRange = 30000 + sizePortRange = 2768 +) + +func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Interface, allocator.Interface, storage.Interface, factory.DestroyFunc) { + etcdStorage, server := registrytest.NewEtcdStorage(t, "") + + serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange} + var backing allocator.Interface + storage, err := portallocator.NewPortAllocatorCustom(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { + mem := allocator.NewAllocationMap(max, rangeSpec) + backing = mem + etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), etcdStorage) + if err != nil { + return nil, err + } + return etcd, nil + }) + if err != nil { + t.Fatalf("unexpected error creating etcd: %v", err) + } + s, d, err := generic.NewRawStorage(etcdStorage) + if err != nil { + t.Fatalf("Couldn't create storage: %v", err) + } + destroyFunc := func() { + d() + server.Terminate(t) + } + return server, storage, backing, s, destroyFunc +} + +func validNewRangeAllocation() *api.RangeAllocation { + portRange := fmt.Sprintf("%d-%d", basePortRange, basePortRange+sizePortRange-1) + return &api.RangeAllocation{ + Range: portRange, + } +} + +func key() string { + return "/ranges/servicenodeports" +} + +// TestEmpty fails to allocate ports if the storage wasn't initialized with a servicenodeport range +func TestEmpty(t *testing.T) { + _, storage, _, _, destroyFunc := newStorage(t) + defer destroyFunc() + if err := storage.Allocate(31000); !strings.Contains(err.Error(), "cannot allocate resources of type servicenodeportallocations at this time") { + t.Fatal(err) + } +} + +// TestAllocate fails to allocate ports out of the valid port range +func TestAllocate(t *testing.T) { + _, storage, _, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + tests := []struct { + name string + port int + errMsg string + }{ + { + name: "Allocate base port", + port: basePortRange, + errMsg: "", + }, + { + name: "Allocate maximum from the port range", + port: basePortRange + sizePortRange - 1, + errMsg: "", + }, + { + name: "Allocate invalid port: base port minus 1", + port: basePortRange - 1, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + { + name: "Allocate invalid port: maximum port from the port range plus 1", + port: basePortRange + sizePortRange, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + { + name: "Allocate invalid port", + port: -2, + errMsg: fmt.Sprintf("provided port is not in the valid range. The range of valid ports is %d-%d", basePortRange, basePortRange+sizePortRange-1), + }, + } + for _, tt := range tests { + tt := tt // NOTE: https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables + t.Run(tt.name, func(t *testing.T) { + err := storage.Allocate(tt.port) + if (err == nil) != (tt.errMsg == "") { + t.Fatalf("Error expected %v, received %v", tt.errMsg, err) + } + if err != nil && err.Error() != tt.errMsg { + t.Fatalf("Error message expected %v, received %v", tt.errMsg, err) + } + }) + } + +} + +// TestReallocate test that we can not allocate a port already allocated until it is released +func TestReallocate(t *testing.T) { + _, storage, backing, si, destroyFunc := newStorage(t) + defer destroyFunc() + if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate a port inside the valid port range + if err := storage.Allocate(30100); err != nil { + t.Fatal(err) + } + + // Try to allocate the same port in the local bitmap + // The local bitmap stores the offset of the port + // offset = port - base (30100 - 30000 = 100) + ok, err := backing.Allocate(100) + if err != nil { + t.Fatal(err) + } + // It should not allocate the port because it was already allocated + if ok { + t.Fatal("Expected allocation to fail") + } + // Try to allocate the port again should fail + if err := storage.Allocate(30100); err != portallocator.ErrAllocated { + t.Fatal(err) + } + + // Release the port + if err := storage.Release(30100); err != nil { + t.Fatal(err) + } + + // Try to allocate the port again should succeed because we've released it + if err := storage.Allocate(30100); err != nil { + t.Fatal(err) + } + +} diff --git a/test/integration/master/BUILD b/test/integration/master/BUILD index 9acb4055e62..b384dfe56d5 100644 --- a/test/integration/master/BUILD +++ b/test/integration/master/BUILD @@ -37,6 +37,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library", diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index e294a272a71..9323927f58c 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -34,6 +34,7 @@ import ( apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/kubernetes" @@ -493,3 +494,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +func TestMultiMasterNodePortAllocation(t *testing.T) { + var kubeAPIServers []*kubeapiservertesting.TestServer + var clientAPIServers []*kubernetes.Clientset + etcd := framework.SharedEtcd() + + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + DisableStorageCleanup: true, + } + + // cleanup the registry storage + defer registry.CleanupStorage() + + // create 2 api servers and 2 clients + for i := 0; i < 2; i++ { + // start master count api server + t.Logf("starting api server: %d", i) + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{ + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1), + }, etcd) + kubeAPIServers = append(kubeAPIServers, server) + + // verify kube API servers have registered and create a client + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig) + if err != nil { + t.Logf("create client error: %v", err) + return false, nil + } + clientAPIServers = append(clientAPIServers, client) + endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("did not find only lease endpoints: %v", err) + } + } + + serviceObject := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + Name: "test-node-port", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "nodeport-test", + Port: 443, + TargetPort: intstr.IntOrString{IntVal: 443}, + NodePort: 32080, + Protocol: "TCP", + }, + }, + Type: "NodePort", + Selector: map[string]string{"foo": "bar"}, + }, + } + + // create and delete the same nodePortservice using different APIservers + // to check that API servers are using the same port allocation bitmap + for i := 0; i < 2; i++ { + // Create the service using the first API server + _, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("unable to create service: %v", err) + } + // Delete the service using the second API server + if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + + // shutdown the api servers + for _, server := range kubeAPIServers { + server.TearDownFn() + } + +}