Merge pull request #116770 from alexzielenski/agg-discovery-err-sources
Fix aggregated discovery race when using CRD and Aggregated APIService under same group
This commit is contained in:
		| @@ -43,6 +43,7 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apimachinery/pkg/version" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/discovery" | ||||
| 	"k8s.io/apiserver/pkg/endpoints/discovery/aggregated" | ||||
| 	"k8s.io/apiserver/pkg/features" | ||||
| 	genericregistry "k8s.io/apiserver/pkg/registry/generic" | ||||
| 	"k8s.io/apiserver/pkg/registry/rest" | ||||
| @@ -210,7 +211,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) | ||||
| 	s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) | ||||
| 	s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy) | ||||
|  | ||||
| 	discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager) | ||||
| 	aggregatedDiscoveryManager := genericServer.AggregatedDiscoveryGroupManager | ||||
| 	if aggregatedDiscoveryManager != nil { | ||||
| 		aggregatedDiscoveryManager = aggregatedDiscoveryManager.WithSource(aggregated.CRDSource) | ||||
| 	} | ||||
| 	discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager) | ||||
| 	namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) | ||||
| 	nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) | ||||
| 	apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) | ||||
|   | ||||
| @@ -265,6 +265,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error { | ||||
| 	}) | ||||
| 	if c.resourceManager != nil { | ||||
| 		c.resourceManager.AddGroupVersion(version.Group, apidiscoveryv2beta1.APIVersionDiscovery{ | ||||
| 			Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, | ||||
| 			Version:   version.Version, | ||||
| 			Resources: aggregatedApiResourcesForDiscovery, | ||||
| 		}) | ||||
|   | ||||
| @@ -127,6 +127,7 @@ var coolBarCRD = &v1.CustomResourceDefinition{ | ||||
|  | ||||
| var coolFooDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{ | ||||
| 	Version:   "v1", | ||||
| 	Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, | ||||
| 	Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ | ||||
| 		{ | ||||
| 			Resource:         "coolfoos", | ||||
| @@ -158,6 +159,7 @@ var coolFooDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2bet | ||||
|  | ||||
| var mergedDiscovery apidiscoveryv2beta1.APIVersionDiscovery = apidiscoveryv2beta1.APIVersionDiscovery{ | ||||
| 	Version:   "v1", | ||||
| 	Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, | ||||
| 	Resources: []apidiscoveryv2beta1.APIResourceDiscovery{ | ||||
| 		{ | ||||
| 			Resource:         "coolbars", | ||||
|   | ||||
| @@ -169,3 +169,7 @@ func (f *recorderResourceManager) WebService() *restful.WebService { | ||||
| func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) { | ||||
| 	panic("unimplemented") | ||||
| } | ||||
|  | ||||
| func (f *recorderResourceManager) WithSource(source Source) ResourceManager { | ||||
| 	panic("unimplemented") | ||||
| } | ||||
|   | ||||
| @@ -36,6 +36,15 @@ import ( | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| type Source uint | ||||
|  | ||||
| // The GroupVersion from the lowest Source takes precedence | ||||
| const ( | ||||
| 	AggregatorSource Source = 0 | ||||
| 	BuiltinSource    Source = 100 | ||||
| 	CRDSource        Source = 200 | ||||
| ) | ||||
|  | ||||
| // This handler serves the /apis endpoint for an aggregated list of | ||||
| // api resources indexed by their group version. | ||||
| type ResourceManager interface { | ||||
| @@ -65,9 +74,55 @@ type ResourceManager interface { | ||||
| 	// Thread-Safe | ||||
| 	SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery) | ||||
|  | ||||
| 	// Returns the same resource manager using a different source | ||||
| 	// The source is used to decide how to de-duplicate groups. | ||||
| 	// The group from the least-numbered source is used | ||||
| 	WithSource(source Source) ResourceManager | ||||
|  | ||||
| 	http.Handler | ||||
| } | ||||
|  | ||||
| type resourceManager struct { | ||||
| 	source Source | ||||
| 	*resourceDiscoveryManager | ||||
| } | ||||
|  | ||||
| func (rm resourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { | ||||
| 	rm.resourceDiscoveryManager.AddGroupVersion(rm.source, groupName, value) | ||||
| } | ||||
| func (rm resourceManager) SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int) { | ||||
| 	rm.resourceDiscoveryManager.SetGroupVersionPriority(rm.source, gv, grouppriority, versionpriority) | ||||
| } | ||||
| func (rm resourceManager) RemoveGroup(groupName string) { | ||||
| 	rm.resourceDiscoveryManager.RemoveGroup(rm.source, groupName) | ||||
| } | ||||
| func (rm resourceManager) RemoveGroupVersion(gv metav1.GroupVersion) { | ||||
| 	rm.resourceDiscoveryManager.RemoveGroupVersion(rm.source, gv) | ||||
| } | ||||
| func (rm resourceManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { | ||||
| 	rm.resourceDiscoveryManager.SetGroups(rm.source, groups) | ||||
| } | ||||
|  | ||||
| func (rm resourceManager) WithSource(source Source) ResourceManager { | ||||
| 	return resourceManager{ | ||||
| 		source:                   source, | ||||
| 		resourceDiscoveryManager: rm.resourceDiscoveryManager, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type groupKey struct { | ||||
| 	name string | ||||
|  | ||||
| 	// Source identifies where this group came from and dictates which group | ||||
| 	// among duplicates is chosen to be used for discovery. | ||||
| 	source Source | ||||
| } | ||||
|  | ||||
| type groupVersionKey struct { | ||||
| 	metav1.GroupVersion | ||||
| 	source Source | ||||
| } | ||||
|  | ||||
| type resourceDiscoveryManager struct { | ||||
| 	serializer runtime.NegotiatedSerializer | ||||
| 	// cache is an atomic pointer to avoid the use of locks | ||||
| @@ -78,8 +133,8 @@ type resourceDiscoveryManager struct { | ||||
| 	// Writes protected by the lock. | ||||
| 	// List of all apigroups & resources indexed by the resource manager | ||||
| 	lock              sync.RWMutex | ||||
| 	apiGroups         map[string]*apidiscoveryv2beta1.APIGroupDiscovery | ||||
| 	versionPriorities map[metav1.GroupVersion]priorityInfo | ||||
| 	apiGroups         map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery | ||||
| 	versionPriorities map[groupVersionKey]priorityInfo | ||||
| } | ||||
|  | ||||
| type priorityInfo struct { | ||||
| @@ -93,7 +148,7 @@ func NewResourceManager(path string) ResourceManager { | ||||
| 	utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) | ||||
| 	rdm := &resourceDiscoveryManager{ | ||||
| 		serializer:        codecs, | ||||
| 		versionPriorities: make(map[metav1.GroupVersion]priorityInfo), | ||||
| 		versionPriorities: make(map[groupVersionKey]priorityInfo), | ||||
| 	} | ||||
| 	rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET", | ||||
| 		/* group = */ "", | ||||
| @@ -105,20 +160,28 @@ func NewResourceManager(path string) ResourceManager { | ||||
| 		/* deprecated */ false, | ||||
| 		/* removedRelease */ "", | ||||
| 		rdm.serveHTTP) | ||||
| 	return rdm | ||||
| 	return resourceManager{ | ||||
| 		source:                   BuiltinSource, | ||||
| 		resourceDiscoveryManager: rdm, | ||||
| 	} | ||||
| } | ||||
| func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(source Source, gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { | ||||
| 	rdm.lock.Lock() | ||||
| 	defer rdm.lock.Unlock() | ||||
|  | ||||
| 	rdm.versionPriorities[gv] = priorityInfo{ | ||||
| 	key := groupVersionKey{ | ||||
| 		GroupVersion: gv, | ||||
| 		source:       source, | ||||
| 	} | ||||
| 	rdm.versionPriorities[key] = priorityInfo{ | ||||
| 		GroupPriorityMinimum: groupPriorityMinimum, | ||||
| 		VersionPriority:      versionPriority, | ||||
| 	} | ||||
| 	rdm.cache.Store(nil) | ||||
| } | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { | ||||
| func (rdm *resourceDiscoveryManager) SetGroups(source Source, groups []apidiscoveryv2beta1.APIGroupDiscovery) { | ||||
| 	rdm.lock.Lock() | ||||
| 	defer rdm.lock.Unlock() | ||||
|  | ||||
| @@ -127,13 +190,17 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG | ||||
|  | ||||
| 	for _, group := range groups { | ||||
| 		for _, version := range group.Versions { | ||||
| 			rdm.addGroupVersionLocked(group.Name, version) | ||||
| 			rdm.addGroupVersionLocked(source, group.Name, version) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Filter unused out priority entries | ||||
| 	for gv := range rdm.versionPriorities { | ||||
| 		entry, exists := rdm.apiGroups[gv.Group] | ||||
| 		key := groupKey{ | ||||
| 			source: source, | ||||
| 			name:   gv.Group, | ||||
| 		} | ||||
| 		entry, exists := rdm.apiGroups[key] | ||||
| 		if !exists { | ||||
| 			delete(rdm.versionPriorities, gv) | ||||
| 			continue | ||||
| @@ -154,21 +221,26 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { | ||||
| func (rdm *resourceDiscoveryManager) AddGroupVersion(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { | ||||
| 	rdm.lock.Lock() | ||||
| 	defer rdm.lock.Unlock() | ||||
|  | ||||
| 	rdm.addGroupVersionLocked(groupName, value) | ||||
| 	rdm.addGroupVersionLocked(source, groupName, value) | ||||
| } | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { | ||||
| func (rdm *resourceDiscoveryManager) addGroupVersionLocked(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { | ||||
| 	klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version) | ||||
|  | ||||
| 	if rdm.apiGroups == nil { | ||||
| 		rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery) | ||||
| 		rdm.apiGroups = make(map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery) | ||||
| 	} | ||||
|  | ||||
| 	if existing, groupExists := rdm.apiGroups[groupName]; groupExists { | ||||
| 	key := groupKey{ | ||||
| 		source: source, | ||||
| 		name:   groupName, | ||||
| 	} | ||||
|  | ||||
| 	if existing, groupExists := rdm.apiGroups[key]; groupExists { | ||||
| 		// If this version already exists, replace it | ||||
| 		versionExists := false | ||||
|  | ||||
| @@ -181,6 +253,7 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val | ||||
| 				if reflect.DeepEqual(existing.Versions[i], value) { | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				existing.Versions[i] = value | ||||
| 				versionExists = true | ||||
| 				break | ||||
| @@ -198,12 +271,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val | ||||
| 			}, | ||||
| 			Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value}, | ||||
| 		} | ||||
| 		rdm.apiGroups[groupName] = group | ||||
| 		rdm.apiGroups[key] = group | ||||
| 	} | ||||
|  | ||||
| 	gv := metav1.GroupVersion{Group: groupName, Version: value.Version} | ||||
| 	if _, ok := rdm.versionPriorities[gv]; !ok { | ||||
| 		rdm.versionPriorities[gv] = priorityInfo{ | ||||
| 	gvKey := groupVersionKey{ | ||||
| 		GroupVersion: gv, | ||||
| 		source:       source, | ||||
| 	} | ||||
| 	if _, ok := rdm.versionPriorities[gvKey]; !ok { | ||||
| 		rdm.versionPriorities[gvKey] = priorityInfo{ | ||||
| 			GroupPriorityMinimum: 1000, | ||||
| 			VersionPriority:      15, | ||||
| 		} | ||||
| @@ -213,10 +290,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val | ||||
| 	rdm.cache.Store(nil) | ||||
| } | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) { | ||||
| func (rdm *resourceDiscoveryManager) RemoveGroupVersion(source Source, apiGroup metav1.GroupVersion) { | ||||
| 	rdm.lock.Lock() | ||||
| 	defer rdm.lock.Unlock() | ||||
| 	group, exists := rdm.apiGroups[apiGroup.Group] | ||||
|  | ||||
| 	key := groupKey{ | ||||
| 		source: source, | ||||
| 		name:   apiGroup.Group, | ||||
| 	} | ||||
|  | ||||
| 	group, exists := rdm.apiGroups[key] | ||||
| 	if !exists { | ||||
| 		return | ||||
| 	} | ||||
| @@ -234,23 +317,33 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVer | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	delete(rdm.versionPriorities, apiGroup) | ||||
| 	gvKey := groupVersionKey{ | ||||
| 		GroupVersion: apiGroup, | ||||
| 		source:       source, | ||||
| 	} | ||||
|  | ||||
| 	delete(rdm.versionPriorities, gvKey) | ||||
| 	if len(group.Versions) == 0 { | ||||
| 		delete(rdm.apiGroups, group.Name) | ||||
| 		delete(rdm.apiGroups, key) | ||||
| 	} | ||||
|  | ||||
| 	// Reset response document so it is recreated lazily | ||||
| 	rdm.cache.Store(nil) | ||||
| } | ||||
|  | ||||
| func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) { | ||||
| func (rdm *resourceDiscoveryManager) RemoveGroup(source Source, groupName string) { | ||||
| 	rdm.lock.Lock() | ||||
| 	defer rdm.lock.Unlock() | ||||
|  | ||||
| 	delete(rdm.apiGroups, groupName) | ||||
| 	key := groupKey{ | ||||
| 		source: source, | ||||
| 		name:   groupName, | ||||
| 	} | ||||
|  | ||||
| 	delete(rdm.apiGroups, key) | ||||
|  | ||||
| 	for k := range rdm.versionPriorities { | ||||
| 		if k.Group == groupName { | ||||
| 		if k.Group == groupName && k.source == source { | ||||
| 			delete(rdm.versionPriorities, k) | ||||
| 		} | ||||
| 	} | ||||
| @@ -265,17 +358,63 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 | ||||
| 	regenerationCounter.Inc() | ||||
| 	// Re-order the apiGroups by their priority. | ||||
| 	groups := []apidiscoveryv2beta1.APIGroupDiscovery{} | ||||
| 	for _, group := range rdm.apiGroups { | ||||
| 		copied := *group.DeepCopy() | ||||
|  | ||||
| 	groupsToUse := map[string]apidiscoveryv2beta1.APIGroupDiscovery{} | ||||
| 	sourcesUsed := map[metav1.GroupVersion]Source{} | ||||
|  | ||||
| 	for key, group := range rdm.apiGroups { | ||||
| 		if existing, ok := groupsToUse[key.name]; ok { | ||||
| 			for _, v := range group.Versions { | ||||
| 				gv := metav1.GroupVersion{Group: key.name, Version: v.Version} | ||||
|  | ||||
| 				// Skip groupversions we've already seen before. Only DefaultSource | ||||
| 				// takes precedence | ||||
| 				if usedSource, seen := sourcesUsed[gv]; seen && key.source >= usedSource { | ||||
| 					continue | ||||
| 				} else if seen { | ||||
| 					// Find the index of the duplicate version and replace | ||||
| 					for i := 0; i < len(existing.Versions); i++ { | ||||
| 						if existing.Versions[i].Version == v.Version { | ||||
| 							existing.Versions[i] = v | ||||
| 							break | ||||
| 						} | ||||
| 					} | ||||
|  | ||||
| 				} else { | ||||
| 					// New group-version, just append | ||||
| 					existing.Versions = append(existing.Versions, v) | ||||
| 				} | ||||
|  | ||||
| 				sourcesUsed[gv] = key.source | ||||
| 				groupsToUse[key.name] = existing | ||||
| 			} | ||||
| 			// Check to see if we have overlapping versions. If we do, take the one | ||||
| 			// with highest source precedence | ||||
| 		} else { | ||||
| 			groupsToUse[key.name] = *group.DeepCopy() | ||||
| 			for _, v := range group.Versions { | ||||
| 				gv := metav1.GroupVersion{Group: key.name, Version: v.Version} | ||||
| 				sourcesUsed[gv] = key.source | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for _, group := range groupsToUse { | ||||
|  | ||||
| 		// Re-order versions based on their priority. Use kube-aware string | ||||
| 		// comparison as a tie breaker | ||||
| 		sort.SliceStable(copied.Versions, func(i, j int) bool { | ||||
| 			iVersion := copied.Versions[i].Version | ||||
| 			jVersion := copied.Versions[j].Version | ||||
| 		sort.SliceStable(group.Versions, func(i, j int) bool { | ||||
| 			iVersion := group.Versions[i].Version | ||||
| 			jVersion := group.Versions[j].Version | ||||
|  | ||||
| 			iPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: iVersion}].VersionPriority | ||||
| 			jPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: jVersion}].VersionPriority | ||||
| 			iGV := metav1.GroupVersion{Group: group.Name, Version: iVersion} | ||||
| 			jGV := metav1.GroupVersion{Group: group.Name, Version: jVersion} | ||||
|  | ||||
| 			iSource := sourcesUsed[iGV] | ||||
| 			jSource := sourcesUsed[jGV] | ||||
|  | ||||
| 			iPriority := rdm.versionPriorities[groupVersionKey{iGV, iSource}].VersionPriority | ||||
| 			jPriority := rdm.versionPriorities[groupVersionKey{jGV, jSource}].VersionPriority | ||||
|  | ||||
| 			// Sort by version string comparator if priority is equal | ||||
| 			if iPriority == jPriority { | ||||
| @@ -286,13 +425,16 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 | ||||
| 			return iPriority > jPriority | ||||
| 		}) | ||||
|  | ||||
| 		groups = append(groups, *copied.DeepCopy()) | ||||
|  | ||||
| 		groups = append(groups, group) | ||||
| 	} | ||||
|  | ||||
| 	// For each group, determine the highest minimum group priority and use that | ||||
| 	priorities := map[string]int{} | ||||
| 	for gv, info := range rdm.versionPriorities { | ||||
| 		if source := sourcesUsed[gv.GroupVersion]; source != gv.source { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if existing, exists := priorities[gv.Group]; exists { | ||||
| 			if existing < info.GroupPriorityMinimum { | ||||
| 				priorities[gv.Group] = info.GroupPriorityMinimum | ||||
|   | ||||
| @@ -18,6 +18,7 @@ package aggregated_test | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| @@ -365,6 +366,91 @@ func TestUpdateService(t *testing.T) { | ||||
| 	assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document") | ||||
| } | ||||
|  | ||||
| func TestMultipleSources(t *testing.T) { | ||||
| 	type pair struct { | ||||
| 		manager discoveryendpoint.ResourceManager | ||||
| 		apis    apidiscoveryv2beta1.APIGroupDiscoveryList | ||||
| 	} | ||||
|  | ||||
| 	pairs := []pair{} | ||||
|  | ||||
| 	defaultManager := discoveryendpoint.NewResourceManager("apis") | ||||
| 	for i := 0; i < 10; i++ { | ||||
| 		name := discoveryendpoint.Source(100 * i) | ||||
| 		manager := defaultManager.WithSource(name) | ||||
| 		apis := fuzzAPIGroups(1, 3, int64(15+i)) | ||||
|  | ||||
| 		// Give the groups deterministic names | ||||
| 		for i := range apis.Items { | ||||
| 			apis.Items[i].Name = fmt.Sprintf("%v.%v.com", i, name) | ||||
| 		} | ||||
|  | ||||
| 		pairs = append(pairs, pair{manager, apis}) | ||||
| 	} | ||||
|  | ||||
| 	expectedResult := []apidiscoveryv2beta1.APIGroupDiscovery{} | ||||
|  | ||||
| 	groupCounter := 0 | ||||
| 	for _, p := range pairs { | ||||
| 		for gi, g := range p.apis.Items { | ||||
| 			for vi, v := range g.Versions { | ||||
| 				p.manager.AddGroupVersion(g.Name, v) | ||||
|  | ||||
| 				// Use index for priority so we dont have to do any sorting | ||||
| 				// Use negative index since it is sorted descending | ||||
| 				p.manager.SetGroupVersionPriority(metav1.GroupVersion{Group: g.Name, Version: v.Version}, -gi-groupCounter, -vi) | ||||
| 			} | ||||
|  | ||||
| 			expectedResult = append(expectedResult, g) | ||||
| 		} | ||||
|  | ||||
| 		groupCounter += len(p.apis.Items) | ||||
| 	} | ||||
|  | ||||
| 	// Show discovery document is what we expect | ||||
| 	_, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") | ||||
|  | ||||
| 	require.Len(t, initialDocument.Items, len(expectedResult)) | ||||
| 	require.Equal(t, initialDocument.Items, expectedResult) | ||||
| } | ||||
|  | ||||
| // Shows that if you have multiple sources including Default source using | ||||
| // with the same group name the groups added by the "Default" source are used | ||||
| func TestSourcePrecedence(t *testing.T) { | ||||
| 	defaultManager := discoveryendpoint.NewResourceManager("apis") | ||||
| 	otherManager := defaultManager.WithSource(500) | ||||
| 	apis := fuzzAPIGroups(1, 3, int64(15)) | ||||
| 	for _, g := range apis.Items { | ||||
| 		for i, v := range g.Versions { | ||||
| 			v.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent | ||||
| 			g.Versions[i] = v | ||||
| 			otherManager.AddGroupVersion(g.Name, v) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	_, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") | ||||
| 	require.Equal(t, apis.Items, initialDocument.Items) | ||||
|  | ||||
| 	// Add the first groupversion under default. | ||||
| 	// No versions should appear in discovery document except this one | ||||
| 	overrideVersion := initialDocument.Items[0].Versions[0] | ||||
| 	overrideVersion.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale | ||||
| 	defaultManager.AddGroupVersion(initialDocument.Items[0].Name, overrideVersion) | ||||
|  | ||||
| 	_, _, maskedDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") | ||||
| 	masked := initialDocument.DeepCopy() | ||||
| 	masked.Items[0].Versions[0].Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale | ||||
|  | ||||
| 	require.Equal(t, masked.Items, maskedDocument.Items) | ||||
|  | ||||
| 	// Wipe out default group. The other versions from the other group should now | ||||
| 	// appear since the group is not being overridden by defaults ource | ||||
| 	defaultManager.RemoveGroup(apis.Items[0].Name) | ||||
|  | ||||
| 	_, _, resetDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") | ||||
| 	require.Equal(t, resetDocument.Items, initialDocument.Items) | ||||
| } | ||||
|  | ||||
| // Show the discovery manager is capable of serving requests to multiple users | ||||
| // with unchanging data | ||||
| func TestConcurrentRequests(t *testing.T) { | ||||
|   | ||||
| @@ -776,6 +776,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A | ||||
| 				s.AggregatedDiscoveryGroupManager.AddGroupVersion( | ||||
| 					groupVersion.Group, | ||||
| 					apidiscoveryv2beta1.APIVersionDiscovery{ | ||||
| 						Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, | ||||
| 						Version:   groupVersion.Version, | ||||
| 						Resources: discoveryAPIResources, | ||||
| 					}, | ||||
| @@ -785,6 +786,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A | ||||
| 				s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion( | ||||
| 					groupVersion.Group, | ||||
| 					apidiscoveryv2beta1.APIVersionDiscovery{ | ||||
| 						Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, | ||||
| 						Version:   groupVersion.Version, | ||||
| 						Resources: discoveryAPIResources, | ||||
| 					}, | ||||
|   | ||||
| @@ -397,7 +397,9 @@ func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) { | ||||
|  | ||||
| 	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { | ||||
| 		s.discoveryAggregationController = NewDiscoveryManager( | ||||
| 			s.GenericAPIServer.AggregatedDiscoveryGroupManager, | ||||
| 			// Use aggregator as the source name to avoid overwriting native/CRD | ||||
| 			// groups | ||||
| 			s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource), | ||||
| 		) | ||||
|  | ||||
| 		// Setup discovery endpoint | ||||
|   | ||||
| @@ -421,7 +421,7 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}) { | ||||
| 	} | ||||
|  | ||||
| 	// Ensure that apiregistration.k8s.io is the first group in the discovery group. | ||||
| 	dm.mergedDiscoveryHandler.SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) | ||||
| 	dm.mergedDiscoveryHandler.WithSource(discoveryendpoint.BuiltinSource).SetGroupVersionPriority(APIRegistrationGroupVersion, APIRegistrationGroupPriority, 0) | ||||
|  | ||||
| 	wait.PollUntil(1*time.Minute, func() (done bool, err error) { | ||||
| 		dm.servicesLock.Lock() | ||||
|   | ||||
| @@ -255,13 +255,16 @@ func runTestCases(t *testing.T, cases []testCase) { | ||||
| 	for _, c := range cases { | ||||
| 		t.Run(c.Name, func(t *testing.T) { | ||||
| 			func() { | ||||
| 				for _, a := range c.Actions { | ||||
| 				testContext, testDone := context.WithCancel(ctx) | ||||
| 				defer testDone() | ||||
|  | ||||
| 				for i, a := range c.Actions { | ||||
| 					if cleaning, ok := a.(cleaningAction); ok { | ||||
| 						defer func() { | ||||
| 							require.NoError(t, cleaning.Cleanup(ctx, client)) | ||||
| 							require.NoError(t, cleaning.Cleanup(testContext, client), "cleanup after \"%T\" step %v", a, i) | ||||
| 						}() | ||||
| 					} | ||||
| 					require.NoError(t, a.Do(ctx, client)) | ||||
| 					require.NoError(t, a.Do(testContext, client), "running \"%T\" step %v", a, i) | ||||
| 				} | ||||
| 			}() | ||||
|  | ||||
| @@ -339,9 +342,10 @@ func TestCRD(t *testing.T) { | ||||
|  | ||||
| 				applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})), | ||||
|  | ||||
| 				// only CRD has stable v2,  this will show that CRD has been synced | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV2}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV2}), | ||||
| 				// Show that we have v1 and v2 but v1 is stale | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), | ||||
| 				waitForStaleGroupVersionsV2([]metav1.GroupVersion{stableV1}), | ||||
| 				waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV2}), | ||||
|  | ||||
| 				// Delete APIService shared by the aggregated apiservice and | ||||
| 				// CRD | ||||
| @@ -355,7 +359,160 @@ func TestCRD(t *testing.T) { | ||||
|  | ||||
| 				// Show that the groupversion is re-added back | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), | ||||
| 				waitForFreshGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2, stableV1alpha1}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Show that if CRD and Aggregated APIservice share a groupversiom, | ||||
| 			// The aggregated apiservice's discovery information is shown in both | ||||
| 			// v1 and v2 discovery | ||||
| 			Name: "CRDAPIServiceSameGroupDifferentVersions", | ||||
| 			Actions: []testAction{ | ||||
| 				// Wait for CRD to apply | ||||
| 				applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v2", "v1alpha1"})), | ||||
| 				// Wait for GV to appear in both discovery documents | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
|  | ||||
| 				applyAPIService( | ||||
| 					apiregistrationv1.APIServiceSpec{ | ||||
| 						Group:                 stableGroup, | ||||
| 						Version:               "v1", | ||||
| 						InsecureSkipTLSVerify: true, | ||||
| 						GroupPriorityMinimum:  int32(1000), | ||||
| 						VersionPriority:       int32(100), | ||||
| 						Service: &apiregistrationv1.ServiceReference{ | ||||
| 							Name:      "unused", | ||||
| 							Namespace: "default", | ||||
| 						}, | ||||
| 					}, | ||||
| 				), | ||||
|  | ||||
| 				// We should now have stable v1 available | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV1}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV1}), | ||||
|  | ||||
| 				// The CRD group-versions not served by the aggregated | ||||
| 				// apiservice should still be availablee | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
|  | ||||
| 				// Remove API service. Show we have switched to CRD | ||||
| 				deleteObject{ | ||||
| 					GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")), | ||||
| 					Name:                 "v1.stable.example.com", | ||||
| 				}, | ||||
|  | ||||
| 				// Show that we still have stable v1 since it is in the CRD | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV2, stableV1alpha1}), | ||||
|  | ||||
| 				waitForAbsentGroupVersionsV1([]metav1.GroupVersion{stableV1}), | ||||
| 				waitForAbsentGroupVersionsV2([]metav1.GroupVersion{stableV1}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Show that if CRD and a builtin share a group version, | ||||
| 			// the builtin takes precedence in both versions of discovery | ||||
| 			Name: "CRDBuiltinOverlapPrecence", | ||||
| 			Actions: []testAction{ | ||||
| 				// Create CRD that overrides a builtin | ||||
| 				applyCRD(makeCRDSpec("apiextensions.k8s.io", "Bar", true, []string{"v1", "v2", "vfake"})), | ||||
|  | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{{Group: "apiextensions.k8s.io", Version: "vfake"}}), | ||||
|  | ||||
| 				// Show that the builtin group-version is still used for V1 | ||||
| 				// By showing presence of v1.CustomResourceDefinition | ||||
| 				// and absence of v1.Bar | ||||
| 				waitForResourcesV1([]metav1.GroupVersionResource{ | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "v1", | ||||
| 						Resource: "customresourcedefinitions", | ||||
| 					}, | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "vfake", | ||||
| 						Resource: "bars", | ||||
| 					}, | ||||
| 				}), | ||||
| 				waitForResourcesV2([]metav1.GroupVersionResource{ | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "v1", | ||||
| 						Resource: "customresourcedefinitions", | ||||
| 					}, | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "vfake", | ||||
| 						Resource: "bars", | ||||
| 					}, | ||||
| 				}), | ||||
|  | ||||
| 				waitForResourcesAbsentV1([]metav1.GroupVersionResource{ | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "v1", | ||||
| 						Resource: "bars", | ||||
| 					}, | ||||
| 				}), | ||||
| 				waitForResourcesAbsentV2([]metav1.GroupVersionResource{ | ||||
| 					{ | ||||
| 						Group:    "apiextensions.k8s.io", | ||||
| 						Version:  "v1", | ||||
| 						Resource: "bars", | ||||
| 					}, | ||||
| 				}), | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			// Tests that a race discovered during alpha phase of the feature is fixed. | ||||
| 			// Rare race would occur if a CRD was synced before the removal of an aggregated | ||||
| 			// APIService could be synced. | ||||
| 			// To test this we: | ||||
| 			//  1. Add CRD to apiserver | ||||
| 			// 	2. Wait for it to sync | ||||
| 			//  3. Add aggregated APIService with same groupversion | ||||
| 			//  4. Remove aggregated apiservice | ||||
| 			//  5. Check that we have CRD GVs in discovery document | ||||
| 			// Show that if CRD and APIService share a groupversion, and the | ||||
| 			// APIService is deleted, and CRD updated, the groupversion from | ||||
| 			// the CRD remains in discovery. | ||||
| 			Name: "Race", | ||||
| 			Actions: []testAction{ | ||||
| 				// Create CRD with the same GV as the aggregated APIService | ||||
| 				applyCRD(makeCRDSpec(stableGroup, "Bar", false, []string{"v1", "v2"})), | ||||
|  | ||||
| 				// only CRD has stable v2,  this will show that CRD has been synced | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}), | ||||
|  | ||||
| 				// Add Aggregated APIService that overlaps the CRD. | ||||
| 				applyAPIService( | ||||
| 					apiregistrationv1.APIServiceSpec{ | ||||
| 						Group:                 stableGroup, | ||||
| 						Version:               "v1", | ||||
| 						InsecureSkipTLSVerify: true, | ||||
| 						GroupPriorityMinimum:  int32(1000), | ||||
| 						VersionPriority:       int32(100), | ||||
| 						Service: &apiregistrationv1.ServiceReference{ | ||||
| 							Name:      "fake", | ||||
| 							Namespace: "default", | ||||
| 						}, | ||||
| 					}, | ||||
| 				), | ||||
|  | ||||
| 				// Delete APIService shared by the aggregated apiservice and | ||||
| 				// CRD | ||||
| 				deleteObject{ | ||||
| 					GroupVersionResource: metav1.GroupVersionResource(apiregistrationv1.SchemeGroupVersion.WithResource("apiservices")), | ||||
| 					Name:                 "v1.stable.example.com", | ||||
| 				}, | ||||
|  | ||||
| 				// Show the CRD (with stablev2) is the one which is now advertised | ||||
| 				waitForGroupVersionsV1([]metav1.GroupVersion{stableV1, stableV2}), | ||||
| 				waitForGroupVersionsV2([]metav1.GroupVersion{stableV1, stableV2}), | ||||
| 			}, | ||||
| 		}, | ||||
| 	}) | ||||
|   | ||||
| @@ -21,6 +21,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | ||||
| @@ -42,6 +43,8 @@ import ( | ||||
| const acceptV1JSON = "application/json" | ||||
| const acceptV2JSON = "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" | ||||
|  | ||||
| const maxTimeout = 10 * time.Second | ||||
|  | ||||
| type testClient interface { | ||||
| 	kubernetes.Interface | ||||
| 	aggregator.Interface | ||||
| @@ -91,6 +94,15 @@ type waitForGroupVersionsV2 []metav1.GroupVersion | ||||
| // Wait for groupversions to disappear from v2 discovery | ||||
| type waitForAbsentGroupVersionsV2 []metav1.GroupVersion | ||||
|  | ||||
| type waitForStaleGroupVersionsV2 []metav1.GroupVersion | ||||
| type waitForFreshGroupVersionsV2 []metav1.GroupVersion | ||||
|  | ||||
| type waitForResourcesV1 []metav1.GroupVersionResource | ||||
| type waitForResourcesAbsentV1 []metav1.GroupVersionResource | ||||
|  | ||||
| type waitForResourcesV2 []metav1.GroupVersionResource | ||||
| type waitForResourcesAbsentV2 []metav1.GroupVersionResource | ||||
|  | ||||
| // Assert something about the current state of v2 discovery | ||||
| type inlineAction func(ctx context.Context, client testClient) error | ||||
|  | ||||
| @@ -133,7 +145,7 @@ func (a applyAPIService) Cleanup(ctx context.Context, client testClient) error { | ||||
| 	err = wait.PollWithContext( | ||||
| 		ctx, | ||||
| 		250*time.Millisecond, | ||||
| 		1*time.Second, | ||||
| 		maxTimeout, | ||||
| 		func(ctx context.Context) (done bool, err error) { | ||||
| 			_, err = client.ApiregistrationV1().APIServices().Get(ctx, name, metav1.GetOptions{}) | ||||
| 			if err == nil { | ||||
| @@ -165,6 +177,13 @@ func (a applyCRD) Do(ctx context.Context, client testClient) error { | ||||
| 		Spec: apiextensionsv1.CustomResourceDefinitionSpec(a), | ||||
| 	} | ||||
|  | ||||
| 	if strings.HasSuffix(obj.Name, ".k8s.io") { | ||||
| 		if obj.Annotations == nil { | ||||
| 			obj.Annotations = map[string]string{} | ||||
| 		} | ||||
| 		obj.Annotations["api-approved.kubernetes.io"] = "https://github.com/kubernetes/kubernetes/fake" | ||||
| 	} | ||||
|  | ||||
| 	unstructuredContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -194,7 +213,7 @@ func (a applyCRD) Cleanup(ctx context.Context, client testClient) error { | ||||
| 	err = wait.PollWithContext( | ||||
| 		ctx, | ||||
| 		250*time.Millisecond, | ||||
| 		1*time.Second, | ||||
| 		maxTimeout, | ||||
| 		func(ctx context.Context) (done bool, err error) { | ||||
| 			_, err = client.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{}) | ||||
| 			if err == nil { | ||||
| @@ -226,6 +245,40 @@ func (d deleteObject) Do(ctx context.Context, client testClient) error { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (w waitForStaleGroupVersionsV2) Do(ctx context.Context, client testClient) error { | ||||
| 	err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { | ||||
| 		for _, gv := range w { | ||||
| 			if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessStale { | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("waiting for stale groupversions v2 (%v): %w", w, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForFreshGroupVersionsV2) Do(ctx context.Context, client testClient) error { | ||||
| 	err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { | ||||
| 		for _, gv := range w { | ||||
| 			if info := FindGroupVersionV2(result, gv); info == nil || info.Freshness != apidiscoveryv2beta1.DiscoveryFreshnessCurrent { | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("waiting for fresh groupversions v2 (%v): %w", w, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForGroupVersionsV2) Do(ctx context.Context, client testClient) error { | ||||
| 	err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { | ||||
| 		for _, gv := range w { | ||||
| @@ -294,6 +347,137 @@ func (w waitForAbsentGroupVersionsV1) Do(ctx context.Context, client testClient) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForResourcesV1) Do(ctx context.Context, client testClient) error { | ||||
| 	requiredResources := map[metav1.GroupVersion][]string{} | ||||
|  | ||||
| 	for _, gvr := range w { | ||||
| 		gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version} | ||||
| 		if existing, ok := requiredResources[gv]; ok { | ||||
| 			requiredResources[gv] = append(existing, gvr.Resource) | ||||
| 		} else { | ||||
| 			requiredResources[gv] = []string{gvr.Resource} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for gv, resourceNames := range requiredResources { | ||||
| 		err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool { | ||||
| 			for _, name := range resourceNames { | ||||
| 				found := false | ||||
|  | ||||
| 				for _, resultResource := range result.APIResources { | ||||
| 					if resultResource.Name == name { | ||||
| 						found = true | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
|  | ||||
| 				if !found { | ||||
| 					return false | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			return true | ||||
| 		}) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			if errors.IsNotFound(err) { | ||||
| 				return nil | ||||
| 			} | ||||
| 			return fmt.Errorf("waiting for resources v1 (%v): %w", w, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForResourcesAbsentV1) Do(ctx context.Context, client testClient) error { | ||||
| 	requiredResources := map[metav1.GroupVersion][]string{} | ||||
|  | ||||
| 	for _, gvr := range w { | ||||
| 		gv := metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version} | ||||
| 		if existing, ok := requiredResources[gv]; ok { | ||||
| 			requiredResources[gv] = append(existing, gvr.Resource) | ||||
| 		} else { | ||||
| 			requiredResources[gv] = []string{gvr.Resource} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for gv, resourceNames := range requiredResources { | ||||
| 		err := WaitForV1ResourcesWithCondition(ctx, client, gv, func(result metav1.APIResourceList) bool { | ||||
| 			for _, name := range resourceNames { | ||||
| 				for _, resultResource := range result.APIResources { | ||||
| 					if resultResource.Name == name { | ||||
| 						return false | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 			return true | ||||
| 		}) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			if errors.IsNotFound(err) { | ||||
| 				return nil | ||||
| 			} | ||||
| 			return fmt.Errorf("waiting for absent resources v1 (%v): %w", w, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForResourcesV2) Do(ctx context.Context, client testClient) error { | ||||
| 	err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { | ||||
| 		for _, gvr := range w { | ||||
| 			if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil { | ||||
| 				return false | ||||
| 			} else { | ||||
| 				found := false | ||||
| 				for _, resultResoure := range info.Resources { | ||||
| 					if resultResoure.Resource == gvr.Resource { | ||||
| 						found = true | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
|  | ||||
| 				if !found { | ||||
| 					return false | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("waiting for resources v2 (%v): %w", w, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (w waitForResourcesAbsentV2) Do(ctx context.Context, client testClient) error { | ||||
| 	err := WaitForResultWithCondition(ctx, client, func(result apidiscoveryv2beta1.APIGroupDiscoveryList) bool { | ||||
| 		for _, gvr := range w { | ||||
| 			if info := FindGroupVersionV2(result, metav1.GroupVersion{Group: gvr.Group, Version: gvr.Version}); info == nil { | ||||
| 				return false | ||||
| 			} else { | ||||
| 				for _, resultResoure := range info.Resources { | ||||
| 					if resultResoure.Resource == gvr.Resource { | ||||
| 						return false | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		return true | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("waiting for absent resources v2 (%v): %w", w, err) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (i inlineAction) Do(ctx context.Context, client testClient) error { | ||||
| 	return i(ctx, client) | ||||
| } | ||||
| @@ -334,7 +518,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s | ||||
| 		Discovery(). | ||||
| 		RESTClient(). | ||||
| 		Get(). | ||||
| 		AbsPath("/apis"). | ||||
| 		AbsPath(path). | ||||
| 		SetHeader("Accept", acceptV1JSON). | ||||
| 		Do(ctx). | ||||
| 		Raw() | ||||
| @@ -352,7 +536,7 @@ func FetchV1DiscoveryGroupsAtPath(ctx context.Context, client testClient, path s | ||||
| 	return groupList, nil | ||||
| } | ||||
|  | ||||
| func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIGroupList, error) { | ||||
| func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1.GroupVersion) (metav1.APIResourceList, error) { | ||||
| 	result, err := client. | ||||
| 		Discovery(). | ||||
| 		RESTClient(). | ||||
| @@ -363,13 +547,13 @@ func FetchV1DiscoveryResource(ctx context.Context, client testClient, gv metav1. | ||||
| 		Raw() | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return metav1.APIGroupList{}, err | ||||
| 		return metav1.APIResourceList{}, err | ||||
| 	} | ||||
|  | ||||
| 	groupList := metav1.APIGroupList{} | ||||
| 	groupList := metav1.APIResourceList{} | ||||
| 	err = json.Unmarshal(result, &groupList) | ||||
| 	if err != nil { | ||||
| 		return metav1.APIGroupList{}, err | ||||
| 		return metav1.APIResourceList{}, err | ||||
| 	} | ||||
|  | ||||
| 	return groupList, nil | ||||
| @@ -408,7 +592,7 @@ func WaitForResultWithCondition(ctx context.Context, client testClient, conditio | ||||
| 	return wait.PollWithContext( | ||||
| 		ctx, | ||||
| 		250*time.Millisecond, | ||||
| 		1*time.Second, | ||||
| 		maxTimeout, | ||||
| 		func(ctx context.Context) (done bool, err error) { | ||||
| 			groupList, err := FetchV2Discovery(ctx, client) | ||||
| 			if err != nil { | ||||
| @@ -429,7 +613,7 @@ func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condit | ||||
| 	return wait.PollWithContext( | ||||
| 		ctx, | ||||
| 		250*time.Millisecond, | ||||
| 		1*time.Second, | ||||
| 		maxTimeout, | ||||
| 		func(ctx context.Context) (done bool, err error) { | ||||
| 			groupList, err := FetchV1DiscoveryGroups(ctx, client) | ||||
|  | ||||
| @@ -445,6 +629,28 @@ func WaitForV1GroupsWithCondition(ctx context.Context, client testClient, condit | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| func WaitForV1ResourcesWithCondition(ctx context.Context, client testClient, gv metav1.GroupVersion, condition func(result metav1.APIResourceList) bool) error { | ||||
| 	// Keep repeatedly fetching document from aggregator. | ||||
| 	// Check to see if it contains our service within a reasonable amount of time | ||||
| 	return wait.PollWithContext( | ||||
| 		ctx, | ||||
| 		250*time.Millisecond, | ||||
| 		maxTimeout, | ||||
| 		func(ctx context.Context) (done bool, err error) { | ||||
| 			resourceList, err := FetchV1DiscoveryResource(ctx, client, gv) | ||||
|  | ||||
| 			if err != nil { | ||||
| 				return false, err | ||||
| 			} | ||||
|  | ||||
| 			if condition(resourceList) { | ||||
| 				return true, nil | ||||
| 			} | ||||
|  | ||||
| 			return false, nil | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| func FindGroupVersionV1(discovery metav1.APIGroupList, gv metav1.GroupVersion) bool { | ||||
| 	for _, documentGroup := range discovery.Groups { | ||||
| 		if documentGroup.Name != gv.Group { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot