Add OpenAPI v3 publishing under feature gate OpenAPIV3

This commit is contained in:
Jefftree 2021-10-26 11:11:59 -07:00
parent 42d8b2f3b9
commit 5bf3ed7a98
15 changed files with 1293 additions and 60 deletions

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
"k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema"
openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi"
openapiv3controller "k8s.io/apiextensions-apiserver/pkg/controller/openapiv3"
"k8s.io/apiextensions-apiserver/pkg/controller/status"
"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -41,10 +42,12 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/features"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
)
@ -218,6 +221,10 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
crdHandler,
)
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
var openapiv3Controller *openapiv3controller.Controller
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
}
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
@ -230,6 +237,9 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if s.GenericAPIServer.OpenAPIVersionedService != nil && s.GenericAPIServer.StaticOpenAPISpec != nil {
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
go namingController.Run(context.StopCh)

View File

@ -1361,7 +1361,8 @@ func buildOpenAPIModelsForApply(staticOpenAPISpec *spec.Swagger, crd *apiextensi
specs := []*spec.Swagger{}
for _, v := range crd.Spec.Versions {
// Defaults are not pruned here, but before being served.
s, err := builder.BuildSwagger(crd, v.Name, builder.Options{V2: false, StripValueValidation: true, StripNullable: true, AllowNonStructural: false})
// See flag description in builder.go for flag usage
s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true, SkipFilterSchemaForKubectlOpenAPIV2Validation: true, StripValueValidation: true, StripNullable: true, AllowNonStructural: false})
if err != nil {
return nil, err
}

View File

@ -43,7 +43,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
openapibuilder "k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/builder3"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/util"
"k8s.io/kube-openapi/pkg/validation/spec"
)
@ -52,10 +54,13 @@ const (
// Reference and Go types for built-in metadata
objectMetaSchemaRef = "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
listMetaSchemaRef = "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ListMeta"
listMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"
typeMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta"
definitionPrefix = "#/definitions/"
listMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"
typeMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta"
objectMetaType = "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"
definitionPrefix = "#/definitions/"
v3DefinitionPrefix = "#/components/schemas/"
)
var (
@ -66,6 +71,15 @@ var (
namespaceToken = "{namespace}"
)
// The path for definitions in OpenAPI v2 and v3 are different. Translate the path if necessary
// The provided schemaRef uses a v2 prefix and is converted to v3 if the v2 bool is false
func refForOpenAPIVersion(schemaRef string, v2 bool) string {
if v2 {
return schemaRef
}
return strings.Replace(schemaRef, definitionPrefix, v3DefinitionPrefix, 1)
}
var definitions map[string]common.OpenAPIDefinition
var buildDefinitions sync.Once
var namer *openapi.DefinitionNamer
@ -75,6 +89,12 @@ type Options struct {
// Convert to OpenAPI v2.
V2 bool
// Only takes effect if the flag and V2 and both set to true. If the condition is reached,
// publish OpenAPI V2 but skip running the spec through ToStructuralOpenAPIV2
// This prevents XPreserveUnknownFields:true fields from being cleared
// Used only by server side apply
SkipFilterSchemaForKubectlOpenAPIV2Validation bool
// Strip value validation.
StripValueValidation bool
@ -85,8 +105,7 @@ type Options struct {
AllowNonStructural bool
}
// BuildSwagger builds swagger for the given crd in the given version
func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec.Swagger, error) {
func generateBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*builder, error) {
var schema *structuralschema.Structural
s, err := apiextensionshelpers.GetSchemaForVersion(crd, version)
if err != nil {
@ -122,7 +141,7 @@ func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string,
// comes from function registerResourceHandlers() in k8s.io/apiserver.
// Alternatives are either (ideally) refactoring registerResourceHandlers() to
// reuse the code, or faking an APIInstaller for CR to feed to registerResourceHandlers().
b := newBuilder(crd, version, schema, opts.V2)
b := newBuilder(crd, version, schema, opts)
// Sample response types for building web service
sample := &CRDCanonicalTypeNamer{
@ -173,13 +192,26 @@ func BuildSwagger(crd *apiextensionsv1.CustomResourceDefinition, version string,
for _, route := range routes {
b.ws.Route(route)
}
return b, nil
}
openAPISpec, err := openapibuilder.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig())
func BuildOpenAPIV3(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec3.OpenAPI, error) {
b, err := generateBuilder(crd, version, opts)
if err != nil {
return nil, err
}
return openAPISpec, nil
return builder3.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig(false))
}
// BuildOpenAPIV2 builds OpenAPI v2 for the given crd in the given version
func BuildOpenAPIV2(crd *apiextensionsv1.CustomResourceDefinition, version string, opts Options) (*spec.Swagger, error) {
b, err := generateBuilder(crd, version, opts)
if err != nil {
return nil, err
}
return openapibuilder.BuildOpenAPISpec([]*restful.WebService{b.ws}, b.getOpenAPIConfig(true))
}
// Implements CanonicalTypeNamer
@ -349,26 +381,26 @@ func (b *builder) buildRoute(root, path, httpMethod, actionVerb, operationVerb s
// buildKubeNative builds input schema with Kubernetes' native object meta, type meta and
// extensions
func (b *builder) buildKubeNative(schema *structuralschema.Structural, v2 bool, crdPreserveUnknownFields bool) (ret *spec.Schema) {
func (b *builder) buildKubeNative(schema *structuralschema.Structural, opts Options, crdPreserveUnknownFields bool) (ret *spec.Schema) {
// only add properties if we have a schema. Otherwise, kubectl would (wrongly) assume additionalProperties=false
// and forbid anything outside of apiVersion, kind and metadata. We have to fix kubectl to stop doing this, e.g. by
// adding additionalProperties=true support to explicitly allow additional fields.
// TODO: fix kubectl to understand additionalProperties=true
if schema == nil || (v2 && (schema.XPreserveUnknownFields || crdPreserveUnknownFields)) {
if schema == nil || ((opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation) && (schema.XPreserveUnknownFields || crdPreserveUnknownFields)) {
ret = &spec.Schema{
SchemaProps: spec.SchemaProps{Type: []string{"object"}},
}
// no, we cannot add more properties here, not even TypeMeta/ObjectMeta because kubectl will complain about
// unknown fields for anything else.
} else {
if v2 {
if opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation {
schema = openapiv2.ToStructuralOpenAPIV2(schema)
}
ret = schema.ToKubeOpenAPI()
ret.SetProperty("metadata", *spec.RefSchema(objectMetaSchemaRef).
WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"]))
addTypeMetaProperties(ret)
addEmbeddedProperties(ret, v2)
ret.SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(objectMetaSchemaRef, opts.V2)).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"]))
addTypeMetaProperties(ret, opts.V2)
addEmbeddedProperties(ret, opts)
}
ret.AddExtension(endpoints.ROUTE_META_GVK, []interface{}{
map[string]interface{}{
@ -381,36 +413,36 @@ func (b *builder) buildKubeNative(schema *structuralschema.Structural, v2 bool,
return ret
}
func addEmbeddedProperties(s *spec.Schema, v2 bool) {
func addEmbeddedProperties(s *spec.Schema, opts Options) {
if s == nil {
return
}
for k := range s.Properties {
v := s.Properties[k]
addEmbeddedProperties(&v, v2)
addEmbeddedProperties(&v, opts)
s.Properties[k] = v
}
if s.Items != nil {
addEmbeddedProperties(s.Items.Schema, v2)
addEmbeddedProperties(s.Items.Schema, opts)
}
if s.AdditionalProperties != nil {
addEmbeddedProperties(s.AdditionalProperties.Schema, v2)
addEmbeddedProperties(s.AdditionalProperties.Schema, opts)
}
if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-preserve-unknown-fields"); ok && isTrue && v2 {
if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-preserve-unknown-fields"); ok && isTrue && opts.V2 && !opts.SkipFilterSchemaForKubectlOpenAPIV2Validation {
// don't add metadata properties if we're publishing to openapi v2 and are allowing unknown fields.
// adding these metadata properties makes kubectl refuse to validate unknown fields.
return
}
if isTrue, ok := s.VendorExtensible.Extensions.GetBool("x-kubernetes-embedded-resource"); ok && isTrue {
s.SetProperty("apiVersion", withDescription(getDefinition(typeMetaType).SchemaProps.Properties["apiVersion"],
s.SetProperty("apiVersion", withDescription(getDefinition(typeMetaType, opts.V2).SchemaProps.Properties["apiVersion"],
"apiVersion defines the versioned schema of this representation of an object. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
))
s.SetProperty("kind", withDescription(getDefinition(typeMetaType).SchemaProps.Properties["kind"],
s.SetProperty("kind", withDescription(getDefinition(typeMetaType, opts.V2).SchemaProps.Properties["kind"],
"kind is a string value representing the type of this object. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
))
s.SetProperty("metadata", *spec.RefSchema(objectMetaSchemaRef).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"]))
s.SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(objectMetaSchemaRef, opts.V2)).WithDescription(swaggerPartialObjectMetadataDescriptions["metadata"]))
req := sets.NewString(s.Required...)
if !req.Has("kind") {
@ -424,8 +456,8 @@ func addEmbeddedProperties(s *spec.Schema, v2 bool) {
// getDefinition gets definition for given Kubernetes type. This function is extracted from
// kube-openapi builder logic
func getDefinition(name string) spec.Schema {
buildDefinitions.Do(buildDefinitionsFunc)
func getDefinition(name string, v2 bool) spec.Schema {
buildDefinitions.Do(generateBuildDefinitionsFunc(v2))
return definitions[name].Schema
}
@ -433,31 +465,37 @@ func withDescription(s spec.Schema, desc string) spec.Schema {
return *s.WithDescription(desc)
}
func buildDefinitionsFunc() {
namer = openapi.NewDefinitionNamer(runtime.NewScheme())
definitions = utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(func(name string) spec.Ref {
defName, _ := namer.GetDefinitionName(name)
return spec.MustCreateRef(definitionPrefix + common.EscapeJsonPointer(defName))
})
func generateBuildDefinitionsFunc(v2 bool) func() {
return func() {
namer = openapi.NewDefinitionNamer(runtime.NewScheme())
definitions = utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(func(name string) spec.Ref {
defName, _ := namer.GetDefinitionName(name)
prefix := v3DefinitionPrefix
if v2 {
prefix = definitionPrefix
}
return spec.MustCreateRef(prefix + common.EscapeJsonPointer(defName))
})
}
}
// addTypeMetaProperties adds Kubernetes-specific type meta properties to input schema:
// apiVersion and kind
func addTypeMetaProperties(s *spec.Schema) {
s.SetProperty("apiVersion", getDefinition(typeMetaType).SchemaProps.Properties["apiVersion"])
s.SetProperty("kind", getDefinition(typeMetaType).SchemaProps.Properties["kind"])
func addTypeMetaProperties(s *spec.Schema, v2 bool) {
s.SetProperty("apiVersion", getDefinition(typeMetaType, v2).SchemaProps.Properties["apiVersion"])
s.SetProperty("kind", getDefinition(typeMetaType, v2).SchemaProps.Properties["kind"])
}
// buildListSchema builds the list kind schema for the CRD
func (b *builder) buildListSchema() *spec.Schema {
func (b *builder) buildListSchema(v2 bool) *spec.Schema {
name := definitionPrefix + util.ToRESTFriendlyName(fmt.Sprintf("%s/%s/%s", b.group, b.version, b.kind))
doc := fmt.Sprintf("List of %s. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md", b.plural)
s := new(spec.Schema).WithDescription(fmt.Sprintf("%s is a list of %s", b.listKind, b.kind)).
WithRequired("items").
SetProperty("items", *spec.ArrayProperty(spec.RefSchema(name)).WithDescription(doc)).
SetProperty("metadata", *spec.RefSchema(listMetaSchemaRef).WithDescription(swaggerPartialObjectMetadataListDescriptions["metadata"]))
SetProperty("metadata", *spec.RefSchema(refForOpenAPIVersion(listMetaSchemaRef, v2)).WithDescription(swaggerPartialObjectMetadataListDescriptions["metadata"]))
addTypeMetaProperties(s)
addTypeMetaProperties(s, v2)
s.AddExtension(endpoints.ROUTE_META_GVK, []map[string]string{
{
"group": b.group,
@ -469,7 +507,7 @@ func (b *builder) buildListSchema() *spec.Schema {
}
// getOpenAPIConfig builds config which wires up generated definitions for kube-openapi to consume
func (b *builder) getOpenAPIConfig() *common.Config {
func (b *builder) getOpenAPIConfig(v2 bool) *common.Config {
return &common.Config{
ProtocolList: []string{"https"},
Info: &spec.Info{
@ -487,13 +525,14 @@ func (b *builder) getOpenAPIConfig() *common.Config {
},
GetOperationIDAndTags: openapi.GetOperationIDAndTags,
GetDefinitionName: func(name string) (string, spec.Extensions) {
buildDefinitions.Do(buildDefinitionsFunc)
buildDefinitions.Do(generateBuildDefinitionsFunc(v2))
return namer.GetDefinitionName(name)
},
GetDefinitions: func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
def := utilopenapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)(ref)
def[fmt.Sprintf("%s/%s.%s", b.group, b.version, b.kind)] = common.OpenAPIDefinition{
Schema: *b.schema,
Schema: *b.schema,
Dependencies: []string{objectMetaType},
}
def[fmt.Sprintf("%s/%s.%s", b.group, b.version, b.listKind)] = common.OpenAPIDefinition{
Schema: *b.listSchema,
@ -503,7 +542,7 @@ func (b *builder) getOpenAPIConfig() *common.Config {
}
}
func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, schema *structuralschema.Structural, v2 bool) *builder {
func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, schema *structuralschema.Structural, opts Options) *builder {
b := &builder{
schema: &spec.Schema{
SchemaProps: spec.SchemaProps{Type: []string{"object"}},
@ -522,8 +561,8 @@ func newBuilder(crd *apiextensionsv1.CustomResourceDefinition, version string, s
}
// Pre-build schema with Kubernetes native properties
b.schema = b.buildKubeNative(schema, v2, crd.Spec.PreserveUnknownFields)
b.listSchema = b.buildListSchema()
b.schema = b.buildKubeNative(schema, opts, crd.Spec.PreserveUnknownFields)
b.listSchema = b.buildListSchema(opts.V2)
return b
}

View File

@ -45,37 +45,43 @@ func TestNewBuilder(t *testing.T) {
wantedSchema string
wantedItemsSchema string
v2 bool // produce OpenAPIv2
v2 bool // produce OpenAPIv2
skipFilterSchemaForKubectlOpenAPIV2Validation bool // produce OpenAPIv2 without going through the ToStructuralOpenAPIV2 path
}{
{
"nil",
"",
`{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`, `{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
true,
false,
},
{"with properties",
`{"type":"object","properties":{"spec":{"type":"object"},"status":{"type":"object"}}}`,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object"},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
true,
false,
},
{"type only",
`{"type":"object"}`,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
true,
false,
},
{"preserve unknown at root v2",
`{"type":"object","x-kubernetes-preserve-unknown-fields":true}`,
`{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
true,
false,
},
{"preserve unknown at root v3",
`{"type":"object","x-kubernetes-preserve-unknown-fields":true}`,
`{"type":"object","x-kubernetes-preserve-unknown-fields":true,"properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
false,
true,
true,
},
{"with extensions",
`
@ -179,6 +185,7 @@ func TestNewBuilder(t *testing.T) {
}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
true,
false,
},
{"with extensions as v3 schema",
`
@ -344,7 +351,8 @@ func TestNewBuilder(t *testing.T) {
"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]
}`,
`{"$ref":"#/definitions/io.k8s.bar.v1.Foo"}`,
false,
true,
true,
},
}
for _, tt := range tests {
@ -384,7 +392,7 @@ func TestNewBuilder(t *testing.T) {
},
Scope: apiextensionsv1.NamespaceScoped,
},
}, "v1", schema, tt.v2)
}, "v1", schema, Options{V2: tt.v2, SkipFilterSchemaForKubectlOpenAPIV2Validation: tt.skipFilterSchemaForKubectlOpenAPIV2Validation})
var wantedSchema, wantedItemsSchema spec.Schema
if err := json.Unmarshal([]byte(tt.wantedSchema), &wantedSchema); err != nil {
@ -500,7 +508,7 @@ func TestCRDRouteParameterBuilder(t *testing.T) {
},
},
}
swagger, err := BuildSwagger(testNamespacedCRD, testCRDVersion, Options{V2: true})
swagger, err := BuildOpenAPIV2(testNamespacedCRD, testCRDVersion, Options{V2: true})
require.NoError(t, err)
require.Equal(t, len(testCase.paths), len(swagger.Paths.Paths), testCase.scope)
for path, expected := range testCase.paths {
@ -567,7 +575,7 @@ func schemaDiff(a, b *spec.Schema) string {
return diff.StringDiff(string(as), string(bs))
}
func TestBuildSwagger(t *testing.T) {
func TestBuildOpenAPIV2(t *testing.T) {
tests := []struct {
name string
schema string
@ -622,7 +630,7 @@ func TestBuildSwagger(t *testing.T) {
`{"type":"object","properties":{"foo":{"type":"string","oneOf":[{"pattern":"a"},{"pattern":"b"}]}}}`,
nil,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"foo":{"type":"string","oneOf":[{"pattern":"a"},{"pattern":"b"}]}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
Options{V2: false},
Options{V2: true, SkipFilterSchemaForKubectlOpenAPIV2Validation: true},
},
}
for _, tt := range tests {
@ -642,7 +650,7 @@ func TestBuildSwagger(t *testing.T) {
}
// TODO: mostly copied from the test above. reuse code to cleanup
got, err := BuildSwagger(&apiextensionsv1.CustomResourceDefinition{
got, err := BuildOpenAPIV2(&apiextensionsv1.CustomResourceDefinition{
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "bar.k8s.io",
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
@ -691,3 +699,106 @@ func TestBuildSwagger(t *testing.T) {
})
}
}
func TestBuildOpenAPIV3(t *testing.T) {
tests := []struct {
name string
schema string
preserveUnknownFields *bool
wantedSchema string
opts Options
}{
{
"nil",
"",
nil,
`{"type":"object","x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
Options{},
},
{
"with properties",
`{"type":"object","properties":{"spec":{"type":"object"},"status":{"type":"object"}}}`,
nil,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object"},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
Options{},
},
{
"with v3 nullable field",
`{"type":"object","properties":{"spec":{"type":"object", "nullable": true},"status":{"type":"object"}}}`,
nil,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object", "nullable": true},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
Options{},
},
{
"with default not pruned for v3",
`{"type":"object","properties":{"spec":{"type":"object","properties":{"field":{"type":"string","default":"foo"}}},"status":{"type":"object"}}}`,
nil,
`{"type":"object","properties":{"apiVersion":{"type":"string"},"kind":{"type":"string"},"metadata":{"$ref":"#/components/schemas/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"},"spec":{"type":"object","properties":{"field":{"type":"string","default":"foo"}}},"status":{"type":"object"}},"x-kubernetes-group-version-kind":[{"group":"bar.k8s.io","kind":"Foo","version":"v1"}]}`,
Options{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var validation *apiextensionsv1.CustomResourceValidation
if len(tt.schema) > 0 {
v1Schema := &apiextensionsv1.JSONSchemaProps{}
if err := json.Unmarshal([]byte(tt.schema), &v1Schema); err != nil {
t.Fatal(err)
}
validation = &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: v1Schema,
}
}
if tt.preserveUnknownFields != nil && *tt.preserveUnknownFields {
validation.OpenAPIV3Schema.XPreserveUnknownFields = utilpointer.BoolPtr(true)
}
got, err := BuildOpenAPIV3(&apiextensionsv1.CustomResourceDefinition{
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "bar.k8s.io",
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Schema: validation,
},
},
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "foos",
Singular: "foo",
Kind: "Foo",
ListKind: "FooList",
},
Scope: apiextensionsv1.NamespaceScoped,
},
}, "v1", tt.opts)
if err != nil {
t.Fatal(err)
}
var wantedSchema spec.Schema
if err := json.Unmarshal([]byte(tt.wantedSchema), &wantedSchema); err != nil {
t.Fatal(err)
}
gotSchema := *got.Components.Schemas["io.k8s.bar.v1.Foo"]
gotProperties := properties(gotSchema.Properties)
wantedProperties := properties(wantedSchema.Properties)
if !gotProperties.Equal(wantedProperties) {
t.Fatalf("unexpected properties, got: %s, expected: %s", gotProperties.List(), wantedProperties.List())
}
// wipe out TypeMeta/ObjectMeta content, with those many lines of descriptions. We trust that they match here.
for _, metaField := range []string{"kind", "apiVersion", "metadata"} {
if _, found := gotSchema.Properties["kind"]; found {
prop := gotSchema.Properties[metaField]
prop.Description = ""
gotSchema.Properties[metaField] = prop
}
}
if !reflect.DeepEqual(&wantedSchema, &gotSchema) {
t.Errorf("unexpected schema: %s\nwant = %#v\ngot = %#v", schemaDiff(&wantedSchema, &gotSchema), &wantedSchema, &gotSchema)
}
})
}
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package builder
import (
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/aggregator"
"k8s.io/kube-openapi/pkg/spec3"
"k8s.io/kube-openapi/pkg/validation/spec"
)
@ -79,3 +81,51 @@ func mergeSpec(dest, source *spec.Swagger) {
dest.Paths.Paths[k] = v
}
}
// MergeSpecsV3 merges OpenAPI v3 specs for CRDs
// For V3, the static spec is never merged with the individual CRD specs so no conflict resolution is necessary
func MergeSpecsV3(crdSpecs ...*spec3.OpenAPI) (*spec3.OpenAPI, error) {
// create shallow copy of staticSpec, but replace paths and definitions because we modify them.
crdSpec := &spec3.OpenAPI{}
if len(crdSpecs) > 0 {
crdSpec.Version = crdSpecs[0].Version
crdSpec.Info = crdSpecs[0].Info
}
for _, s := range crdSpecs {
// merge specs without checking conflicts, since the naming controller prevents
// conflicts between user-defined CRDs
mergeSpecV3(crdSpec, s)
}
return crdSpec, nil
}
// mergeSpecV3 copies paths and definitions from source to dest, mutating dest, but not source.
// We assume that conflicts do not matter.
func mergeSpecV3(dest, source *spec3.OpenAPI) {
if source == nil || source.Paths == nil {
return
}
if dest.Paths == nil {
dest.Paths = &spec3.Paths{}
}
for k, v := range source.Components.Schemas {
if dest.Components == nil {
dest.Components = &spec3.Components{}
}
if dest.Components.Schemas == nil {
dest.Components.Schemas = map[string]*spec.Schema{}
}
if _, exists := dest.Components.Schemas[k]; exists {
klog.Warningf("Should not happen: OpenAPI V3 merge schema conflict on %s", k)
}
dest.Components.Schemas[k] = v
}
for k, v := range source.Paths.Paths {
if dest.Paths.Paths == nil {
dest.Paths.Paths = map[string]*spec3.Path{}
}
dest.Paths.Paths[k] = v
}
}

View File

@ -202,7 +202,7 @@ func buildVersionSpecs(crd *apiextensionsv1.CustomResourceDefinition, oldSpecs m
continue
}
// Defaults are not pruned here, but before being served.
spec, err := builder.BuildSwagger(crd, v.Name, builder.Options{V2: true})
spec, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{V2: true})
if err != nil {
return nil, false, err
}

View File

@ -0,0 +1,272 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapiv3
import (
"fmt"
"reflect"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
)
// Controller watches CustomResourceDefinitions and publishes OpenAPI v3
type Controller struct {
crdLister listers.CustomResourceDefinitionLister
crdsSynced cache.InformerSynced
// To allow injection for testing.
syncFn func(string) error
queue workqueue.RateLimitingInterface
openAPIV3Service *handler3.OpenAPIService
// specs per version and per CRD name
lock sync.Mutex
specsByGVandName map[schema.GroupVersion]map[string]*spec3.OpenAPI
}
// NewController creates a new Controller with input CustomResourceDefinition informer
func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
c := &Controller{
crdLister: crdInformer.Lister(),
crdsSynced: crdInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_v3_controller"),
specsByGVandName: map[schema.GroupVersion]map[string]*spec3.OpenAPI{},
}
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addCustomResourceDefinition,
UpdateFunc: c.updateCustomResourceDefinition,
DeleteFunc: c.deleteCustomResourceDefinition,
})
c.syncFn = c.sync
return c
}
// Run sets openAPIAggregationManager and starts workers
func (c *Controller) Run(openAPIV3Service *handler3.OpenAPIService, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
defer klog.Infof("Shutting down OpenAPI V3 controller")
klog.Infof("Starting OpenAPI V3 controller")
c.openAPIV3Service = openAPIV3Service
if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
return
}
for _, crd := range crds {
if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
continue
}
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
c.buildV3Spec(crd, crd.Name, v.Name)
}
}
// only start one worker thread since its a slow moving API
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
// log slow aggregations
start := time.Now()
defer func() {
elapsed := time.Since(start)
if elapsed > time.Second {
klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed)
}
}()
err := c.syncFn(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *Controller) sync(name string) error {
c.lock.Lock()
defer c.lock.Unlock()
crd, err := c.crdLister.Get(name)
if err != nil && !errors.IsNotFound(err) {
return err
}
if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
c.deleteCRD(name)
return nil
}
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
c.buildV3Spec(crd, name, v.Name)
}
return nil
}
func (c *Controller) deleteCRD(name string) {
for gv, crdListForGV := range c.specsByGVandName {
_, needOpenAPIUpdate := crdListForGV[name]
if needOpenAPIUpdate {
delete(crdListForGV, name)
if len(crdListForGV) == 0 {
delete(c.specsByGVandName, gv)
}
c.updateGroupVersion(gv)
}
}
}
func (c *Controller) updateGroupVersion(gv schema.GroupVersion) error {
if _, ok := c.specsByGVandName[gv]; !ok {
c.openAPIV3Service.DeleteGroupVersion(groupVersionToOpenAPIV3Path(gv))
return nil
}
var specs []*spec3.OpenAPI
for _, spec := range c.specsByGVandName[gv] {
specs = append(specs, spec)
}
mergedSpec, err := builder.MergeSpecsV3(specs...)
if err != nil {
return fmt.Errorf("failed to merge specs: %v", err)
}
c.openAPIV3Service.UpdateGroupVersion(groupVersionToOpenAPIV3Path(gv), mergedSpec)
return nil
}
func (c *Controller) updateCRDSpec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string, v3 *spec3.OpenAPI) error {
gv := schema.GroupVersion{
Group: crd.Spec.Group,
Version: versionName,
}
_, ok := c.specsByGVandName[gv]
if !ok {
c.specsByGVandName[gv] = map[string]*spec3.OpenAPI{}
}
oldSpec, ok := c.specsByGVandName[gv][name]
if ok {
if reflect.DeepEqual(oldSpec, v3) {
// no changes to CRD
return nil
}
}
c.specsByGVandName[gv][name] = v3
return c.updateGroupVersion(gv)
}
func (c *Controller) buildV3Spec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string) error {
v3, err := builder.BuildOpenAPIV3(crd, versionName, builder.Options{V2: false})
if err != nil {
return err
}
c.updateCRDSpec(crd, name, versionName, v3)
return nil
}
func (c *Controller) addCustomResourceDefinition(obj interface{}) {
castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
c.enqueue(castObj)
}
func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name)
c.enqueue(castNewObj)
}
func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
if !ok {
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
return
}
}
klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
c.enqueue(castObj)
}
func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
c.queue.Add(obj.Name)
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapiv3
import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
func groupVersionToOpenAPIV3Path(gv schema.GroupVersion) string {
return "apis/" + gv.Group + "/" + gv.Version
}

View File

@ -49,9 +49,10 @@ import (
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
openapibuilder "k8s.io/kube-openapi/pkg/builder"
openapibuilder2 "k8s.io/kube-openapi/pkg/builder"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
"k8s.io/kube-openapi/pkg/handler3"
openapiutil "k8s.io/kube-openapi/pkg/util"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"k8s.io/kube-openapi/pkg/validation/spec"
@ -144,6 +145,10 @@ type GenericAPIServer struct {
// It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true.
OpenAPIVersionedService *handler.OpenAPIService
// OpenAPIV3VersionedService controls the /openapi/v3 endpoint and can be used to update the served spec.
// It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true.
OpenAPIV3VersionedService *handler3.OpenAPIService
// StaticOpenAPISpec is the spec derived from the restful container endpoints.
// It is set during PrepareRun.
StaticOpenAPISpec *spec.Swagger
@ -345,7 +350,12 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
Config: s.openAPIConfig,
}.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
s.OpenAPIV3VersionedService = routes.OpenAPI{
Config: s.openAPIConfig,
}.InstallV3(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}
}
s.installHealthz()
@ -706,7 +716,7 @@ func (s *GenericAPIServer) getOpenAPIModels(apiPrefix string, apiGroupInfos ...*
}
// Build the openapi definitions for those resources and convert it to proto models
openAPISpec, err := openapibuilder.BuildOpenAPIDefinitionsForResources(s.openAPIConfig, resourceNames...)
openAPISpec, err := openapibuilder2.BuildOpenAPIDefinitionsForResources(s.openAPIConfig, resourceNames...)
if err != nil {
return nil, err
}

View File

@ -21,9 +21,11 @@ import (
"k8s.io/klog/v2"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/kube-openapi/pkg/builder"
builder2 "k8s.io/kube-openapi/pkg/builder"
"k8s.io/kube-openapi/pkg/builder3"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/validation/spec"
)
@ -33,8 +35,8 @@ type OpenAPI struct {
}
// Install adds the SwaggerUI webservice to the given mux.
func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (*handler.OpenAPIService, *spec.Swagger) {
spec, err := builder.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config)
func (oa OpenAPI) InstallV2(c *restful.Container, mux *mux.PathRecorderMux) (*handler.OpenAPIService, *spec.Swagger) {
spec, err := builder2.BuildOpenAPISpec(c.RegisteredWebServices(), oa.Config)
if err != nil {
klog.Fatalf("Failed to build open api spec for root: %v", err)
}
@ -51,3 +53,34 @@ func (oa OpenAPI) Install(c *restful.Container, mux *mux.PathRecorderMux) (*hand
return openAPIVersionedService, spec
}
// InstallV3 adds the static group/versions defined in the RegisteredWebServices to the OpenAPI v3 spec
func (oa OpenAPI) InstallV3(c *restful.Container, mux *mux.PathRecorderMux) *handler3.OpenAPIService {
openAPIVersionedService, err := handler3.NewOpenAPIService(nil)
if err != nil {
klog.Fatalf("Failed to create OpenAPIService: %v", err)
}
err = openAPIVersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", mux)
if err != nil {
klog.Fatalf("Failed to register versioned open api spec for root: %v", err)
}
grouped := make(map[string][]*restful.WebService)
for _, t := range c.RegisteredWebServices() {
// Strip the "/" prefix from the name
gvName := t.RootPath()[1:]
grouped[gvName] = []*restful.WebService{t}
}
for gv, ws := range grouped {
spec, err := builder3.BuildOpenAPISpec(ws, oa.Config)
if err != nil {
klog.Errorf("Failed to build OpenAPI v3 for group %s, %q", gv, err)
}
openAPIVersionedService.UpdateGroupVersion(gv, spec)
}
return openAPIVersionedService
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
@ -46,6 +47,8 @@ import (
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
)
@ -141,9 +144,12 @@ type APIAggregator struct {
// Enable swagger and/or OpenAPI if these configs are non-nil.
openAPIConfig *openapicommon.Config
// openAPIAggregationController downloads and merges OpenAPI specs.
// openAPIAggregationController downloads and merges OpenAPI v2 specs.
openAPIAggregationController *openapicontroller.AggregationController
// openAPIV3AggregationController downloads and caches OpenAPI v3 specs.
openAPIV3AggregationController *openapiv3controller.AggregationController
// egressSelector selects the proper egress dialer to communicate with the custom apiserver
// overwrites proxyTransport dialer if not nil
egressSelector *egressselector.EgressSelector
@ -344,6 +350,9 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
if s.openAPIConfig != nil {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
go s.openAPIAggregationController.Run(context.StopCh)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
go s.openAPIV3AggregationController.Run(context.StopCh)
}
return nil
})
}
@ -363,6 +372,18 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
return preparedAPIAggregator{}, err
}
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
specDownloaderV3 := openapiv3aggregator.NewDownloader()
openAPIV3Aggregator, err := openapiv3aggregator.BuildAndRegisterAggregator(
specDownloaderV3,
s.GenericAPIServer.NextDelegate(),
s.GenericAPIServer.Handler.NonGoRestfulMux)
if err != nil {
return preparedAPIAggregator{}, err
}
_ = openAPIV3Aggregator
s.openAPIV3AggregationController = openapiv3controller.NewAggregationController(openAPIV3Aggregator)
}
}
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
@ -382,6 +403,9 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
}
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService)
}
return nil
}
@ -403,6 +427,9 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
}
if s.openAPIV3AggregationController != nil {
s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
}
s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
@ -447,6 +474,9 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
if s.openAPIAggregationController != nil {
s.openAPIAggregationController.RemoveAPIService(apiServiceName)
}
if s.openAPIV3AggregationController != nil {
s.openAPIAggregationController.RemoveAPIService(apiServiceName)
}
delete(s.proxyHandlers, apiServiceName)
// TODO unregister group level discovery when there are no more versions for the group

View File

@ -0,0 +1,206 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
"k8s.io/apiserver/pkg/server"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler3"
"k8s.io/kube-openapi/pkg/spec3"
)
// SpecAggregator calls out to http handlers of APIServices and caches specs. It keeps state of the last
// known specs including the http etag.
// TODO(jefftree): remove the downloading and caching and proxy directly to the APIServices. This is possible because we
// don't have to merge here, which is cpu intensive in v2
type SpecAggregator interface {
AddUpdateAPIService(handler http.Handler, apiService *v1.APIService)
UpdateAPIServiceSpec(apiServiceName string) error
RemoveAPIServiceSpec(apiServiceName string)
GetAPIServiceNames() []string
}
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_"
localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
)
// IsLocalAPIService returns true for local specs from delegates.
func IsLocalAPIService(apiServiceName string) bool {
return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
}
// GetAPIServicesName returns the names of APIServices recorded in openAPIV3Specs.
// We use this function to pass the names of local APIServices to the controller in this package,
// so that the controller can periodically sync the OpenAPI spec from delegation API servers.
func (s *specAggregator) GetAPIServiceNames() []string {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
names := make([]string, len(s.openAPIV3Specs))
for key := range s.openAPIV3Specs {
names = append(names, key)
}
return names
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecAggregator, error) {
var err error
s := &specAggregator{
openAPIV3Specs: map[string]*openAPIV3APIServiceInfo{},
downloader: downloader,
}
s.openAPIV3VersionedService, err = handler3.NewOpenAPIService(nil)
if err != nil {
return nil, err
}
err = s.openAPIV3VersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", pathHandler)
if err != nil {
return nil, err
}
i := 1
for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
handler := delegate.UnprotectedHandler()
if handler == nil {
continue
}
apiServiceName := fmt.Sprintf(localDelegateChainNamePattern, i)
localAPIService := v1.APIService{}
localAPIService.Name = apiServiceName
s.AddUpdateAPIService(handler, &localAPIService)
s.UpdateAPIServiceSpec(apiServiceName)
i++
}
return s, nil
}
// AddUpdateAPIService adds or updates the api service. It is thread safe.
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
// If the APIService is being updated, use the existing struct.
if apiServiceInfo, ok := s.openAPIV3Specs[apiservice.Name]; ok {
apiServiceInfo.apiService = *apiservice
apiServiceInfo.handler = handler
}
s.openAPIV3Specs[apiservice.Name] = &openAPIV3APIServiceInfo{
apiService: *apiservice,
handler: handler,
specs: make(map[string]*openAPIV3SpecInfo),
}
}
// UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves.
// It is thread safe.
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
apiService, exists := s.openAPIV3Specs[apiServiceName]
if !exists {
return fmt.Errorf("APIService %s does not exist for update", apiServiceName)
}
// Pass a list of old etags to the Downloader to prevent transfers if etags match
etagList := make(map[string]string)
for gv, specInfo := range apiService.specs {
etagList[gv] = specInfo.etag
}
groups, err := s.downloader.Download(apiService.handler, etagList)
if err != nil {
return err
}
// Remove any groups that do not exist anymore
for group := range s.openAPIV3Specs[apiServiceName].specs {
if _, exists := groups[group]; !exists {
s.openAPIV3VersionedService.DeleteGroupVersion(group)
delete(s.openAPIV3Specs[apiServiceName].specs, group)
}
}
for group, info := range groups {
if info.spec == nil {
continue
}
// If ETag has not changed, no update is necessary
oldInfo, exists := s.openAPIV3Specs[apiServiceName].specs[group]
if exists && oldInfo.etag == info.etag {
continue
}
s.openAPIV3Specs[apiServiceName].specs[group] = &openAPIV3SpecInfo{
spec: info.spec,
etag: info.etag,
}
s.openAPIV3VersionedService.UpdateGroupVersion(group, info.spec)
}
return nil
}
type specAggregator struct {
// mutex protects all members of this struct.
rwMutex sync.RWMutex
// OpenAPI V3 specs by APIService name
openAPIV3Specs map[string]*openAPIV3APIServiceInfo
// provided for dynamic OpenAPI spec
openAPIV3VersionedService *handler3.OpenAPIService
// For downloading the OpenAPI v3 specs from apiservices
downloader Downloader
}
var _ SpecAggregator = &specAggregator{}
type openAPIV3APIServiceInfo struct {
apiService v1.APIService
handler http.Handler
specs map[string]*openAPIV3SpecInfo
}
type openAPIV3SpecInfo struct {
spec *spec3.OpenAPI
etag string
}
// RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned.
// It is thread safe.
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
if apiServiceInfo, ok := s.openAPIV3Specs[apiServiceName]; ok {
for gv := range apiServiceInfo.specs {
s.openAPIV3VersionedService.DeleteGroupVersion(gv)
}
delete(s.openAPIV3Specs, apiServiceName)
}
}

View File

@ -0,0 +1,169 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"encoding/json"
"fmt"
"net/http"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/spec3"
)
// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v3 and /openap/v3/<group>/<version> endpoints.
type Downloader struct {
}
// NewDownloader creates a new OpenAPI Downloader.
func NewDownloader() Downloader {
return Downloader{}
}
func (s *Downloader) handlerWithUser(handler http.Handler, info user.Info) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req = req.WithContext(request.WithUser(req.Context(), info))
handler.ServeHTTP(w, req)
})
}
// gvList is a struct for the response of the /openapi/v3 endpoint to unmarshal into
type gvList struct {
Paths []string `json:"Paths"`
}
// SpecETag is a OpenAPI v3 spec and etag pair for the endpoint of each OpenAPI group/version
type SpecETag struct {
spec *spec3.OpenAPI
etag string
}
// Download downloads OpenAPI v3 for all groups of a given handler
func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) {
// TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034
// Move to proxy request in the aggregator and let the APIServices serve the OpenAPI directly
handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser})
handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out")
req, err := http.NewRequest("GET", "/openapi/v3", nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "application/json")
writer := newInMemoryResponseWriter()
handler.ServeHTTP(writer, req)
switch writer.respCode {
case http.StatusNotFound:
// Gracefully skip 404, assuming the server won't provide any spec
return nil, nil
case http.StatusOK:
groups := gvList{}
aggregated := make(map[string]*SpecETag)
if err := json.Unmarshal(writer.data, &groups); err != nil {
return nil, err
}
for _, path := range groups.Paths {
reqPath := fmt.Sprintf("/openapi/v3/%s", path)
req, err := http.NewRequest("GET", reqPath, nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", "application/json")
oldEtag, ok := etagList[path]
if ok {
req.Header.Add("If-None-Match", oldEtag)
}
openAPIWriter := newInMemoryResponseWriter()
handler.ServeHTTP(openAPIWriter, req)
switch openAPIWriter.respCode {
case http.StatusNotFound:
continue
case http.StatusNotModified:
aggregated[path] = &SpecETag{
etag: oldEtag,
}
case http.StatusOK:
var spec spec3.OpenAPI
// TODO|jefftree: For OpenAPI v3 Beta, if the v3 spec is empty then
// we should request the v2 endpoint and convert it to v3
if len(openAPIWriter.data) > 0 {
err = json.Unmarshal(openAPIWriter.data, &spec)
if err != nil {
return nil, err
}
etag := openAPIWriter.Header().Get("Etag")
aggregated[path] = &SpecETag{
spec: &spec,
etag: etag,
}
}
default:
klog.Errorf("Error: unknown status %v", openAPIWriter.respCode)
}
}
return aggregated, nil
default:
return nil, fmt.Errorf("failed to retrieve openAPI spec, http error: %s", writer.String())
}
}
// inMemoryResponseWriter is a http.Writer that keep the response in memory.
type inMemoryResponseWriter struct {
writeHeaderCalled bool
header http.Header
respCode int
data []byte
}
func newInMemoryResponseWriter() *inMemoryResponseWriter {
return &inMemoryResponseWriter{header: http.Header{}}
}
func (r *inMemoryResponseWriter) Header() http.Header {
return r.header
}
func (r *inMemoryResponseWriter) WriteHeader(code int) {
r.writeHeaderCalled = true
r.respCode = code
}
func (r *inMemoryResponseWriter) Write(in []byte) (int, error) {
if !r.writeHeaderCalled {
r.WriteHeader(http.StatusOK)
}
r.data = append(r.data, in...)
return len(in), nil
}
func (r *inMemoryResponseWriter) String() string {
s := fmt.Sprintf("ResponseCode: %d", r.respCode)
if r.data != nil {
s += fmt.Sprintf(", Body: %s", string(r.data))
}
if r.header != nil {
s += fmt.Sprintf(", Header: %s", r.header)
}
return s
}

View File

@ -0,0 +1,102 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"encoding/json"
"fmt"
"net/http"
"testing"
"github.com/stretchr/testify/assert"
)
type handlerTest struct {
etag string
data []byte
}
var _ http.Handler = handlerTest{}
func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Create an APIService with a handler for one group/version
group := make(map[string][]string)
group["Paths"] = []string{"apis/group/version"}
j, _ := json.Marshal(group)
if r.URL.Path == "/openapi/v3" {
w.Write(j)
return
}
if r.URL.Path == "/openapi/v3/apis/group/version" {
if len(h.etag) > 0 {
w.Header().Add("Etag", h.etag)
}
ifNoneMatches := r.Header["If-None-Match"]
for _, match := range ifNoneMatches {
if match == h.etag {
w.WriteHeader(http.StatusNotModified)
return
}
}
w.Write(h.data)
}
}
func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID string, expectedEtag string) error {
if err != nil {
return fmt.Errorf("downloadOpenAPISpec failed : %s", err)
}
specInfo, ok := gvSpec["apis/group/version"]
if !ok {
if expectedSpecID == "" {
return nil
}
return fmt.Errorf("expected to download spec, no spec downloaded")
}
if specInfo.spec != nil && expectedSpecID == "" {
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version)
}
if specInfo.spec != nil && specInfo.spec.Version != expectedSpecID {
return fmt.Errorf("expected ID %s, actual ID %s", expectedSpecID, specInfo.spec.Version)
}
if specInfo.etag != expectedEtag {
return fmt.Errorf("expected ETag '%s', actual ETag '%s'", expectedEtag, specInfo.etag)
}
return nil
}
func TestDownloadOpenAPISpec(t *testing.T) {
s := Downloader{}
// Test with eTag
gvSpec, err := s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test"))
// Test not modified
gvSpec, err = s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{"apis/group/version": "etag_test"})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "", "etag_test"))
// Test different eTags
gvSpec, err = s.Download(
handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test1"}, map[string]string{"apis/group/version": "etag_test2"})
assert.NoError(t, assertDownloadedSpec(gvSpec, err, "test", "etag_test1"))
}

View File

@ -0,0 +1,175 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapiv3
import (
"fmt"
"net/http"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
)
const (
successfulUpdateDelay = time.Minute
successfulUpdateDelayLocal = time.Second
failedUpdateMaxExpDelay = time.Hour
)
type syncAction int
const (
syncRequeue syncAction = iota
syncRequeueRateLimited
syncNothing
)
// AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove
// them if necessary.
type AggregationController struct {
openAPIAggregationManager aggregator.SpecAggregator
queue workqueue.RateLimitingInterface
// To allow injection for testing.
syncHandler func(key string) (syncAction, error)
}
// NewAggregationController creates new OpenAPI aggregation controller.
func NewAggregationController(openAPIAggregationManager aggregator.SpecAggregator) *AggregationController {
c := &AggregationController{
openAPIAggregationManager: openAPIAggregationManager,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay),
"open_api_v3_aggregation_controller",
),
}
c.syncHandler = c.sync
// update each service at least once, also those which are not coming from APIServices, namely local services
for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
c.queue.AddAfter(name, time.Second)
}
return c
}
// Run starts OpenAPI AggregationController
func (c *AggregationController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting OpenAPI V3 AggregationController")
defer klog.Info("Shutting down OpenAPI V3 AggregationController")
go wait.Until(c.runWorker, time.Second, stopCh)
<-stopCh
}
func (c *AggregationController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *AggregationController) processNextWorkItem() bool {
key, quit := c.queue.Get()
defer c.queue.Done(key)
if quit {
return false
}
if aggregator.IsLocalAPIService(key.(string)) {
// for local delegation targets that are aggregated once per second, log at
// higher level to avoid flooding the log
klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key)
} else {
klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
}
action, err := c.syncHandler(key.(string))
if err == nil {
c.queue.Forget(key)
} else {
utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
}
switch action {
case syncRequeue:
if aggregator.IsLocalAPIService(key.(string)) {
klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
c.queue.AddAfter(key, successfulUpdateDelayLocal)
} else {
klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
c.queue.AddAfter(key, successfulUpdateDelay)
}
case syncRequeueRateLimited:
klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
c.queue.AddRateLimited(key)
case syncNothing:
klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
}
return true
}
func (c *AggregationController) sync(key string) (syncAction, error) {
err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key)
switch {
case err != nil:
return syncRequeueRateLimited, err
}
return syncRequeue, nil
}
// AddAPIService adds a new API Service to OpenAPI Aggregation.
func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) {
if apiService.Spec.Service == nil {
return
}
c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
c.queue.AddAfter(apiService.Name, time.Second)
}
// UpdateAPIService updates API Service's info and handler.
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
if apiService.Spec.Service == nil {
return
}
c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
key := apiService.Name
if c.queue.NumRequeues(key) > 0 {
// The item has failed before. Remove it from failure queue and
// update it in a second
c.queue.Forget(key)
c.queue.AddAfter(key, time.Second)
}
}
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
func (c *AggregationController) RemoveAPIService(apiServiceName string) {
c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName)
// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
// and will not add it again to the queue.
c.queue.Forget(apiServiceName)
}