Change aggregator to proxier for OpenAPI v3
This commit is contained in:
		| @@ -17,8 +17,11 @@ limitations under the License. | |||||||
| package aggregator | package aggregator | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"sort" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| @@ -26,15 +29,10 @@ import ( | |||||||
| 	"k8s.io/apiserver/pkg/server" | 	"k8s.io/apiserver/pkg/server" | ||||||
| 	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" | 	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" | ||||||
| 	"k8s.io/kube-openapi/pkg/common" | 	"k8s.io/kube-openapi/pkg/common" | ||||||
| 	"k8s.io/kube-openapi/pkg/handler3" |  | ||||||
| 	"k8s.io/kube-openapi/pkg/spec3" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // SpecAggregator calls out to http handlers of APIServices and caches specs. It keeps state of the last | // SpecProxier proxies OpenAPI V3 requests to their respective APIService | ||||||
| // known specs including the http etag. | type SpecProxier interface { | ||||||
| // TODO(jefftree): remove the downloading and caching and proxy directly to the APIServices. This is possible because we |  | ||||||
| // don't have to merge here, which is cpu intensive in v2 |  | ||||||
| type SpecAggregator interface { |  | ||||||
| 	AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) | 	AddUpdateAPIService(handler http.Handler, apiService *v1.APIService) | ||||||
| 	UpdateAPIServiceSpec(apiServiceName string) error | 	UpdateAPIServiceSpec(apiServiceName string) error | ||||||
| 	RemoveAPIServiceSpec(apiServiceName string) | 	RemoveAPIServiceSpec(apiServiceName string) | ||||||
| @@ -53,37 +51,27 @@ func IsLocalAPIService(apiServiceName string) bool { | |||||||
| 	return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) | 	return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix) | ||||||
| } | } | ||||||
|  |  | ||||||
| // GetAPIServicesName returns the names of APIServices recorded in openAPIV3Specs. | // GetAPIServicesName returns the names of APIServices recorded in apiServiceInfo. | ||||||
| // We use this function to pass the names of local APIServices to the controller in this package, | // We use this function to pass the names of local APIServices to the controller in this package, | ||||||
| // so that the controller can periodically sync the OpenAPI spec from delegation API servers. | // so that the controller can periodically sync the OpenAPI spec from delegation API servers. | ||||||
| func (s *specAggregator) GetAPIServiceNames() []string { | func (s *specProxier) GetAPIServiceNames() []string { | ||||||
| 	s.rwMutex.Lock() | 	s.rwMutex.RLock() | ||||||
| 	defer s.rwMutex.Unlock() | 	defer s.rwMutex.RUnlock() | ||||||
|  |  | ||||||
| 	names := make([]string, len(s.openAPIV3Specs)) | 	names := make([]string, len(s.apiServiceInfo)) | ||||||
| 	for key := range s.openAPIV3Specs { | 	for key := range s.apiServiceInfo { | ||||||
| 		names = append(names, key) | 		names = append(names, key) | ||||||
| 	} | 	} | ||||||
| 	return names | 	return names | ||||||
| } | } | ||||||
|  |  | ||||||
| // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. | // BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup. | ||||||
| func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecAggregator, error) { | func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.DelegationTarget, pathHandler common.PathHandlerByGroupVersion) (SpecProxier, error) { | ||||||
| 	var err error | 	s := &specProxier{ | ||||||
| 	s := &specAggregator{ | 		apiServiceInfo: map[string]*openAPIV3APIServiceInfo{}, | ||||||
| 		openAPIV3Specs: map[string]*openAPIV3APIServiceInfo{}, |  | ||||||
| 		downloader:     downloader, | 		downloader:     downloader, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	s.openAPIV3VersionedService, err = handler3.NewOpenAPIService(nil) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	err = s.openAPIV3VersionedService.RegisterOpenAPIV3VersionedService("/openapi/v3", pathHandler) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	i := 1 | 	i := 1 | ||||||
| 	for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { | 	for delegate := delegationTarget; delegate != nil; delegate = delegate.NextDelegate() { | ||||||
| 		handler := delegate.UnprotectedHandler() | 		handler := delegate.UnprotectedHandler() | ||||||
| @@ -98,109 +86,126 @@ func BuildAndRegisterAggregator(downloader Downloader, delegationTarget server.D | |||||||
| 		s.UpdateAPIServiceSpec(apiServiceName) | 		s.UpdateAPIServiceSpec(apiServiceName) | ||||||
| 		i++ | 		i++ | ||||||
| 	} | 	} | ||||||
|  | 	s.register(pathHandler) | ||||||
|  |  | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // AddUpdateAPIService adds or updates the api service. It is thread safe. | // AddUpdateAPIService adds or updates the api service. It is thread safe. | ||||||
| func (s *specAggregator) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { | func (s *specProxier) AddUpdateAPIService(handler http.Handler, apiservice *v1.APIService) { | ||||||
| 	s.rwMutex.Lock() | 	s.rwMutex.Lock() | ||||||
| 	defer s.rwMutex.Unlock() | 	defer s.rwMutex.Unlock() | ||||||
| 	// If the APIService is being updated, use the existing struct. | 	// If the APIService is being updated, use the existing struct. | ||||||
| 	if apiServiceInfo, ok := s.openAPIV3Specs[apiservice.Name]; ok { | 	if apiServiceInfo, ok := s.apiServiceInfo[apiservice.Name]; ok { | ||||||
| 		apiServiceInfo.apiService = *apiservice | 		apiServiceInfo.apiService = *apiservice | ||||||
| 		apiServiceInfo.handler = handler | 		apiServiceInfo.handler = handler | ||||||
| 	} | 	} | ||||||
| 	s.openAPIV3Specs[apiservice.Name] = &openAPIV3APIServiceInfo{ | 	s.apiServiceInfo[apiservice.Name] = &openAPIV3APIServiceInfo{ | ||||||
| 		apiService: *apiservice, | 		apiService: *apiservice, | ||||||
| 		handler:    handler, | 		handler:    handler, | ||||||
| 		specs:      make(map[string]*openAPIV3SpecInfo), |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. | // UpdateAPIServiceSpec updates all the OpenAPI v3 specs that the APIService serves. | ||||||
| // It is thread safe. | // It is thread safe. | ||||||
| func (s *specAggregator) UpdateAPIServiceSpec(apiServiceName string) error { | func (s *specProxier) UpdateAPIServiceSpec(apiServiceName string) error { | ||||||
| 	s.rwMutex.Lock() | 	s.rwMutex.Lock() | ||||||
| 	defer s.rwMutex.Unlock() | 	defer s.rwMutex.Unlock() | ||||||
|  |  | ||||||
| 	apiService, exists := s.openAPIV3Specs[apiServiceName] | 	apiService, exists := s.apiServiceInfo[apiServiceName] | ||||||
| 	if !exists { | 	if !exists { | ||||||
| 		return fmt.Errorf("APIService %s does not exist for update", apiServiceName) | 		return fmt.Errorf("APIService %s does not exist for update", apiServiceName) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Pass a list of old etags to the Downloader to prevent transfers if etags match | 	gv, err := s.downloader.OpenAPIV3Root(apiService.handler) | ||||||
| 	etagList := make(map[string]string) |  | ||||||
| 	for gv, specInfo := range apiService.specs { |  | ||||||
| 		etagList[gv] = specInfo.etag |  | ||||||
| 	} |  | ||||||
| 	groups, err := s.downloader.Download(apiService.handler, etagList) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  | 	s.apiServiceInfo[apiServiceName].gvList = gv | ||||||
| 	// Remove any groups that do not exist anymore |  | ||||||
| 	for group := range s.openAPIV3Specs[apiServiceName].specs { |  | ||||||
| 		if _, exists := groups[group]; !exists { |  | ||||||
| 			s.openAPIV3VersionedService.DeleteGroupVersion(group) |  | ||||||
| 			delete(s.openAPIV3Specs[apiServiceName].specs, group) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for group, info := range groups { |  | ||||||
| 		if info.spec == nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// If ETag has not changed, no update is necessary |  | ||||||
| 		oldInfo, exists := s.openAPIV3Specs[apiServiceName].specs[group] |  | ||||||
| 		if exists && oldInfo.etag == info.etag { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		s.openAPIV3Specs[apiServiceName].specs[group] = &openAPIV3SpecInfo{ |  | ||||||
| 			spec: info.spec, |  | ||||||
| 			etag: info.etag, |  | ||||||
| 		} |  | ||||||
| 		s.openAPIV3VersionedService.UpdateGroupVersion(group, info.spec) |  | ||||||
| 	} |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| type specAggregator struct { | type specProxier struct { | ||||||
| 	// mutex protects all members of this struct. | 	// mutex protects all members of this struct. | ||||||
| 	rwMutex sync.RWMutex | 	rwMutex sync.RWMutex | ||||||
|  |  | ||||||
| 	// OpenAPI V3 specs by APIService name | 	// OpenAPI V3 specs by APIService name | ||||||
| 	openAPIV3Specs map[string]*openAPIV3APIServiceInfo | 	apiServiceInfo map[string]*openAPIV3APIServiceInfo | ||||||
| 	// provided for dynamic OpenAPI spec |  | ||||||
| 	openAPIV3VersionedService *handler3.OpenAPIService |  | ||||||
|  |  | ||||||
| 	// For downloading the OpenAPI v3 specs from apiservices | 	// For downloading the OpenAPI v3 specs from apiservices | ||||||
| 	downloader Downloader | 	downloader Downloader | ||||||
| } | } | ||||||
|  |  | ||||||
| var _ SpecAggregator = &specAggregator{} | var _ SpecProxier = &specProxier{} | ||||||
|  |  | ||||||
| type openAPIV3APIServiceInfo struct { | type openAPIV3APIServiceInfo struct { | ||||||
| 	apiService v1.APIService | 	apiService v1.APIService | ||||||
| 	handler    http.Handler | 	handler    http.Handler | ||||||
| 	specs      map[string]*openAPIV3SpecInfo | 	gvList     []string | ||||||
| } |  | ||||||
|  |  | ||||||
| type openAPIV3SpecInfo struct { |  | ||||||
| 	spec *spec3.OpenAPI |  | ||||||
| 	etag string |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. | // RemoveAPIServiceSpec removes an api service from the OpenAPI map. If it does not exist, no error is returned. | ||||||
| // It is thread safe. | // It is thread safe. | ||||||
| func (s *specAggregator) RemoveAPIServiceSpec(apiServiceName string) { | func (s *specProxier) RemoveAPIServiceSpec(apiServiceName string) { | ||||||
| 	s.rwMutex.Lock() | 	s.rwMutex.Lock() | ||||||
| 	defer s.rwMutex.Unlock() | 	defer s.rwMutex.Unlock() | ||||||
| 	if apiServiceInfo, ok := s.openAPIV3Specs[apiServiceName]; ok { | 	if _, ok := s.apiServiceInfo[apiServiceName]; ok { | ||||||
| 		for gv := range apiServiceInfo.specs { | 		delete(s.apiServiceInfo, apiServiceName) | ||||||
| 			s.openAPIV3VersionedService.DeleteGroupVersion(gv) |  | ||||||
| 		} |  | ||||||
| 		delete(s.openAPIV3Specs, apiServiceName) |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // handleDiscovery is the handler for OpenAPI V3 Discovery | ||||||
|  | func (s *specProxier) handleDiscovery(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 	s.rwMutex.RLock() | ||||||
|  | 	defer s.rwMutex.RUnlock() | ||||||
|  |  | ||||||
|  | 	gvList := make(map[string]bool) | ||||||
|  | 	for _, apiServiceInfo := range s.apiServiceInfo { | ||||||
|  | 		for _, gv := range apiServiceInfo.gvList { | ||||||
|  | 			gvList[gv] = true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	keys := make([]string, 0, len(gvList)) | ||||||
|  | 	for k := range gvList { | ||||||
|  | 		keys = append(keys, k) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	sort.Strings(keys) | ||||||
|  | 	output := map[string][]string{"Paths": keys} | ||||||
|  | 	j, err := json.Marshal(output) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	http.ServeContent(w, r, "/openapi/v3", time.Now(), bytes.NewReader(j)) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // handleGroupVersion is the OpenAPI V3 handler for a specified group/version | ||||||
|  | func (s *specProxier) handleGroupVersion(w http.ResponseWriter, r *http.Request) { | ||||||
|  | 	s.rwMutex.RLock() | ||||||
|  | 	defer s.rwMutex.RUnlock() | ||||||
|  |  | ||||||
|  | 	// TODO: Import this logic from kube-openapi instead of duplicating | ||||||
|  | 	// URLs for OpenAPI V3 have the format /openapi/v3/<groupversionpath> | ||||||
|  | 	// SplitAfterN with 4 yields ["", "openapi", "v3", <groupversionpath>] | ||||||
|  | 	url := strings.SplitAfterN(r.URL.Path, "/", 4) | ||||||
|  | 	targetGV := url[3] | ||||||
|  |  | ||||||
|  | 	for _, apiServiceInfo := range s.apiServiceInfo { | ||||||
|  | 		for _, gv := range apiServiceInfo.gvList { | ||||||
|  | 			if targetGV == gv { | ||||||
|  | 				apiServiceInfo.handler.ServeHTTP(w, r) | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	// No group-versions match the desired request | ||||||
|  | 	w.WriteHeader(404) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Register registers the OpenAPI V3 Discovery and GroupVersion handlers | ||||||
|  | func (s *specProxier) register(handler common.PathHandlerByGroupVersion) { | ||||||
|  | 	handler.Handle("/openapi/v3", http.HandlerFunc(s.handleDiscovery)) | ||||||
|  | 	handler.HandlePrefix("/openapi/v3/", http.HandlerFunc(s.handleGroupVersion)) | ||||||
|  | } | ||||||
|   | |||||||
| @@ -54,6 +54,34 @@ type SpecETag struct { | |||||||
| 	etag string | 	etag string | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // OpenAPIV3Root downloads the OpenAPI V3 root document from an APIService | ||||||
|  | func (s *Downloader) OpenAPIV3Root(handler http.Handler) ([]string, error) { | ||||||
|  | 	handler = s.handlerWithUser(handler, &user.DefaultInfo{Name: aggregatorUser}) | ||||||
|  | 	handler = http.TimeoutHandler(handler, specDownloadTimeout, "request timed out") | ||||||
|  |  | ||||||
|  | 	req, err := http.NewRequest("GET", "/openapi/v3", nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	req.Header.Add("Accept", "application/json") | ||||||
|  |  | ||||||
|  | 	writer := newInMemoryResponseWriter() | ||||||
|  | 	handler.ServeHTTP(writer, req) | ||||||
|  |  | ||||||
|  | 	switch writer.respCode { | ||||||
|  | 	case http.StatusNotFound: | ||||||
|  | 		// TODO: For APIServices, download the V2 spec and convert to V3 | ||||||
|  | 		return nil, nil | ||||||
|  | 	case http.StatusOK: | ||||||
|  | 		groups := gvList{} | ||||||
|  | 		if err := json.Unmarshal(writer.data, &groups); err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		return groups.Paths, nil | ||||||
|  | 	} | ||||||
|  | 	return nil, fmt.Errorf("Error, could not get list of group versions for APIService") | ||||||
|  | } | ||||||
|  |  | ||||||
| // Download downloads OpenAPI v3 for all groups of a given handler | // Download downloads OpenAPI v3 for all groups of a given handler | ||||||
| func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { | func (s *Downloader) Download(handler http.Handler, etagList map[string]string) (returnSpec map[string]*SpecETag, err error) { | ||||||
| 	// TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 | 	// TODO(jefftree): https://github.com/kubernetes/kubernetes/pull/105945#issuecomment-966455034 | ||||||
|   | |||||||
| @@ -32,10 +32,12 @@ type handlerTest struct { | |||||||
|  |  | ||||||
| var _ http.Handler = handlerTest{} | var _ http.Handler = handlerTest{} | ||||||
|  |  | ||||||
|  | var groupList = []string{"apis/group/version"} | ||||||
|  |  | ||||||
| func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { | func (h handlerTest) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||||
| 	// Create an APIService with a handler for one group/version | 	// Create an APIService with a handler for one group/version | ||||||
| 	group := make(map[string][]string) | 	group := make(map[string][]string) | ||||||
| 	group["Paths"] = []string{"apis/group/version"} | 	group["Paths"] = groupList | ||||||
| 	j, _ := json.Marshal(group) | 	j, _ := json.Marshal(group) | ||||||
| 	if r.URL.Path == "/openapi/v3" { | 	if r.URL.Path == "/openapi/v3" { | ||||||
| 		w.Write(j) | 		w.Write(j) | ||||||
| @@ -85,6 +87,11 @@ func assertDownloadedSpec(gvSpec map[string]*SpecETag, err error, expectedSpecID | |||||||
| func TestDownloadOpenAPISpec(t *testing.T) { | func TestDownloadOpenAPISpec(t *testing.T) { | ||||||
| 	s := Downloader{} | 	s := Downloader{} | ||||||
|  |  | ||||||
|  | 	groups, err := s.OpenAPIV3Root( | ||||||
|  | 		handlerTest{data: []byte(""), etag: ""}) | ||||||
|  | 	assert.NoError(t, err) | ||||||
|  | 	assert.Equal(t, groups, groupList) | ||||||
|  |  | ||||||
| 	// Test with eTag | 	// Test with eTag | ||||||
| 	gvSpec, err := s.Download( | 	gvSpec, err := s.Download( | ||||||
| 		handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) | 		handlerTest{data: []byte("{\"openapi\": \"test\"}"), etag: "etag_test"}, map[string]string{}) | ||||||
|   | |||||||
| @@ -43,10 +43,9 @@ const ( | |||||||
| 	syncNothing | 	syncNothing | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // AggregationController periodically check for changes in OpenAPI specs of APIServices and update/remove | // AggregationController periodically checks the list of group-versions handled by each APIService and updates the discovery page periodically | ||||||
| // them if necessary. |  | ||||||
| type AggregationController struct { | type AggregationController struct { | ||||||
| 	openAPIAggregationManager aggregator.SpecAggregator | 	openAPIAggregationManager aggregator.SpecProxier | ||||||
| 	queue                     workqueue.RateLimitingInterface | 	queue                     workqueue.RateLimitingInterface | ||||||
|  |  | ||||||
| 	// To allow injection for testing. | 	// To allow injection for testing. | ||||||
| @@ -54,7 +53,7 @@ type AggregationController struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| // NewAggregationController creates new OpenAPI aggregation controller. | // NewAggregationController creates new OpenAPI aggregation controller. | ||||||
| func NewAggregationController(openAPIAggregationManager aggregator.SpecAggregator) *AggregationController { | func NewAggregationController(openAPIAggregationManager aggregator.SpecProxier) *AggregationController { | ||||||
| 	c := &AggregationController{ | 	c := &AggregationController{ | ||||||
| 		openAPIAggregationManager: openAPIAggregationManager, | 		openAPIAggregationManager: openAPIAggregationManager, | ||||||
| 		queue: workqueue.NewNamedRateLimitingQueue( | 		queue: workqueue.NewNamedRateLimitingQueue( | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Jefftree
					Jefftree