Merge pull request #113171 from Jefftree/aggregated-discovery-generic

Aggregated discovery server changes
This commit is contained in:
Kubernetes Prow Robot 2022-11-09 00:08:12 -08:00 committed by GitHub
commit 8058e8eff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 3913 additions and 40 deletions

View File

@ -134,6 +134,13 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
autoRegistrationController)
// Imbue all builtin group-priorities onto the aggregated discovery
if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
for gv, entry := range apiVersionPriorities {
aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupPriority(gv.Group, int(entry.group))
}
}
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
@ -478,6 +479,9 @@ func buildGenericConfig(
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager()
}
return
}

View File

@ -213,7 +213,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())

View File

@ -23,6 +23,7 @@ import (
"k8s.io/klog/v2"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
autoscaling "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -43,6 +45,7 @@ import (
type DiscoveryController struct {
versionHandler *versionDiscoveryHandler
groupHandler *groupDiscoveryHandler
resourceManager discoveryendpoint.ResourceManager
crdLister listers.CustomResourceDefinitionLister
crdsSynced cache.InformerSynced
@ -53,10 +56,16 @@ type DiscoveryController struct {
queue workqueue.RateLimitingInterface
}
func NewDiscoveryController(crdInformer informers.CustomResourceDefinitionInformer, versionHandler *versionDiscoveryHandler, groupHandler *groupDiscoveryHandler) *DiscoveryController {
func NewDiscoveryController(
crdInformer informers.CustomResourceDefinitionInformer,
versionHandler *versionDiscoveryHandler,
groupHandler *groupDiscoveryHandler,
resourceManager discoveryendpoint.ResourceManager,
) *DiscoveryController {
c := &DiscoveryController{
versionHandler: versionHandler,
groupHandler: groupHandler,
resourceManager: resourceManager,
crdLister: crdInformer.Lister(),
crdsSynced: crdInformer.Informer().HasSynced,
@ -78,6 +87,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
apiResourcesForDiscovery := []metav1.APIResource{}
aggregatedApiResourcesForDiscovery := []apidiscoveryv2beta1.APIResourceDiscovery{}
versionsForDiscoveryMap := map[metav1.GroupVersion]bool{}
crds, err := c.crdLister.List(labels.Everything())
@ -146,6 +156,53 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if err != nil {
return err
}
if c.resourceManager != nil {
var scope apidiscoveryv2beta1.ResourceScope
if crd.Spec.Scope == apiextensionsv1.NamespaceScoped {
scope = apidiscoveryv2beta1.ScopeNamespace
} else {
scope = apidiscoveryv2beta1.ScopeCluster
}
apiResourceDiscovery := apidiscoveryv2beta1.APIResourceDiscovery{
Resource: crd.Status.AcceptedNames.Plural,
SingularResource: crd.Status.AcceptedNames.Singular,
Scope: scope,
ResponseKind: &metav1.GroupVersionKind{
Group: version.Group,
Version: version.Version,
Kind: crd.Status.AcceptedNames.Kind,
},
Verbs: verbs,
ShortNames: crd.Status.AcceptedNames.ShortNames,
Categories: crd.Status.AcceptedNames.Categories,
}
if subresources != nil && subresources.Status != nil {
apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2beta1.APISubresourceDiscovery{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: version.Group,
Version: version.Version,
Kind: crd.Status.AcceptedNames.Kind,
},
Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
})
}
if subresources != nil && subresources.Scale != nil {
apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2beta1.APISubresourceDiscovery{
Subresource: "scale",
ResponseKind: &metav1.GroupVersionKind{
Group: autoscaling.GroupName,
Version: "v1",
Kind: "Scale",
},
Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
})
}
aggregatedApiResourcesForDiscovery = append(aggregatedApiResourcesForDiscovery, apiResourceDiscovery)
}
if subresources != nil && subresources.Status != nil {
apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
Name: crd.Status.AcceptedNames.Plural + "/status",
@ -170,6 +227,10 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if !foundGroup {
c.groupHandler.unsetDiscovery(version.Group)
c.versionHandler.unsetDiscovery(version)
if c.resourceManager != nil {
c.resourceManager.RemoveGroup(version.Group)
}
return nil
}
@ -186,12 +247,30 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if !foundVersion {
c.versionHandler.unsetDiscovery(version)
if c.resourceManager != nil {
c.resourceManager.RemoveGroupVersion(metav1.GroupVersion{
Group: version.Group,
Version: version.Version,
})
}
return nil
}
c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
return apiResourcesForDiscovery
})))
sort.Slice(aggregatedApiResourcesForDiscovery[:], func(i, j int) bool {
return aggregatedApiResourcesForDiscovery[i].Resource < aggregatedApiResourcesForDiscovery[j].Resource
})
if c.resourceManager != nil {
c.resourceManager.AddGroupVersion(version.Group, apidiscoveryv2beta1.APIVersionDiscovery{
Version: version.Version,
Resources: aggregatedApiResourcesForDiscovery,
})
// Default priority for CRDs
c.resourceManager.SetGroupPriority(version.Group, 1000)
}
return nil
}

View File

@ -0,0 +1,408 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
)
var coolFooCRD = &v1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
Kind: "CustomResourceDefinition",
},
ObjectMeta: metav1.ObjectMeta{
Name: "coolfoo.stable.example.com",
},
Spec: v1.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Names: v1.CustomResourceDefinitionNames{
Plural: "coolfoos",
Singular: "coolfoo",
ShortNames: []string{"foo"},
Kind: "CoolFoo",
ListKind: "CoolFooList",
Categories: []string{"cool"},
},
Scope: v1.ClusterScoped,
Versions: []v1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Deprecated: false,
Subresources: &v1.CustomResourceSubresources{
// This CRD has a /status subresource
Status: &v1.CustomResourceSubresourceStatus{},
},
Schema: &v1.CustomResourceValidation{
// Unused by discovery
OpenAPIV3Schema: &v1.JSONSchemaProps{},
},
},
},
Conversion: &v1.CustomResourceConversion{},
PreserveUnknownFields: false,
},
Status: v1.CustomResourceDefinitionStatus{
Conditions: []v1.CustomResourceDefinitionCondition{
{
Type: v1.Established,
Status: v1.ConditionTrue,
},
},
},
}
var coolBarCRD = &v1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
Kind: "CustomResourceDefinition",
},
ObjectMeta: metav1.ObjectMeta{
Name: "coolbar.stable.example.com",
},
Spec: v1.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Names: v1.CustomResourceDefinitionNames{
Plural: "coolbars",
Singular: "coolbar",
ShortNames: []string{"bar"},
Kind: "CoolBar",
ListKind: "CoolBarList",
Categories: []string{"cool"},
},
Scope: v1.ClusterScoped,
Versions: []v1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Deprecated: false,
Schema: &v1.CustomResourceValidation{
// Unused by discovery
OpenAPIV3Schema: &v1.JSONSchemaProps{},
},
},
},
Conversion: &v1.CustomResourceConversion{},
PreserveUnknownFields: false,
},
Status: v1.CustomResourceDefinitionStatus{
Conditions: []v1.CustomResourceDefinitionCondition{
{
Type: v1.Established,
Status: v1.ConditionTrue,
},
},
},
}
var coolFooDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
Resources: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "coolfoos",
Scope: apidiscoveryv2beta1.ScopeCluster,
SingularResource: "coolfoo",
Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"},
ShortNames: []string{"foo"},
Categories: []string{"cool"},
ResponseKind: &metav1.GroupVersionKind{
Group: "stable.example.com",
Version: "v1",
Kind: "CoolFoo",
},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{
{
Subresource: "status",
Verbs: []string{"get", "patch", "update"},
AcceptedTypes: nil, // is this correct?
ResponseKind: &metav1.GroupVersionKind{
Group: "stable.example.com",
Version: "v1",
Kind: "CoolFoo",
},
},
},
},
},
}
var mergedDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
Resources: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "coolbars",
Scope: apidiscoveryv2beta1.ScopeCluster,
SingularResource: "coolbar",
Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"},
ShortNames: []string{"bar"},
Categories: []string{"cool"},
ResponseKind: &metav1.GroupVersionKind{
Group: "stable.example.com",
Version: "v1",
Kind: "CoolBar",
},
}, {
Resource: "coolfoos",
Scope: apidiscoveryv2beta1.ScopeCluster,
SingularResource: "coolfoo",
Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"},
ShortNames: []string{"foo"},
Categories: []string{"cool"},
ResponseKind: &metav1.GroupVersionKind{
Group: "stable.example.com",
Version: "v1",
Kind: "CoolFoo",
},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{
{
Subresource: "status",
Verbs: []string{"get", "patch", "update"},
AcceptedTypes: nil, // is this correct?
ResponseKind: &metav1.GroupVersionKind{
Group: "stable.example.com",
Version: "v1",
Kind: "CoolFoo",
},
},
},
},
},
}
func init() {
// Not testing against an apiserver, so just assume names are accepted
coolFooCRD.Status.AcceptedNames = coolFooCRD.Spec.Names
coolBarCRD.Status.AcceptedNames = coolBarCRD.Spec.Names
}
// Provides an apiextensions-apiserver client
type testEnvironment struct {
clientset.Interface
// Discovery test details
versionDiscoveryHandler
groupDiscoveryHandler
aggregated.FakeResourceManager
}
func (env *testEnvironment) Start(ctx context.Context) {
discoverySyncedCh := make(chan struct{})
factory := externalversions.NewSharedInformerFactoryWithOptions(
env.Interface, 30*time.Second)
discoveryController := NewDiscoveryController(
factory.Apiextensions().V1().CustomResourceDefinitions(),
&env.versionDiscoveryHandler,
&env.groupDiscoveryHandler,
env.FakeResourceManager,
)
factory.Start(ctx.Done())
go discoveryController.Run(ctx.Done(), discoverySyncedCh)
select {
case <-discoverySyncedCh:
case <-ctx.Done():
}
}
func setup() *testEnvironment {
env := &testEnvironment{
Interface: fake.NewSimpleClientset(),
FakeResourceManager: aggregated.NewFakeResourceManager(),
versionDiscoveryHandler: versionDiscoveryHandler{
discovery: make(map[schema.GroupVersion]*discovery.APIVersionHandler),
},
groupDiscoveryHandler: groupDiscoveryHandler{
discovery: make(map[string]*discovery.APIGroupHandler),
},
}
return env
}
func TestResourceManagerExistingCRD(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := setup()
_, err := env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Create(
ctx,
coolFooCRD,
metav1.CreateOptions{
FieldManager: "resource-manager-test",
},
)
require.NoError(t, err)
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000)
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().
SetGroupPriority(coolFooCRD.Spec.Group, 1000)
env.Start(ctx)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
}
// Tests that if a CRD is added a runtime, the discovery controller will
// put its information in the discovery document
func TestResourceManagerAddedCRD(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := setup()
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
env.Start(ctx)
// Create CRD after the controller has already started
_, err := env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Create(
ctx,
coolFooCRD,
metav1.CreateOptions{
FieldManager: "resource-manager-test",
},
)
require.NoError(t, err)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
}
// Test that having multiple CRDs in the same version will add both
// versions to discovery.
func TestMultipleCRDSameVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := setup()
env.Start(ctx)
_, err := env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Create(
ctx,
coolFooCRD,
metav1.CreateOptions{
FieldManager: "resource-manager-test",
},
)
require.NoError(t, err)
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
_, err = env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Create(
ctx,
coolBarCRD,
metav1.CreateOptions{
FieldManager: "resource-manager-test",
},
)
require.NoError(t, err)
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, mergedDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
}
// Tests that if a CRD is deleted at runtime, the discovery controller will
// remove its information from its ResourceManager
func TestDiscoveryControllerResourceManagerRemovedCRD(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := setup()
env.Start(ctx)
// Create CRD after the controller has already started
_, err := env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Create(
ctx,
coolFooCRD,
metav1.CreateOptions{},
)
require.NoError(t, err)
// Wait for the Controller to pick up the Create event and add it to the
// Resource Manager
env.FakeResourceManager.Expect().
AddGroupVersion(coolFooCRD.Spec.Group, coolFooDiscovery)
env.FakeResourceManager.Expect().SetGroupPriority(coolFooCRD.Spec.Group, 1000)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
err = env.Interface.
ApiextensionsV1().
CustomResourceDefinitions().
Delete(ctx, coolFooCRD.Name, metav1.DeleteOptions{})
require.NoError(t, err)
// Wait for the Controller to detect there are no more CRDs of this group
// and remove the entire group
env.FakeResourceManager.Expect().RemoveGroup(coolFooCRD.Spec.Group)
err = env.FakeResourceManager.WaitForActions(ctx, 1*time.Second)
require.NoError(t, err)
}

View File

@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = grouplessGroupVersion
group.OptionsExternalVersion = &grouplessGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = testGroupVersion
group.OptionsExternalVersion = &testGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = newGroupVersion
group.OptionsExternalVersion = &newGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -3311,7 +3311,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container := restful.NewContainer()
if _, err := group.InstallREST(container); err == nil {
if _, _, err := group.InstallREST(container); err == nil {
t.Fatal("expected error")
}
@ -3343,7 +3343,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container = restful.NewContainer()
if _, err := group.InstallREST(container); err != nil {
if _, _, err := group.InstallREST(container); err != nil {
t.Fatal(err)
}
@ -3821,7 +3821,7 @@ func (obj *UnregisteredAPIObject) DeepCopyObject() runtime.Object {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, negotiation.DefaultEndpointRestrictions, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(codecs, negotiation.DefaultEndpointRestrictions, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"}, false)
}))
defer server.Close()
// Decode error response behavior is dictated by
@ -4328,7 +4328,7 @@ func TestXGSubresource(t *testing.T) {
Serializer: codecs,
}
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"crypto/sha512"
"encoding/json"
"fmt"
"net/http"
"strconv"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// This file exposes helper functions used for calculating the E-Tag header
// used in discovery endpoint responses
// Attaches Cache-Busting functionality to an endpoint
// - Sets ETag header to provided hash
// - Replies with 304 Not Modified, if If-None-Match header matches hash
//
// hash should be the value of calculateETag on object. If hash is empty, then
//
// the object is simply serialized without E-Tag functionality
func ServeHTTPWithETag(
object runtime.Object,
hash string,
serializer runtime.NegotiatedSerializer,
w http.ResponseWriter,
req *http.Request,
) {
// ETag must be enclosed in double quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
quotedHash := strconv.Quote(hash)
w.Header().Set("ETag", quotedHash)
w.Header().Set("Vary", "Accept")
w.Header().Set("Cache-Control", "public")
// If Request includes If-None-Match and matches hash, reply with 304
// Otherwise, we delegate to the handler for actual content
//
// According to documentation, An Etag within an If-None-Match
// header will be enclosed within doule quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives
if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash {
w.WriteHeader(http.StatusNotModified)
return
}
responsewriters.WriteObjectNegotiated(
serializer,
DiscoveryEndpointRestrictions,
AggregatedDiscoveryGV,
w,
req,
http.StatusOK,
object,
true,
)
}
func calculateETag(resources interface{}) (string, error) {
serialized, err := json.Marshal(resources)
if err != nil {
return "", err
}
return fmt.Sprintf("%X", sha512.Sum512(serialized)), nil
}

View File

@ -0,0 +1,170 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"context"
"errors"
"net/http"
"reflect"
"sync"
"time"
"github.com/emicklei/go-restful/v3"
"github.com/google/go-cmp/cmp"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
type FakeResourceManager interface {
ResourceManager
Expect() ResourceManager
HasExpectedNumberActions() bool
Validate() error
WaitForActions(ctx context.Context, timeout time.Duration) error
}
func NewFakeResourceManager() FakeResourceManager {
return &fakeResourceManager{}
}
// a resource manager with helper functions for checking the actions
// match expected. For Use in tests
type fakeResourceManager struct {
recorderResourceManager
expect recorderResourceManager
}
// a resource manager which instead of managing a discovery document,
// simply records the calls to its interface functoins for testing
type recorderResourceManager struct {
lock sync.RWMutex
Actions []recorderResourceManagerAction
}
var _ ResourceManager = &fakeResourceManager{}
var _ ResourceManager = &recorderResourceManager{}
// Storage type for a call to the resource manager
type recorderResourceManagerAction struct {
Type string
Group string
Version string
Value interface{}
}
func (f *fakeResourceManager) Expect() ResourceManager {
return &f.expect
}
func (f *fakeResourceManager) HasExpectedNumberActions() bool {
f.lock.RLock()
defer f.lock.RUnlock()
f.expect.lock.RLock()
defer f.expect.lock.RUnlock()
return len(f.Actions) >= len(f.expect.Actions)
}
func (f *fakeResourceManager) Validate() error {
f.lock.RLock()
defer f.lock.RUnlock()
f.expect.lock.RLock()
defer f.expect.lock.RUnlock()
if !reflect.DeepEqual(f.expect.Actions, f.Actions) {
return errors.New(cmp.Diff(f.expect.Actions, f.Actions))
}
return nil
}
func (f *fakeResourceManager) WaitForActions(ctx context.Context, timeout time.Duration) error {
err := wait.PollImmediateWithContext(
ctx,
100*time.Millisecond, // try every 100ms
timeout, // timeout after timeout
func(ctx context.Context) (done bool, err error) {
if f.HasExpectedNumberActions() {
return true, f.Validate()
}
return false, nil
})
return err
}
func (f *recorderResourceManager) SetGroupPriority(groupName string, priority int) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroupPriority",
Group: groupName,
Value: priority,
})
}
func (f *recorderResourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "AddGroupVersion",
Group: groupName,
Value: value,
})
}
func (f *recorderResourceManager) RemoveGroup(groupName string) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "RemoveGroup",
Group: groupName,
})
}
func (f *recorderResourceManager) RemoveGroupVersion(gv metav1.GroupVersion) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "RemoveGroupVersion",
Group: gv.Group,
Version: gv.Version,
})
}
func (f *recorderResourceManager) SetGroups(values []apidiscoveryv2beta1.APIGroupDiscovery) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroups",
Value: values,
})
}
func (f *recorderResourceManager) WebService() *restful.WebService {
panic("unimplemented")
}
func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) {
panic("unimplemented")
}

View File

@ -0,0 +1,302 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
"reflect"
"sort"
"sync"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"sync/atomic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
// This handler serves the /apis endpoint for an aggregated list of
// api resources indexed by their group version.
type ResourceManager interface {
// Adds knowledge of the given groupversion to the discovery document
// If it was already being tracked, updates the stored APIVersionDiscovery
// Thread-safe
AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery)
// Sets priority for a group for sorting discovery.
// If a priority is set before the group is known, the priority will be ignored
// Once a group is removed, the priority is forgotten.
SetGroupPriority(groupName string, priority int)
// Removes all group versions for a given group
// Thread-safe
RemoveGroup(groupName string)
// Removes a specific groupversion. If all versions of a group have been
// removed, then the entire group is unlisted.
// Thread-safe
RemoveGroupVersion(gv metav1.GroupVersion)
// Resets the manager's known list of group-versions and replaces them
// with the given groups
// Thread-Safe
SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery)
http.Handler
}
type resourceDiscoveryManager struct {
serializer runtime.NegotiatedSerializer
// cache is an atomic pointer to avoid the use of locks
cache atomic.Pointer[cachedGroupList]
// Writes protected by the lock.
// List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex
apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery
apiGroupNames map[string]int
}
func NewResourceManager() ResourceManager {
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, apiGroupNames: make(map[string]int)}
}
func (rdm *resourceDiscoveryManager) SetGroupPriority(group string, priority int) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
if _, exists := rdm.apiGroupNames[group]; exists {
rdm.apiGroupNames[group] = priority
rdm.cache.Store(nil)
} else {
klog.Warningf("DiscoveryManager: Attempted to set priority for group %s but does not exist", group)
}
}
func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
rdm.apiGroups = nil
rdm.cache.Store(nil)
for _, group := range groups {
for _, version := range group.Versions {
rdm.addGroupVersionLocked(group.Name, version)
}
}
// Filter unused out apiGroupNames
for name := range rdm.apiGroupNames {
if _, exists := rdm.apiGroups[name]; !exists {
delete(rdm.apiGroupNames, name)
}
}
}
func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
rdm.addGroupVersionLocked(groupName, value)
}
func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version)
if rdm.apiGroups == nil {
rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery)
}
if existing, groupExists := rdm.apiGroups[groupName]; groupExists {
// If this version already exists, replace it
versionExists := false
// Not very efficient, but in practice there are generally not many versions
for i := range existing.Versions {
if existing.Versions[i].Version == value.Version {
// The new gv is the exact same as what is already in
// the map. This is a noop and cache should not be
// invalidated.
if reflect.DeepEqual(existing.Versions[i], value) {
return
}
existing.Versions[i] = value
versionExists = true
break
}
}
if !versionExists {
existing.Versions = append(existing.Versions, value)
}
} else {
group := &apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: groupName,
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value},
}
rdm.apiGroups[groupName] = group
rdm.apiGroupNames[groupName] = 0
}
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
group, exists := rdm.apiGroups[apiGroup.Group]
if !exists {
return
}
modified := false
for i := range group.Versions {
if group.Versions[i].Version == apiGroup.Version {
group.Versions = append(group.Versions[:i], group.Versions[i+1:]...)
modified = true
break
}
}
// If no modification was done, cache does not need to be cleared
if !modified {
return
}
if len(group.Versions) == 0 {
delete(rdm.apiGroups, group.Name)
delete(rdm.apiGroupNames, group.Name)
}
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
delete(rdm.apiGroups, groupName)
delete(rdm.apiGroupNames, groupName)
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
// Prepares the api group list for serving by converting them from map into
// list and sorting them according to insertion order
func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery {
// Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups {
groups = append(groups, *group.DeepCopy())
}
sort.SliceStable(groups, func(i, j int) bool {
iName := groups[i].Name
jName := groups[j].Name
// Default to 0 priority by default
iPriority := rdm.apiGroupNames[iName]
jPriority := rdm.apiGroupNames[jName]
// Sort discovery based on apiservice priority.
// Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go
if iPriority == jPriority {
// Equal priority uses name to break ties
return iName < jName
}
// i sorts before j if it has a lower priority
return iPriority > jPriority
})
return groups
}
// Fetches from cache if it exists. If cache is empty, create it.
func (rdm *resourceDiscoveryManager) fetchFromCache() *cachedGroupList {
rdm.lock.RLock()
defer rdm.lock.RUnlock()
cacheLoad := rdm.cache.Load()
if cacheLoad != nil {
return cacheLoad
}
response := apidiscoveryv2beta1.APIGroupDiscoveryList{
Items: rdm.calculateAPIGroupsLocked(),
}
etag, err := calculateETag(response)
if err != nil {
klog.Errorf("failed to calculate etag for discovery document: %s", etag)
etag = ""
}
cached := &cachedGroupList{
cachedResponse: response,
cachedResponseETag: etag,
}
rdm.cache.Store(cached)
return cached
}
type cachedGroupList struct {
cachedResponse apidiscoveryv2beta1.APIGroupDiscoveryList
cachedResponseETag string
}
func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
cache := rdm.fetchFromCache()
response := cache.cachedResponse
etag := cache.cachedResponseETag
if len(etag) > 0 {
// Use proper e-tag headers if one is available
ServeHTTPWithETag(
&response,
etag,
rdm.serializer,
resp,
req,
)
} else {
// Default to normal response in rare case etag is
// not cached with the object for some reason.
responsewriters.WriteObjectNegotiated(
rdm.serializer,
DiscoveryEndpointRestrictions,
AggregatedDiscoveryGV,
resp,
req,
http.StatusOK,
&response,
true,
)
}
}

View File

@ -0,0 +1,501 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated_test
import (
"encoding/json"
"math/rand"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"strings"
"sync"
"testing"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
)
var scheme = runtime.NewScheme()
var codecs = runtimeserializer.NewCodecFactory(scheme)
const discoveryPath = "/apis"
func init() {
// Add all builtin types to scheme
apidiscoveryv2beta1.AddToScheme(scheme)
codecs = runtimeserializer.NewCodecFactory(scheme)
}
func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList {
fuzzer := fuzz.NewWithSeed(seed)
fuzzer.NumElements(atLeastNumGroups, maxNumGroups)
fuzzer.NilChance(0)
fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) {
c.FuzzNoCustom(o)
// The ResourceManager will just not serve the group if its versions
// list is empty
atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{}
c.Fuzz(&atLeastOne)
o.Versions = append(o.Versions, atLeastOne)
o.TypeMeta = metav1.TypeMeta{}
var name string
c.Fuzz(&name)
o.ObjectMeta = metav1.ObjectMeta{
Name: name,
}
})
var apis []apidiscoveryv2beta1.APIGroupDiscovery
fuzzer.Fuzz(&apis)
sort.Slice(apis[:], func(i, j int) bool {
return apis[i].Name < apis[j].Name
})
return apidiscoveryv2beta1.APIGroupDiscoveryList{
TypeMeta: metav1.TypeMeta{
Kind: "APIGroupDiscoveryList",
APIVersion: "apidiscovery.k8s.io/v2beta1",
},
Items: apis,
}
}
func fetchPath(handler http.Handler, acceptPrefix string, path string, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) {
// Expect json-formatted apis group list
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", discoveryPath, nil)
// Ask for JSON response
req.Header.Set("Accept", acceptPrefix+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
if etag != "" {
// Quote provided etag if unquoted
quoted := etag
if !strings.HasPrefix(etag, "\"") {
quoted = strconv.Quote(etag)
}
req.Header.Set("If-None-Match", quoted)
}
handler.ServeHTTP(w, req)
bytes := w.Body.Bytes()
var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList
if len(bytes) > 0 {
decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{}
runtime.DecodeInto(codecs.UniversalDecoder(), bytes, decoded)
}
return w.Result(), bytes, decoded
}
// Add all builtin APIServices to the manager and check the output
func TestBasicResponse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
response, body, decoded := fetchPath(manager, "application/json", discoveryPath, "")
jsonFormatted, err := json.Marshal(&apis)
require.NoError(t, err, "json marshal should always succeed")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported")
assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set")
assert.NoError(t, err, "decode should always succeed")
assert.EqualValues(t, &apis, decoded, "decoded value should equal input")
assert.Equal(t, string(jsonFormatted)+"\n", string(body), "response should be the api group list")
}
// Test that protobuf is outputted correctly
func TestBasicResponseProtobuf(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
response, _, decoded := fetchPath(manager, "application/vnd.kubernetes.protobuf", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, "application/vnd.kubernetes.protobuf;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported")
assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set")
assert.EqualValues(t, &apis, decoded, "decoded value should equal input")
}
// Test that an etag associated with the service only depends on the apiresources
// e.g.: Multiple services with the same contents should have the same etag.
func TestEtagConsistent(t *testing.T) {
// Create 2 managers, add a bunch of services to each
manager1 := discoveryendpoint.NewResourceManager()
manager2 := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 11)
manager1.SetGroups(apis.Items)
manager2.SetGroups(apis.Items)
// Make sure etag of each is the same
res1_initial, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_initial, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_initial.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_initial.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_initial.Header.Get("ETag"), res2_initial.Header.Get("ETag"), "etag should be deterministic")
// Then add one service to only one.
// Make sure etag is changed, but other is the same
apis = fuzzAPIGroups(1, 1, 11)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager1.AddGroupVersion(group.Name, version)
}
}
res1_addedToOne, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_addedToOne, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEqual(t, res1_initial.Header.Get("ETag"), res1_addedToOne.Header.Get("ETag"), "ETag should be changed since version was added")
assert.Equal(t, res2_initial.Header.Get("ETag"), res2_addedToOne.Header.Get("ETag"), "ETag should be unchanged since data was unchanged")
// Then add service to other one
// Make sure etag is the same
for _, group := range apis.Items {
for _, version := range group.Versions {
manager2.AddGroupVersion(group.Name, version)
}
}
res1_addedToBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_addedToBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_addedToBoth.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETags should be equal since content is equal")
assert.NotEqual(t, res2_initial.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETag should be changed since data was changed")
// Remove the group version from both. Initial E-Tag should be restored
for _, group := range apis.Items {
for _, version := range group.Versions {
manager1.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
manager2.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
res1_removeFromBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_removeFromBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_removeFromBoth.Header.Get("ETag"), res2_removeFromBoth.Header.Get("ETag"), "ETags should be equal since content is equal")
assert.Equal(t, res1_initial.Header.Get("ETag"), res1_removeFromBoth.Header.Get("ETag"), "ETag should be equal to initial value since added content was removed")
}
// Test that if a request comes in with an If-None-Match header with an incorrect
// E-Tag, that fresh content is returned.
func TestEtagNonMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
// fetch the document once
initial, _, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
// Send another request with a wrong e-tag. The same response should
// get sent again
second, _, _ := fetchPath(manager, "application/json", discoveryPath, "wrongetag")
assert.Equal(t, http.StatusOK, initial.StatusCode, "response should be 200 OK")
assert.Equal(t, http.StatusOK, second.StatusCode, "response should be 200 OK")
assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal")
}
// Test that if a request comes in with an If-None-Match header with a correct
// E-Tag, that 304 Not Modified is returned
func TestEtagMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
// fetch the document once
initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
assert.NotEmpty(t, initialBody, "body should not be empty")
// Send another request with a wrong e-tag. The same response should
// get sent again
second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag"))
assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK")
assert.Equal(t, http.StatusNotModified, second.StatusCode, "second response should be 304 Not Modified")
assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal")
assert.Empty(t, secondBody, "body should be empty when returning 304 Not Modified")
}
// Test that if a request comes in with an If-None-Match header with an old
// E-Tag, that fresh content is returned
func TestEtagOutdated(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
// fetch the document once
initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
assert.NotEmpty(t, initialBody, "body should not be empty")
// Then add some services so the etag changes
apis = fuzzAPIGroups(1, 3, 14)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
// Send another request with the old e-tag. Response should not be 304 Not Modified
second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag"))
assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK")
assert.Equal(t, http.StatusOK, second.StatusCode, "second response should be 304 Not Modified")
assert.NotEqual(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be unequal since contents differ")
assert.NotEmpty(t, secondBody, "body should be not empty when returning 304 Not Modified")
}
// Test that an api service can be added or removed
func TestAddRemove(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "")
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
_, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "")
require.NotNil(t, initialDocument, "initial document should parse")
require.NotNil(t, secondDocument, "second document should parse")
assert.Len(t, initialDocument.Items, len(apis.Items), "initial document should have set number of groups")
assert.Len(t, secondDocument.Items, 0, "second document should have no groups")
}
// Show that updating an existing service replaces and does not add the entry
// and instead replaces it
func TestUpdateService(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, initialDocument, &apis, "should have returned expected document")
b, err := json.Marshal(apis)
if err != nil {
t.Error(err)
}
var newapis apidiscoveryv2beta1.APIGroupDiscoveryList
err = json.Unmarshal(b, &newapis)
if err != nil {
t.Error(err)
}
newapis.Items[0].Versions[0].Resources[0].Resource = "changed a resource name!"
for _, group := range newapis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, secondDocument, &newapis, "should have returned expected document")
assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document")
}
// Show the discovery manager is capable of serving requests to multiple users
// with unchanging data
func TestConcurrentRequests(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
waitGroup := sync.WaitGroup{}
numReaders := 100
numRequestsPerReader := 100
// Spawn a bunch of readers that will keep sending requests to the server
for i := 0; i < numReaders; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
etag := ""
for j := 0; j < numRequestsPerReader; j++ {
usedEtag := etag
if j%2 == 0 {
// Disable use of etag for every second request
usedEtag = ""
}
response, body, document := fetchPath(manager, "application/json", discoveryPath, usedEtag)
if usedEtag != "" {
assert.Equal(t, http.StatusNotModified, response.StatusCode, "response should be Not Modified if etag was used")
assert.Empty(t, body, "body should be empty if etag used")
} else {
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused")
assert.Equal(t, &apis, document, "document should be equal")
}
etag = response.Header.Get("ETag")
}
}()
}
waitGroup.Wait()
}
// Show the handler is capable of serving many concurrent readers and many
// concurrent writers without tripping up. Good to run with go '-race' detector
// since there are not many "correctness" checks
func TestAbuse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
numReaders := 100
numRequestsPerReader := 1000
numWriters := 10
numWritesPerWriter := 1000
waitGroup := sync.WaitGroup{}
// Spawn a bunch of writers that randomly add groups, remove groups, and
// reset the list of groups
for i := 0; i < numWriters; i++ {
source := rand.NewSource(int64(i))
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
// track list of groups we've added so that we can remove them
// randomly
var addedGroups []metav1.GroupVersion
for j := 0; j < numWritesPerWriter; j++ {
switch source.Int63() % 3 {
case 0:
// Add a fuzzed group
apis := fuzzAPIGroups(1, 2, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
addedGroups = append(addedGroups, metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
case 1:
// Remove a group that we have added
if len(addedGroups) > 0 {
manager.RemoveGroupVersion(addedGroups[0])
addedGroups = addedGroups[1:]
} else {
// Send a request and try to remove a group someone else
// might have added
_, _, document := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotNil(t, document, "manager should always succeed in returning a document")
if len(document.Items) > 0 {
manager.RemoveGroupVersion(metav1.GroupVersion{
Group: document.Items[0].Name,
Version: document.Items[0].Versions[0].Version,
})
}
}
case 2:
manager.SetGroups(nil)
addedGroups = nil
default:
panic("unreachable")
}
}
}()
}
// Spawn a bunch of readers that will keep sending requests to the server
// and making sure the response makes sense
for i := 0; i < numReaders; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
etag := ""
for j := 0; j < numRequestsPerReader; j++ {
response, body, document := fetchPath(manager, "application/json", discoveryPath, etag)
if response.StatusCode == http.StatusNotModified {
assert.Equal(t, etag, response.Header.Get("ETag"))
assert.Empty(t, body, "body should be empty if etag used")
assert.Nil(t, document)
} else {
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused")
assert.NotNil(t, document)
}
etag = response.Header.Get("ETag")
}
}()
}
waitGroup.Wait()
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
var AggregatedDiscoveryGV = schema.GroupVersion{Group: "apidiscovery.k8s.io", Version: "v2beta1"}
// Interface is from "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
// DiscoveryEndpointRestrictions allows requests to /apis to provide a Content Negotiation GVK for aggregated discovery.
var DiscoveryEndpointRestrictions = discoveryEndpointRestrictions{}
type discoveryEndpointRestrictions struct{}
func (discoveryEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool {
return IsAggregatedDiscoveryGVK(gvk)
}
func (discoveryEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (discoveryEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// IsAggregatedDiscoveryGVK checks if a provided GVK is the GVK for serving aggregated discovery.
func IsAggregatedDiscoveryGVK(gvk *schema.GroupVersionKind) bool {
if gvk != nil {
return gvk.Group == "apidiscovery.k8s.io" && gvk.Version == "v2beta1" && gvk.Kind == "APIGroupDiscoveryList"
}
return false
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer"
"github.com/emicklei/go-restful/v3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
type WrappedHandler struct {
s runtime.NegotiatedSerializer
handler http.Handler
aggHandler http.Handler
}
func (wrapped *WrappedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
mediaType, _ := negotiation.NegotiateMediaTypeOptions(req.Header.Get("Accept"), wrapped.s.SupportedMediaTypes(), DiscoveryEndpointRestrictions)
// mediaType.Convert looks at the request accept headers and is used to control whether the discovery document will be aggregated.
if IsAggregatedDiscoveryGVK(mediaType.Convert) {
wrapped.aggHandler.ServeHTTP(resp, req)
return
}
}
wrapped.handler.ServeHTTP(resp, req)
}
func (wrapped *WrappedHandler) restfulHandle(req *restful.Request, resp *restful.Response) {
wrapped.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (wrapped *WrappedHandler) GenerateWebService(prefix string, returnType interface{}) *restful.WebService {
mediaTypes, _ := negotiation.MediaTypesForSerializer(wrapped.s)
ws := new(restful.WebService)
ws.Path(prefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(wrapped.restfulHandle).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(returnType))
return ws
}
// WrapAggregatedDiscoveryToHandler wraps a handler with an option to
// emit the aggregated discovery by passing in the aggregated
// discovery type in content negotiation headers: eg: (Accept:
// application/json;v=v2beta1;g=apidiscovery.k8s.io;as=APIGroupDiscoveryList)
func WrapAggregatedDiscoveryToHandler(handler http.Handler, aggHandler http.Handler) *WrappedHandler {
scheme := runtime.NewScheme()
apidiscoveryv2beta1.AddToScheme(scheme)
codecs := serializer.NewCodecFactory(scheme)
return &WrappedHandler{codecs, handler, aggHandler}
}

View File

@ -0,0 +1,156 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
"net/http/httptest"
"io"
"testing"
"github.com/stretchr/testify/assert"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
const discoveryPath = "/apis"
const jsonAccept = "application/json"
const protobufAccept = "application/vnd.kubernetes.protobuf"
const aggregatedAcceptSuffix = ";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
const aggregatedJSONAccept = jsonAccept + aggregatedAcceptSuffix
const aggregatedProtoAccept = protobufAccept + aggregatedAcceptSuffix
func fetchPath(handler http.Handler, path, accept string) string {
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", discoveryPath, nil)
// Ask for JSON response
req.Header.Set("Accept", accept)
handler.ServeHTTP(w, req)
return string(w.Body.Bytes())
}
type fakeHTTPHandler struct {
data string
}
func (f fakeHTTPHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
io.WriteString(resp, f.data)
}
func TestAggregationEnabled(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
unaggregated := fakeHTTPHandler{data: "unaggregated"}
aggregated := fakeHTTPHandler{data: "aggregated"}
wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated)
testCases := []struct {
accept string
expected string
}{
{
// Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error
accept: "application/json;foo=bar",
expected: "unaggregated",
}, {
// Empty accept headers are valid and should be handled by the unaggregated handler
accept: "",
expected: "unaggregated",
}, {
accept: aggregatedJSONAccept,
expected: "aggregated",
}, {
accept: aggregatedProtoAccept,
expected: "aggregated",
}, {
accept: jsonAccept,
expected: "unaggregated",
}, {
accept: protobufAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type
accept: aggregatedJSONAccept + "," + jsonAccept,
expected: "aggregated",
}, {
// Server should return the first accepted type
accept: aggregatedProtoAccept + "," + protobufAccept,
expected: "aggregated",
},
}
for _, tc := range testCases {
body := fetchPath(wrapped, discoveryPath, tc.accept)
assert.Equal(t, tc.expected, body)
}
}
func TestAggregationDisabled(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, false)()
unaggregated := fakeHTTPHandler{data: "unaggregated"}
aggregated := fakeHTTPHandler{data: "aggregated"}
wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated)
testCases := []struct {
accept string
expected string
}{
{
// Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error
accept: "application/json;foo=bar",
expected: "unaggregated",
}, {
// Empty accept headers are valid and should be handled by the unaggregated handler
accept: "",
expected: "unaggregated",
}, {
accept: aggregatedJSONAccept,
expected: "unaggregated",
}, {
accept: aggregatedProtoAccept,
expected: "unaggregated",
}, {
accept: jsonAccept,
expected: "unaggregated",
}, {
accept: protobufAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type.
// If aggregation is disabled, the unaggregated type should be returned.
accept: aggregatedJSONAccept + "," + jsonAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type.
// If aggregation is disabled, the unaggregated type should be returned.
accept: aggregatedProtoAccept + "," + protobufAccept,
expected: "unaggregated",
},
}
for _, tc := range testCases {
body := fetchPath(wrapped, discoveryPath, tc.accept)
assert.Equal(t, tc.expected, body)
}
}

View File

@ -69,5 +69,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group, false)
}

View File

@ -56,7 +56,7 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(s.apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(s.handle).
ws.Route(ws.GET("/").To(s.restfulHandle).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
@ -65,12 +65,16 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService {
return ws
}
func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Response) {
clientIP := utilnet.GetClientIP(req.Request)
func (s *legacyRootAPIHandler) restfulHandle(req *restful.Request, resp *restful.Response) {
s.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (s *legacyRootAPIHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
clientIP := utilnet.GetClientIP(req)
apiVersions := &metav1.APIVersions{
ServerAddressByClientCIDRs: s.addresses.ServerAddressByClientCIDRs(clientIP),
Versions: []string{"v1"},
}
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, apiVersions, false)
}

View File

@ -35,7 +35,7 @@ import (
type GroupManager interface {
AddGroup(apiGroup metav1.APIGroup)
RemoveGroup(groupName string)
ServeHTTP(resp http.ResponseWriter, req *http.Request)
WebService() *restful.WebService
}
@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups}, false)
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -79,5 +79,5 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()}, false)
}

View File

@ -22,6 +22,7 @@ import (
restful "github.com/emicklei/go-restful/v3"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -105,7 +106,7 @@ type APIGroupVersion struct {
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
@ -117,7 +118,11 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storagev
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
if err != nil {
registrationErrors = append(registrationErrors, err)
}
return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo {

View File

@ -146,7 +146,7 @@ func transformResponseObject(ctx context.Context, scope *RequestScope, req *http
return
}
kind, serializer, _ := targetEncodingForTransform(scope, mediaType, req)
responsewriters.WriteObjectNegotiated(serializer, scope, kind.GroupVersion(), w, req, statusCode, obj)
responsewriters.WriteObjectNegotiated(serializer, scope, kind.GroupVersion(), w, req, statusCode, obj, false)
}
// errNotAcceptable indicates Accept negotiation has failed

View File

@ -259,7 +259,7 @@ func (w *deferredResponseWriter) Close() error {
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object, listGVKInContentType bool) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
@ -269,7 +269,7 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiat
return
}
_, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
mediaType, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
if err != nil {
// if original statusCode was not successful we need to return the original error
// we cannot hide it behind negotiation problems
@ -286,10 +286,30 @@ func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiat
encoder := s.EncoderForVersion(serializer.Serializer, gv)
request.TrackSerializeResponseObjectLatency(req.Context(), func() {
if listGVKInContentType {
SerializeObject(generateMediaTypeWithGVK(serializer.MediaType, mediaType.Convert), encoder, w, req, statusCode, object)
} else {
SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object)
}
})
}
func generateMediaTypeWithGVK(mediaType string, gvk *schema.GroupVersionKind) string {
if gvk == nil {
return mediaType
}
if gvk.Group != "" {
mediaType += ";g=" + gvk.Group
}
if gvk.Version != "" {
mediaType += ";v=" + gvk.Version
}
if gvk.Kind != "" {
mediaType += ";as=" + gvk.Kind
}
return mediaType
}
// ErrorNegotiated renders an error to the response. Returns the HTTP status code of the error.
// The context is optional and may be nil.
func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request) int {
@ -306,7 +326,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status)
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status, false)
return code
}

View File

@ -237,7 +237,7 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObjectNegotiated(r.scope.Serializer, r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj)
responsewriters.WriteObjectNegotiated(r.scope.Serializer, r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj, false)
}
func (r *responder) Error(err error) {

View File

@ -26,6 +26,7 @@ import (
"unicode"
restful "github.com/emicklei/go-restful/v3"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
@ -68,6 +69,94 @@ type action struct {
AllNamespaces bool // true iff the action is namespaced but works on aggregate result for all namespaces
}
func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscoveryv2beta1.APIResourceDiscovery, error) {
var apiResourceList []apidiscoveryv2beta1.APIResourceDiscovery
parentResources := map[string]*apidiscoveryv2beta1.APIResourceDiscovery{}
// Loop through all top-level resources
for _, r := range list {
if strings.Contains(r.Name, "/") {
// Skip subresources for now so we can get the list of resources
continue
}
var scope apidiscoveryv2beta1.ResourceScope
if r.Namespaced {
scope = apidiscoveryv2beta1.ScopeNamespace
} else {
scope = apidiscoveryv2beta1.ScopeCluster
}
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{
Resource: r.Name,
Scope: scope,
ResponseKind: &metav1.GroupVersionKind{
Group: r.Group,
Version: r.Version,
Kind: r.Kind,
},
Verbs: r.Verbs,
ShortNames: r.ShortNames,
Categories: r.Categories,
SingularResource: r.SingularName,
})
parentResources[r.Name] = &apiResourceList[len(apiResourceList)-1]
}
// Loop through all subresources
for _, r := range list {
// Split resource name and subresource name
split := strings.SplitN(r.Name, "/", 2)
if len(split) != 2 {
// Skip parent resources
continue
}
var scope apidiscoveryv2beta1.ResourceScope
if r.Namespaced {
scope = apidiscoveryv2beta1.ScopeNamespace
} else {
scope = apidiscoveryv2beta1.ScopeCluster
}
var parent *apidiscoveryv2beta1.APIResourceDiscovery
var exists bool
parent, exists = parentResources[split[0]]
if !exists {
// If a subresource exists without a parent, create a parent
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{
Resource: split[0],
Scope: scope,
})
parentResources[split[0]] = &apiResourceList[len(apiResourceList)-1]
parent = &apiResourceList[len(apiResourceList)-1]
parentResources[split[0]] = parent
}
if parent.Scope != scope {
return nil, fmt.Errorf("Error: Parent %s (scope: %s) and subresource %s (scope: %s) scope do not match", split[0], parent.Scope, split[1], scope)
//
}
subresource := apidiscoveryv2beta1.APISubresourceDiscovery{
Subresource: split[1],
Verbs: r.Verbs,
}
if r.Kind != "" {
subresource.ResponseKind = &metav1.GroupVersionKind{
Group: r.Group,
Version: r.Version,
Kind: r.Kind,
}
}
parent.Subresources = append(parent.Subresources, subresource)
}
return apiResourceList, nil
}
// An interface to see if one storage supports override its default verb for monitoring
type StorageMetricsOverride interface {
// OverrideMetricsVerb gives a storage object an opportunity to override the verb reported to the metrics endpoint

View File

@ -18,6 +18,10 @@ package endpoints
import (
"testing"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestIsVowel(t *testing.T) {
@ -97,3 +101,243 @@ func TestGetArticleForNoun(t *testing.T) {
}
}
}
func TestConvertAPIResourceToDiscovery(t *testing.T) {
tests := []struct {
name string
resources []metav1.APIResource
wantAPIResourceDiscovery []apidiscoveryv2beta1.APIResourceDiscovery
wantErr bool
}{
{
name: "Basic Test",
resources: []metav1.APIResource{
{
Name: "pods",
Namespaced: true,
Kind: "Pod",
ShortNames: []string{"po"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "pods",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Pod",
},
ShortNames: []string{"po"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Basic Group Version Test",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Group: "batch",
Version: "v1",
Kind: "CronJob",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Test with subresource",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "cronjobs/status",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
},
},
},
{
name: "Test with subresource with no parent",
resources: []metav1.APIResource{
{
Name: "cronjobs/status",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
},
},
},
{
name: "Test with mismatch parent and subresource scope",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "cronjobs/status",
Namespaced: false,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{},
wantErr: true,
},
{
name: "Cluster Scope Test",
resources: []metav1.APIResource{
{
Name: "nodes",
Namespaced: false,
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
Scope: apidiscoveryv2beta1.ScopeCluster,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Namespace Scope Test",
resources: []metav1.APIResource{
{
Name: "nodes",
Namespaced: true,
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Singular Resource Name",
resources: []metav1.APIResource{
{
Name: "nodes",
SingularName: "node",
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
SingularResource: "node",
Scope: apidiscoveryv2beta1.ScopeCluster,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
}
for _, tt := range tests {
discoveryAPIResources, err := ConvertGroupVersionIntoToDiscovery(tt.resources)
if err != nil {
if tt.wantErr == false {
t.Error(err)
}
} else {
require.Equal(t, tt.wantAPIResourceDiscovery, discoveryAPIResources)
}
}
}

View File

@ -50,6 +50,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/filterlatency"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
@ -125,6 +126,7 @@ type Config struct {
EnableIndex bool
EnableProfiling bool
EnableDiscovery bool
// Requires generic profiling enabled
EnableContentionProfiling bool
EnableMetrics bool
@ -262,6 +264,9 @@ type Config struct {
// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
}
type RecommendedConfig struct {
@ -677,6 +682,14 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
manager := c.AggregatedDiscoveryGroupManager
if manager == nil {
manager = discoveryendpoint.NewResourceManager()
}
s.AggregatedDiscoveryGroupManager = manager
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager()
}
for {
if c.JSONPatchMaxCopyBytes <= 0 {
break

View File

@ -26,6 +26,7 @@ import (
systemd "github.com/coreos/go-systemd/v22/daemon"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -39,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
@ -137,9 +139,15 @@ type GenericAPIServer struct {
// listedPathProvider is a lister which provides the set of paths to show at /
listedPathProvider routes.ListedPathProvider
// DiscoveryGroupManager serves /apis
// DiscoveryGroupManager serves /apis in an unaggregated form.
DiscoveryGroupManager discovery.GroupManager
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
// AggregatedLegacyDiscoveryGroupManager serves /api in an aggregated form.
AggregatedLegacyDiscoveryGroupManager discoveryendpoint.ResourceManager
// Enable swagger and/or OpenAPI if these configs are non-nil.
openAPIConfig *openapicommon.Config
@ -672,11 +680,35 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
if err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
resourceInfos = append(resourceInfos, r...)
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
// Aggregated discovery only aggregates resources under /apis
if apiPrefix == APIGroupPrefix {
s.AggregatedDiscoveryGroupManager.AddGroupVersion(
groupVersion.Group,
apidiscoveryv2beta1.APIVersionDiscovery{
Version: groupVersion.Version,
Resources: discoveryAPIResources,
},
)
} else {
// There is only one group version for legacy resources, priority can be defaulted to 0.
s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion(
groupVersion.Group,
apidiscoveryv2beta1.APIVersionDiscovery{
Version: groupVersion.Version,
Resources: discoveryAPIResources,
},
)
}
}
}
s.RegisterDestroyFunc(apiGroupInfo.destroyStorage)
@ -711,7 +743,13 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
legacyRootAPIHandler := discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix)
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(legacyRootAPIHandler, s.AggregatedLegacyDiscoveryGroupManager)
s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/api", metav1.APIVersions{}))
} else {
s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
}
return nil
}

View File

@ -8,6 +8,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/emicklei/go-restful/v3 v3.9.0
github.com/gogo/protobuf v1.3.2
github.com/google/gofuzz v1.1.0
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
@ -46,7 +47,6 @@ require (
github.com/google/cel-go v0.12.5 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
@ -154,6 +155,11 @@ type APIAggregator struct {
// openAPIV3AggregationController downloads and caches OpenAPI v3 specs.
openAPIV3AggregationController *openapiv3controller.AggregationController
// discoveryAggregationController downloads and caches discovery documents
// from all aggregated apiservices so they are available from /apis endpoint
// when discovery with resources are requested
discoveryAggregationController DiscoveryAggregationController
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
@ -244,7 +250,13 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
lister: s.lister,
discoveryGroup: discoveryGroup(enabledVersions),
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport)
} else {
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
}
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
@ -365,8 +377,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return s, nil
}
// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec and calling
// the generic PrepareRun.
// PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec &
// aggregated discovery document and calling the generic PrepareRun.
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
// add post start hook before generic PrepareRun in order to be before /healthz installation
if s.openAPIConfig != nil {
@ -383,6 +395,20 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
})
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
s.discoveryAggregationController = NewDiscoveryManager(
s.GenericAPIServer.AggregatedDiscoveryGroupManager,
)
// Setup discovery endpoint
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
// Run discovery manager's worker to watch for new/removed/updated
// APIServices to the discovery document can be updated at runtime
go s.discoveryAggregationController.Run(context.StopCh)
return nil
})
}
prepared := s.GenericAPIServer.PrepareRun()
// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
@ -432,6 +458,12 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService)
}
// Forward calls to discovery manager to update discovery document
if s.discoveryAggregationController != nil {
handlerCopy := *proxyHandler
handlerCopy.setServiceAvailable(true)
s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
}
return nil
}
@ -457,6 +489,10 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
}
if s.discoveryAggregationController != nil {
s.discoveryAggregationController.AddAPIService(apiService, proxyHandler)
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
@ -489,6 +525,11 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
// RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so it's ok to run the controller on a single thread.
func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
// Forward calls to discovery manager to update discovery document
if s.discoveryAggregationController != nil {
s.discoveryAggregationController.RemoveAPIService(apiServiceName)
}
version := v1helper.APIServiceNameToGroupVersion(apiServiceName)
proxyPath := "/apis/" + version.Group + "/" + version.Version

View File

@ -91,7 +91,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList, false)
}
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
@ -162,5 +162,5 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "", http.StatusNotFound)
return
}
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup, false)
}

View File

@ -0,0 +1,572 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/request"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
)
var APIRegistrationGroup string = "apiregistration.k8s.io"
var APIRegistrationGroupPriority int = 18000
// Given a list of APIServices and proxyHandlers for contacting them,
// DiscoveryManager caches a list of discovery documents for each server
type DiscoveryAggregationController interface {
// Adds or Updates an APIService from the Aggregated Discovery Controller's
// knowledge base
// Thread-safe
AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler)
// Removes an APIService from the Aggregated Discovery Controller's Knowledge
// bank
// Thread-safe
RemoveAPIService(apiServiceName string)
// Spwans a worker which waits for added/updated apiservices and updates
// the unified discovery document by contacting the aggregated api services
Run(stopCh <-chan struct{})
// Returns true if all non-local APIServices that have been added
// are synced at least once to the discovery document
ExternalServicesSynced() bool
}
type discoveryManager struct {
// Locks `services`
servicesLock sync.RWMutex
// Map from APIService's name (or a unique string for local servers)
// to information about contacting that API Service
apiServices map[string]groupVersionInfo
// Locks cachedResults
resultsLock sync.RWMutex
// Map from APIService.Spec.Service to the previously fetched value
// (Note that many APIServices might use the same APIService.Spec.Service)
cachedResults map[serviceKey]cachedResult
// Queue of dirty apiServiceKey which need to be refreshed
// It is important that the reconciler for this queue does not excessively
// contact the apiserver if a key was enqueued before the server was last
// contacted.
dirtyAPIServiceQueue workqueue.RateLimitingInterface
// Merged handler which stores all known groupversions
mergedDiscoveryHandler discoveryendpoint.ResourceManager
}
// Version of Service/Spec with relevant fields for use as a cache key
type serviceKey struct {
Namespace string
Name string
Port int32
}
// Human-readable String representation used for logs
func (s serviceKey) String() string {
return fmt.Sprintf("%v/%v:%v", s.Namespace, s.Name, s.Port)
}
func newServiceKey(service apiregistrationv1.ServiceReference) serviceKey {
// Docs say. Defaults to 443 for compatibility reasons.
// BETA: Should this be a shared constant to avoid drifting with the
// implementation?
port := int32(443)
if service.Port != nil {
port = *service.Port
}
return serviceKey{
Name: service.Name,
Namespace: service.Namespace,
Port: port,
}
}
type cachedResult struct {
// Currently cached discovery document for this service
// Map from group name to version name to
discovery map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery
// ETag hash of the cached discoveryDocument
etag string
// Guaranteed to be a time less than the time the server responded with the
// discovery data.
lastUpdated time.Time
}
// Information about a specific APIService/GroupVersion
type groupVersionInfo struct {
// Date this APIService was marked dirty.
// Guaranteed to be a time greater than the most recent time the APIService
// was known to be modified.
//
// Used for request deduplication to ensure the data used to reconcile each
// apiservice was retrieved after the time of the APIService change:
// real_apiservice_change_time < groupVersionInfo.lastMarkedDirty < cachedResult.lastUpdated < real_document_fresh_time
//
// This ensures that if the apiservice was changed after the last cached entry
// was stored, the discovery document will always be re-fetched.
lastMarkedDirty time.Time
// Last time sync function was run for this GV.
lastReconciled time.Time
// ServiceReference of this GroupVersion. This identifies the Service which
// describes how to contact the server responsible for this GroupVersion.
service serviceKey
// groupPriority describes the priority of the APIService for sorting
groupPriority int
// Method for contacting the service
handler http.Handler
}
var _ DiscoveryAggregationController = &discoveryManager{}
func NewDiscoveryManager(
target discoveryendpoint.ResourceManager,
) DiscoveryAggregationController {
return &discoveryManager{
mergedDiscoveryHandler: target,
apiServices: make(map[string]groupVersionInfo),
cachedResults: make(map[serviceKey]cachedResult),
dirtyAPIServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "discovery-manager"),
}
}
// Returns discovery data for the given apiservice.
// Caches the result.
// Returns the cached result if it is retrieved after the apiservice was last
// marked dirty
// If there was an error in fetching, returns the stale cached result if it exists,
// and a non-nil error
// If the result is current, returns nil error and non-nil result
func (dm *discoveryManager) fetchFreshDiscoveryForService(gv metav1.GroupVersion, info groupVersionInfo) (*cachedResult, error) {
// Lookup last cached result for this apiservice's service.
cached, exists := dm.getCacheEntryForService(info.service)
// If entry exists and was updated after the given time, just stop now
if exists && cached.lastUpdated.After(info.lastMarkedDirty) {
return &cached, nil
}
// If we have a handler to contact the server for this APIService, and
// the cache entry is too old to use, refresh the cache entry now.
handler := http.TimeoutHandler(info.handler, 5*time.Second, "request timed out")
req, err := http.NewRequest("GET", "/apis", nil)
if err != nil {
// NewRequest should not fail, but if it does for some reason,
// log it and continue
return &cached, fmt.Errorf("failed to create http.Request: %v", err)
}
// Apply aggregator user to request
req = req.WithContext(
request.WithUser(
req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator", Groups: []string{"system:masters"}}))
req = req.WithContext(request.WithRequestInfo(req.Context(), &request.RequestInfo{
Path: req.URL.Path,
IsResourceRequest: false,
}))
req.Header.Add("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
if exists && len(cached.etag) > 0 {
req.Header.Add("If-None-Match", cached.etag)
}
// Important that the time recorded in the data's "lastUpdated" is conservatively
// from BEFORE the request is dispatched so that lastUpdated can be used to
// de-duplicate requests.
now := time.Now()
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotModified:
dm.resultsLock.Lock()
defer dm.resultsLock.Unlock()
// Keep old entry, update timestamp
cached = cachedResult{
discovery: cached.discovery,
etag: cached.etag,
lastUpdated: now,
}
dm.setCacheEntryForService(info.service, cached)
return &cached, nil
case http.StatusNotFound:
// Discovery Document is not being served at all.
// Fall back to legacy discovery information
if len(gv.Version) == 0 {
return nil, errors.New("not found")
}
var path string
if len(gv.Group) == 0 {
path = "/api/" + gv.Version
} else {
path = "/apis/" + gv.Group + "/" + gv.Version
}
req, err := http.NewRequest("GET", path, nil)
if err != nil {
// NewRequest should not fail, but if it does for some reason,
// log it and continue
return nil, fmt.Errorf("failed to create http.Request: %v", err)
}
// Apply aggregator user to request
req = req.WithContext(
request.WithUser(
req.Context(), &user.DefaultInfo{Name: "system:kube-aggregator"}))
// req.Header.Add("Accept", runtime.ContentTypeProtobuf)
req.Header.Add("Accept", runtime.ContentTypeJSON)
if exists && len(cached.etag) > 0 {
req.Header.Add("If-None-Match", cached.etag)
}
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
if writer.respCode != http.StatusOK {
return nil, fmt.Errorf("failed to download discovery for %s: %v", path, writer.String())
}
parsed := &metav1.APIResourceList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
return nil, err
}
// Create a discomap with single group-version
resources, err := endpoints.ConvertGroupVersionIntoToDiscovery(parsed.APIResources)
if err != nil {
return nil, err
}
discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{
// Convert old-style APIGroupList to new information
gv: {
Version: gv.Version,
Resources: resources,
},
}
cached = cachedResult{
discovery: discoMap,
lastUpdated: now,
}
// Save the resolve, because it is still useful in case other services
// are already marked dirty. THey can use it without making http request
dm.setCacheEntryForService(info.service, cached)
return &cached, nil
case http.StatusOK:
parsed := &apidiscoveryv2beta1.APIGroupDiscoveryList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), writer.data, parsed); err != nil {
return nil, err
}
klog.V(3).Infof("DiscoveryManager: Successfully downloaded discovery for %s", info.service.String())
// Convert discovery info into a map for convenient lookup later
discoMap := map[metav1.GroupVersion]apidiscoveryv2beta1.APIVersionDiscovery{}
for _, g := range parsed.Items {
for _, v := range g.Versions {
discoMap[metav1.GroupVersion{Group: g.Name, Version: v.Version}] = v
}
}
// Save cached result
cached = cachedResult{
discovery: discoMap,
etag: writer.Header().Get("Etag"),
lastUpdated: now,
}
dm.setCacheEntryForService(info.service, cached)
return &cached, nil
default:
klog.Infof("DiscoveryManager: Failed to download discovery for %v: %v %s",
info.service.String(), writer.respCode, writer.data)
return nil, fmt.Errorf("service %s returned non-success response code: %v",
info.service.String(), writer.respCode)
}
}
// Try to sync a single APIService.
func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
info, exists := dm.getInfoForAPIService(apiServiceName)
gv := helper.APIServiceNameToGroupVersion(apiServiceName)
mgv := metav1.GroupVersion{Group: gv.Group, Version: gv.Version}
if !exists {
// apiservice was removed. remove it from merged discovery
dm.mergedDiscoveryHandler.RemoveGroupVersion(mgv)
return nil
}
// Lookup last cached result for this apiservice's service.
now := time.Now()
cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
info.lastReconciled = now
dm.setInfoForAPIService(apiServiceName, &info)
var entry apidiscoveryv2beta1.APIVersionDiscovery
// Extract the APIService's specific resource information from the
// groupversion
if cached == nil {
// There was an error fetching discovery for this APIService, and
// there is nothing in the cache for this GV.
//
// Just use empty GV to mark that GV exists, but no resources.
// Also mark that it is stale to indicate the fetch failed
// TODO: Maybe also stick in a status for the version the error?
entry = apidiscoveryv2beta1.APIVersionDiscovery{
Version: gv.Version,
}
} else {
// Find our specific groupversion within the discovery document
entry, exists = cached.discovery[mgv]
if exists {
// The stale/fresh entry has our GV, so we can include it in the doc
} else {
// Successfully fetched discovery information from the server, but
// the server did not include this groupversion?
entry = apidiscoveryv2beta1.APIVersionDiscovery{
Version: gv.Version,
}
}
}
// The entry's staleness depends upon if `fetchFreshDiscoveryForService`
// returned an error or not.
if err == nil {
entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent
} else {
entry.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale
}
dm.mergedDiscoveryHandler.AddGroupVersion(gv.Group, entry)
return nil
}
// Spwans a goroutune which waits for added/updated apiservices and updates
// the discovery document accordingly
func (dm *discoveryManager) Run(stopCh <-chan struct{}) {
klog.Info("Starting ResourceDiscoveryManager")
// Shutdown the queue since stopCh was signalled
defer dm.dirtyAPIServiceQueue.ShutDown()
// Spawn workers
// These workers wait for APIServices to be marked dirty.
// Worker ensures the cached discovery document hosted by the ServiceReference of
// the APIService is at least as fresh as the APIService, then includes the
// APIService's groupversion into the merged document
for i := 0; i < 2; i++ {
go func() {
for {
next, shutdown := dm.dirtyAPIServiceQueue.Get()
if shutdown {
return
}
func() {
defer dm.dirtyAPIServiceQueue.Done(next)
if err := dm.syncAPIService(next.(string)); err != nil {
dm.dirtyAPIServiceQueue.AddRateLimited(next)
} else {
dm.dirtyAPIServiceQueue.Forget(next)
}
}()
}
}()
}
// Ensure that apiregistration.k8s.io is the first group in the discovery group.
dm.mergedDiscoveryHandler.SetGroupPriority(APIRegistrationGroup, APIRegistrationGroupPriority)
wait.PollUntil(1*time.Minute, func() (done bool, err error) {
dm.servicesLock.Lock()
defer dm.servicesLock.Unlock()
now := time.Now()
// Mark all non-local APIServices as dirty
for key, info := range dm.apiServices {
info.lastMarkedDirty = now
dm.apiServices[key] = info
dm.dirtyAPIServiceQueue.Add(key)
}
return false, nil
}, stopCh)
}
// Adds an APIService to be tracked by the discovery manager. If the APIService
// is already known
func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) {
// If service is nil then its information is contained by a local APIService
// which is has already been added to the manager.
if apiService.Spec.Service == nil {
return
}
// Add or update APIService record and mark it as dirty
dm.setInfoForAPIService(apiService.Name, &groupVersionInfo{
groupPriority: int(apiService.Spec.GroupPriorityMinimum),
handler: handler,
lastMarkedDirty: time.Now(),
service: newServiceKey(*apiService.Spec.Service),
})
dm.dirtyAPIServiceQueue.Add(apiService.Name)
}
func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
if dm.setInfoForAPIService(apiServiceName, nil) != nil {
// mark dirty if there was actually something deleted
dm.dirtyAPIServiceQueue.Add(apiServiceName)
}
}
func (dm *discoveryManager) ExternalServicesSynced() bool {
dm.servicesLock.RLock()
defer dm.servicesLock.RUnlock()
for _, info := range dm.apiServices {
if info.lastReconciled.IsZero() {
return false
}
}
return true
}
//
// Lock-protected accessors
//
func (dm *discoveryManager) getCacheEntryForService(key serviceKey) (cachedResult, bool) {
dm.resultsLock.RLock()
defer dm.resultsLock.RUnlock()
result, ok := dm.cachedResults[key]
return result, ok
}
func (dm *discoveryManager) setCacheEntryForService(key serviceKey, result cachedResult) {
dm.resultsLock.Lock()
defer dm.resultsLock.Unlock()
dm.cachedResults[key] = result
}
func (dm *discoveryManager) getInfoForAPIService(name string) (groupVersionInfo, bool) {
dm.servicesLock.RLock()
defer dm.servicesLock.RUnlock()
result, ok := dm.apiServices[name]
return result, ok
}
func (dm *discoveryManager) setInfoForAPIService(name string, result *groupVersionInfo) (oldValueIfExisted *groupVersionInfo) {
dm.servicesLock.Lock()
defer dm.servicesLock.Unlock()
if oldValue, exists := dm.apiServices[name]; exists {
oldValueIfExisted = &oldValue
}
if result != nil {
dm.apiServices[name] = *result
} else {
delete(dm.apiServices, name)
}
return oldValueIfExisted
}
// !TODO: This was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator/downloader.go
// which was copied from staging/src/k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator/downloader.go
// so we should find a home for this
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -0,0 +1,359 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver_test
import (
"context"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
)
// Test that the discovery manager starts and aggregates from two local API services
func TestBasic(t *testing.T) {
service1 := discoveryendpoint.NewResourceManager()
service2 := discoveryendpoint.NewResourceManager()
apiGroup1 := fuzzAPIGroups(2, 5, 25)
apiGroup2 := fuzzAPIGroups(2, 5, 50)
service1.SetGroups(apiGroup1.Items)
service2.SetGroups(apiGroup2.Items)
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
for _, g := range apiGroup1.Items {
for _, v := range g.Versions {
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: v.Version + "." + g.Name,
},
Spec: apiregistrationv1.APIServiceSpec{
Group: g.Name,
Version: v.Version,
Service: &apiregistrationv1.ServiceReference{
Name: "service1",
},
},
}, service1)
}
}
for _, g := range apiGroup2.Items {
for _, v := range g.Versions {
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: v.Version + "." + g.Name,
},
Spec: apiregistrationv1.APIServiceSpec{
Group: g.Name,
Version: v.Version,
Service: &apiregistrationv1.ServiceReference{
Name: "service2",
},
},
}, service2)
}
}
testCtx, _ := context.WithCancel(context.Background())
go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
t.Fatalf("unexpected status code %d", response.StatusCode)
}
checkAPIGroups(t, apiGroup1, parsed)
checkAPIGroups(t, apiGroup2, parsed)
}
func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList, response *apidiscoveryv2beta1.APIGroupDiscoveryList) {
if len(response.Items) < len(api.Items) {
t.Errorf("expected to check for at least %d groups, only have %d groups in response", len(api.Items), len(response.Items))
}
for _, knownGroup := range api.Items {
found := false
for _, possibleGroup := range response.Items {
if knownGroup.Name == possibleGroup.Name {
t.Logf("found %s", knownGroup.Name)
found = true
}
}
if found == false {
t.Errorf("could not find %s", knownGroup.Name)
}
}
}
// Test that a handler associated with an APIService gets pinged after the
// APIService has been marked as dirty
func TestDirty(t *testing.T) {
pinged := false
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pinged = true
service.ServeHTTP(w, r)
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
// immediately check for ping, since Run() should block for local services
if !pinged {
t.Errorf("service handler never pinged")
}
}
// Show that an APIService can be removed and that its group no longer remains
// if there are no versions
func TestRemoveAPIService(t *testing.T) {
aggyService := discoveryendpoint.NewResourceManager()
service := discoveryendpoint.NewResourceManager()
apiGroup := fuzzAPIGroups(2, 3, 10)
service.SetGroups(apiGroup.Items)
var apiServices []*apiregistrationv1.APIService
for _, g := range apiGroup.Items {
for _, v := range g.Versions {
apiservice := &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: v.Version + "." + g.Name,
},
Spec: apiregistrationv1.APIServiceSpec{
Group: g.Name,
Version: v.Version,
Service: &apiregistrationv1.ServiceReference{
Namespace: "serviceNamespace",
Name: "serviceName",
},
},
}
apiServices = append(apiServices, apiservice)
}
}
aggregatedManager := apiserver.NewDiscoveryManager(aggyService)
for _, s := range apiServices {
aggregatedManager.AddAPIService(s, service)
}
testCtx, _ := context.WithCancel(context.Background())
go aggregatedManager.Run(testCtx.Done())
for _, s := range apiServices {
aggregatedManager.RemoveAPIService(s.Name)
}
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 {
t.Fatalf("unexpected status code %d", response.StatusCode)
}
if len(parsed.Items) > 0 {
t.Errorf("expected to find no groups after service deletion (got %d groups)", len(parsed.Items))
}
}
func TestLegacyFallback(t *testing.T) {
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
Name: "stable.example.com",
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "stable.example.com/v1",
Version: "v1",
},
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: "stable.example.com/v1",
Version: "v1",
},
{
GroupVersion: "stable.example.com/v1beta1",
Version: "v1beta1",
},
},
})
resource := metav1.APIResource{
Name: "foos",
SingularName: "foo",
Group: "stable.example.com",
Version: "v1",
Namespaced: false,
Kind: "Foo",
Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"},
Categories: []string{"all"},
}
legacyResourceHandler := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
Group: "stable.example.com",
Version: "v1",
}, discovery.APIResourceListerFunc(func() []metav1.APIResource {
return []metav1.APIResource{
resource,
}
}))
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: "stable.example.com",
Version: "v1",
Service: &apiregistrationv1.ServiceReference{
Name: "test-service",
},
},
}, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/apis/stable.example.com" {
legacyGroupHandler.ServeHTTP(w, r)
} else if r.URL.Path == "/apis/stable.example.com/v1" {
// defer to legacy discovery
legacyResourceHandler.ServeHTTP(w, r)
} else {
// Unknown url
w.WriteHeader(http.StatusNotFound)
}
}))
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
_, _, doc := fetchPath(aggregatedResourceManager, "")
converted, err := endpoints.ConvertGroupVersionIntoToDiscovery([]metav1.APIResource{resource})
require.NoError(t, err)
require.Equal(t, []apidiscoveryv2beta1.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: resource.Group,
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: resource.Version,
Resources: converted,
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent,
},
},
},
}, doc.Items)
}
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList {
fuzzer := fuzz.NewWithSeed(seed)
fuzzer.NumElements(atLeastNumGroups, maxNumGroups)
fuzzer.NilChance(0)
fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) {
c.FuzzNoCustom(o)
// The ResourceManager will just not serve the grouop if its versions
// list is empty
atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{}
c.Fuzz(&atLeastOne)
o.Versions = append(o.Versions, atLeastOne)
o.TypeMeta = metav1.TypeMeta{
Kind: "APIGroupDiscovery",
APIVersion: "v1",
}
})
var apis []apidiscoveryv2beta1.APIGroupDiscovery
fuzzer.Fuzz(&apis)
return apidiscoveryv2beta1.APIGroupDiscoveryList{
TypeMeta: metav1.TypeMeta{
Kind: "APIGroupDiscoveryList",
APIVersion: "v1",
},
Items: apis,
}
}
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go
func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) {
// Expect json-formatted apis group list
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/apis", nil)
// Ask for JSON response
req.Header.Set("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
if etag != "" {
// Quote provided etag if unquoted
quoted := etag
if !strings.HasPrefix(etag, "\"") {
quoted = strconv.Quote(etag)
}
req.Header.Set("If-None-Match", quoted)
}
handler.ServeHTTP(w, req)
bytes := w.Body.Bytes()
var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList
if len(bytes) > 0 {
decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{}
runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), bytes, decoded)
}
return w.Result(), bytes, decoded
}

View File

@ -231,6 +231,14 @@ func (r *responder) Error(_ http.ResponseWriter, _ *http.Request, err error) {
// these methods provide locked access to fields
// Sets serviceAvailable value on proxyHandler
// not thread safe
func (r *proxyHandler) setServiceAvailable(value bool) {
info := r.handlingInfo.Load().(proxyHandlingInfo)
info.serviceAvailable = true
r.handlingInfo.Store(info)
}
func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
if apiService.Spec.Service == nil {
r.handlingInfo.Store(proxyHandlingInfo{local: true})

View File

@ -0,0 +1,472 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"reflect"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
kubernetes "k8s.io/client-go/kubernetes"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
featuregatetesting "k8s.io/component-base/featuregate/testing"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
type kubeClientSet = kubernetes.Interface
type aggegatorClientSet = aggregator.Interface
type apiextensionsClientSet = apiextensions.Interface
type dynamicClientset = dynamic.Interface
type testClientSet struct {
kubeClientSet
aggegatorClientSet
apiextensionsClientSet
dynamicClientset
}
func (t testClientSet) Discovery() discovery.DiscoveryInterface {
return t.kubeClientSet.Discovery()
}
var (
scheme = runtime.NewScheme()
codecs = runtimeserializer.NewCodecFactory(scheme)
serialize runtime.NegotiatedSerializer
basicTestGroup = apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: "stable.example.com",
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: "v1",
Resources: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "jobs",
Verbs: []string{"create", "list", "watch", "delete"},
ShortNames: []string{"jz"},
Categories: []string{"all"},
},
},
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent,
},
},
}
)
func init() {
// Add all builtin types to scheme
utilruntime.Must(k8sscheme.AddToScheme(scheme))
utilruntime.Must(aggregatorclientsetscheme.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok {
panic("failed to create serializer info")
}
serialize = runtime.NewSimpleNegotiatedSerializer(info)
}
// Spins up an api server which is cleaned up at the end up the test
// Returns some kubernetes clients
func setup(t *testing.T) (context.Context, testClientSet, context.CancelFunc) {
ctx, cancelCtx := context.WithCancel(context.Background())
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
t.Cleanup(server.TearDownFn)
kubeClientSet, err := kubernetes.NewForConfig(server.ClientConfig)
require.NoError(t, err)
aggegatorClientSet, err := aggregator.NewForConfig(server.ClientConfig)
require.NoError(t, err)
apiextensionsClientSet, err := apiextensions.NewForConfig(server.ClientConfig)
require.NoError(t, err)
dynamicClientset, err := dynamic.NewForConfig(server.ClientConfig)
require.NoError(t, err)
client := testClientSet{
kubeClientSet: kubeClientSet,
aggegatorClientSet: aggegatorClientSet,
apiextensionsClientSet: apiextensionsClientSet,
dynamicClientset: dynamicClientset,
}
return ctx, client, cancelCtx
}
func registerAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion, service FakeService) error {
port := service.Port()
if port == nil {
return errors.New("service not yet started")
}
// Register the APIService
patch := apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: gv.Version + "." + gv.Group,
},
TypeMeta: metav1.TypeMeta{
Kind: "APIService",
APIVersion: "apiregistration.k8s.io/v1",
},
Spec: apiregistrationv1.APIServiceSpec{
Group: gv.Group,
Version: gv.Version,
InsecureSkipTLSVerify: true,
GroupPriorityMinimum: 1000,
VersionPriority: 15,
Service: &apiregistrationv1.ServiceReference{
Namespace: "default",
Name: service.Name(),
Port: port,
},
},
}
_, err := client.
ApiregistrationV1().
APIServices().
Create(context.TODO(), &patch, metav1.CreateOptions{FieldManager: "test-manager"})
return err
}
func unregisterAPIService(ctx context.Context, client aggregator.Interface, gv metav1.GroupVersion) error {
return client.ApiregistrationV1().APIServices().Delete(ctx, gv.Version+"."+gv.Group, metav1.DeleteOptions{})
}
func WaitForGroupsAbsent(ctx context.Context, client testClientSet, groups ...string) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if docGroup.Name == searchGroup {
return false
}
}
}
return true
})
}
func WaitForGroups(ctx context.Context, client testClientSet, groups ...apidiscoveryv2beta1.APIGroupDiscovery) error {
return WaitForResultWithCondition(ctx, client, func(groupList apidiscoveryv2beta1.APIGroupDiscoveryList) bool {
for _, searchGroup := range groups {
for _, docGroup := range groupList.Items {
if reflect.DeepEqual(searchGroup, docGroup) {
return true
}
}
}
return false
})
}
func WaitForResultWithCondition(ctx context.Context, client testClientSet, condition func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool) error {
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
return wait.PollWithContext(
ctx,
250*time.Millisecond,
1*time.Second,
func(ctx context.Context) (done bool, err error) {
result, err := client.
Discovery().
RESTClient().
Get().
AbsPath("/apis").
SetHeader("Accept", "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList").
Do(ctx).
Raw()
if err != nil {
return false, err
}
groupList := apidiscoveryv2beta1.APIGroupDiscoveryList{}
err = json.Unmarshal(result, &groupList)
if err != nil {
panic(err)
}
if condition(groupList) {
return true, nil
}
return false, nil
})
}
func TestAggregatedAPIServiceDiscovery(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
// Keep any goroutines spawned from running past the execution of this test
ctx, client, cleanup := setup(t)
defer cleanup()
// Create a resource manager whichs serves our GroupVersion
resourceManager := discoveryendpoint.NewResourceManager()
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup})
// Install our ResourceManager as an Aggregated APIService to the
// test server
service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/apis") {
resourceManager.ServeHTTP(w, r)
} else if strings.HasPrefix(r.URL.Path, "/apis/stable.example.com") {
// Return invalid response so APIService can be marked as "available"
w.WriteHeader(http.StatusOK)
} else {
// reject openapi/v2, openapi/v3, apis/<group>/<version>
w.WriteHeader(http.StatusNotFound)
}
}))
service.Start(t, ctx)
// For each groupversion served by our resourcemanager, create an APIService
// object connected to our fake APIServer
for _, versionInfo := range basicTestGroup.Versions {
groupVersion := metav1.GroupVersion{
Group: basicTestGroup.Name,
Version: versionInfo.Version,
}
require.NoError(t, registerAPIService(ctx, client, groupVersion, service))
defer func() {
require.NoError(t, unregisterAPIService(ctx, client, groupVersion))
}()
}
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
require.NoError(t, WaitForGroups(ctx, client, basicTestGroup))
}
// Shows that the following sequence is handled correctly:
// 1. Create an APIService
// - Check that API service is in discovery doc
// 2. Create CRD with the same GroupVersion as APIService
// 3. Delete APIService
// - Check that API service is removed from discovery
// 4. Update CRD
// - Check that CRD is in discovery document
func TestOverlappingCRDAndAPIService(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
// Keep any goroutines spawned from running past the execution of this test
ctx, client, cleanup := setup(t)
defer cleanup()
// Create a resource manager whichs serves our GroupVersion
resourceManager := discoveryendpoint.NewResourceManager()
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup})
// Install our ResourceManager as an Aggregated APIService to the
// test server
service := NewFakeService("test-server", client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/apis" {
resourceManager.ServeHTTP(w, r)
} else if strings.HasPrefix(r.URL.Path, "/apis/") {
// Return "valid" response so APIService can be marked as "available"
w.WriteHeader(http.StatusOK)
} else {
// reject openapi/v2, openapi/v3, apis/<group>/<version>
w.WriteHeader(http.StatusNotFound)
}
}))
service.Start(t, ctx)
// For each groupversion served by our resourcemanager, create an APIService
// object connected to our fake APIServer
for _, versionInfo := range basicTestGroup.Versions {
groupVersion := metav1.GroupVersion{
Group: basicTestGroup.Name,
Version: versionInfo.Version,
}
registerAPIService(ctx, client, groupVersion, service)
}
// Keep repeatedly fetching document from aggregator.
// Check to see if it contains our service within a reasonable amount of time
require.NoError(t, WaitForGroups(ctx, client, basicTestGroup))
// Create a CRD
crd, err := client.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "foos.stable.example.com",
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "stable.example.com",
Names: apiextensionsv1.CustomResourceDefinitionNames{
Singular: "foo",
Plural: "foos",
Kind: "Foo",
},
Scope: apiextensionsv1.ClusterScoped,
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"stringMap": {
Description: "a map[string]string",
Type: "object",
AdditionalProperties: &apiextensionsv1.JSONSchemaPropsOrBool{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "string",
},
},
},
},
},
},
},
},
},
}, metav1.CreateOptions{
FieldManager: "test-manager",
})
require.NoError(t, err)
// Create a CR for the CRD
// Keep trying until it succeeds (or should we try for discovery?)
require.NoError(t, wait.PollWithContext(ctx, 100*time.Millisecond, 1*time.Second, func(ctx context.Context) (done bool, err error) {
toCreate := &unstructured.Unstructured{}
toCreate.SetUnstructuredContent(map[string]any{
"apiVersion": "stable.example.com/v1",
"kind": "Foo",
"key": "value",
})
_, err = client.dynamicClientset.Resource(schema.GroupVersionResource{
Group: "stable.example.com",
Version: "v1",
Resource: "foos",
}).Create(ctx, toCreate, metav1.CreateOptions{
FieldManager: "test-manager",
})
return err != nil, nil
}))
// For each groupversion served by our resourcemanager, delete an APIService
// object connected to our fake APIServer
for _, versionInfo := range basicTestGroup.Versions {
groupVersion := metav1.GroupVersion{
Group: basicTestGroup.Name,
Version: versionInfo.Version,
}
unregisterAPIService(ctx, client, groupVersion)
}
// Wait for the apiservice to be deleted from discovery
require.NoError(t, WaitForGroupsAbsent(ctx, client, "stable.example.com"))
// Update the CRD with a minor change to show that reconciliation will
// eventually refresh the discovery group on resync
obj := &unstructured.Unstructured{}
obj.SetUnstructuredContent(map[string]interface{}{
"apiVersion": "apiextensions.k8s.io/v1",
"kind": "CustomResourceDefinition",
"metadata": map[string]any{
"name": crd.Name,
},
"spec": map[string]interface{}{
"names": map[string]any{
"categories": []string{"all"},
},
},
})
buf := bytes.NewBuffer(nil)
err = unstructured.UnstructuredJSONScheme.Encode(obj, buf)
require.NoError(t, err)
//Is there a better way to force crd resync?
_, err = client.ApiextensionsV1().CustomResourceDefinitions().Patch(
ctx,
crd.Name,
types.ApplyPatchType,
buf.Bytes(),
metav1.PatchOptions{
FieldManager: "test-manager",
},
)
require.NoError(t, err)
// Wait until the crd appears in discovery
expectedDiscovery := apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: basicTestGroup.Name,
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{
{
Version: "v1",
Resources: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "foos",
ResponseKind: &metav1.GroupVersionKind{
Group: basicTestGroup.Name,
Version: "v1",
Kind: "Foo",
},
Scope: apidiscoveryv2beta1.ScopeCluster,
SingularResource: crd.Spec.Names.Singular,
Verbs: []string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"},
Categories: []string{"all"},
},
},
//!TODO: set freshness of builtin/crds
Freshness: "",
},
},
}
require.NoError(t, WaitForGroups(ctx, client, expectedDiscovery))
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
)
type FakeService interface {
Start(t *testing.T, ctx context.Context)
Port() *int32
Name() string
}
// Creates and registers an in-process Service capable of communicating with the
// kubernetes integration test apiserver
type fakeService struct {
name string
client kubernetes.Interface
handler http.Handler
lock sync.RWMutex
activePort *int32
}
func NewFakeService(name string, client kubernetes.Interface, handler http.Handler) *fakeService {
return &fakeService{
name: name,
client: client,
handler: handler,
}
}
func (f *fakeService) run(ctx context.Context) error {
aggregatedServer := httptest.NewUnstartedServer(f.handler)
aggregatedServer.StartTLS()
defer aggregatedServer.Close()
serverURL, err := url.Parse(aggregatedServer.URL)
if err != nil {
// This should never occur
panic(err)
}
serverPort, err := strconv.Atoi(serverURL.Port())
if err != nil {
// This should never occur
panic(err)
}
port := int32(serverPort)
// Install service into the cluster
service, err := f.client.CoreV1().Services("default").Apply(
ctx,
corev1apply.Service(f.name, "default").
WithSpec(corev1apply.ServiceSpec().
WithPorts(
corev1apply.ServicePort().
WithPort(port)).
WithType("ExternalName").
WithExternalName("localhost")),
metav1.ApplyOptions{
FieldManager: "test-manager",
},
)
if err != nil {
return err
}
f.lock.Lock()
f.activePort = &port
f.lock.Unlock()
<-ctx.Done()
f.lock.Lock()
f.activePort = nil
f.lock.Unlock()
// Uninstall service from the cluser
err = f.client.CoreV1().Services("default").Delete(ctx, service.Name, metav1.DeleteOptions{})
return err
}
func (f *fakeService) Start(t *testing.T, ctx context.Context) {
go func() {
err := f.run(ctx)
if errors.Is(err, context.Canceled) {
err = nil
}
require.NoError(t, err)
}()
err := wait.PollWithContext(ctx, 1*time.Second, 200*time.Millisecond, func(ctx context.Context) (done bool, err error) {
return f.Port() != nil, nil
})
if errors.Is(err, context.Canceled) {
err = nil
}
require.NoError(t, err, "service should have come alive in a reasonable amount of time")
}
func (f *fakeService) Port() *int32 {
// Returns the port of the server if it is running or nil
// if it is not running
f.lock.RLock()
defer f.lock.RUnlock()
return f.activePort
}
func (f *fakeService) Name() string {
return f.name
}

1
vendor/modules.txt vendored
View File

@ -1501,6 +1501,7 @@ k8s.io/apiserver/pkg/cel/metrics
k8s.io/apiserver/pkg/endpoints
k8s.io/apiserver/pkg/endpoints/deprecation
k8s.io/apiserver/pkg/endpoints/discovery
k8s.io/apiserver/pkg/endpoints/discovery/aggregated
k8s.io/apiserver/pkg/endpoints/filterlatency
k8s.io/apiserver/pkg/endpoints/filters
k8s.io/apiserver/pkg/endpoints/handlers