start using apiservice status in controllers and serving
This commit is contained in:
@@ -66,3 +66,37 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
|
||||
tokens := strings.SplitN(apiServiceName, ".", 2)
|
||||
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
|
||||
}
|
||||
|
||||
// SetAPIServiceCondition sets the status condition. It either overwrites the existing one or
|
||||
// creates a new one
|
||||
func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) {
|
||||
var existingCondition *APIServiceCondition
|
||||
for i := range apiService.Status.Conditions {
|
||||
if apiService.Status.Conditions[i].Type == newCondition.Type {
|
||||
existingCondition = &apiService.Status.Conditions[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if existingCondition == nil {
|
||||
apiService.Status.Conditions = append(apiService.Status.Conditions, newCondition)
|
||||
return
|
||||
}
|
||||
|
||||
if existingCondition.Status != newCondition.Status {
|
||||
existingCondition.Status = newCondition.Status
|
||||
existingCondition.LastTransitionTime = newCondition.LastTransitionTime
|
||||
}
|
||||
|
||||
existingCondition.Reason = newCondition.Reason
|
||||
existingCondition.Message = newCondition.Message
|
||||
}
|
||||
|
||||
// IsAPIServiceConditionTrue indicates if the condition is present and strictly true
|
||||
func IsAPIServiceConditionTrue(apiService *APIService, conditionType APIServiceConditionType) bool {
|
||||
for _, condition := range apiService.Status.Conditions {
|
||||
if condition.Type == conditionType && condition.Status == ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@@ -18,7 +18,6 @@ package validation
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/validation"
|
||||
"k8s.io/apimachinery/pkg/api/validation/path"
|
||||
@@ -101,7 +100,8 @@ func ValidateAPIServiceStatus(status *apiregistration.APIServiceStatus, fldPath
|
||||
if condition.Status != apiregistration.ConditionTrue &&
|
||||
condition.Status != apiregistration.ConditionFalse &&
|
||||
condition.Status != apiregistration.ConditionUnknown {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("conditions").Index(i).Child("status"), condition.Status, []string{apiregistration.ConditionTrue, apiregistration.ConditionFalse, apiregistration.ConditionUnknown}))
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("conditions").Index(i).Child("status"), condition.Status, []string{
|
||||
string(apiregistration.ConditionTrue), string(apiregistration.ConditionFalse), string(apiregistration.ConditionUnknown)}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ func ValidateAPIServiceStatus(status *apiregistration.APIServiceStatus, fldPath
|
||||
}
|
||||
|
||||
func ValidateAPIServiceStatusUpdate(newAPIService *apiregistration.APIService, oldAPIService *apiregistration.APIService) field.ErrorList {
|
||||
allErrs := validation.ValidateObjectMetaUpdate(&update.ObjectMeta, &old.ObjectMeta, field.NewPath("metadata"))
|
||||
allErrs = append(allErrs, ValidateAPIServiceStatus(&update.Status, field.NewPath("status"))...)
|
||||
allErrs := validation.ValidateObjectMetaUpdate(&newAPIService.ObjectMeta, &oldAPIService.ObjectMeta, field.NewPath("metadata"))
|
||||
allErrs = append(allErrs, ValidateAPIServiceStatus(&newAPIService.Status, field.NewPath("status"))...)
|
||||
return allErrs
|
||||
}
|
||||
|
@@ -81,6 +81,7 @@ go_library(
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@@ -32,7 +32,6 @@ import (
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/version"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
@@ -41,6 +40,7 @@ import (
|
||||
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
|
||||
apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd"
|
||||
)
|
||||
|
||||
@@ -102,11 +102,6 @@ type APIAggregator struct {
|
||||
// controller state
|
||||
lister listers.APIServiceLister
|
||||
|
||||
// serviceLister is used by the aggregator handler to determine whether or not to try to expose the group
|
||||
serviceLister v1listers.ServiceLister
|
||||
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
// provided for easier embedding
|
||||
APIRegistrationInformers informers.SharedInformerFactory
|
||||
}
|
||||
@@ -140,23 +135,25 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiregistrationClient, err := internalclientset.NewForConfig(c.Config.GenericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informerFactory := informers.NewSharedInformerFactory(
|
||||
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
|
||||
apiregistrationClient,
|
||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||
)
|
||||
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
|
||||
|
||||
s := &APIAggregator{
|
||||
GenericAPIServer: genericServer,
|
||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
serviceLister: kubeInformers.Core().V1().Services().Lister(),
|
||||
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
|
||||
GenericAPIServer: genericServer,
|
||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
APIRegistrationInformers: informerFactory,
|
||||
}
|
||||
|
||||
@@ -173,15 +170,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
}
|
||||
|
||||
apisHandler := &apisHandler{
|
||||
codecs: Codecs,
|
||||
lister: s.lister,
|
||||
serviceLister: s.serviceLister,
|
||||
endpointsLister: s.endpointsLister,
|
||||
codecs: Codecs,
|
||||
lister: s.lister,
|
||||
}
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", apisHandler)
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
|
||||
|
||||
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
|
||||
availableController := statuscontrollers.NewAvailableConditionController(
|
||||
informerFactory.Apiregistration().InternalVersion().APIServices(),
|
||||
kubeInformers.Core().V1().Services(),
|
||||
kubeInformers.Core().V1().Endpoints(),
|
||||
apiregistrationClient.Apiregistration(),
|
||||
)
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
informerFactory.Start(stopCh)
|
||||
@@ -192,6 +193,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
||||
go apiserviceRegistrationController.Run(stopCh)
|
||||
return nil
|
||||
})
|
||||
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||
go availableController.Run(stopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
@@ -237,12 +242,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
|
||||
// it's time to register the group aggregation endpoint
|
||||
groupPath := "/apis/" + apiService.Spec.Group
|
||||
groupDiscoveryHandler := &apiGroupHandler{
|
||||
codecs: Codecs,
|
||||
groupName: apiService.Spec.Group,
|
||||
lister: s.lister,
|
||||
serviceLister: s.serviceLister,
|
||||
endpointsLister: s.endpointsLister,
|
||||
delegate: s.delegateHandler,
|
||||
codecs: Codecs,
|
||||
groupName: apiService.Spec.Group,
|
||||
lister: s.lister,
|
||||
delegate: s.delegateHandler,
|
||||
}
|
||||
// aggregation is protected
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
|
||||
|
@@ -96,6 +96,13 @@ func (c *APIServiceRegistrationController) sync(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// remove registration handling for APIServices which are not available
|
||||
if !apiregistration.IsAPIServiceConditionTrue(apiService, apiregistration.Available) {
|
||||
c.apiHandlerManager.RemoveAPIService(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO move the destination host to status so that you can see where its going
|
||||
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
|
||||
return nil
|
||||
}
|
||||
|
@@ -25,7 +25,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
apiregistrationv1alpha1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
|
||||
@@ -37,9 +36,6 @@ import (
|
||||
type apisHandler struct {
|
||||
codecs serializer.CodecFactory
|
||||
lister listers.APIServiceLister
|
||||
|
||||
serviceLister v1listers.ServiceLister
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
}
|
||||
|
||||
var discoveryGroup = metav1.APIGroup{
|
||||
@@ -74,7 +70,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if len(apiGroupServers[0].Spec.Group) == 0 {
|
||||
continue
|
||||
}
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers, r.serviceLister, r.endpointsLister)
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
|
||||
if discoveryGroup != nil {
|
||||
discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
|
||||
}
|
||||
@@ -85,33 +81,14 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
|
||||
// if none of the services are available, it will return nil.
|
||||
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService, serviceLister v1listers.ServiceLister, endpointsLister v1listers.EndpointsLister) *metav1.APIGroup {
|
||||
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService) *metav1.APIGroup {
|
||||
apiServicesByGroup := apiregistrationapi.SortedByGroup(apiServices)[0]
|
||||
|
||||
var discoveryGroup *metav1.APIGroup
|
||||
|
||||
for _, apiService := range apiServicesByGroup {
|
||||
if apiService.Spec.Service != nil {
|
||||
// skip any API services without actual services
|
||||
if _, err := serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hasActiveEndpoints := false
|
||||
endpoints, err := endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
// skip any API services without endpoints
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, subset := range endpoints.Subsets {
|
||||
if len(subset.Addresses) > 0 {
|
||||
hasActiveEndpoints = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasActiveEndpoints {
|
||||
continue
|
||||
}
|
||||
if !apiregistrationapi.IsAPIServiceConditionTrue(apiService, apiregistrationapi.Available) {
|
||||
continue
|
||||
}
|
||||
|
||||
// the first APIService which is valid becomes the default
|
||||
@@ -143,9 +120,6 @@ type apiGroupHandler struct {
|
||||
|
||||
lister listers.APIServiceLister
|
||||
|
||||
serviceLister v1listers.ServiceLister
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
delegate http.Handler
|
||||
}
|
||||
|
||||
@@ -172,7 +146,7 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup, r.serviceLister, r.endpointsLister)
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup)
|
||||
if discoveryGroup == nil {
|
||||
http.Error(w, "", http.StatusNotFound)
|
||||
return
|
||||
|
@@ -27,8 +27,6 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
corev1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
@@ -65,6 +63,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 10,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
@@ -77,6 +80,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroupList{
|
||||
@@ -126,6 +134,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 20,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
|
||||
@@ -138,6 +151,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v2",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
|
||||
@@ -150,6 +168,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v2",
|
||||
Priority: 1,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
@@ -162,6 +185,11 @@ func TestAPIs(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroupList{
|
||||
@@ -209,25 +237,13 @@ func TestAPIs(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
handler := &apisHandler{
|
||||
codecs: Codecs,
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
}
|
||||
for _, o := range tc.apiservices {
|
||||
indexer.Add(o)
|
||||
}
|
||||
serviceIndexer.Add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"}})
|
||||
endpointsIndexer.Add(&corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"},
|
||||
Subsets: []corev1.EndpointSubset{
|
||||
{Addresses: []corev1.EndpointAddress{{}}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
@@ -319,6 +335,11 @@ func TestAPIGroup(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 20,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
|
||||
@@ -331,6 +352,11 @@ func TestAPIGroup(t *testing.T) {
|
||||
Version: "v2",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
|
||||
@@ -343,6 +369,11 @@ func TestAPIGroup(t *testing.T) {
|
||||
Version: "v2",
|
||||
Priority: 1,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
@@ -355,6 +386,11 @@ func TestAPIGroup(t *testing.T) {
|
||||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroup{
|
||||
@@ -380,26 +416,14 @@ func TestAPIGroup(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
handler := &apiGroupHandler{
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
groupName: "foo",
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
groupName: "foo",
|
||||
}
|
||||
for _, o := range tc.apiservices {
|
||||
indexer.Add(o)
|
||||
}
|
||||
serviceIndexer.Add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"}})
|
||||
endpointsIndexer.Add(&corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"},
|
||||
Subsets: []corev1.EndpointSubset{
|
||||
{Addresses: []corev1.EndpointAddress{{}}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
@@ -0,0 +1,53 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["available_controller.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["available_controller_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
],
|
||||
)
|
@@ -0,0 +1,364 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
v1informers "k8s.io/client-go/informers/core/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
"k8s.io/kube-aggregator/pkg/controllers"
|
||||
)
|
||||
|
||||
var cloner = conversion.NewCloner()
|
||||
|
||||
type AvailableConditionController struct {
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter
|
||||
|
||||
apiServiceLister listers.APIServiceLister
|
||||
apiServiceSynced cache.InformerSynced
|
||||
|
||||
// serviceLister is used to get the IP to create the transport for
|
||||
serviceLister v1listers.ServiceLister
|
||||
servicesSynced cache.InformerSynced
|
||||
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
endpointsSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func NewAvailableConditionController(
|
||||
apiServiceInformer informers.APIServiceInformer,
|
||||
serviceInformer v1informers.ServiceInformer,
|
||||
endpointsInformer v1informers.EndpointsInformer,
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter,
|
||||
) *AvailableConditionController {
|
||||
c := &AvailableConditionController{
|
||||
apiServiceClient: apiServiceClient,
|
||||
apiServiceLister: apiServiceInformer.Lister(),
|
||||
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
|
||||
serviceLister: serviceInformer.Lister(),
|
||||
servicesSynced: serviceInformer.Informer().HasSynced,
|
||||
endpointsLister: endpointsInformer.Lister(),
|
||||
endpointsSynced: endpointsInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"),
|
||||
}
|
||||
|
||||
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
})
|
||||
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addService,
|
||||
UpdateFunc: c.updateService,
|
||||
DeleteFunc: c.deleteService,
|
||||
})
|
||||
|
||||
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addEndpoints,
|
||||
UpdateFunc: c.updateEndpoints,
|
||||
DeleteFunc: c.deleteEndpoints,
|
||||
})
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) sync(key string) error {
|
||||
inAPIService, err := c.apiServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
apiService := &apiregistration.APIService{}
|
||||
if err := apiregistration.DeepCopy_apiregistration_APIService(inAPIService, apiService, cloner); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
availableCondition := apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
|
||||
// local API services are always considered available
|
||||
if apiService.Spec.Service == nil {
|
||||
availableCondition.Status = apiregistration.ConditionTrue
|
||||
availableCondition.Reason = "Local"
|
||||
availableCondition.Message = "Local APIServices are always available"
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "ServiceNotFound"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
} else if err != nil {
|
||||
availableCondition.Status = apiregistration.ConditionUnknown
|
||||
availableCondition.Reason = "ServiceAccessError"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
if service.Spec.Type == v1.ServiceTypeClusterIP {
|
||||
endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "EndpointsNotFound"
|
||||
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
} else if err != nil {
|
||||
availableCondition.Status = apiregistration.ConditionUnknown
|
||||
availableCondition.Reason = "EndpointsAccessError"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
hasActiveEndpoints := false
|
||||
for _, subset := range endpoints.Subsets {
|
||||
if len(subset.Addresses) > 0 {
|
||||
hasActiveEndpoints = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasActiveEndpoints {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "MissingEndpoints"
|
||||
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO actually try to hit the discovery endpoint
|
||||
|
||||
availableCondition.Reason = "Passed"
|
||||
availableCondition.Message = "all checks passed"
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err = c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting AvailableConditionController")
|
||||
defer glog.Infof("Shutting down AvailableConditionController")
|
||||
|
||||
if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||
// aren't threadsafe
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *AvailableConditionController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) enqueue(obj *apiregistration.APIService) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) addAPIService(obj interface{}) {
|
||||
castObj := obj.(*apiregistration.APIService)
|
||||
glog.V(4).Infof("Adding %s", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateAPIService(obj, _ interface{}) {
|
||||
castObj := obj.(*apiregistration.APIService)
|
||||
glog.V(4).Infof("Updating %s", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
|
||||
castObj, ok := obj.(*apiregistration.APIService)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*apiregistration.APIService)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Deleting %q", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
// there aren't very many apiservices, just check them all.
|
||||
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []*apiregistration.APIService {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var ret []*apiregistration.APIService
|
||||
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
|
||||
for _, apiService := range apiServiceList {
|
||||
if apiService.Spec.Service == nil {
|
||||
continue
|
||||
}
|
||||
if apiService.Spec.Service.Namespace == metadata.GetNamespace() && apiService.Spec.Service.Name == metadata.GetName() {
|
||||
ret = append(ret, apiService)
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// TODO, think of a way to avoid checking on every service manipulation
|
||||
|
||||
func (c *AvailableConditionController) addService(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteService(obj interface{}) {
|
||||
castObj, ok := obj.(*v1.Service)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v1.Service)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) addEndpoints(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
|
||||
castObj, ok := obj.(*v1.Endpoints)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v1.Endpoints)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
@@ -0,0 +1,206 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
)
|
||||
|
||||
func newEndpoints(namespace, name string) *v1.Endpoints {
|
||||
return &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
}
|
||||
}
|
||||
|
||||
func newEndpointsWithAddress(namespace, name string) *v1.Endpoints {
|
||||
return &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
Subsets: []v1.EndpointSubset{
|
||||
{
|
||||
Addresses: []v1.EndpointAddress{
|
||||
{
|
||||
IP: "val",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newService(namespace, name string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newLocalAPIService(name string) *apiregistration.APIService {
|
||||
return &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
}
|
||||
}
|
||||
|
||||
func newRemoteAPIService(name string) *apiregistration.APIService {
|
||||
return &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "foo",
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
apiServiceName string
|
||||
apiServices []*apiregistration.APIService
|
||||
services []*v1.Service
|
||||
endpoints []*v1.Endpoints
|
||||
expectedAvailability apiregistration.APIServiceCondition
|
||||
}{
|
||||
{
|
||||
name: "local",
|
||||
apiServiceName: "local.group",
|
||||
apiServices: []*apiregistration.APIService{newLocalAPIService("local.group")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
Reason: "Local",
|
||||
Message: "Local APIServices are always available",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no service",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "not-bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "ServiceNotFound",
|
||||
Message: `service/bar in "foo" is not present`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no endpoints",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "EndpointsNotFound",
|
||||
Message: `cannot find endpoints for service/bar in "foo"`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing endpoints",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
endpoints: []*v1.Endpoints{newEndpoints("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "MissingEndpoints",
|
||||
Message: `endpoints for service/bar in "foo" have no addresses`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "remote",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
Reason: "Passed",
|
||||
Message: `all checks passed`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
for _, obj := range tc.apiServices {
|
||||
apiServiceIndexer.Add(obj)
|
||||
}
|
||||
for _, obj := range tc.services {
|
||||
serviceIndexer.Add(obj)
|
||||
}
|
||||
for _, obj := range tc.endpoints {
|
||||
endpointsIndexer.Add(obj)
|
||||
}
|
||||
|
||||
c := AvailableConditionController{
|
||||
apiServiceClient: fakeClient.Apiregistration(),
|
||||
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
}
|
||||
c.sync(tc.apiServiceName)
|
||||
|
||||
// ought to have one action writing status
|
||||
if e, a := 1, len(fakeClient.Actions()); e != a {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
|
||||
continue
|
||||
}
|
||||
|
||||
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
|
||||
if !ok {
|
||||
t.Errorf("%v got %v", tc.name, ok)
|
||||
continue
|
||||
}
|
||||
|
||||
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, e, action.GetObject())
|
||||
continue
|
||||
}
|
||||
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
|
||||
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Status, condition.Status; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Message, condition.Message; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
}
|
||||
}
|
@@ -13,8 +13,10 @@ go_library(
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice:go_default_library",
|
||||
],
|
||||
|
@@ -18,6 +18,7 @@ package etcd
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
@@ -55,11 +56,11 @@ func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
|
||||
// NewStatusREST makes a RESTStorage for status that has more limited options.
|
||||
// It is based on the original REST so that we can share the same underlying store
|
||||
func NewStatusREST(scheme *runtime.Scheme, rest *REST) *StatusREST {
|
||||
statusStore := *rest.store
|
||||
statusStore := *rest.Store
|
||||
statusStore.CreateStrategy = nil
|
||||
statusStore.DeleteStrategy = nil
|
||||
statusStore.UpdateStrategy = apiservice.NewStatusStrategy(scheme)
|
||||
return &REST{store: statusStore}
|
||||
return &StatusREST{store: &statusStore}
|
||||
}
|
||||
|
||||
type StatusREST struct {
|
||||
@@ -69,7 +70,7 @@ type StatusREST struct {
|
||||
var _ = rest.Updater(&StatusREST{})
|
||||
|
||||
func (r *StatusREST) New() runtime.Object {
|
||||
return &extensions.Deployment{}
|
||||
return &apiregistration.APIService{}
|
||||
}
|
||||
|
||||
// Update alters the status subset of an object.
|
||||
|
@@ -46,7 +46,7 @@ func (apiServerStrategy) NamespaceScoped() bool {
|
||||
}
|
||||
|
||||
func (apiServerStrategy) PrepareForCreate(ctx genericapirequest.Context, obj runtime.Object) {
|
||||
apiservice = obj.(*apiregistration.APIService)
|
||||
apiservice := obj.(*apiregistration.APIService)
|
||||
apiservice.Status = apiregistration.APIServiceStatus{}
|
||||
}
|
||||
|
||||
@@ -81,14 +81,14 @@ type apiServerStatusStrategy struct {
|
||||
}
|
||||
|
||||
func NewStatusStrategy(typer runtime.ObjectTyper) apiServerStatusStrategy {
|
||||
return apiServerStrategy{typer, names.SimpleNameGenerator}
|
||||
return apiServerStatusStrategy{typer, names.SimpleNameGenerator}
|
||||
}
|
||||
|
||||
func (apiServerStrategy) NamespaceScoped() bool {
|
||||
func (apiServerStatusStrategy) NamespaceScoped() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) {
|
||||
func (apiServerStatusStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) {
|
||||
newAPIService := obj.(*apiregistration.APIService)
|
||||
oldAPIService := old.(*apiregistration.APIService)
|
||||
newAPIService.Spec = oldAPIService.Spec
|
||||
@@ -98,18 +98,18 @@ func (apiServerStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, ol
|
||||
newAPIService.OwnerReferences = oldAPIService.OwnerReferences
|
||||
}
|
||||
|
||||
func (apiServerStrategy) AllowCreateOnUpdate() bool {
|
||||
func (apiServerStatusStrategy) AllowCreateOnUpdate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStrategy) AllowUnconditionalUpdate() bool {
|
||||
func (apiServerStatusStrategy) AllowUnconditionalUpdate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStrategy) Canonicalize(obj runtime.Object) {
|
||||
func (apiServerStatusStrategy) Canonicalize(obj runtime.Object) {
|
||||
}
|
||||
|
||||
func (apiServerStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList {
|
||||
func (apiServerStatusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList {
|
||||
return validation.ValidateAPIServiceStatusUpdate(obj.(*apiregistration.APIService), old.(*apiregistration.APIService))
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user