remove unused service keys from aggregated discovery
This commit is contained in:
		
				
					committed by
					
						
						Jefftree
					
				
			
			
				
	
			
			
			
						parent
						
							d171e6733a
						
					
				
				
					commit
					aefaf66d2b
				
			@@ -27,6 +27,7 @@ import (
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apiserver/pkg/authentication/user"
 | 
			
		||||
	"k8s.io/apiserver/pkg/endpoints"
 | 
			
		||||
@@ -489,6 +490,36 @@ func (dm *discoveryManager) Run(stopCh <-chan struct{}, discoverySyncedCh chan<-
 | 
			
		||||
	}, stopCh)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Takes a snapshot of all currently used services by known APIServices and
 | 
			
		||||
// purges the cache entries of those not present in the snapshot.
 | 
			
		||||
func (dm *discoveryManager) removeUnusedServices() {
 | 
			
		||||
	usedServiceKeys := sets.Set[serviceKey]{}
 | 
			
		||||
 | 
			
		||||
	func() {
 | 
			
		||||
		dm.servicesLock.Lock()
 | 
			
		||||
		defer dm.servicesLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		// Mark all non-local APIServices as dirty
 | 
			
		||||
		for _, info := range dm.apiServices {
 | 
			
		||||
			usedServiceKeys.Insert(info.service)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// Avoids double lock. It is okay if a service is added/removed between these
 | 
			
		||||
	// functions. This is just a cache and that should be infrequent.
 | 
			
		||||
 | 
			
		||||
	func() {
 | 
			
		||||
		dm.resultsLock.Lock()
 | 
			
		||||
		defer dm.resultsLock.Unlock()
 | 
			
		||||
 | 
			
		||||
		for key := range dm.cachedResults {
 | 
			
		||||
			if !usedServiceKeys.Has(key) {
 | 
			
		||||
				delete(dm.cachedResults, key)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Adds an APIService to be tracked by the discovery manager. If the APIService
 | 
			
		||||
// is already known
 | 
			
		||||
func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIService, handler http.Handler) {
 | 
			
		||||
@@ -506,12 +537,14 @@ func (dm *discoveryManager) AddAPIService(apiService *apiregistrationv1.APIServi
 | 
			
		||||
		lastMarkedDirty: time.Now(),
 | 
			
		||||
		service:         newServiceKey(*apiService.Spec.Service),
 | 
			
		||||
	})
 | 
			
		||||
	dm.removeUnusedServices()
 | 
			
		||||
	dm.dirtyAPIServiceQueue.Add(apiService.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
 | 
			
		||||
	if dm.setInfoForAPIService(apiServiceName, nil) != nil {
 | 
			
		||||
		// mark dirty if there was actually something deleted
 | 
			
		||||
		dm.removeUnusedServices()
 | 
			
		||||
		dm.dirtyAPIServiceQueue.Add(apiServiceName)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -296,6 +296,58 @@ func TestInitialRunHasAllAPIServices(t *testing.T) {
 | 
			
		||||
	checkAPIGroups(t, apiGroup, parsed)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestServiceGC(t *testing.T) {
 | 
			
		||||
	service := discoveryendpoint.NewResourceManager("apis")
 | 
			
		||||
 | 
			
		||||
	aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
 | 
			
		||||
	aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
 | 
			
		||||
	testCtx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	go aggregatedManager.Run(testCtx.Done(), nil)
 | 
			
		||||
 | 
			
		||||
	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "v1.stable.example.com",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: apiregistrationv1.APIServiceSpec{
 | 
			
		||||
			Group:   "stable.example.com",
 | 
			
		||||
			Version: "v1",
 | 
			
		||||
			Service: &apiregistrationv1.ServiceReference{
 | 
			
		||||
				Name: "test-service",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}, service)
 | 
			
		||||
 | 
			
		||||
	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
 | 
			
		||||
 | 
			
		||||
	// Lookup size of cache
 | 
			
		||||
	getCacheLen := func() int {
 | 
			
		||||
		aggregatedManager.resultsLock.Lock()
 | 
			
		||||
		defer aggregatedManager.resultsLock.Unlock()
 | 
			
		||||
		return len(aggregatedManager.cachedResults)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	require.Equal(t, 1, getCacheLen())
 | 
			
		||||
 | 
			
		||||
	// Change the service of the same APIService a bit to create duplicate entry
 | 
			
		||||
	aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "v1.stable.example.com",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: apiregistrationv1.APIServiceSpec{
 | 
			
		||||
			Group:   "stable.example.com",
 | 
			
		||||
			Version: "v1",
 | 
			
		||||
			Service: &apiregistrationv1.ServiceReference{
 | 
			
		||||
				Name: "test-service-changed",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}, service)
 | 
			
		||||
 | 
			
		||||
	require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
 | 
			
		||||
	require.Equal(t, 1, getCacheLen())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Test that a handler associated with an APIService gets pinged after the
 | 
			
		||||
// APIService has been marked as dirty
 | 
			
		||||
func TestDirty(t *testing.T) {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user