Merge pull request #43141 from deads2k/tpr-04-register

Automatic merge from submit-queue (batch tested with PRs 43429, 43416, 43312, 43141, 43421)

Create controller to auto register TPRs with the aggregator

Builds on https://github.com/kubernetes/kubernetes/pull/42732 (already lgtmed)

Creates a simple controller to wire TPRs with the API Service autoregistration controller.

@kubernetes/sig-api-machinery-misc @ncdc
This commit is contained in:
Kubernetes Submit Queue 2017-03-25 22:24:27 -07:00 committed by GitHub
commit 484ac692f8
6 changed files with 1014 additions and 1 deletions

View File

@ -5,23 +5,32 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["thirdparty.go"],
srcs = [
"thirdparty.go",
"tprregistration_controller.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/informers/informers_generated/internalversion/extensions/internalversion:go_default_library",
"//pkg/client/listers/extensions/internalversion:go_default_library",
"//pkg/registry/extensions/rest:go_default_library",
"//pkg/registry/extensions/thirdpartyresourcedata:go_default_library",
"//pkg/registry/extensions/thirdpartyresourcedata/storage:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apiserver/pkg/endpoints",
"//vendor:k8s.io/apiserver/pkg/endpoints/handlers",
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
@ -30,6 +39,9 @@ go_library(
"//vendor:k8s.io/apiserver/pkg/server",
"//vendor:k8s.io/apiserver/pkg/server/storage",
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
],
)
@ -45,3 +57,19 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["tprregistration_controller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/extensions:go_default_library",
"//pkg/client/listers/extensions/internalversion:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
],
)

View File

@ -0,0 +1,214 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package thirdparty
import (
"fmt"
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kubernetes/pkg/apis/extensions"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
)
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
// adding and removing APIServices
type AutoAPIServiceRegistration interface {
// AddAPIServiceToSync adds an API service to auto-register.
AddAPIServiceToSync(in *apiregistration.APIService)
// RemoveAPIServiceToSync removes an API service to auto-register.
RemoveAPIServiceToSync(name string)
}
type tprRegistrationController struct {
tprLister listers.ThirdPartyResourceLister
tprSynced cache.InformerSynced
apiServiceRegistration AutoAPIServiceRegistration
syncHandler func(groupVersion schema.GroupVersion) error
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
// this is actually keyed by a groupVersion
queue workqueue.RateLimitingInterface
}
// NewAutoRegistrationController returns a controller which will register TPR GroupVersions with the auto APIService registration
// controller so they automatically stay in sync.
func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInformer, apiServiceRegistration AutoAPIServiceRegistration) *tprRegistrationController {
c := &tprRegistrationController{
tprLister: tprInformer.Lister(),
tprSynced: tprInformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
}
c.syncHandler = c.handleTPR
tprInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*extensions.ThirdPartyResource)
c.enqueueTPR(cast)
},
UpdateFunc: func(_, obj interface{}) {
cast := obj.(*extensions.ThirdPartyResource)
c.enqueueTPR(cast)
},
DeleteFunc: func(obj interface{}) {
cast, ok := obj.(*extensions.ThirdPartyResource)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
cast, ok = tombstone.Obj.(*extensions.ThirdPartyResource)
if !ok {
glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
return
}
}
c.enqueueTPR(cast)
},
})
return c
}
func (c *tprRegistrationController) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
glog.Infof("Starting tpr-autoregister controller")
defer glog.Infof("Shutting down tpr-autoregister controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.tprSynced) {
return
}
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
// after one second
go wait.Until(c.runWorker, time.Second, stopCh)
}
// wait until we're told to stop
<-stopCh
}
func (c *tprRegistrationController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *tprRegistrationController) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of work
defer c.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := c.syncHandler(key.(schema.GroupVersion))
if err == nil {
// if you had no error, tell the queue to stop tracking history for your key. This will
// reset things like failure counts for per-item rate limiting
c.queue.Forget(key)
return true
}
// there was a failure so be sure to report it. This method allows for pluggable error handling
// which can be used for things like cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
// since we failed, we should requeue the item to work on later. This method will add a backoff
// to avoid hotlooping on particular items (they're probably still not going to work right away)
// and overall controller protection (everything I've done is broken, this controller needs to
// calm down or it can starve other useful work) cases.
c.queue.AddRateLimited(key)
return true
}
func (c *tprRegistrationController) enqueueTPR(tpr *extensions.ThirdPartyResource) {
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
if err != nil {
utilruntime.HandleError(err)
return
}
for _, version := range tpr.Versions {
c.queue.Add(schema.GroupVersion{Group: group, Version: version.Name})
}
}
func (c *tprRegistrationController) handleTPR(groupVersion schema.GroupVersion) error {
// check all TPRs. There shouldn't that many, but if we have problems later we can index them
tprs, err := c.tprLister.List(labels.Everything())
if err != nil {
return err
}
found := false
for _, tpr := range tprs {
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
if err != nil {
return err
}
for _, version := range tpr.Versions {
if version.Name == groupVersion.Version && group == groupVersion.Group {
found = true
break
}
}
}
apiServiceName := groupVersion.Version + "." + groupVersion.Group
if !found {
c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
return nil
}
c.apiServiceRegistration.AddAPIServiceToSync(&apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
Spec: apiregistration.APIServiceSpec{
Group: groupVersion.Group,
Version: groupVersion.Version,
Priority: 500, // TPRs should have relatively low priority
},
})
return nil
}

View File

@ -0,0 +1,141 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package thirdparty
import (
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kubernetes/pkg/apis/extensions"
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
)
func TestEnqueue(t *testing.T) {
c := tprRegistrationController{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
}
tpr := &extensions.ThirdPartyResource{
ObjectMeta: metav1.ObjectMeta{Name: "resource.group.example.com"},
Versions: []extensions.APIVersion{
{Name: "v1alpha1"},
{Name: "v1"},
},
}
c.enqueueTPR(tpr)
first, _ := c.queue.Get()
expectedFirst := schema.GroupVersion{Group: "group.example.com", Version: "v1alpha1"}
if first != expectedFirst {
t.Errorf("expected %v, got %v", expectedFirst, first)
}
second, _ := c.queue.Get()
expectedSecond := schema.GroupVersion{Group: "group.example.com", Version: "v1"}
if second != expectedSecond {
t.Errorf("expected %v, got %v", expectedSecond, second)
}
}
func TestHandleTPR(t *testing.T) {
tests := []struct {
name string
startingTPRs []*extensions.ThirdPartyResource
version schema.GroupVersion
expectedAdded []*apiregistration.APIService
expectedRemoved []string
}{
{
name: "simple add",
startingTPRs: []*extensions.ThirdPartyResource{
{
ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"},
Versions: []extensions.APIVersion{
{Name: "v1"},
},
},
},
version: schema.GroupVersion{Group: "group.com", Version: "v1"},
expectedAdded: []*apiregistration.APIService{
{
ObjectMeta: metav1.ObjectMeta{Name: "v1.group.com"},
Spec: apiregistration.APIServiceSpec{
Group: "group.com",
Version: "v1",
Priority: 500,
},
},
},
},
{
name: "simple remove",
startingTPRs: []*extensions.ThirdPartyResource{
{
ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"},
Versions: []extensions.APIVersion{
{Name: "v1"},
},
},
},
version: schema.GroupVersion{Group: "group.com", Version: "v2"},
expectedRemoved: []string{"v2.group.com"},
},
}
for _, test := range tests {
registration := &fakeAPIServiceRegistration{}
tprCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
tprLister := listers.NewThirdPartyResourceLister(tprCache)
c := tprRegistrationController{
tprLister: tprLister,
apiServiceRegistration: registration,
}
for i := range test.startingTPRs {
tprCache.Add(test.startingTPRs[i])
}
c.handleTPR(test.version)
if !reflect.DeepEqual(test.expectedAdded, registration.added) {
t.Errorf("%s expected %v, got %v", test.name, test.expectedAdded, registration.added)
}
if !reflect.DeepEqual(test.expectedRemoved, registration.removed) {
t.Errorf("%s expected %v, got %v", test.name, test.expectedRemoved, registration.removed)
}
}
}
type fakeAPIServiceRegistration struct {
added []*apiregistration.APIService
removed []string
}
func (a *fakeAPIServiceRegistration) AddAPIServiceToSync(in *apiregistration.APIService) {
a.added = append(a.added, in)
}
func (a *fakeAPIServiceRegistration) RemoveAPIServiceToSync(name string) {
a.removed = append(a.removed, name)
}

View File

@ -0,0 +1,249 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package autoregister
import (
"fmt"
"reflect"
"sync"
"time"
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/conversion"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
)
const (
AutoRegisterManagedLabel = "kube-aggregator.kubernetes.io/automanaged"
)
var (
cloner = conversion.NewCloner()
)
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
// adding and removing APIServices
type AutoAPIServiceRegistration interface {
// AddAPIServiceToSync adds an API service to auto-register.
AddAPIServiceToSync(in *apiregistration.APIService)
// RemoveAPIServiceToSync removes an API service to auto-register.
RemoveAPIServiceToSync(name string)
}
// autoRegisterController is used to keep a particular set of APIServices present in the API. It is useful
// for cases where you want to auto-register APIs like TPRs or groups from the core kube-apiserver
type autoRegisterController struct {
apiServiceLister listers.APIServiceLister
apiServiceSynced cache.InformerSynced
apiServiceClient apiregistrationclient.APIServicesGetter
apiServicesToSyncLock sync.RWMutex
apiServicesToSync map[string]*apiregistration.APIService
syncHandler func(apiServiceName string) error
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
queue workqueue.RateLimitingInterface
}
func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer, apiServiceClient apiregistrationclient.APIServicesGetter) *autoRegisterController {
c := &autoRegisterController{
apiServiceLister: apiServiceInformer.Lister(),
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
apiServiceClient: apiServiceClient,
apiServicesToSync: map[string]*apiregistration.APIService{},
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"),
}
c.syncHandler = c.checkAPIService
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cast := obj.(*apiregistration.APIService)
c.queue.Add(cast.Name)
},
UpdateFunc: func(_, obj interface{}) {
cast := obj.(*apiregistration.APIService)
c.queue.Add(cast.Name)
},
DeleteFunc: func(obj interface{}) {
cast, ok := obj.(*apiregistration.APIService)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
return
}
cast, ok = tombstone.Obj.(*apiregistration.APIService)
if !ok {
glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
return
}
}
c.queue.Add(cast.Name)
},
})
return c
}
func (c *autoRegisterController) Run(threadiness int, stopCh chan struct{}) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
glog.Infof("Starting autoregister controller")
defer glog.Infof("Shutting down autoregister controller")
// wait for your secondary caches to fill before starting your work
if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced) {
return
}
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
// after one second
go wait.Until(c.runWorker, time.Second, stopCh)
}
// wait until we're told to stop
<-stopCh
}
func (c *autoRegisterController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *autoRegisterController) processNextWorkItem() bool {
// pull the next work item from queue. It should be a key we use to lookup something in a cache
key, quit := c.queue.Get()
if quit {
return false
}
// you always have to indicate to the queue that you've completed a piece of work
defer c.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := c.syncHandler(key.(string))
if err == nil {
// if you had no error, tell the queue to stop tracking history for your key. This will
// reset things like failure counts for per-item rate limiting
c.queue.Forget(key)
return true
}
// there was a failure so be sure to report it. This method allows for pluggable error handling
// which can be used for things like cluster-monitoring
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
// since we failed, we should requeue the item to work on later. This method will add a backoff
// to avoid hotlooping on particular items (they're probably still not going to work right away)
// and overall controller protection (everything I've done is broken, this controller needs to
// calm down or it can starve other useful work) cases.
c.queue.AddRateLimited(key)
return true
}
func (c *autoRegisterController) checkAPIService(name string) error {
desired := c.GetAPIServiceToSync(name)
curr, err := c.apiServiceLister.Get(name)
switch {
// we had a real error, just return it
case err != nil && !apierrors.IsNotFound(err):
return err
// we don't have an entry and we don't want one
case apierrors.IsNotFound(err) && desired == nil:
return nil
// we don't have an entry and we do want one
case apierrors.IsNotFound(err) && desired != nil:
_, err := c.apiServiceClient.APIServices().Create(desired)
return err
// we aren't trying to manage this APIService. If the user removes the label, he's taken over management himself
case curr.Labels[AutoRegisterManagedLabel] != "true":
return nil
// we have a spurious APIService that we're managing, delete it
case desired == nil:
return c.apiServiceClient.APIServices().Delete(curr.Name, nil)
// if the specs already match, nothing for us to do
case reflect.DeepEqual(curr.Spec, desired.Spec):
return nil
}
// we have an entry and we have a desired, now we deconflict. Only a few fields matter.
apiService := &apiregistration.APIService{}
if err := apiregistration.DeepCopy_apiregistration_APIService(curr, apiService, cloner); err != nil {
return err
}
apiService.Spec = desired.Spec
_, err = c.apiServiceClient.APIServices().Update(apiService)
return err
}
func (c *autoRegisterController) GetAPIServiceToSync(name string) *apiregistration.APIService {
c.apiServicesToSyncLock.RLock()
defer c.apiServicesToSyncLock.RUnlock()
return c.apiServicesToSync[name]
}
func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIService) {
c.apiServicesToSyncLock.Lock()
defer c.apiServicesToSyncLock.Unlock()
apiService := &apiregistration.APIService{}
if err := apiregistration.DeepCopy_apiregistration_APIService(in, apiService, cloner); err != nil {
// this shouldn't happen
utilruntime.HandleError(err)
return
}
if apiService.Labels == nil {
apiService.Labels = map[string]string{}
}
apiService.Labels[AutoRegisterManagedLabel] = "true"
c.apiServicesToSync[apiService.Name] = apiService
c.queue.Add(apiService.Name)
}
func (c *autoRegisterController) RemoveAPIServiceToSync(name string) {
c.apiServicesToSyncLock.Lock()
defer c.apiServicesToSyncLock.Unlock()
delete(c.apiServicesToSync, name)
c.queue.Add(name)
}

View File

@ -0,0 +1,343 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package autoregister
import (
"fmt"
"reflect"
"testing"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
)
func alwaysReady() bool { return true }
func waitForNothing(startTime time.Time, client *fake.Clientset) (bool, error) {
if len(client.Actions()) > 0 {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
if time.Now().After(startTime.Add(3 * time.Second)) {
return true, nil
}
return false, nil
}
func waitForCreate(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) {
return func(startTime time.Time, client *fake.Clientset) (bool, error) {
if len(client.Actions()) == 0 {
return false, nil
}
if len(client.Actions()) > 1 {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
action := client.Actions()[0]
createAction, ok := action.(core.CreateAction)
if !ok {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
apiService := createAction.GetObject().(*apiregistration.APIService)
if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "true" {
return false, fmt.Errorf("bad name or label %v", createAction)
}
return true, nil
}
}
func waitForUpdate(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) {
return func(startTime time.Time, client *fake.Clientset) (bool, error) {
if len(client.Actions()) == 0 {
return false, nil
}
if len(client.Actions()) > 1 {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
action := client.Actions()[0]
updateAction, ok := action.(core.UpdateAction)
if !ok {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
apiService := updateAction.GetObject().(*apiregistration.APIService)
if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "true" || apiService.Spec.Group != "" {
return false, fmt.Errorf("bad name, label, or group %v", updateAction)
}
return true, nil
}
}
func waitForDelete(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) {
return func(startTime time.Time, client *fake.Clientset) (bool, error) {
if len(client.Actions()) == 0 {
return false, nil
}
// tolerate delete being called multiple times. This happens if the delete fails on missing resource which
// happens on an unsynced cache
for _, action := range client.Actions() {
deleteAction, ok := action.(core.DeleteAction)
if !ok {
return false, fmt.Errorf("unexpected action: %v", client.Actions())
}
if deleteAction.GetName() != name {
return false, fmt.Errorf("bad name %v", deleteAction)
}
}
return true, nil
}
}
func TestCheckAPIService(t *testing.T) {
tests := []struct {
name string
steps []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface)
expectedResults []func(startTime time.Time, client *fake.Clientset) (bool, error)
}{
{
name: "do nothing",
steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){
// adding an API service which isn't auto-managed does nothing
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
fakeWatch.Add(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// removing an auto-sync that doesn't exist should do nothing
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.RemoveAPIServiceToSync("bar")
},
},
expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){
waitForNothing,
waitForNothing,
},
},
{
name: "simple create and delete",
steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){
// adding one to auto-register should create
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// adding the same item again shouldn't do anything
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// removing entry should delete the API service since its managed
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.RemoveAPIServiceToSync("foo")
},
},
expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){
waitForCreate("foo"),
waitForNothing,
waitForDelete("foo"),
},
},
{
name: "create, user manage, then delete",
steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){
// adding one to auto-register should create
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// adding an API service to take ownership shouldn't cause the controller to do anything
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
fakeWatch.Modify(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// removing entry should NOT delete the API service since its user owned
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.RemoveAPIServiceToSync("foo")
},
},
expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){
waitForCreate("foo"),
waitForNothing,
waitForNothing,
},
},
{
name: "create managed apiservice without a matching request",
steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){
// adding an API service which isn't auto-managed does nothing
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
fakeWatch.Add(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// adding an API service which claims to be managed but isn't should be deleted
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
fakeWatch.Modify(&apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Labels: map[string]string{AutoRegisterManagedLabel: "true"},
}})
},
},
expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){
waitForNothing,
waitForDelete("foo"),
},
},
{
name: "modifying it should result in stomping",
steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){
// adding one to auto-register should create
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
},
// updating a managed APIService should result in stomping it
func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) {
fakeWatch.Modify(&apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Labels: map[string]string{AutoRegisterManagedLabel: "true"},
},
Spec: apiregistration.APIServiceSpec{
Group: "something",
},
})
},
},
expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){
waitForCreate("foo"),
waitForUpdate("foo"),
},
},
}
NextTest:
for _, test := range tests {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
fakeWatch := watch.NewFake()
client.PrependWatchReactor("apiservices", core.DefaultWatchReactor(fakeWatch, nil))
c := NewAutoRegisterController(informerFactory.Apiregistration().InternalVersion().APIServices(), client.Apiregistration())
stopCh := make(chan struct{})
go informerFactory.Start(stopCh)
go c.Run(3, stopCh)
// wait for the initial sync to complete
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
return c.apiServiceSynced(), nil
})
if err != nil {
t.Errorf("%s %v", test.name, err)
close(stopCh)
continue NextTest
}
for i, step := range test.steps {
client.ClearActions()
step(c, fakeWatch, client)
startTime := time.Now()
err := wait.PollImmediate(10*time.Millisecond, 20*time.Second, func() (bool, error) {
return test.expectedResults[i](startTime, client)
})
if err != nil {
t.Errorf("%s[%d] %v", test.name, i, err)
close(stopCh)
continue NextTest
}
// make sure that any create/update/delete is propagated to the watch
for _, a := range client.Actions() {
switch action := a.(type) {
case core.CreateAction:
fakeWatch.Add(action.GetObject())
metadata, err := meta.Accessor(action.GetObject())
if err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if _, err := c.apiServiceLister.Get(metadata.GetName()); err == nil {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("%s[%d] %v", test.name, i, err)
close(stopCh)
continue NextTest
}
case core.DeleteAction:
obj, err := c.apiServiceLister.Get(action.GetName())
if apierrors.IsNotFound(err) {
close(stopCh)
continue NextTest
}
if err != nil {
t.Fatal(err)
}
fakeWatch.Delete(obj)
err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if _, err := c.apiServiceLister.Get(action.GetName()); apierrors.IsNotFound(err) {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("%s[%d] %v", test.name, i, err)
close(stopCh)
continue NextTest
}
case core.UpdateAction:
fakeWatch.Modify(action.GetObject())
metadata, err := meta.Accessor(action.GetObject())
if err != nil {
t.Fatal(err)
}
err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
obj, err := c.apiServiceLister.Get(metadata.GetName())
if err != nil {
return false, err
}
if reflect.DeepEqual(obj, action.GetObject()) {
return true, nil
}
return false, nil
})
if err != nil {
t.Errorf("%s[%d] %v", test.name, i, err)
close(stopCh)
continue NextTest
}
}
}
}
close(stopCh)
}
}

38
vendor/BUILD vendored
View File

@ -16321,3 +16321,41 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "k8s.io/kube-aggregator/pkg/controllers/autoregister_test",
srcs = ["k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go"],
library = ":k8s.io/kube-aggregator/pkg/controllers/autoregister",
tags = ["automanaged"],
deps = [
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake",
"//vendor:k8s.io/kube-aggregator/pkg/client/informers/internalversion",
],
)
go_library(
name = "k8s.io/kube-aggregator/pkg/controllers/autoregister",
srcs = ["k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go"],
tags = ["automanaged"],
deps = [
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/conversion",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
"//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion",
"//vendor:k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion",
"//vendor:k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion",
],
)