Merge pull request #118212 from Jefftree/updated-lazy-aggregator-v2
Fix making OpenAPI V2 aggregator lazy
This commit is contained in:
		@@ -17,331 +17,240 @@ limitations under the License.
 | 
			
		||||
package aggregator
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"crypto/sha256"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	restful "github.com/emicklei/go-restful/v3"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apiserver/pkg/server"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/aggregator"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/builder"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/cached"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/common"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/common/restfuladapter"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/handler"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/validation/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var ErrAPIServiceNotFound = errors.New("resource not found")
 | 
			
		||||
 | 
			
		||||
// SpecAggregator calls out to http handlers of APIServices and merges specs. It keeps state of the last
 | 
			
		||||
// known specs including the http etag.
 | 
			
		||||
type SpecAggregator interface {
 | 
			
		||||
	AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error
 | 
			
		||||
	UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
 | 
			
		||||
	RemoveAPIServiceSpec(apiServiceName string) error
 | 
			
		||||
	GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
 | 
			
		||||
	GetAPIServiceNames() []string
 | 
			
		||||
	AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error
 | 
			
		||||
	UpdateAPIServiceSpec(apiServiceName string) error
 | 
			
		||||
	RemoveAPIService(apiServiceName string) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	aggregatorUser                = "system:aggregator"
 | 
			
		||||
	specDownloadTimeout           = 60 * time.Second
 | 
			
		||||
	localDelegateChainNamePrefix  = "k8s_internal_local_delegation_chain_"
 | 
			
		||||
	localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
 | 
			
		||||
	specDownloadTimeout           = time.Minute
 | 
			
		||||
	localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
 | 
			
		||||
 | 
			
		||||
	// A randomly generated UUID to differentiate local and remote eTags.
 | 
			
		||||
	locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// IsLocalAPIService returns true for local specs from delegates.
 | 
			
		||||
func IsLocalAPIService(apiServiceName string) bool {
 | 
			
		||||
	return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
 | 
			
		||||
// openAPISpecInfo is used to store OpenAPI specs.
 | 
			
		||||
// The apiService object is used to sort specs with their priorities.
 | 
			
		||||
type openAPISpecInfo struct {
 | 
			
		||||
	apiService v1.APIService
 | 
			
		||||
	// spec is the cached OpenAPI spec
 | 
			
		||||
	spec cached.Replaceable[*spec.Swagger]
 | 
			
		||||
 | 
			
		||||
	// The downloader is used only for non-local apiservices to
 | 
			
		||||
	// re-update the spec every so often.
 | 
			
		||||
	downloader cached.Data[*spec.Swagger]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAPIServiceNames returns the names of APIServices recorded in specAggregator.openAPISpecs.
 | 
			
		||||
// We use this function to pass the names of local APIServices to the controller in this package,
 | 
			
		||||
// so that the controller can periodically sync the OpenAPI spec from delegation API servers.
 | 
			
		||||
func (s *specAggregator) GetAPIServiceNames() []string {
 | 
			
		||||
	names := make([]string, 0, len(s.openAPISpecs))
 | 
			
		||||
	for key := range s.openAPISpecs {
 | 
			
		||||
		names = append(names, key)
 | 
			
		||||
type specAggregator struct {
 | 
			
		||||
	// mutex protects the specsByAPIServiceName map and its contents.
 | 
			
		||||
	mutex sync.Mutex
 | 
			
		||||
 | 
			
		||||
	// Map of API Services' OpenAPI specs by their name
 | 
			
		||||
	specsByAPIServiceName map[string]*openAPISpecInfo
 | 
			
		||||
 | 
			
		||||
	// provided for dynamic OpenAPI spec
 | 
			
		||||
	openAPIVersionedService *handler.OpenAPIService
 | 
			
		||||
 | 
			
		||||
	downloader *Downloader
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggregatorSpec *spec.Swagger, delegationHandlers []http.Handler, pathHandler common.PathHandler) *specAggregator {
 | 
			
		||||
	s := &specAggregator{
 | 
			
		||||
		downloader:            downloader,
 | 
			
		||||
		specsByAPIServiceName: map[string]*openAPISpecInfo{},
 | 
			
		||||
	}
 | 
			
		||||
	return names
 | 
			
		||||
	cachedAggregatorSpec := cached.NewResultOK(aggregatorSpec, "never-changes")
 | 
			
		||||
	s.addLocalSpec(fmt.Sprintf(localDelegateChainNamePattern, 0), cachedAggregatorSpec)
 | 
			
		||||
	for i, handler := range delegationHandlers {
 | 
			
		||||
		name := fmt.Sprintf(localDelegateChainNamePattern, i+1)
 | 
			
		||||
 | 
			
		||||
		spec := NewCacheableDownloader(downloader, handler)
 | 
			
		||||
		spec = decorateError(name, spec)
 | 
			
		||||
		s.addLocalSpec(name, spec)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.openAPIVersionedService = handler.NewOpenAPIServiceLazy(s.buildMergeSpecLocked())
 | 
			
		||||
	s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
 | 
			
		||||
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
 | 
			
		||||
	config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
 | 
			
		||||
	s := &specAggregator{
 | 
			
		||||
		openAPISpecs: map[string]*openAPISpecInfo{},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	i := 0
 | 
			
		||||
	// Build Aggregator's spec
 | 
			
		||||
	aggregatorOpenAPISpec, err := builder.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(webServices), config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	aggregatorOpenAPISpec.Definitions = handler.PruneDefaults(aggregatorOpenAPISpec.Definitions)
 | 
			
		||||
 | 
			
		||||
	var delegationHandlers []http.Handler
 | 
			
		||||
 | 
			
		||||
	// Reserving non-name spec for aggregator's Spec.
 | 
			
		||||
	s.addLocalSpec(aggregatorOpenAPISpec, nil, fmt.Sprintf(localDelegateChainNamePattern, i), "")
 | 
			
		||||
	i++
 | 
			
		||||
	for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() {
 | 
			
		||||
		handler := delegate.UnprotectedHandler()
 | 
			
		||||
		if handler == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		delegateSpec, etag, _, err := downloader.Download(handler, "")
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			// ignore errors for the empty delegate we attach at the end the chain
 | 
			
		||||
			// atm the empty delegate returns 503 when the server hasn't been fully initialized
 | 
			
		||||
			// and the spec downloader only silences 404s
 | 
			
		||||
			if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		if delegateSpec == nil {
 | 
			
		||||
		// ignore errors for the empty delegate we attach at the end the chain
 | 
			
		||||
		// atm the empty delegate returns 503 when the server hasn't been fully initialized
 | 
			
		||||
		// and the spec downloader only silences 404s
 | 
			
		||||
		if len(delegate.ListedPaths()) == 0 && delegate.NextDelegate() == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		s.addLocalSpec(delegateSpec, handler, fmt.Sprintf(localDelegateChainNamePattern, i), etag)
 | 
			
		||||
		i++
 | 
			
		||||
		delegationHandlers = append(delegationHandlers, handler)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Build initial spec to serve.
 | 
			
		||||
	klog.V(2).Infof("Building initial OpenAPI spec")
 | 
			
		||||
	defer func(start time.Time) {
 | 
			
		||||
		duration := time.Since(start)
 | 
			
		||||
		klog.V(2).Infof("Finished initial OpenAPI spec generation after %v", duration)
 | 
			
		||||
 | 
			
		||||
		regenerationCounter.With(map[string]string{"apiservice": "*", "reason": "startup"})
 | 
			
		||||
		regenerationDurationGauge.With(map[string]string{"reason": "startup"}).Set(duration.Seconds())
 | 
			
		||||
	}(time.Now())
 | 
			
		||||
	specToServe, err := s.buildOpenAPISpec()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Install handler
 | 
			
		||||
	s.openAPIVersionedService = handler.NewOpenAPIService(specToServe)
 | 
			
		||||
	s.openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", pathHandler)
 | 
			
		||||
 | 
			
		||||
	s := buildAndRegisterSpecAggregatorForLocalServices(downloader, aggregatorOpenAPISpec, delegationHandlers, pathHandler)
 | 
			
		||||
	return s, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type specAggregator struct {
 | 
			
		||||
	// mutex protects all members of this struct.
 | 
			
		||||
	rwMutex sync.RWMutex
 | 
			
		||||
 | 
			
		||||
	// Map of API Services' OpenAPI specs by their name
 | 
			
		||||
	openAPISpecs map[string]*openAPISpecInfo
 | 
			
		||||
 | 
			
		||||
	// provided for dynamic OpenAPI spec
 | 
			
		||||
	openAPIVersionedService *handler.OpenAPIService
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ SpecAggregator = &specAggregator{}
 | 
			
		||||
 | 
			
		||||
// This function is not thread safe as it only being called on startup.
 | 
			
		||||
func (s *specAggregator) addLocalSpec(spec *spec.Swagger, localHandler http.Handler, name, etag string) {
 | 
			
		||||
	localAPIService := v1.APIService{}
 | 
			
		||||
	localAPIService.Name = name
 | 
			
		||||
	s.openAPISpecs[name] = &openAPISpecInfo{
 | 
			
		||||
		etag:       etag,
 | 
			
		||||
		apiService: localAPIService,
 | 
			
		||||
		handler:    localHandler,
 | 
			
		||||
		spec:       spec,
 | 
			
		||||
func (s *specAggregator) addLocalSpec(name string, spec cached.Data[*spec.Swagger]) {
 | 
			
		||||
	service := v1.APIService{}
 | 
			
		||||
	service.Name = name
 | 
			
		||||
	info := &openAPISpecInfo{
 | 
			
		||||
		apiService: service,
 | 
			
		||||
	}
 | 
			
		||||
	info.spec.Replace(spec)
 | 
			
		||||
	s.specsByAPIServiceName[name] = info
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// openAPISpecInfo is used to store OpenAPI spec with its priority.
 | 
			
		||||
// It can be used to sort specs with their priorities.
 | 
			
		||||
type openAPISpecInfo struct {
 | 
			
		||||
	apiService v1.APIService
 | 
			
		||||
// buildMergeSpecLocked creates a new cached mergeSpec from the list of cached specs.
 | 
			
		||||
func (s *specAggregator) buildMergeSpecLocked() cached.Data[*spec.Swagger] {
 | 
			
		||||
	apiServices := make([]*v1.APIService, 0, len(s.specsByAPIServiceName))
 | 
			
		||||
	for k := range s.specsByAPIServiceName {
 | 
			
		||||
		apiServices = append(apiServices, &s.specsByAPIServiceName[k].apiService)
 | 
			
		||||
	}
 | 
			
		||||
	sortByPriority(apiServices)
 | 
			
		||||
	caches := make([]cached.Data[*spec.Swagger], len(apiServices))
 | 
			
		||||
	for i, apiService := range apiServices {
 | 
			
		||||
		caches[i] = &(s.specsByAPIServiceName[apiService.Name].spec)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Specification of this API Service. If null then the spec is not loaded yet.
 | 
			
		||||
	spec    *spec.Swagger
 | 
			
		||||
	handler http.Handler
 | 
			
		||||
	etag    string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// buildOpenAPISpec aggregates all OpenAPI specs.  It is not thread-safe. The caller is responsible to hold proper locks.
 | 
			
		||||
func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err error) {
 | 
			
		||||
	specs := []openAPISpecInfo{}
 | 
			
		||||
	for _, specInfo := range s.openAPISpecs {
 | 
			
		||||
		if specInfo.spec == nil {
 | 
			
		||||
			continue
 | 
			
		||||
	return cached.NewListMerger(func(results []cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
 | 
			
		||||
		var merged *spec.Swagger
 | 
			
		||||
		etags := make([]string, 0, len(results))
 | 
			
		||||
		for _, specInfo := range results {
 | 
			
		||||
			result := specInfo.Get()
 | 
			
		||||
			if result.Err != nil {
 | 
			
		||||
				// APIService name and err message will be included in
 | 
			
		||||
				// the error message as part of decorateError
 | 
			
		||||
				klog.Warning(result.Err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if merged == nil {
 | 
			
		||||
				merged = &spec.Swagger{}
 | 
			
		||||
				*merged = *result.Data
 | 
			
		||||
				// Paths, Definitions and parameters are set by
 | 
			
		||||
				// MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters
 | 
			
		||||
				merged.Paths = nil
 | 
			
		||||
				merged.Definitions = nil
 | 
			
		||||
				merged.Parameters = nil
 | 
			
		||||
			}
 | 
			
		||||
			etags = append(etags, result.Etag)
 | 
			
		||||
			if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(merged, result.Data); err != nil {
 | 
			
		||||
				return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to build merge specs: %v", err))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// Copy the spec before removing the defaults.
 | 
			
		||||
		localSpec := *specInfo.spec
 | 
			
		||||
		localSpecInfo := *specInfo
 | 
			
		||||
		localSpecInfo.spec = &localSpec
 | 
			
		||||
		localSpecInfo.spec.Definitions = handler.PruneDefaults(specInfo.spec.Definitions)
 | 
			
		||||
		specs = append(specs, localSpecInfo)
 | 
			
		||||
	}
 | 
			
		||||
	if len(specs) == 0 {
 | 
			
		||||
		return &spec.Swagger{}, nil
 | 
			
		||||
	}
 | 
			
		||||
	sortByPriority(specs)
 | 
			
		||||
	for _, specInfo := range specs {
 | 
			
		||||
		if specToReturn == nil {
 | 
			
		||||
			specToReturn = &spec.Swagger{}
 | 
			
		||||
			*specToReturn = *specInfo.spec
 | 
			
		||||
			// Paths, Definitions and parameters are set by MergeSpecsIgnorePathConflict
 | 
			
		||||
			specToReturn.Paths = nil
 | 
			
		||||
			specToReturn.Definitions = nil
 | 
			
		||||
			specToReturn.Parameters = nil
 | 
			
		||||
		}
 | 
			
		||||
		if err := aggregator.MergeSpecsIgnorePathConflictRenamingDefinitionsAndParameters(specToReturn, specInfo.spec); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return specToReturn, nil
 | 
			
		||||
		// Printing the etags list is stable because it is sorted.
 | 
			
		||||
		return cached.NewResultOK(merged, fmt.Sprintf("%x", sha256.Sum256([]byte(fmt.Sprintf("%#v", etags)))))
 | 
			
		||||
	}, caches)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// updateOpenAPISpec aggregates all OpenAPI specs.  It is not thread-safe. The caller is responsible to hold proper locks.
 | 
			
		||||
func (s *specAggregator) updateOpenAPISpec() error {
 | 
			
		||||
	if s.openAPIVersionedService == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	specToServe, err := s.buildOpenAPISpec()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return s.openAPIVersionedService.UpdateSpec(specToServe)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
 | 
			
		||||
// if the update fails.
 | 
			
		||||
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
 | 
			
		||||
	if specInfo == nil {
 | 
			
		||||
		return fmt.Errorf("invalid input: specInfo must be non-nil")
 | 
			
		||||
	}
 | 
			
		||||
	_, updated := s.openAPISpecs[specInfo.apiService.Name]
 | 
			
		||||
	origSpecInfo, existedBefore := s.openAPISpecs[specInfo.apiService.Name]
 | 
			
		||||
	s.openAPISpecs[specInfo.apiService.Name] = specInfo
 | 
			
		||||
 | 
			
		||||
	// Skip aggregation if OpenAPI spec didn't change
 | 
			
		||||
	if existedBefore && origSpecInfo != nil && origSpecInfo.etag == specInfo.etag {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	klog.V(2).Infof("Updating OpenAPI spec because %s is updated", specInfo.apiService.Name)
 | 
			
		||||
	defer func(start time.Time) {
 | 
			
		||||
		duration := time.Since(start)
 | 
			
		||||
		klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration)
 | 
			
		||||
 | 
			
		||||
		reason := "add"
 | 
			
		||||
		if updated {
 | 
			
		||||
			reason = "update"
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		regenerationCounter.With(map[string]string{"apiservice": specInfo.apiService.Name, "reason": reason})
 | 
			
		||||
		regenerationDurationGauge.With(map[string]string{"reason": reason}).Set(duration.Seconds())
 | 
			
		||||
	}(time.Now())
 | 
			
		||||
	if err := s.updateOpenAPISpec(); err != nil {
 | 
			
		||||
		if existedBefore {
 | 
			
		||||
			s.openAPISpecs[specInfo.apiService.Name] = origSpecInfo
 | 
			
		||||
		} else {
 | 
			
		||||
			delete(s.openAPISpecs, specInfo.apiService.Name)
 | 
			
		||||
		}
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// tryDeleteServiceSpecs tries delete specified specInfo from openAPISpecs map, and keeps the map intact
 | 
			
		||||
// if the update fails.
 | 
			
		||||
func (s *specAggregator) tryDeleteServiceSpecs(apiServiceName string) error {
 | 
			
		||||
	orgSpecInfo, exists := s.openAPISpecs[apiServiceName]
 | 
			
		||||
// updateServiceLocked updates the spec cache by downloading the latest
 | 
			
		||||
// version of the spec.
 | 
			
		||||
func (s *specAggregator) updateServiceLocked(name string) error {
 | 
			
		||||
	specInfo, exists := s.specsByAPIServiceName[name]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		return ErrAPIServiceNotFound
 | 
			
		||||
	}
 | 
			
		||||
	result := specInfo.downloader.Get()
 | 
			
		||||
	filteredResult := cached.NewTransformer[*spec.Swagger](func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
 | 
			
		||||
		if result.Err != nil {
 | 
			
		||||
			return result
 | 
			
		||||
		}
 | 
			
		||||
		return cached.NewResultOK(aggregator.FilterSpecByPathsWithoutSideEffects(result.Data, []string{"/apis/"}), result.Etag)
 | 
			
		||||
	}, result)
 | 
			
		||||
	specInfo.spec.Replace(filteredResult)
 | 
			
		||||
	return result.Err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateAPIServiceSpec updates the api service. It is thread safe.
 | 
			
		||||
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error {
 | 
			
		||||
	s.mutex.Lock()
 | 
			
		||||
	defer s.mutex.Unlock()
 | 
			
		||||
	return s.updateServiceLocked(apiServiceName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddUpdateAPIService adds the api service. It is thread safe. If the
 | 
			
		||||
// apiservice already exists, it will be updated.
 | 
			
		||||
func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler http.Handler) error {
 | 
			
		||||
	if apiService.Spec.Service == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	delete(s.openAPISpecs, apiServiceName)
 | 
			
		||||
	klog.V(2).Infof("Updating OpenAPI spec because %s is removed", apiServiceName)
 | 
			
		||||
	defer func(start time.Time) {
 | 
			
		||||
		duration := time.Since(start)
 | 
			
		||||
		klog.V(2).Infof("Finished OpenAPI spec generation after %v", duration)
 | 
			
		||||
	s.mutex.Lock()
 | 
			
		||||
	defer s.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
		regenerationCounter.With(map[string]string{"apiservice": apiServiceName, "reason": "delete"})
 | 
			
		||||
		regenerationDurationGauge.With(map[string]string{"reason": "delete"}).Set(duration.Seconds())
 | 
			
		||||
	}(time.Now())
 | 
			
		||||
	if err := s.updateOpenAPISpec(); err != nil {
 | 
			
		||||
		s.openAPISpecs[apiServiceName] = orgSpecInfo
 | 
			
		||||
		return err
 | 
			
		||||
	_, exists := s.specsByAPIServiceName[apiService.Name]
 | 
			
		||||
	if !exists {
 | 
			
		||||
		s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{
 | 
			
		||||
			apiService: *apiService,
 | 
			
		||||
			downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)),
 | 
			
		||||
		}
 | 
			
		||||
		s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return s.updateServiceLocked(apiService.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
 | 
			
		||||
// It is thread safe.
 | 
			
		||||
func (s *specAggregator) RemoveAPIService(apiServiceName string) error {
 | 
			
		||||
	s.mutex.Lock()
 | 
			
		||||
	defer s.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if _, exists := s.specsByAPIServiceName[apiServiceName]; !exists {
 | 
			
		||||
		return ErrAPIServiceNotFound
 | 
			
		||||
	}
 | 
			
		||||
	delete(s.specsByAPIServiceName, apiServiceName)
 | 
			
		||||
	// Re-create the mergeSpec for the new list of apiservices
 | 
			
		||||
	s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UpdateAPIServiceSpec updates the api service's OpenAPI spec. It is thread safe.
 | 
			
		||||
func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error {
 | 
			
		||||
	s.rwMutex.Lock()
 | 
			
		||||
	defer s.rwMutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	specInfo, existingService := s.openAPISpecs[apiServiceName]
 | 
			
		||||
	if !existingService {
 | 
			
		||||
		return fmt.Errorf("APIService %q does not exists", apiServiceName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// For APIServices (non-local) specs, only merge their /apis/ prefixed endpoint as it is the only paths
 | 
			
		||||
	// proxy handler delegates.
 | 
			
		||||
	if specInfo.apiService.Spec.Service != nil {
 | 
			
		||||
		spec = aggregator.FilterSpecByPathsWithoutSideEffects(spec, []string{"/apis/"})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return s.tryUpdatingServiceSpecs(&openAPISpecInfo{
 | 
			
		||||
		apiService: specInfo.apiService,
 | 
			
		||||
		spec:       spec,
 | 
			
		||||
		handler:    specInfo.handler,
 | 
			
		||||
		etag:       etag,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// AddUpdateAPIService adds or updates the api service. It is thread safe.
 | 
			
		||||
func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) error {
 | 
			
		||||
	s.rwMutex.Lock()
 | 
			
		||||
	defer s.rwMutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if apiService.Spec.Service == nil {
 | 
			
		||||
		// All local specs should be already aggregated using local delegate chain
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	newSpec := &openAPISpecInfo{
 | 
			
		||||
		apiService: *apiService,
 | 
			
		||||
		handler:    handler,
 | 
			
		||||
	}
 | 
			
		||||
	if specInfo, existingService := s.openAPISpecs[apiService.Name]; existingService {
 | 
			
		||||
		newSpec.etag = specInfo.etag
 | 
			
		||||
		newSpec.spec = specInfo.spec
 | 
			
		||||
	}
 | 
			
		||||
	return s.tryUpdatingServiceSpecs(newSpec)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveAPIServiceSpec removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
 | 
			
		||||
// It is thread safe.
 | 
			
		||||
func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) error {
 | 
			
		||||
	s.rwMutex.Lock()
 | 
			
		||||
	defer s.rwMutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if _, existingService := s.openAPISpecs[apiServiceName]; !existingService {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return s.tryDeleteServiceSpecs(apiServiceName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAPIServiceSpec returns api service spec info
 | 
			
		||||
func (s *specAggregator) GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool) {
 | 
			
		||||
	s.rwMutex.RLock()
 | 
			
		||||
	defer s.rwMutex.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if info, existingService := s.openAPISpecs[apiServiceName]; existingService {
 | 
			
		||||
		return info.handler, info.etag, true
 | 
			
		||||
	}
 | 
			
		||||
	return nil, "", false
 | 
			
		||||
// decorateError creates a new cache that wraps a downloader
 | 
			
		||||
// cache the name of the apiservice to help with debugging.
 | 
			
		||||
func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] {
 | 
			
		||||
	return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
 | 
			
		||||
		if result.Err != nil {
 | 
			
		||||
			return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err))
 | 
			
		||||
		}
 | 
			
		||||
		return result
 | 
			
		||||
	}, cache)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -0,0 +1,372 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2023 The Kubernetes Authors.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package aggregator
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"io"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"bytes"
 | 
			
		||||
	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/common"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/validation/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestBasicPathsMerged(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	delegationHandlers := []http.Handler{
 | 
			
		||||
		&openAPIHandler{
 | 
			
		||||
			openapi: &spec.Swagger{
 | 
			
		||||
				SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
					Paths: &spec.Paths{
 | 
			
		||||
						Paths: map[string]spec.PathItem{
 | 
			
		||||
							"/apis/foo/v1": {},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	buildAndRegisterSpecAggregator(delegationHandlers, mux)
 | 
			
		||||
 | 
			
		||||
	swagger, err := fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	expectPath(t, swagger, "/apis/foo/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAddUpdateAPIService(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	var delegationHandlers []http.Handler
 | 
			
		||||
	delegate1 := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/foo/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
	delegationHandlers = append(delegationHandlers, delegate1)
 | 
			
		||||
	s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
 | 
			
		||||
 | 
			
		||||
	apiService := &v1.APIService{
 | 
			
		||||
		Spec: v1.APIServiceSpec{
 | 
			
		||||
			Service: &v1.ServiceReference{Name: "dummy"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	apiService.Name = "apiservice"
 | 
			
		||||
 | 
			
		||||
	handler := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/apiservicegroup/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
 | 
			
		||||
	if err := s.AddUpdateAPIService(apiService, handler); err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	swagger, err := fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
 | 
			
		||||
 | 
			
		||||
	t.Log("Update APIService OpenAPI")
 | 
			
		||||
	handler.openapi = &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/apiservicegroup/v2": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	s.UpdateAPIServiceSpec(apiService.Name)
 | 
			
		||||
 | 
			
		||||
	swagger, err = fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	// Ensure that the if the APIService OpenAPI is updated, the
 | 
			
		||||
	// aggregated OpenAPI is also updated.
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiservicegroup/v2")
 | 
			
		||||
	expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAddRemoveAPIService(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	var delegationHandlers []http.Handler
 | 
			
		||||
	delegate1 := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/foo/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
	delegationHandlers = append(delegationHandlers, delegate1)
 | 
			
		||||
 | 
			
		||||
	s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
 | 
			
		||||
 | 
			
		||||
	apiService := &v1.APIService{
 | 
			
		||||
		Spec: v1.APIServiceSpec{
 | 
			
		||||
			Service: &v1.ServiceReference{Name: "dummy"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	apiService.Name = "apiservice"
 | 
			
		||||
 | 
			
		||||
	handler := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/apiservicegroup/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
 | 
			
		||||
	if err := s.AddUpdateAPIService(apiService, handler); err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	swagger, err := fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
 | 
			
		||||
 | 
			
		||||
	t.Logf("Remove APIService %s", apiService.Name)
 | 
			
		||||
	s.RemoveAPIService(apiService.Name)
 | 
			
		||||
 | 
			
		||||
	swagger, err = fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	// Ensure that the if the APIService is added then removed, the OpenAPI disappears from the aggregated OpenAPI as well.
 | 
			
		||||
	expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFailingAPIServiceSkippedAggregation(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	var delegationHandlers []http.Handler
 | 
			
		||||
	delegate1 := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/foo/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
	delegationHandlers = append(delegationHandlers, delegate1)
 | 
			
		||||
 | 
			
		||||
	s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
 | 
			
		||||
 | 
			
		||||
	apiServiceFailed := &v1.APIService{
 | 
			
		||||
		Spec: v1.APIServiceSpec{
 | 
			
		||||
			Service: &v1.ServiceReference{Name: "dummy"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	apiServiceFailed.Name = "apiserviceFailed"
 | 
			
		||||
 | 
			
		||||
	handlerFailed := &openAPIHandler{
 | 
			
		||||
		returnErr: true,
 | 
			
		||||
		openapi: &spec.Swagger{
 | 
			
		||||
			SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
				Paths: &spec.Paths{
 | 
			
		||||
					Paths: map[string]spec.PathItem{
 | 
			
		||||
						"/apis/failed/v1": {},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	apiServiceSuccess := &v1.APIService{
 | 
			
		||||
		Spec: v1.APIServiceSpec{
 | 
			
		||||
			Service: &v1.ServiceReference{Name: "dummy2"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	apiServiceSuccess.Name = "apiserviceSuccess"
 | 
			
		||||
 | 
			
		||||
	handlerSuccess := &openAPIHandler{
 | 
			
		||||
		openapi: &spec.Swagger{
 | 
			
		||||
			SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
				Paths: &spec.Paths{
 | 
			
		||||
					Paths: map[string]spec.PathItem{
 | 
			
		||||
						"/apis/success/v1": {},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.AddUpdateAPIService(apiServiceFailed, handlerFailed)
 | 
			
		||||
	s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess)
 | 
			
		||||
 | 
			
		||||
	swagger, err := fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	expectPath(t, swagger, "/apis/foo/v1")
 | 
			
		||||
	expectNoPath(t, swagger, "/apis/failed/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/success/v1")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAPIServiceFailSuccessTransition(t *testing.T) {
 | 
			
		||||
	mux := http.NewServeMux()
 | 
			
		||||
	var delegationHandlers []http.Handler
 | 
			
		||||
	delegate1 := &openAPIHandler{openapi: &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/foo/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}}
 | 
			
		||||
	delegationHandlers = append(delegationHandlers, delegate1)
 | 
			
		||||
 | 
			
		||||
	s := buildAndRegisterSpecAggregator(delegationHandlers, mux)
 | 
			
		||||
 | 
			
		||||
	apiService := &v1.APIService{
 | 
			
		||||
		Spec: v1.APIServiceSpec{
 | 
			
		||||
			Service: &v1.ServiceReference{Name: "dummy"},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	apiService.Name = "apiservice"
 | 
			
		||||
 | 
			
		||||
	handler := &openAPIHandler{
 | 
			
		||||
		returnErr: true,
 | 
			
		||||
		openapi: &spec.Swagger{
 | 
			
		||||
			SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
				Paths: &spec.Paths{
 | 
			
		||||
					Paths: map[string]spec.PathItem{
 | 
			
		||||
						"/apis/apiservicegroup/v1": {},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.AddUpdateAPIService(apiService, handler)
 | 
			
		||||
 | 
			
		||||
	swagger, err := fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	expectPath(t, swagger, "/apis/foo/v1")
 | 
			
		||||
	expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
 | 
			
		||||
	t.Log("Transition APIService to not return error")
 | 
			
		||||
	handler.returnErr = false
 | 
			
		||||
	err = s.UpdateAPIServiceSpec(apiService.Name)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	swagger, err = fetchOpenAPI(mux)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Error(err)
 | 
			
		||||
	}
 | 
			
		||||
	expectPath(t, swagger, "/apis/foo/v1")
 | 
			
		||||
	expectPath(t, swagger, "/apis/apiservicegroup/v1")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type openAPIHandler struct {
 | 
			
		||||
	openapi   *spec.Swagger
 | 
			
		||||
	returnErr bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	if o.returnErr {
 | 
			
		||||
		w.WriteHeader(500)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	data, err := json.Marshal(o.openapi)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	http.ServeContent(w, r, "/openapi/v2", time.Now(), bytes.NewReader(data))
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func fetchOpenAPI(mux *http.ServeMux) (*spec.Swagger, error) {
 | 
			
		||||
	server := httptest.NewServer(mux)
 | 
			
		||||
	defer server.Close()
 | 
			
		||||
	client := server.Client()
 | 
			
		||||
 | 
			
		||||
	req, err := http.NewRequest("GET", server.URL+"/openapi/v2", nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	resp, err := client.Do(req)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	body, err := io.ReadAll(resp.Body)
 | 
			
		||||
 | 
			
		||||
	swagger := &spec.Swagger{}
 | 
			
		||||
	if err := swagger.UnmarshalJSON(body); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return swagger, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func buildAndRegisterSpecAggregator(delegationHandlers []http.Handler, mux common.PathHandler) *specAggregator {
 | 
			
		||||
	downloader := NewDownloader()
 | 
			
		||||
	aggregatorSpec := &spec.Swagger{
 | 
			
		||||
		SwaggerProps: spec.SwaggerProps{
 | 
			
		||||
			Paths: &spec.Paths{
 | 
			
		||||
				Paths: map[string]spec.PathItem{
 | 
			
		||||
					"/apis/apiregistration.k8s.io/v1": {},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	s := buildAndRegisterSpecAggregatorForLocalServices(&downloader, aggregatorSpec, delegationHandlers, mux)
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func expectPath(t *testing.T, swagger *spec.Swagger, path string) {
 | 
			
		||||
	if _, ok := swagger.Paths.Paths[path]; !ok {
 | 
			
		||||
		t.Errorf("Expected path %s to exist in aggregated paths", path)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func expectNoPath(t *testing.T, swagger *spec.Swagger, path string) {
 | 
			
		||||
	if _, ok := swagger.Paths.Paths[path]; ok {
 | 
			
		||||
		t.Errorf("Expected path %s to be omitted in aggregated paths", path)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -24,9 +24,50 @@ import (
 | 
			
		||||
 | 
			
		||||
	"k8s.io/apiserver/pkg/authentication/user"
 | 
			
		||||
	"k8s.io/apiserver/pkg/endpoints/request"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/cached"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/validation/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// cacheableDownloader is a downloader that will always return the data
 | 
			
		||||
// and the etag.
 | 
			
		||||
type cacheableDownloader struct {
 | 
			
		||||
	downloader *Downloader
 | 
			
		||||
	handler    http.Handler
 | 
			
		||||
	etag       string
 | 
			
		||||
	spec       *spec.Swagger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Creates a downloader that also returns the etag, making it useful to use as a cached dependency.
 | 
			
		||||
func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] {
 | 
			
		||||
	return &cacheableDownloader{
 | 
			
		||||
		downloader: downloader,
 | 
			
		||||
		handler:    handler,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] {
 | 
			
		||||
	swagger, etag, status, err := d.downloader.Download(d.handler, d.etag)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return cached.NewResultErr[*spec.Swagger](err)
 | 
			
		||||
	}
 | 
			
		||||
	switch status {
 | 
			
		||||
	case http.StatusNotModified:
 | 
			
		||||
		// Nothing has changed, do nothing.
 | 
			
		||||
	case http.StatusOK:
 | 
			
		||||
		if swagger != nil {
 | 
			
		||||
			d.etag = etag
 | 
			
		||||
			d.spec = swagger
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case http.StatusNotFound:
 | 
			
		||||
		return cached.NewResultErr[*spec.Swagger](ErrAPIServiceNotFound)
 | 
			
		||||
	default:
 | 
			
		||||
		return cached.NewResultErr[*spec.Swagger](fmt.Errorf("invalid status code: %v", status))
 | 
			
		||||
	}
 | 
			
		||||
	return cached.NewResultOK(d.spec, d.etag)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Downloader is the OpenAPI downloader type. It will try to download spec from /openapi/v2 or /swagger.json endpoint.
 | 
			
		||||
type Downloader struct {
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -18,56 +18,60 @@ package aggregator
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sort"
 | 
			
		||||
 | 
			
		||||
	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// byPriority can be used in sort.Sort to sort specs with their priorities.
 | 
			
		||||
type byPriority struct {
 | 
			
		||||
	specs           []openAPISpecInfo
 | 
			
		||||
	apiServices     []*apiregistrationv1.APIService
 | 
			
		||||
	groupPriorities map[string]int32
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (a byPriority) Len() int      { return len(a.specs) }
 | 
			
		||||
func (a byPriority) Swap(i, j int) { a.specs[i], a.specs[j] = a.specs[j], a.specs[i] }
 | 
			
		||||
func (a byPriority) Len() int { return len(a.apiServices) }
 | 
			
		||||
func (a byPriority) Swap(i, j int) {
 | 
			
		||||
	a.apiServices[i], a.apiServices[j] = a.apiServices[j], a.apiServices[i]
 | 
			
		||||
}
 | 
			
		||||
func (a byPriority) Less(i, j int) bool {
 | 
			
		||||
	// All local specs will come first
 | 
			
		||||
	if a.specs[i].apiService.Spec.Service == nil && a.specs[j].apiService.Spec.Service != nil {
 | 
			
		||||
	if a.apiServices[i].Spec.Service == nil && a.apiServices[j].Spec.Service != nil {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	if a.specs[i].apiService.Spec.Service != nil && a.specs[j].apiService.Spec.Service == nil {
 | 
			
		||||
	if a.apiServices[i].Spec.Service != nil && a.apiServices[j].Spec.Service == nil {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	// WARNING: This will result in not following priorities for local APIServices.
 | 
			
		||||
	if a.specs[i].apiService.Spec.Service == nil {
 | 
			
		||||
	if a.apiServices[i].Spec.Service == nil {
 | 
			
		||||
		// Sort local specs with their name. This is the order in the delegation chain (aggregator first).
 | 
			
		||||
		return a.specs[i].apiService.Name < a.specs[j].apiService.Name
 | 
			
		||||
		return a.apiServices[i].Name < a.apiServices[j].Name
 | 
			
		||||
	}
 | 
			
		||||
	var iPriority, jPriority int32
 | 
			
		||||
	if a.specs[i].apiService.Spec.Group == a.specs[j].apiService.Spec.Group {
 | 
			
		||||
		iPriority = a.specs[i].apiService.Spec.VersionPriority
 | 
			
		||||
		jPriority = a.specs[i].apiService.Spec.VersionPriority
 | 
			
		||||
	if a.apiServices[i].Spec.Group == a.apiServices[j].Spec.Group {
 | 
			
		||||
		iPriority = a.apiServices[i].Spec.VersionPriority
 | 
			
		||||
		jPriority = a.apiServices[i].Spec.VersionPriority
 | 
			
		||||
	} else {
 | 
			
		||||
		iPriority = a.groupPriorities[a.specs[i].apiService.Spec.Group]
 | 
			
		||||
		jPriority = a.groupPriorities[a.specs[j].apiService.Spec.Group]
 | 
			
		||||
		iPriority = a.groupPriorities[a.apiServices[i].Spec.Group]
 | 
			
		||||
		jPriority = a.groupPriorities[a.apiServices[j].Spec.Group]
 | 
			
		||||
	}
 | 
			
		||||
	if iPriority != jPriority {
 | 
			
		||||
		// Sort by priority, higher first
 | 
			
		||||
		return iPriority > jPriority
 | 
			
		||||
	}
 | 
			
		||||
	// Sort by service name.
 | 
			
		||||
	return a.specs[i].apiService.Name < a.specs[j].apiService.Name
 | 
			
		||||
	return a.apiServices[i].Name < a.apiServices[j].Name
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func sortByPriority(specs []openAPISpecInfo) {
 | 
			
		||||
func sortByPriority(apiServices []*apiregistrationv1.APIService) {
 | 
			
		||||
	b := byPriority{
 | 
			
		||||
		specs:           specs,
 | 
			
		||||
		apiServices:     apiServices,
 | 
			
		||||
		groupPriorities: map[string]int32{},
 | 
			
		||||
	}
 | 
			
		||||
	for _, spec := range specs {
 | 
			
		||||
		if spec.apiService.Spec.Service == nil {
 | 
			
		||||
	for _, apiService := range apiServices {
 | 
			
		||||
		if apiService.Spec.Service == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if pr, found := b.groupPriorities[spec.apiService.Spec.Group]; !found || spec.apiService.Spec.GroupPriorityMinimum > pr {
 | 
			
		||||
			b.groupPriorities[spec.apiService.Spec.Group] = spec.apiService.Spec.GroupPriorityMinimum
 | 
			
		||||
		if pr, found := b.groupPriorities[apiService.Spec.Group]; !found || apiService.Spec.GroupPriorityMinimum > pr {
 | 
			
		||||
			b.groupPriorities[apiService.Spec.Group] = apiService.Spec.GroupPriorityMinimum
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	sort.Sort(b)
 | 
			
		||||
 
 | 
			
		||||
@@ -21,23 +21,22 @@ import (
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
	"k8s.io/kube-openapi/pkg/validation/spec"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) apiregistrationv1.APIService {
 | 
			
		||||
func newAPIServiceForTest(name, group string, minGroupPriority, versionPriority int32, svc *apiregistrationv1.ServiceReference) *apiregistrationv1.APIService {
 | 
			
		||||
	r := apiregistrationv1.APIService{}
 | 
			
		||||
	r.Spec.Group = group
 | 
			
		||||
	r.Spec.GroupPriorityMinimum = minGroupPriority
 | 
			
		||||
	r.Spec.VersionPriority = versionPriority
 | 
			
		||||
	r.Spec.Service = svc
 | 
			
		||||
	r.Name = name
 | 
			
		||||
	return r
 | 
			
		||||
	return &r
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames []string) {
 | 
			
		||||
func assertSortedServices(t *testing.T, actual []*apiregistrationv1.APIService, expectedNames []string) {
 | 
			
		||||
	actualNames := []string{}
 | 
			
		||||
	for _, a := range actual {
 | 
			
		||||
		actualNames = append(actualNames, a.apiService.Name)
 | 
			
		||||
		actualNames = append(actualNames, a.Name)
 | 
			
		||||
	}
 | 
			
		||||
	if !reflect.DeepEqual(actualNames, expectedNames) {
 | 
			
		||||
		t.Errorf("Expected %s got %s.", expectedNames, actualNames)
 | 
			
		||||
@@ -45,32 +44,14 @@ func assertSortedServices(t *testing.T, actual []openAPISpecInfo, expectedNames
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAPIServiceSort(t *testing.T) {
 | 
			
		||||
	list := []openAPISpecInfo{
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
			spec:       &spec.Swagger{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
			spec:       &spec.Swagger{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
			spec:       &spec.Swagger{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
			spec:       &spec.Swagger{},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil),
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			apiService: newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil),
 | 
			
		||||
		},
 | 
			
		||||
	list := []*apiregistrationv1.APIService{
 | 
			
		||||
		newAPIServiceForTest("FirstService", "Group1", 10, 5, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
		newAPIServiceForTest("SecondService", "Group2", 15, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
		newAPIServiceForTest("FirstServiceInternal", "Group1", 16, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
		newAPIServiceForTest("ThirdService", "Group3", 15, 3, &apiregistrationv1.ServiceReference{}),
 | 
			
		||||
		newAPIServiceForTest("local_service_1", "Group4", 15, 1, nil),
 | 
			
		||||
		newAPIServiceForTest("local_service_3", "Group5", 15, 2, nil),
 | 
			
		||||
		newAPIServiceForTest("local_service_2", "Group6", 15, 3, nil),
 | 
			
		||||
	}
 | 
			
		||||
	sortByPriority(list)
 | 
			
		||||
	assertSortedServices(t, list, []string{"local_service_1", "local_service_2", "local_service_3", "FirstService", "FirstServiceInternal", "SecondService", "ThirdService"})
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
 | 
			
		||||
	"k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -67,11 +67,6 @@ func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregat
 | 
			
		||||
 | 
			
		||||
	c.syncHandler = c.sync
 | 
			
		||||
 | 
			
		||||
	// update each service at least once, also those which are not coming from APIServices, namely local services
 | 
			
		||||
	for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
 | 
			
		||||
		c.queue.AddAfter(name, time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return c
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -100,55 +95,31 @@ func (c *AggregationController) processNextWorkItem() bool {
 | 
			
		||||
	if quit {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if aggregator.IsLocalAPIService(key.(string)) {
 | 
			
		||||
		// for local delegation targets that are aggregated once per second, log at
 | 
			
		||||
		// higher level to avoid flooding the log
 | 
			
		||||
		klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key)
 | 
			
		||||
	} else {
 | 
			
		||||
		klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
 | 
			
		||||
	}
 | 
			
		||||
	klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
 | 
			
		||||
 | 
			
		||||
	action, err := c.syncHandler(key.(string))
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		c.queue.Forget(key)
 | 
			
		||||
	} else {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch action {
 | 
			
		||||
	case syncRequeue:
 | 
			
		||||
		if aggregator.IsLocalAPIService(key.(string)) {
 | 
			
		||||
			klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
 | 
			
		||||
			c.queue.AddAfter(key, successfulUpdateDelayLocal)
 | 
			
		||||
		} else {
 | 
			
		||||
			klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
 | 
			
		||||
			c.queue.AddAfter(key, successfulUpdateDelay)
 | 
			
		||||
		}
 | 
			
		||||
		c.queue.AddAfter(key, successfulUpdateDelay)
 | 
			
		||||
	case syncRequeueRateLimited:
 | 
			
		||||
		klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
 | 
			
		||||
		c.queue.AddRateLimited(key)
 | 
			
		||||
	case syncNothing:
 | 
			
		||||
		klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
 | 
			
		||||
		c.queue.Forget(key)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *AggregationController) sync(key string) (syncAction, error) {
 | 
			
		||||
	handler, etag, exists := c.openAPIAggregationManager.GetAPIServiceInfo(key)
 | 
			
		||||
	if !exists || handler == nil {
 | 
			
		||||
		return syncNothing, nil
 | 
			
		||||
	}
 | 
			
		||||
	returnSpec, newEtag, httpStatus, err := c.downloader.Download(handler, etag)
 | 
			
		||||
	switch {
 | 
			
		||||
	case err != nil:
 | 
			
		||||
		return syncRequeueRateLimited, err
 | 
			
		||||
	case httpStatus == http.StatusNotModified:
 | 
			
		||||
	case httpStatus == http.StatusNotFound || returnSpec == nil:
 | 
			
		||||
		return syncRequeueRateLimited, fmt.Errorf("OpenAPI spec does not exist")
 | 
			
		||||
	case httpStatus == http.StatusOK:
 | 
			
		||||
		if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key, returnSpec, newEtag); err != nil {
 | 
			
		||||
	if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
 | 
			
		||||
		if err == aggregator.ErrAPIServiceNotFound {
 | 
			
		||||
			return syncNothing, nil
 | 
			
		||||
		} else {
 | 
			
		||||
			return syncRequeueRateLimited, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -160,7 +131,7 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService *
 | 
			
		||||
	if apiService.Spec.Service == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
 | 
			
		||||
	if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
 | 
			
		||||
		utilruntime.HandleError(fmt.Errorf("adding %q to AggregationController failed with: %v", apiService.Name, err))
 | 
			
		||||
	}
 | 
			
		||||
	c.queue.AddAfter(apiService.Name, time.Second)
 | 
			
		||||
@@ -168,11 +139,8 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService *
 | 
			
		||||
 | 
			
		||||
// UpdateAPIService updates API Service's info and handler.
 | 
			
		||||
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
 | 
			
		||||
	if apiService.Spec.Service == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if err := c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService); err != nil {
 | 
			
		||||
		utilruntime.HandleError(fmt.Errorf("updating %q to AggregationController failed with: %v", apiService.Name, err))
 | 
			
		||||
	if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
 | 
			
		||||
		utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err))
 | 
			
		||||
	}
 | 
			
		||||
	key := apiService.Name
 | 
			
		||||
	if c.queue.NumRequeues(key) > 0 {
 | 
			
		||||
@@ -187,7 +155,7 @@ func (c *AggregationController) UpdateAPIService(handler http.Handler, apiServic
 | 
			
		||||
 | 
			
		||||
// RemoveAPIService removes API Service from OpenAPI Aggregation Controller.
 | 
			
		||||
func (c *AggregationController) RemoveAPIService(apiServiceName string) {
 | 
			
		||||
	if err := c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName); err != nil {
 | 
			
		||||
	if err := c.openAPIAggregationManager.RemoveAPIService(apiServiceName); err != nil {
 | 
			
		||||
		utilruntime.HandleError(fmt.Errorf("removing %q from AggregationController failed with: %v", apiServiceName, err))
 | 
			
		||||
	}
 | 
			
		||||
	// This will only remove it if it was failing before. If it was successful, processNextWorkItem will figure it out
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user