add list of served versions to storage version
This commit is contained in:
		@@ -56,6 +56,11 @@ type APIGroupVersion struct {
 | 
				
			|||||||
	// GroupVersion is the external group version
 | 
						// GroupVersion is the external group version
 | 
				
			||||||
	GroupVersion schema.GroupVersion
 | 
						GroupVersion schema.GroupVersion
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// AllServedVersionsByResource is indexed by resource and maps to a list of versions that resource exists in.
 | 
				
			||||||
 | 
						// This was created so that StorageVersion for APIs can include a list of all version that are served for each
 | 
				
			||||||
 | 
						// GroupResource tuple.
 | 
				
			||||||
 | 
						AllServedVersionsByResource map[string][]string
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
 | 
						// OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
 | 
				
			||||||
	// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
 | 
						// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
 | 
				
			||||||
	// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
 | 
						// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -600,6 +600,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
 | 
				
			|||||||
		if a.group.ConvertabilityChecker != nil {
 | 
							if a.group.ConvertabilityChecker != nil {
 | 
				
			||||||
			decodableVersions = a.group.ConvertabilityChecker.VersionsForGroupKind(fqKindToRegister.GroupKind())
 | 
								decodableVersions = a.group.ConvertabilityChecker.VersionsForGroupKind(fqKindToRegister.GroupKind())
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		resourceInfo = &storageversion.ResourceInfo{
 | 
							resourceInfo = &storageversion.ResourceInfo{
 | 
				
			||||||
			GroupResource: schema.GroupResource{
 | 
								GroupResource: schema.GroupResource{
 | 
				
			||||||
				Group:    a.group.GroupVersion.Group,
 | 
									Group:    a.group.GroupVersion.Group,
 | 
				
			||||||
@@ -612,6 +613,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
 | 
				
			|||||||
			EquivalentResourceMapper: a.group.EquivalentResourceRegistry,
 | 
								EquivalentResourceMapper: a.group.EquivalentResourceRegistry,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			DirectlyDecodableVersions: decodableVersions,
 | 
								DirectlyDecodableVersions: decodableVersions,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ServedVersions: a.group.AllServedVersionsByResource[path],
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -911,8 +911,21 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion) *genericapi.APIGroupVersion {
 | 
					func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion) *genericapi.APIGroupVersion {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						allServedVersionsByResource := map[string][]string{}
 | 
				
			||||||
 | 
						for version, resourcesInVersion := range apiGroupInfo.VersionedResourcesStorageMap {
 | 
				
			||||||
 | 
							for resource := range resourcesInVersion {
 | 
				
			||||||
 | 
								if len(groupVersion.Group) == 0 {
 | 
				
			||||||
 | 
									allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], version)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], fmt.Sprintf("%s/%s", groupVersion.Group, version))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &genericapi.APIGroupVersion{
 | 
						return &genericapi.APIGroupVersion{
 | 
				
			||||||
		GroupVersion:                groupVersion,
 | 
							GroupVersion:                groupVersion,
 | 
				
			||||||
 | 
							AllServedVersionsByResource: allServedVersionsByResource,
 | 
				
			||||||
		MetaGroupVersion:            apiGroupInfo.MetaGroupVersion,
 | 
							MetaGroupVersion:            apiGroupInfo.MetaGroupVersion,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		ParameterCodec:        apiGroupInfo.ParameterCodec,
 | 
							ParameterCodec:        apiGroupInfo.ParameterCodec,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -25,7 +25,6 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
						utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | 
				
			||||||
	serverstorage "k8s.io/apiserver/pkg/server/storage"
 | 
					 | 
				
			||||||
	"k8s.io/client-go/kubernetes"
 | 
						"k8s.io/client-go/kubernetes"
 | 
				
			||||||
	"k8s.io/client-go/rest"
 | 
						"k8s.io/client-go/rest"
 | 
				
			||||||
	_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
 | 
						_ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration
 | 
				
			||||||
@@ -45,6 +44,10 @@ type ResourceInfo struct {
 | 
				
			|||||||
	// DirectlyDecodableVersions is a list of versions that the converter for REST storage knows how to convert.  This
 | 
						// DirectlyDecodableVersions is a list of versions that the converter for REST storage knows how to convert.  This
 | 
				
			||||||
	// contains items like apiextensions.k8s.io/v1beta1 even if we don't serve that version.
 | 
						// contains items like apiextensions.k8s.io/v1beta1 even if we don't serve that version.
 | 
				
			||||||
	DirectlyDecodableVersions []schema.GroupVersion
 | 
						DirectlyDecodableVersions []schema.GroupVersion
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ServedVersions holds a list of all versions of GroupResource that are served.  Note that a server may be able to
 | 
				
			||||||
 | 
						// decode a particular version, but still not serve it.
 | 
				
			||||||
 | 
						ServedVersions []string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
 | 
					// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions.
 | 
				
			||||||
@@ -52,7 +55,7 @@ type Manager interface {
 | 
				
			|||||||
	// AddResourceInfo records resources whose StorageVersions need updates
 | 
						// AddResourceInfo records resources whose StorageVersions need updates
 | 
				
			||||||
	AddResourceInfo(resources ...*ResourceInfo)
 | 
						AddResourceInfo(resources ...*ResourceInfo)
 | 
				
			||||||
	// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | 
						// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | 
				
			||||||
	UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string, apiResourceConfigSource serverstorage.APIResourceConfigSource)
 | 
						UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string)
 | 
				
			||||||
	// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
 | 
						// PendingUpdate returns true if the StorageVersion of the given resource is still pending update.
 | 
				
			||||||
	PendingUpdate(gr schema.GroupResource) bool
 | 
						PendingUpdate(gr schema.GroupResource) bool
 | 
				
			||||||
	// LastUpdateError returns the last error hit when updating the storage version of the given resource.
 | 
						// LastUpdateError returns the last error hit when updating the storage version of the given resource.
 | 
				
			||||||
@@ -112,7 +115,7 @@ func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | 
					// UpdateStorageVersions tries to update the StorageVersions of the recorded resources
 | 
				
			||||||
func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string, apiResourceConfigSource serverstorage.APIResourceConfigSource) {
 | 
					func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) {
 | 
				
			||||||
	clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
 | 
						clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err))
 | 
							utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err))
 | 
				
			||||||
@@ -145,12 +148,8 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C
 | 
				
			|||||||
			gr.Group = "core"
 | 
								gr.Group = "core"
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		servedVersions := []string{}
 | 
							servedVersions := r.ServedVersions
 | 
				
			||||||
		for _, dv := range decodableVersions {
 | 
					
 | 
				
			||||||
			if apiResourceConfigSource.ResourceEnabled(gr.WithVersion(dv)) {
 | 
					 | 
				
			||||||
				servedVersions = append(servedVersions, dv)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if err := updateStorageVersionFor(sc, serverID, gr, r.EncodingVersion, decodableVersions, servedVersions); err != nil {
 | 
							if err := updateStorageVersionFor(sc, serverID, gr, r.EncodingVersion, decodableVersions, servedVersions); err != nil {
 | 
				
			||||||
			utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
 | 
								utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
 | 
				
			||||||
			s.recordStatusFailure(&r, err)
 | 
								s.recordStatusFailure(&r, err)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -367,7 +367,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
 | 
				
			|||||||
				// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
 | 
									// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
 | 
				
			||||||
				// share the same generic apiserver config. The same StorageVersion manager is used
 | 
									// share the same generic apiserver config. The same StorageVersion manager is used
 | 
				
			||||||
				// to register all built-in resources when the generic apiservers install APIs.
 | 
									// to register all built-in resources when the generic apiservers install APIs.
 | 
				
			||||||
				s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID, c.GenericConfig.MergedResourceConfig)
 | 
									s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
 | 
				
			||||||
				return false, nil
 | 
									return false, nil
 | 
				
			||||||
			}, hookContext.StopCh)
 | 
								}, hookContext.StopCh)
 | 
				
			||||||
			// Once the storage version updater finishes the first round of update,
 | 
								// Once the storage version updater finishes the first round of update,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -31,7 +31,6 @@ import (
 | 
				
			|||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/features"
 | 
						"k8s.io/apiserver/pkg/features"
 | 
				
			||||||
	serverstorage "k8s.io/apiserver/pkg/server/storage"
 | 
					 | 
				
			||||||
	"k8s.io/apiserver/pkg/storageversion"
 | 
						"k8s.io/apiserver/pkg/storageversion"
 | 
				
			||||||
	utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
						utilfeature "k8s.io/apiserver/pkg/util/feature"
 | 
				
			||||||
	"k8s.io/client-go/dynamic"
 | 
						"k8s.io/client-go/dynamic"
 | 
				
			||||||
@@ -53,9 +52,9 @@ type wrappedStorageVersionManager struct {
 | 
				
			|||||||
	completed      <-chan struct{}
 | 
						completed      <-chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (w *wrappedStorageVersionManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string, apiResourceConfigSource serverstorage.APIResourceConfigSource) {
 | 
					func (w *wrappedStorageVersionManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) {
 | 
				
			||||||
	<-w.startUpdateSV
 | 
						<-w.startUpdateSV
 | 
				
			||||||
	w.Manager.UpdateStorageVersions(loopbackClientConfig, serverID, apiResourceConfigSource)
 | 
						w.Manager.UpdateStorageVersions(loopbackClientConfig, serverID)
 | 
				
			||||||
	close(w.updateFinished)
 | 
						close(w.updateFinished)
 | 
				
			||||||
	<-w.finishUpdateSV
 | 
						<-w.finishUpdateSV
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user