fix service allocation concurrency issues

The service allocator is used to allocate ip addresses for the
Service IP allocator and NodePorts for the Service NodePort
allocator. It uses a bitmap backed by etcd to store the allocation
and tries to allocate the resources directly from the local memory
instead from etcd, that can cause issues in environment with
high concurrency.

It may happen, in deployments with multiple apiservers, that the
resource allocation information is out of sync, this is more
sensible with NodePorts, per example:

1. apiserver A create a service with NodePort X
2. apiserver B deletes the service
3. apiserver A creates the service again

If the allocation data of apiserver A wasn't refreshed with the
deletion of apiserver B, apiserver A fails the allocation because
the data is out of sync. The Repair loops solve the problem later,
but there are some use cases that require to improve the concurrency
in the allocation logic.

We can try to not do the Allocation and Release operations locally,
and try instead to check if the local data is up to date with etcd,
and operate over the most recent version of the data.
This commit is contained in:
Antonio Ojea 2020-04-08 15:54:52 +02:00
parent cb87793d57
commit e3df13439a
4 changed files with 161 additions and 32 deletions

View File

@ -80,17 +80,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou
}, nil
}
// Allocate attempts to allocate the item locally and then in etcd.
// Allocate attempts to allocate the item.
func (e *Etcd) Allocate(offset int) (bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
ok, err := e.alloc.Allocate(offset)
if !ok || err != nil {
return ok, err
}
err = e.tryUpdate(func() error {
err := e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
if err != nil {
return err
@ -109,49 +104,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) {
return true, nil
}
// AllocateNext attempts to allocate the next item locally and then in etcd.
// AllocateNext attempts to allocate the next item.
func (e *Etcd) AllocateNext() (int, bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
offset, ok, err := e.alloc.AllocateNext()
if !ok || err != nil {
return offset, ok, err
}
var offset int
var ok bool
var err error
err = e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
return errorUnableToAllocate
}
return nil
return errorUnableToAllocate
}
return nil
})
return offset, ok, err
if err != nil {
if err == errorUnableToAllocate {
return offset, false, nil
}
return offset, false, err
}
return offset, true, nil
}
// Release attempts to release the provided item locally and then in etcd.
// Release attempts to release the provided item.
func (e *Etcd) Release(item int) error {
e.lock.Lock()
defer e.lock.Unlock()
if err := e.alloc.Release(item); err != nil {
return err
}
return e.tryUpdate(func() error {
return e.alloc.Release(item)
})
}
func (e *Etcd) ForEach(fn func(int)) {
@ -172,9 +162,9 @@ func (e *Etcd) tryUpdate(fn func() error) error {
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
return nil, err
}
if err := fn(); err != nil {
return nil, err
}
}
if err := fn(); err != nil {
return nil, err
}
e.last = existing.ResourceVersion
rangeSpec, data := e.alloc.Snapshot()

View File

@ -102,3 +102,59 @@ func TestStore(t *testing.T) {
t.Fatal(err)
}
}
// Test that one item is allocated in storage but is not allocated locally
// When try to allocate it, it should fail despite it's free in the local bitmap
// bot not in the storage
func TestAllocatedStorageButReleasedLocally(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Allocate an item in the storage
if _, err := storage.Allocate(2); err != nil {
t.Fatal(err)
}
// Release the item in the local bitmap
// emulating it's out of sync with the storage
err := backing.Release(2)
if err != nil {
t.Fatal(err)
}
// It should fail trying to allocate it deespite it's free
// in the local bitmap because it's not in the storage
ok, err := storage.Allocate(2)
if ok || err != nil {
t.Fatal(err)
}
}
// Test that one item is free in storage but is allocated locally
// When try to allocate it, it should succeed despite it's allocated
// in the local bitmap bot not in the storage
func TestAllocatedLocallyButReleasedStorage(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Allocate an item in the local bitmap only but not in the storage
// emulating it's out of sync with the storage
if _, err := backing.Allocate(2); err != nil {
t.Fatal(err)
}
// It should be able to allocate it
// because it's free in the storage
ok, err := storage.Allocate(2)
if !ok || err != nil {
t.Fatal(err)
}
}

View File

@ -40,6 +40,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",

View File

@ -35,6 +35,7 @@ import (
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/client-go/kubernetes"
@ -494,3 +495,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
testReconcilersMasterLease(t, 3, 3)
}
func TestMultiMasterNodePortAllocation(t *testing.T) {
var kubeAPIServers []*kubeapiservertesting.TestServer
var clientAPIServers []*kubernetes.Clientset
etcd := framework.SharedEtcd()
instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
DisableStorageCleanup: true,
}
// cleanup the registry storage
defer registry.CleanupStorage()
// create 2 api servers and 2 clients
for i := 0; i < 2; i++ {
// start master count api server
t.Logf("starting api server: %d", i)
server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
}, etcd)
kubeAPIServers = append(kubeAPIServers, server)
// verify kube API servers have registered and create a client
if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig)
if err != nil {
t.Logf("create client error: %v", err)
return false, nil
}
clientAPIServers = append(clientAPIServers, client)
endpoints, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
}
return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil
}); err != nil {
t.Fatalf("did not find only lease endpoints: %v", err)
}
}
serviceObject := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
Name: "test-node-port",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "nodeport-test",
Port: 443,
TargetPort: intstr.IntOrString{IntVal: 443},
NodePort: 32080,
Protocol: "TCP",
},
},
Type: "NodePort",
Selector: map[string]string{"foo": "bar"},
},
}
// create and delete the same nodePortservice using different APIservers
// to check that API servers are using the same port allocation bitmap
for i := 0; i < 2; i++ {
// Create the service using the first API server
_, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), serviceObject, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create service: %v", err)
}
// Delete the service using the second API server
if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(context.TODO(), serviceObject.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}
// shutdown the api servers
for _, server := range kubeAPIServers {
server.TearDownFn()
}
}