Sync local APIService objects once

This commit is contained in:
Jordan Liggitt
2017-08-29 23:19:34 -04:00
parent 0529dd405b
commit 8ca6d9994e
4 changed files with 251 additions and 22 deletions

View File

@@ -229,7 +229,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
for _, curr := range delegateAPIServer.ListedPaths() {
if curr == "/api/v1" {
apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
continue
}
@@ -247,7 +247,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget,
if apiService == nil {
continue
}
registration.AddAPIServiceToSync(apiService)
registration.AddAPIServiceToSyncOnStart(apiService)
apiServices = append(apiServices, apiService)
}

View File

@@ -27,6 +27,7 @@ go_library(
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",

View File

@@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
@@ -39,12 +40,19 @@ import (
const (
AutoRegisterManagedLabel = "kube-aggregator.kubernetes.io/automanaged"
// manageOnStart is a value for the AutoRegisterManagedLabel that indicates the APIService wants to be synced one time when the controller starts.
manageOnStart = "onstart"
// manageContinuously is a value for the AutoRegisterManagedLabel that indicates the APIService wants to be synced continuously.
manageContinuously = "true"
)
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
// adding and removing APIServices
type AutoAPIServiceRegistration interface {
// AddAPIServiceToSync adds an API service to auto-register.
// AddAPIServiceToSyncOnStart adds an API service to sync on start.
AddAPIServiceToSyncOnStart(in *apiregistration.APIService)
// AddAPIServiceToSync adds an API service to sync continuously.
AddAPIServiceToSync(in *apiregistration.APIService)
// RemoveAPIServiceToSync removes an API service to auto-register.
RemoveAPIServiceToSync(name string)
@@ -62,6 +70,13 @@ type autoRegisterController struct {
syncHandler func(apiServiceName string) error
// track which services we have synced
syncedSuccessfullyLock *sync.RWMutex
syncedSuccessfully map[string]bool
// remember names of services that existed when we started
apiServicesAtStart map[string]bool
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
@@ -72,7 +87,13 @@ func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer,
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
apiServiceClient: apiServiceClient,
apiServicesToSync: map[string]*apiregistration.APIService{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"),
apiServicesAtStart: map[string]bool{},
syncedSuccessfullyLock: &sync.RWMutex{},
syncedSuccessfully: map[string]bool{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"),
}
c.syncHandler = c.checkAPIService
@@ -120,6 +141,13 @@ func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) {
return
}
// record APIService objects that existed when we started
if services, err := c.apiServiceLister.List(labels.Everything()); err == nil {
for _, service := range services {
c.apiServicesAtStart[service.Name] = true
}
}
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
@@ -169,29 +197,61 @@ func (c *autoRegisterController) processNextWorkItem() bool {
return true
}
func (c *autoRegisterController) checkAPIService(name string) error {
// checkAPIService syncs the current APIService against a list of desired APIService objects
//
// | A. desired: not found | B. desired: sync on start | C. desired: sync always
// ------------------------------------------------|-----------------------|---------------------------|------------------------
// 1. current: lookup error | error | error | error
// 2. current: not found | - | create once | create
// 3. current: no sync | - | - | -
// 4. current: sync on start, not present at start | - | - | -
// 5. current: sync on start, present at start | delete once | update once | update once
// 6. current: sync always | delete | update once | update
func (c *autoRegisterController) checkAPIService(name string) (err error) {
desired := c.GetAPIServiceToSync(name)
curr, err := c.apiServiceLister.Get(name)
// if we've never synced this service successfully, record a successful sync.
hasSynced := c.hasSyncedSuccessfully(name)
if !hasSynced {
defer func() {
if err == nil {
c.setSyncedSuccessfully(name)
}
}()
}
switch {
// we had a real error, just return it
// we had a real error, just return it (1A,1B,1C)
case err != nil && !apierrors.IsNotFound(err):
return err
// we don't have an entry and we don't want one
// we don't have an entry and we don't want one (2A)
case apierrors.IsNotFound(err) && desired == nil:
return nil
// we don't have an entry and we do want one
// the local object only wants to sync on start and has already synced (2B,5B,6B "once" enforcement)
case isAutomanagedOnStart(desired) && hasSynced:
return nil
// we don't have an entry and we do want one (2B,2C)
case apierrors.IsNotFound(err) && desired != nil:
_, err := c.apiServiceClient.APIServices().Create(desired)
return err
// we aren't trying to manage this APIService. If the user removes the label, he's taken over management himself
case curr.Labels[AutoRegisterManagedLabel] != "true":
// we aren't trying to manage this APIService (3A,3B,3C)
case !isAutomanaged(curr):
return nil
// we have a spurious APIService that we're managing, delete it
// the remote object only wants to sync on start, but was added after we started (4A,4B,4C)
case isAutomanagedOnStart(curr) && !c.apiServicesAtStart[name]:
return nil
// the remote object only wants to sync on start and has already synced (5A,5B,5C "once" enforcement)
case isAutomanagedOnStart(curr) && hasSynced:
return nil
// we have a spurious APIService that we're managing, delete it (5A,6A)
case desired == nil:
return c.apiServiceClient.APIServices().Delete(curr.Name, nil)
@@ -200,7 +260,7 @@ func (c *autoRegisterController) checkAPIService(name string) error {
return nil
}
// we have an entry and we have a desired, now we deconflict. Only a few fields matter.
// we have an entry and we have a desired, now we deconflict. Only a few fields matter. (5B,5C,6B,6C)
apiService := curr.DeepCopy()
apiService.Spec = desired.Spec
_, err = c.apiServiceClient.APIServices().Update(apiService)
@@ -214,7 +274,15 @@ func (c *autoRegisterController) GetAPIServiceToSync(name string) *apiregistrati
return c.apiServicesToSync[name]
}
func (c *autoRegisterController) AddAPIServiceToSyncOnStart(in *apiregistration.APIService) {
c.addAPIServiceToSync(in, manageOnStart)
}
func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIService) {
c.addAPIServiceToSync(in, manageContinuously)
}
func (c *autoRegisterController) addAPIServiceToSync(in *apiregistration.APIService, syncType string) {
c.apiServicesToSyncLock.Lock()
defer c.apiServicesToSyncLock.Unlock()
@@ -222,7 +290,7 @@ func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIServ
if apiService.Labels == nil {
apiService.Labels = map[string]string{}
}
apiService.Labels[AutoRegisterManagedLabel] = "true"
apiService.Labels[AutoRegisterManagedLabel] = syncType
c.apiServicesToSync[apiService.Name] = apiService
c.queue.Add(apiService.Name)
@@ -235,3 +303,31 @@ func (c *autoRegisterController) RemoveAPIServiceToSync(name string) {
delete(c.apiServicesToSync, name)
c.queue.Add(name)
}
func (c *autoRegisterController) hasSyncedSuccessfully(name string) bool {
c.syncedSuccessfullyLock.RLock()
defer c.syncedSuccessfullyLock.RUnlock()
return c.syncedSuccessfully[name]
}
func (c *autoRegisterController) setSyncedSuccessfully(name string) {
c.syncedSuccessfullyLock.Lock()
defer c.syncedSuccessfullyLock.Unlock()
c.syncedSuccessfully[name] = true
}
func automanagedType(service *apiregistration.APIService) string {
if service == nil {
return ""
}
return service.Labels[AutoRegisterManagedLabel]
}
func isAutomanagedOnStart(service *apiregistration.APIService) bool {
return automanagedType(service) == manageOnStart
}
func isAutomanaged(service *apiregistration.APIService) bool {
managedType := automanagedType(service)
return managedType == manageOnStart || managedType == manageContinuously
}

View File

@@ -18,6 +18,7 @@ package autoregister
import (
"fmt"
"sync"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -35,6 +36,12 @@ func newAutoRegisterManagedAPIService(name string) *apiregistration.APIService {
}
}
func newAutoRegisterManagedOnStartAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("onstart")}},
}
}
func newAutoRegisterManagedModifiedAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("true")}},
@@ -44,6 +51,15 @@ func newAutoRegisterManagedModifiedAPIService(name string) *apiregistration.APIS
}
}
func newAutoRegisterManagedOnStartModifiedAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("onstart")}},
Spec: apiregistration.APIServiceSpec{
Group: "something",
},
}
}
func newAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name},
@@ -80,6 +96,28 @@ func checkForCreate(name string, client *fake.Clientset) error {
return nil
}
func checkForCreateOnStart(name string, client *fake.Clientset) error {
if len(client.Actions()) == 0 {
return nil
}
if len(client.Actions()) > 1 {
return fmt.Errorf("unexpected action: %v", client.Actions())
}
action := client.Actions()[0]
createAction, ok := action.(clienttesting.CreateAction)
if !ok {
return fmt.Errorf("unexpected action: %v", client.Actions())
}
apiService := createAction.GetObject().(*apiregistration.APIService)
if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "onstart" {
return fmt.Errorf("bad name or label %v", createAction)
}
return nil
}
func checkForUpdate(name string, client *fake.Clientset) error {
if len(client.Actions()) == 0 {
return nil
@@ -121,13 +159,16 @@ func checkForDelete(name string, client *fake.Clientset) error {
func TestSync(t *testing.T) {
tests := []struct {
name string
apiServiceName string
addAPIServices []*apiregistration.APIService
updateAPIServices []*apiregistration.APIService
addSyncAPIServices []*apiregistration.APIService
delSyncAPIServices []string
expectedResults func(name string, client *fake.Clientset) error
name string
apiServiceName string
addAPIServices []*apiregistration.APIService
updateAPIServices []*apiregistration.APIService
addSyncAPIServices []*apiregistration.APIService
addSyncOnStartAPIServices []*apiregistration.APIService
delSyncAPIServices []string
alreadySynced map[string]bool
presentAtStart map[string]bool
expectedResults func(name string, client *fake.Clientset) error
}{
{
name: "adding an API service which isn't auto-managed does nothing",
@@ -166,7 +207,7 @@ func TestSync(t *testing.T) {
expectedResults: checkForDelete,
},
{
name: "removing auto-manged then RemoveAPIService should not touch APIService",
name: "removing auto-managed then RemoveAPIService should not touch APIService",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{},
updateAPIServices: []*apiregistration.APIService{newAPIService("foo")},
@@ -192,17 +233,104 @@ func TestSync(t *testing.T) {
delSyncAPIServices: []string{},
expectedResults: checkForUpdate,
},
{
name: "adding one to auto-register on start should create",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{},
updateAPIServices: []*apiregistration.APIService{},
addSyncOnStartAPIServices: []*apiregistration.APIService{newAPIService("foo")},
delSyncAPIServices: []string{},
expectedResults: checkForCreateOnStart,
},
{
name: "adding one to auto-register on start already synced should do nothing",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{},
updateAPIServices: []*apiregistration.APIService{},
addSyncOnStartAPIServices: []*apiregistration.APIService{newAPIService("foo")},
delSyncAPIServices: []string{},
alreadySynced: map[string]bool{"foo": true},
expectedResults: checkForNothing,
},
{
name: "managed onstart apiservice present at start without a matching request should delete",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{newAPIService("foo")},
updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")},
addSyncAPIServices: []*apiregistration.APIService{},
delSyncAPIServices: []string{},
presentAtStart: map[string]bool{"foo": true},
alreadySynced: map[string]bool{},
expectedResults: checkForDelete,
},
{
name: "managed onstart apiservice present at start without a matching request already synced once should no-op",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{newAPIService("foo")},
updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")},
addSyncAPIServices: []*apiregistration.APIService{},
delSyncAPIServices: []string{},
presentAtStart: map[string]bool{"foo": true},
alreadySynced: map[string]bool{"foo": true},
expectedResults: checkForNothing,
},
{
name: "managed onstart apiservice not present at start without a matching request should no-op",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{newAPIService("foo")},
updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")},
addSyncAPIServices: []*apiregistration.APIService{},
delSyncAPIServices: []string{},
presentAtStart: map[string]bool{},
alreadySynced: map[string]bool{},
expectedResults: checkForNothing,
},
{
name: "modifying onstart it should result in stomping",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{},
updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedModifiedAPIService("foo")},
addSyncOnStartAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")},
delSyncAPIServices: []string{},
expectedResults: checkForUpdate,
},
{
name: "modifying onstart already synced should no-op",
apiServiceName: "foo",
addAPIServices: []*apiregistration.APIService{},
updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedModifiedAPIService("foo")},
addSyncOnStartAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")},
delSyncAPIServices: []string{},
alreadySynced: map[string]bool{"foo": true},
expectedResults: checkForNothing,
},
}
for _, test := range tests {
fakeClient := fake.NewSimpleClientset()
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
c := autoRegisterController{
alreadySynced := map[string]bool{}
for k, v := range test.alreadySynced {
alreadySynced[k] = v
}
presentAtStart := map[string]bool{}
for k, v := range test.presentAtStart {
presentAtStart[k] = v
}
c := &autoRegisterController{
apiServiceClient: fakeClient.Apiregistration(),
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
apiServicesToSync: map[string]*apiregistration.APIService{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"),
syncedSuccessfullyLock: &sync.RWMutex{},
syncedSuccessfully: alreadySynced,
apiServicesAtStart: presentAtStart,
}
for _, obj := range test.addAPIServices {
@@ -217,6 +345,10 @@ func TestSync(t *testing.T) {
c.AddAPIServiceToSync(obj)
}
for _, obj := range test.addSyncOnStartAPIServices {
c.AddAPIServiceToSyncOnStart(obj)
}
for _, objName := range test.delSyncAPIServices {
c.RemoveAPIServiceToSync(objName)
}