From e729c57534bbb9e8f356485366ce4493079ce569 Mon Sep 17 00:00:00 2001 From: "Timothy St. Clair" Date: Tue, 7 Mar 2017 14:38:38 -0600 Subject: [PATCH] Add the ability to lock on ConfigMaps to support HA for self hosted components w/o locking on an endpoint which has a series of issues. --- .../leaderelection/leaderelection_test.go | 237 +++++++++++++++++- pkg/client/leaderelection/resourcelock/BUILD | 1 + .../resourcelock/configmaplock.go | 109 ++++++++ .../resourcelock/endpointslock.go | 1 + 4 files changed, 343 insertions(+), 5 deletions(-) create mode 100644 pkg/client/leaderelection/resourcelock/configmaplock.go diff --git a/pkg/client/leaderelection/leaderelection_test.go b/pkg/client/leaderelection/leaderelection_test.go index 3b75dc6ebd3..a7f40049891 100644 --- a/pkg/client/leaderelection/leaderelection_test.go +++ b/pkg/client/leaderelection/leaderelection_test.go @@ -14,10 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package leaderelection implements leader election of a set of endpoints. -// It uses an annotation in the endpoints object to store the record of the -// election state. - package leaderelection import ( @@ -36,7 +32,8 @@ import ( rl "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" ) -func TestTryAcquireOrRenew(t *testing.T) { +// Will test leader election using endpoints as the resource +func TestTryAcquireOrRenewEndpoints(t *testing.T) { future := time.Now().Add(1000 * time.Hour) past := time.Now().Add(-1000 * time.Hour) @@ -264,3 +261,233 @@ func TestTryAcquireOrRenew(t *testing.T) { } } } + +// Will test leader election using configmap as the resource +func TestTryAcquireOrRenewConfigMap(t *testing.T) { + future := time.Now().Add(1000 * time.Hour) + past := time.Now().Add(-1000 * time.Hour) + + tests := []struct { + observedRecord rl.LeaderElectionRecord + observedTime time.Time + reactors []struct { + verb string + reaction core.ReactionFunc + } + + expectSuccess bool + transitionLeader bool + outHolder string + }{ + // acquire from no onfigmap + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, errors.NewNotFound(action.(core.GetAction).GetResource().GroupResource(), action.(core.GetAction).GetName()) + }, + }, + { + verb: "create", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*v1.ConfigMap), nil + }, + }, + }, + expectSuccess: true, + outHolder: "baz", + }, + // acquire from unled configmap + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + }, + }, nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*v1.ConfigMap), nil + }, + }, + }, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + // acquire from led, unacked configmap + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, + }, + }, + }, nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*v1.ConfigMap), nil + }, + }, + }, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"}, + observedTime: past, + + expectSuccess: true, + transitionLeader: true, + outHolder: "baz", + }, + // don't acquire from led, acked configmap + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"bing"}`, + }, + }, + }, nil + }, + }, + }, + observedTime: future, + + expectSuccess: false, + outHolder: "bing", + }, + // renew already acquired configmap + { + reactors: []struct { + verb string + reaction core.ReactionFunc + }{ + { + verb: "get", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: action.GetNamespace(), + Name: action.(core.GetAction).GetName(), + Annotations: map[string]string{ + rl.LeaderElectionRecordAnnotationKey: `{"holderIdentity":"baz"}`, + }, + }, + }, nil + }, + }, + { + verb: "update", + reaction: func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, action.(core.CreateAction).GetObject().(*v1.ConfigMap), nil + }, + }, + }, + observedTime: future, + observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"}, + + expectSuccess: true, + outHolder: "baz", + }, + } + + for i, test := range tests { + // OnNewLeader is called async so we have to wait for it. + var wg sync.WaitGroup + wg.Add(1) + var reportedLeader string + + lock := rl.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}, + LockConfig: rl.ResourceLockConfig{ + Identity: "baz", + EventRecorder: &record.FakeRecorder{}, + }, + } + + lec := LeaderElectionConfig{ + Lock: &lock, + LeaseDuration: 10 * time.Second, + Callbacks: LeaderCallbacks{ + OnNewLeader: func(l string) { + defer wg.Done() + reportedLeader = l + }, + }, + } + c := &fakeclientset.Clientset{Fake: core.Fake{}} + for _, reactor := range test.reactors { + c.AddReactor(reactor.verb, "configmaps", reactor.reaction) + } + c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { + t.Errorf("[%v] unreachable action. testclient called too many times: %+v", i, action) + return true, nil, fmt.Errorf("uncreachable action") + }) + + le := &LeaderElector{ + config: lec, + observedRecord: test.observedRecord, + observedTime: test.observedTime, + } + lock.Client = c + + if test.expectSuccess != le.tryAcquireOrRenew() { + t.Errorf("[%v]unexpected result of tryAcquireOrRenew: [succeded=%v]", i, !test.expectSuccess) + } + + le.observedRecord.AcquireTime = metav1.Time{} + le.observedRecord.RenewTime = metav1.Time{} + if le.observedRecord.HolderIdentity != test.outHolder { + t.Errorf("[%v]expected holder:\n\t%+v\ngot:\n\t%+v", i, test.outHolder, le.observedRecord.HolderIdentity) + } + if len(test.reactors) != len(c.Actions()) { + t.Errorf("[%v]wrong number of api interactions", i) + } + if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 { + t.Errorf("[%v]leader should have transitioned but did not", i) + } + if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 { + t.Errorf("[%v]leader should not have transitioned but did", i) + } + + le.maybeReportTransition() + wg.Wait() + if reportedLeader != test.outHolder { + t.Errorf("[%v]reported leader was not the new leader. expected %q, got %q", i, test.outHolder, reportedLeader) + } + } +} diff --git a/pkg/client/leaderelection/resourcelock/BUILD b/pkg/client/leaderelection/resourcelock/BUILD index bb0588a30a8..54751b7ce70 100644 --- a/pkg/client/leaderelection/resourcelock/BUILD +++ b/pkg/client/leaderelection/resourcelock/BUILD @@ -10,6 +10,7 @@ load( go_library( name = "go_default_library", srcs = [ + "configmaplock.go", "endpointslock.go", "interface.go", ], diff --git a/pkg/client/leaderelection/resourcelock/configmaplock.go b/pkg/client/leaderelection/resourcelock/configmaplock.go new file mode 100644 index 00000000000..20aaf3625a6 --- /dev/null +++ b/pkg/client/leaderelection/resourcelock/configmaplock.go @@ -0,0 +1,109 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "encoding/json" + "errors" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" +) + +// TODO: This is almost a exact replica of Endpoints lock. +// going forwards as we self host more and more components +// and use ConfigMaps as the means to pass that configuration +// data we will likely move to deprecate the Endpoints lock. + +type ConfigMapLock struct { + // ConfigMapMeta should contain a Name and a Namespace of an + // ConfigMapMeta object that the Leadercmlector will attempt to lead. + ConfigMapMeta metav1.ObjectMeta + Client clientset.Interface + LockConfig ResourceLockConfig + cm *v1.ConfigMap +} + +// Get returns the cmlection record from a ConfigMap Annotation +func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) { + var record LeaderElectionRecord + var err error + cml.cm, err = cml.Client.Core().ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Create attempts to create a LeadercmlectionRecord annotation +func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm, err = cml.Client.Core().ConfigMaps(cml.ConfigMapMeta.Namespace).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cml.ConfigMapMeta.Name, + Namespace: cml.ConfigMapMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }) + return err +} + +// Update will update and existing annotation on a given resource. +func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + cml.cm, err = cml.Client.Core().ConfigMaps(cml.ConfigMapMeta.Namespace).Update(cml.cm) + return err +} + +// RecordEvent in leader cmlection while adding meta-data +func (cml *ConfigMapLock) RecordEvent(s string) { + events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) + cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (cml *ConfigMapLock) Describe() string { + return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) +} + +// returns the Identity of the lock +func (cml *ConfigMapLock) Identity() string { + return cml.LockConfig.Identity +} diff --git a/pkg/client/leaderelection/resourcelock/endpointslock.go b/pkg/client/leaderelection/resourcelock/endpointslock.go index 5001438f807..215c3b65525 100644 --- a/pkg/client/leaderelection/resourcelock/endpointslock.go +++ b/pkg/client/leaderelection/resourcelock/endpointslock.go @@ -35,6 +35,7 @@ type EndpointsLock struct { e *v1.Endpoints } +// Get returns the election record from a Endpoints Annotation func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { var record LeaderElectionRecord var err error