CRD Conversion
This commit is contained in:
@@ -28,6 +28,7 @@ import (
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
genericoptions "k8s.io/apiserver/pkg/server/options"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
kubeexternalinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
)
|
||||
@@ -38,6 +39,8 @@ func createAPIExtensionsConfig(
|
||||
pluginInitializers []admission.PluginInitializer,
|
||||
commandOptions *options.ServerRunOptions,
|
||||
masterCount int,
|
||||
serviceResolver webhook.ServiceResolver,
|
||||
authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
|
||||
) (*apiextensionsapiserver.Config, error) {
|
||||
// make a shallow copy to let us twiddle a few things
|
||||
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
|
||||
@@ -74,6 +77,8 @@ func createAPIExtensionsConfig(
|
||||
ExtraConfig: apiextensionsapiserver.ExtraConfig{
|
||||
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
|
||||
MasterCount: masterCount,
|
||||
AuthResolverWrapper: authResolverWrapper,
|
||||
ServiceResolver: serviceResolver,
|
||||
},
|
||||
}
|
||||
|
||||
|
@@ -165,7 +165,8 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
|
||||
}
|
||||
|
||||
// If additional API servers are added, they should be gated.
|
||||
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
|
||||
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
|
||||
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/install"
|
||||
@@ -78,6 +79,11 @@ type ExtraConfig struct {
|
||||
// MasterCount is used to detect whether cluster is HA, and if it is
|
||||
// the CRD Establishing will be hold by 5 seconds.
|
||||
MasterCount int
|
||||
|
||||
// ServiceResolver is used in CR webhook converters to resolve webhook's service names
|
||||
ServiceResolver webhook.ServiceResolver
|
||||
// AuthResolverWrapper is used in CR webhook converters
|
||||
AuthResolverWrapper webhook.AuthenticationInfoResolverWrapper
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -167,7 +173,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
delegate: delegateHandler,
|
||||
}
|
||||
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
|
||||
crdHandler := NewCustomResourceDefinitionHandler(
|
||||
crdHandler, err := NewCustomResourceDefinitionHandler(
|
||||
versionDiscoveryHandler,
|
||||
groupDiscoveryHandler,
|
||||
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
|
||||
@@ -175,8 +181,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
c.ExtraConfig.CRDRESTOptionsGetter,
|
||||
c.GenericConfig.AdmissionControl,
|
||||
establishingController,
|
||||
c.ExtraConfig.ServiceResolver,
|
||||
c.ExtraConfig.AuthResolverWrapper,
|
||||
c.ExtraConfig.MasterCount,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
|
||||
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
|
||||
|
||||
|
@@ -20,39 +20,74 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
)
|
||||
|
||||
// NewCRDConverter returns a new CRD converter based on the conversion settings in crd object.
|
||||
func NewCRDConverter(crd *apiextensions.CustomResourceDefinition) (safe, unsafe runtime.ObjectConvertor) {
|
||||
// CRConverterFactory is the factory for all CR converters.
|
||||
type CRConverterFactory struct {
|
||||
// webhookConverterFactory is the factory for webhook converters.
|
||||
// This field should not be used if CustomResourceWebhookConversion feature is disabled.
|
||||
webhookConverterFactory *webhookConverterFactory
|
||||
}
|
||||
|
||||
// NewCRConverterFactory creates a new CRConverterFactory
|
||||
func NewCRConverterFactory(serviceResolver webhook.ServiceResolver, authResolverWrapper webhook.AuthenticationInfoResolverWrapper) (*CRConverterFactory, error) {
|
||||
converterFactory := &CRConverterFactory{}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceWebhookConversion) {
|
||||
webhookConverterFactory, err := newWebhookConverterFactory(serviceResolver, authResolverWrapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
converterFactory.webhookConverterFactory = webhookConverterFactory
|
||||
}
|
||||
return converterFactory, nil
|
||||
}
|
||||
|
||||
// NewConverter returns a new CR converter based on the conversion settings in crd object.
|
||||
func (m *CRConverterFactory) NewConverter(crd *apiextensions.CustomResourceDefinition) (safe, unsafe runtime.ObjectConvertor, err error) {
|
||||
validVersions := map[schema.GroupVersion]bool{}
|
||||
for _, version := range crd.Spec.Versions {
|
||||
validVersions[schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}] = true
|
||||
}
|
||||
|
||||
// The only converter right now is nopConverter. More converters will be returned based on the
|
||||
// CRD object when they introduced.
|
||||
unsafe = &crdConverter{
|
||||
switch crd.Spec.Conversion.Strategy {
|
||||
case apiextensions.NoneConverter:
|
||||
unsafe = &crConverter{
|
||||
clusterScoped: crd.Spec.Scope == apiextensions.ClusterScoped,
|
||||
delegate: &nopConverter{
|
||||
validVersions: validVersions,
|
||||
},
|
||||
}
|
||||
return &safeConverterWrapper{unsafe}, unsafe
|
||||
return &safeConverterWrapper{unsafe}, unsafe, nil
|
||||
case apiextensions.WebhookConverter:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceWebhookConversion) {
|
||||
return nil, nil, fmt.Errorf("webhook conversion is disabled on this cluster")
|
||||
}
|
||||
unsafe, err := m.webhookConverterFactory.NewWebhookConverter(validVersions, crd)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return &safeConverterWrapper{unsafe}, unsafe, nil
|
||||
}
|
||||
|
||||
return nil, nil, fmt.Errorf("unknown conversion strategy %q for CRD %s", crd.Spec.Conversion.Strategy, crd.Name)
|
||||
}
|
||||
|
||||
var _ runtime.ObjectConvertor = &crdConverter{}
|
||||
var _ runtime.ObjectConvertor = &crConverter{}
|
||||
|
||||
// crdConverter extends the delegate with generic CRD conversion behaviour. The delegate will implement the
|
||||
// crConverter extends the delegate with generic CR conversion behaviour. The delegate will implement the
|
||||
// user defined conversion strategy given in the CustomResourceDefinition.
|
||||
type crdConverter struct {
|
||||
type crConverter struct {
|
||||
delegate runtime.ObjectConvertor
|
||||
clusterScoped bool
|
||||
}
|
||||
|
||||
func (c *crdConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, value string) (string, string, error) {
|
||||
func (c *crConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, value string) (string, string, error) {
|
||||
// We currently only support metadata.namespace and metadata.name.
|
||||
switch {
|
||||
case label == "metadata.name":
|
||||
@@ -64,12 +99,12 @@ func (c *crdConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, val
|
||||
}
|
||||
}
|
||||
|
||||
func (c *crdConverter) Convert(in, out, context interface{}) error {
|
||||
func (c *crConverter) Convert(in, out, context interface{}) error {
|
||||
return c.delegate.Convert(in, out, context)
|
||||
}
|
||||
|
||||
// ConvertToVersion converts in object to the given gvk in place and returns the same `in` object.
|
||||
func (c *crdConverter) ConvertToVersion(in runtime.Object, target runtime.GroupVersioner) (runtime.Object, error) {
|
||||
func (c *crConverter) ConvertToVersion(in runtime.Object, target runtime.GroupVersioner) (runtime.Object, error) {
|
||||
// Run the converter on the list items instead of list itself
|
||||
if list, ok := in.(*unstructured.UnstructuredList); ok {
|
||||
for i := range list.Items {
|
||||
|
@@ -49,11 +49,11 @@ func (c *nopConverter) Convert(in, out, context interface{}) error {
|
||||
|
||||
outGVK := unstructOut.GroupVersionKind()
|
||||
if !c.validVersions[outGVK.GroupVersion()] {
|
||||
return fmt.Errorf("request to convert CRD from an invalid group/version: %s", outGVK.String())
|
||||
return fmt.Errorf("request to convert CR from an invalid group/version: %s", outGVK.String())
|
||||
}
|
||||
inGVK := unstructIn.GroupVersionKind()
|
||||
if !c.validVersions[inGVK.GroupVersion()] {
|
||||
return fmt.Errorf("request to convert CRD to an invalid group/version: %s", inGVK.String())
|
||||
return fmt.Errorf("request to convert CR to an invalid group/version: %s", inGVK.String())
|
||||
}
|
||||
|
||||
unstructOut.SetUnstructuredContent(unstructIn.UnstructuredContent())
|
||||
@@ -72,7 +72,7 @@ func (c *nopConverter) ConvertToVersion(in runtime.Object, target runtime.GroupV
|
||||
return nil, fmt.Errorf("%v is unstructured and is not suitable for converting to %q", kind, target)
|
||||
}
|
||||
if !c.validVersions[gvk.GroupVersion()] {
|
||||
return nil, fmt.Errorf("request to convert CRD to an invalid group/version: %s", gvk.String())
|
||||
return nil, fmt.Errorf("request to convert CR to an invalid group/version: %s", gvk.String())
|
||||
}
|
||||
in.GetObjectKind().SetGroupVersionKind(gvk)
|
||||
return in, nil
|
||||
|
@@ -0,0 +1,350 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package conversion
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
internal "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
)
|
||||
|
||||
type webhookConverterFactory struct {
|
||||
clientManager webhook.ClientManager
|
||||
}
|
||||
|
||||
func newWebhookConverterFactory(serviceResolver webhook.ServiceResolver, authResolverWrapper webhook.AuthenticationInfoResolverWrapper) (*webhookConverterFactory, error) {
|
||||
clientManager, err := webhook.NewClientManager(v1beta1.SchemeGroupVersion, v1beta1.AddToScheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
authInfoResolver, err := webhook.NewDefaultAuthenticationInfoResolver("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Set defaults which may be overridden later.
|
||||
clientManager.SetAuthenticationInfoResolver(authInfoResolver)
|
||||
clientManager.SetAuthenticationInfoResolverWrapper(authResolverWrapper)
|
||||
clientManager.SetServiceResolver(serviceResolver)
|
||||
return &webhookConverterFactory{clientManager}, nil
|
||||
}
|
||||
|
||||
// webhookConverter is a converter that calls an external webhook to do the CR conversion.
|
||||
type webhookConverter struct {
|
||||
validVersions map[schema.GroupVersion]bool
|
||||
clientManager webhook.ClientManager
|
||||
restClient *rest.RESTClient
|
||||
name string
|
||||
nopConverter nopConverter
|
||||
}
|
||||
|
||||
func webhookClientConfigForCRD(crd *internal.CustomResourceDefinition) *webhook.ClientConfig {
|
||||
apiConfig := crd.Spec.Conversion.WebhookClientConfig
|
||||
ret := webhook.ClientConfig{
|
||||
Name: fmt.Sprintf("conversion_webhook_for_%s", crd.Name),
|
||||
CABundle: apiConfig.CABundle,
|
||||
}
|
||||
if apiConfig.URL != nil {
|
||||
ret.URL = *apiConfig.URL
|
||||
}
|
||||
if apiConfig.Service != nil {
|
||||
ret.Service = &webhook.ClientConfigService{
|
||||
Name: apiConfig.Service.Name,
|
||||
Namespace: apiConfig.Service.Namespace,
|
||||
}
|
||||
if apiConfig.Service.Path != nil {
|
||||
ret.Service.Path = *apiConfig.Service.Path
|
||||
}
|
||||
}
|
||||
return &ret
|
||||
}
|
||||
|
||||
var _ runtime.ObjectConvertor = &webhookConverter{}
|
||||
|
||||
func (f *webhookConverterFactory) NewWebhookConverter(validVersions map[schema.GroupVersion]bool, crd *internal.CustomResourceDefinition) (*webhookConverter, error) {
|
||||
restClient, err := f.clientManager.HookClient(*webhookClientConfigForCRD(crd))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &webhookConverter{
|
||||
clientManager: f.clientManager,
|
||||
validVersions: validVersions,
|
||||
restClient: restClient,
|
||||
name: crd.Name,
|
||||
nopConverter: nopConverter{validVersions: validVersions},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (webhookConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, value string) (string, string, error) {
|
||||
return "", "", errors.New("unstructured cannot convert field labels")
|
||||
}
|
||||
|
||||
func (c *webhookConverter) Convert(in, out, context interface{}) error {
|
||||
unstructIn, ok := in.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return fmt.Errorf("input type %T in not valid for unstructured conversion", in)
|
||||
}
|
||||
|
||||
unstructOut, ok := out.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return fmt.Errorf("output type %T in not valid for unstructured conversion", out)
|
||||
}
|
||||
|
||||
outGVK := unstructOut.GroupVersionKind()
|
||||
if !c.validVersions[outGVK.GroupVersion()] {
|
||||
return fmt.Errorf("request to convert CR from an invalid group/version: %s", outGVK.String())
|
||||
}
|
||||
inGVK := unstructIn.GroupVersionKind()
|
||||
if !c.validVersions[inGVK.GroupVersion()] {
|
||||
return fmt.Errorf("request to convert CR to an invalid group/version: %s", inGVK.String())
|
||||
}
|
||||
|
||||
converted, err := c.ConvertToVersion(unstructIn, outGVK.GroupVersion())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unstructuredConverted, ok := converted.(runtime.Unstructured)
|
||||
if !ok {
|
||||
// this should not happened
|
||||
return fmt.Errorf("CR conversion failed")
|
||||
}
|
||||
unstructOut.SetUnstructuredContent(unstructuredConverted.UnstructuredContent())
|
||||
return nil
|
||||
}
|
||||
|
||||
func createConversionReview(obj runtime.Object, apiVersion string) *v1beta1.ConversionReview {
|
||||
listObj, isList := obj.(*unstructured.UnstructuredList)
|
||||
var objects []runtime.RawExtension
|
||||
if isList {
|
||||
for i := 0; i < len(listObj.Items); i++ {
|
||||
// Only sent item for conversion, if the apiVersion is different
|
||||
if listObj.Items[i].GetAPIVersion() != apiVersion {
|
||||
objects = append(objects, runtime.RawExtension{Object: &listObj.Items[i]})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if obj.GetObjectKind().GroupVersionKind().GroupVersion().String() != apiVersion {
|
||||
objects = []runtime.RawExtension{{Object: obj}}
|
||||
}
|
||||
}
|
||||
return &v1beta1.ConversionReview{
|
||||
Request: &v1beta1.ConversionRequest{
|
||||
Objects: objects,
|
||||
DesiredAPIVersion: apiVersion,
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Response: &v1beta1.ConversionResponse{},
|
||||
}
|
||||
}
|
||||
|
||||
func getRawExtensionObject(rx runtime.RawExtension) (runtime.Object, error) {
|
||||
if rx.Object != nil {
|
||||
return rx.Object, nil
|
||||
}
|
||||
u := unstructured.Unstructured{}
|
||||
err := u.UnmarshalJSON(rx.Raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &u, nil
|
||||
}
|
||||
|
||||
// getTargetGroupVersion returns group/version which should be used to convert in objects to.
|
||||
// String version of the return item is APIVersion.
|
||||
func getTargetGroupVersion(in runtime.Object, target runtime.GroupVersioner) (schema.GroupVersion, error) {
|
||||
fromGVK := in.GetObjectKind().GroupVersionKind()
|
||||
toGVK, ok := target.KindForGroupVersionKinds([]schema.GroupVersionKind{fromGVK})
|
||||
if !ok {
|
||||
// TODO: should this be a typed error?
|
||||
return schema.GroupVersion{}, fmt.Errorf("%v is unstructured and is not suitable for converting to %q", fromGVK.String(), target)
|
||||
}
|
||||
return toGVK.GroupVersion(), nil
|
||||
}
|
||||
|
||||
func (c *webhookConverter) ConvertToVersion(in runtime.Object, target runtime.GroupVersioner) (runtime.Object, error) {
|
||||
// In general, the webhook should not do any defaulting or validation. A special case of that is an empty object
|
||||
// conversion that must result an empty object and practically is the same as nopConverter.
|
||||
// A smoke test in API machinery calls the converter on empty objects. As this case happens consistently
|
||||
// it special cased here not to call webhook converter. The test initiated here:
|
||||
// https://github.com/kubernetes/kubernetes/blob/dbb448bbdcb9e440eee57024ffa5f1698956a054/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go#L201
|
||||
if isEmptyUnstructuredObject(in) {
|
||||
return c.nopConverter.ConvertToVersion(in, target)
|
||||
}
|
||||
|
||||
toGV, err := getTargetGroupVersion(in, target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !c.validVersions[toGV] {
|
||||
return nil, fmt.Errorf("request to convert CR to an invalid group/version: %s", toGV.String())
|
||||
}
|
||||
fromGV := in.GetObjectKind().GroupVersionKind().GroupVersion()
|
||||
if !c.validVersions[fromGV] {
|
||||
return nil, fmt.Errorf("request to convert CR from an invalid group/version: %s", fromGV.String())
|
||||
}
|
||||
listObj, isList := in.(*unstructured.UnstructuredList)
|
||||
if isList {
|
||||
for i, item := range listObj.Items {
|
||||
fromGV := item.GroupVersionKind().GroupVersion()
|
||||
if !c.validVersions[fromGV] {
|
||||
return nil, fmt.Errorf("input list has invalid group/version `%v` at `%v` index", fromGV, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
request := createConversionReview(in, toGV.String())
|
||||
if len(request.Request.Objects) == 0 {
|
||||
if !isList {
|
||||
return in, nil
|
||||
}
|
||||
out := listObj.DeepCopy()
|
||||
out.SetAPIVersion(toGV.String())
|
||||
return out, nil
|
||||
}
|
||||
response := &v1beta1.ConversionReview{}
|
||||
// TODO: Figure out if adding one second timeout make sense here.
|
||||
ctx := context.TODO()
|
||||
r := c.restClient.Post().Context(ctx).Body(request).Do()
|
||||
if err := r.Into(response); err != nil {
|
||||
// TODO: Return a webhook specific error to be able to convert it to meta.Status
|
||||
return nil, fmt.Errorf("calling to conversion webhook failed for %s: %v", c.name, err)
|
||||
}
|
||||
|
||||
if response.Response == nil {
|
||||
// TODO: Return a webhook specific error to be able to convert it to meta.Status
|
||||
return nil, fmt.Errorf("conversion webhook response was absent for %s", c.name)
|
||||
}
|
||||
|
||||
if response.Response.Result.Status != v1.StatusSuccess {
|
||||
// TODO return status message as error
|
||||
return nil, fmt.Errorf("conversion request failed for %v, Response: %v", in.GetObjectKind(), response)
|
||||
}
|
||||
|
||||
if len(response.Response.ConvertedObjects) != len(request.Request.Objects) {
|
||||
return nil, fmt.Errorf("expected %v converted objects, got %v", len(request.Request.Objects), len(response.Response.ConvertedObjects))
|
||||
}
|
||||
|
||||
if isList {
|
||||
convertedList := listObj.DeepCopy()
|
||||
// Collection of items sent for conversion is different than list items
|
||||
// because only items that needed conversion has been sent.
|
||||
convertedIndex := 0
|
||||
for i := 0; i < len(listObj.Items); i++ {
|
||||
if listObj.Items[i].GetAPIVersion() == toGV.String() {
|
||||
// This item has not been sent for conversion, skip it.
|
||||
continue
|
||||
}
|
||||
converted, err := getRawExtensionObject(response.Response.ConvertedObjects[convertedIndex])
|
||||
convertedIndex++
|
||||
original := listObj.Items[i]
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid converted object at index %v: %v", convertedIndex, err)
|
||||
}
|
||||
if e, a := toGV, converted.GetObjectKind().GroupVersionKind().GroupVersion(); e != a {
|
||||
return nil, fmt.Errorf("invalid converted object at index %v: invalid groupVersion, e=%v, a=%v", convertedIndex, e, a)
|
||||
}
|
||||
if e, a := original.GetObjectKind().GroupVersionKind().Kind, converted.GetObjectKind().GroupVersionKind().Kind; e != a {
|
||||
return nil, fmt.Errorf("invalid converted object at index %v: invalid kind, e=%v, a=%v", convertedIndex, e, a)
|
||||
}
|
||||
unstructConverted, ok := converted.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
// this should not happened
|
||||
return nil, fmt.Errorf("CR conversion failed")
|
||||
}
|
||||
if err := validateConvertedObject(&listObj.Items[i], unstructConverted); err != nil {
|
||||
return nil, fmt.Errorf("invalid converted object at index %v: %v", convertedIndex, err)
|
||||
}
|
||||
convertedList.Items[i] = *unstructConverted
|
||||
}
|
||||
convertedList.SetAPIVersion(toGV.String())
|
||||
return convertedList, nil
|
||||
}
|
||||
|
||||
if len(response.Response.ConvertedObjects) != 1 {
|
||||
// This should not happened
|
||||
return nil, fmt.Errorf("CR conversion failed")
|
||||
}
|
||||
converted, err := getRawExtensionObject(response.Response.ConvertedObjects[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if e, a := toGV, converted.GetObjectKind().GroupVersionKind().GroupVersion(); e != a {
|
||||
return nil, fmt.Errorf("invalid converted object: invalid groupVersion, e=%v, a=%v", e, a)
|
||||
}
|
||||
if e, a := in.GetObjectKind().GroupVersionKind().Kind, converted.GetObjectKind().GroupVersionKind().Kind; e != a {
|
||||
return nil, fmt.Errorf("invalid converted object: invalid kind, e=%v, a=%v", e, a)
|
||||
}
|
||||
unstructConverted, ok := converted.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
// this should not happened
|
||||
return nil, fmt.Errorf("CR conversion failed")
|
||||
}
|
||||
unstructIn, ok := in.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
// this should not happened
|
||||
return nil, fmt.Errorf("CR conversion failed")
|
||||
}
|
||||
if err := validateConvertedObject(unstructIn, unstructConverted); err != nil {
|
||||
return nil, fmt.Errorf("invalid converted object: %v", err)
|
||||
}
|
||||
return converted, nil
|
||||
}
|
||||
|
||||
func validateConvertedObject(unstructIn, unstructOut *unstructured.Unstructured) error {
|
||||
if e, a := unstructIn.GetKind(), unstructOut.GetKind(); e != a {
|
||||
return fmt.Errorf("must have the same kind: %v != %v", e, a)
|
||||
}
|
||||
if e, a := unstructIn.GetName(), unstructOut.GetName(); e != a {
|
||||
return fmt.Errorf("must have the same name: %v != %v", e, a)
|
||||
}
|
||||
if e, a := unstructIn.GetNamespace(), unstructOut.GetNamespace(); e != a {
|
||||
return fmt.Errorf("must have the same namespace: %v != %v", e, a)
|
||||
}
|
||||
if e, a := unstructIn.GetUID(), unstructOut.GetUID(); e != a {
|
||||
return fmt.Errorf("must have the same UID: %v != %v", e, a)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// isEmptyUnstructuredObject returns true if in is an empty unstructured object, i.e. an unstructured object that does
|
||||
// not have any field except apiVersion and kind.
|
||||
func isEmptyUnstructuredObject(in runtime.Object) bool {
|
||||
u, ok := in.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if len(u.Object) != 2 {
|
||||
return false
|
||||
}
|
||||
if _, ok := u.Object["kind"]; !ok {
|
||||
return false
|
||||
}
|
||||
if _, ok := u.Object["apiVersion"]; !ok {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
@@ -67,6 +67,7 @@ import (
|
||||
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
|
||||
"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
|
||||
"k8s.io/apiextensions-apiserver/pkg/registry/customresource/tableconvertor"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
)
|
||||
|
||||
// crdHandler serves the `/apis` endpoint.
|
||||
@@ -93,6 +94,8 @@ type crdHandler struct {
|
||||
// MasterCount is used to implement sleep to improve
|
||||
// CRD establishing process for HA clusters.
|
||||
masterCount int
|
||||
|
||||
converterFactory *conversion.CRConverterFactory
|
||||
}
|
||||
|
||||
// crdInfo stores enough information to serve the storage for the custom resource
|
||||
@@ -129,7 +132,9 @@ func NewCustomResourceDefinitionHandler(
|
||||
restOptionsGetter generic.RESTOptionsGetter,
|
||||
admission admission.Interface,
|
||||
establishingController *establish.EstablishingController,
|
||||
masterCount int) *crdHandler {
|
||||
serviceResolver webhook.ServiceResolver,
|
||||
authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
|
||||
masterCount int) (*crdHandler, error) {
|
||||
ret := &crdHandler{
|
||||
versionDiscoveryHandler: versionDiscoveryHandler,
|
||||
groupDiscoveryHandler: groupDiscoveryHandler,
|
||||
@@ -147,10 +152,15 @@ func NewCustomResourceDefinitionHandler(
|
||||
ret.removeDeadStorage()
|
||||
},
|
||||
})
|
||||
crConverterFactory, err := conversion.NewCRConverterFactory(serviceResolver, authResolverWrapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret.converterFactory = crConverterFactory
|
||||
|
||||
ret.customStorage.Store(crdStorageMap{})
|
||||
|
||||
return ret
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
@@ -433,7 +443,10 @@ func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResource
|
||||
scaleScopes := map[string]handlers.RequestScope{}
|
||||
|
||||
for _, v := range crd.Spec.Versions {
|
||||
safeConverter, unsafeConverter := conversion.NewCRDConverter(crd)
|
||||
safeConverter, unsafeConverter, err := r.converterFactory.NewConverter(crd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
|
||||
// decode unversioned Options objects, so we delegate to parameterScheme for such types.
|
||||
parameterScheme := runtime.NewScheme()
|
||||
|
@@ -79,14 +79,24 @@ func TestConvertFieldLabel(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
|
||||
crd := apiextensions.CustomResourceDefinition{}
|
||||
crd := apiextensions.CustomResourceDefinition{
|
||||
Spec: apiextensions.CustomResourceDefinitionSpec{
|
||||
Conversion: &apiextensions.CustomResourceConversion{
|
||||
Strategy: "None",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if test.clusterScoped {
|
||||
crd.Spec.Scope = apiextensions.ClusterScoped
|
||||
} else {
|
||||
crd.Spec.Scope = apiextensions.NamespaceScoped
|
||||
}
|
||||
_, c := conversion.NewCRDConverter(&crd)
|
||||
f, err := conversion.NewCRConverterFactory(nil, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, c, err := f.NewConverter(&crd)
|
||||
|
||||
label, value, err := c.ConvertFieldLabel(schema.GroupVersionKind{}, test.label, "value")
|
||||
if e, a := test.expectError, err != nil; e != a {
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
@@ -30,6 +31,9 @@ import (
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
genericoptions "k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/apiserver/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/util/webhook"
|
||||
"k8s.io/client-go/listers/core/v1"
|
||||
)
|
||||
|
||||
const defaultEtcdPathPrefix = "/registry/apiextensions.kubernetes.io"
|
||||
@@ -94,6 +98,8 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err
|
||||
GenericConfig: serverConfig,
|
||||
ExtraConfig: apiserver.ExtraConfig{
|
||||
CRDRESTOptionsGetter: NewCRDRESTOptionsGetter(*o.RecommendedOptions.Etcd),
|
||||
ServiceResolver: &serviceResolver{serverConfig.SharedInformerFactory.Core().V1().Services().Lister()},
|
||||
AuthResolverWrapper: webhook.NewDefaultAuthenticationInfoResolverWrapper(nil, serverConfig.LoopbackClientConfig),
|
||||
},
|
||||
}
|
||||
return config, nil
|
||||
@@ -114,3 +120,11 @@ func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregi
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
type serviceResolver struct {
|
||||
services v1.ServiceLister
|
||||
}
|
||||
|
||||
func (r *serviceResolver) ResolveEndpoint(namespace, name string) (*url.URL, error) {
|
||||
return proxy.ResolveCluster(r.services, namespace, name)
|
||||
}
|
||||
|
@@ -76,8 +76,8 @@ func newNamespacedCustomResourceClient(ns string, client dynamic.Interface, crd
|
||||
return newNamespacedCustomResourceVersionedClient(ns, client, crd, crd.Spec.Versions[0].Name)
|
||||
}
|
||||
|
||||
// updateCustomResourceDefinitionWithRetry updates a CRD, retrying up to 5 times on version conflict errors.
|
||||
func updateCustomResourceDefinitionWithRetry(client clientset.Interface, name string, update func(*apiextensionsv1beta1.CustomResourceDefinition)) (*apiextensionsv1beta1.CustomResourceDefinition, error) {
|
||||
// UpdateCustomResourceDefinitionWithRetry updates a CRD, retrying up to 5 times on version conflict errors.
|
||||
func UpdateCustomResourceDefinitionWithRetry(client clientset.Interface, name string, update func(*apiextensionsv1beta1.CustomResourceDefinition)) (*apiextensionsv1beta1.CustomResourceDefinition, error) {
|
||||
for i := 0; i < 5; i++ {
|
||||
crd, err := client.ApiextensionsV1beta1().CustomResourceDefinitions().Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
@@ -445,7 +445,7 @@ func TestCRValidationOnCRDUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
// update the CRD to a less stricter schema
|
||||
_, err = updateCustomResourceDefinitionWithRetry(apiExtensionClient, "noxus.mygroup.example.com", func(crd *apiextensionsv1beta1.CustomResourceDefinition) {
|
||||
_, err = UpdateCustomResourceDefinitionWithRetry(apiExtensionClient, "noxus.mygroup.example.com", func(crd *apiextensionsv1beta1.CustomResourceDefinition) {
|
||||
validationSchema, err := getSchemaForVersion(crd, v.Name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@@ -64,7 +64,7 @@ func TestInternalVersionIsHandlerVersion(t *testing.T) {
|
||||
|
||||
// update validation via update because the cache priming in CreateNewCustomResourceDefinition will fail otherwise
|
||||
t.Logf("Updating CRD to validate apiVersion")
|
||||
noxuDefinition, err = updateCustomResourceDefinitionWithRetry(apiExtensionClient, noxuDefinition.Name, func(crd *apiextensionsv1beta1.CustomResourceDefinition) {
|
||||
noxuDefinition, err = UpdateCustomResourceDefinitionWithRetry(apiExtensionClient, noxuDefinition.Name, func(crd *apiextensionsv1beta1.CustomResourceDefinition) {
|
||||
crd.Spec.Validation = &apiextensionsv1beta1.CustomResourceValidation{
|
||||
OpenAPIV3Schema: &apiextensionsv1beta1.JSONSchemaProps{
|
||||
Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{
|
||||
|
@@ -164,6 +164,7 @@ func (a *Webhook) Dispatch(attr admission.Attributes) error {
|
||||
return admission.NewForbidden(attr, fmt.Errorf("not yet ready to handle request"))
|
||||
}
|
||||
hooks := a.hookSource.Webhooks()
|
||||
// TODO: Figure out if adding one second timeout make sense here.
|
||||
ctx := context.TODO()
|
||||
|
||||
var relevantHooks []*v1beta1.Webhook
|
||||
|
@@ -17,13 +17,13 @@ limitations under the License.
|
||||
package apimachinery
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/test/integration"
|
||||
"time"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
"k8s.io/apiextensions-apiserver/test/integration"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
Reference in New Issue
Block a user