resourcequota: use singleflight.Group to reduce apiserver load
relates to #22422 and #123806
This commit is contained in:
		@@ -21,6 +21,7 @@ import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"golang.org/x/sync/singleflight"
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/labels"
 | 
			
		||||
@@ -51,6 +52,7 @@ type quotaAccessor struct {
 | 
			
		||||
	// This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
 | 
			
		||||
	// We track the lookup result here so that for repeated requests, we don't look it up very often.
 | 
			
		||||
	liveLookupCache *lru.Cache
 | 
			
		||||
	group           singleflight.Group
 | 
			
		||||
	liveTTL         time.Duration
 | 
			
		||||
	// updatedQuotas holds a cache of quotas that we've updated.  This is used to pull the "really latest" during back to
 | 
			
		||||
	// back quota evaluations that touch the same quota doc.  This only works because we can compare etcd resourceVersions
 | 
			
		||||
@@ -114,11 +116,9 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err
 | 
			
		||||
	if len(items) == 0 {
 | 
			
		||||
		lruItemObj, ok := e.liveLookupCache.Get(namespace)
 | 
			
		||||
		if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
 | 
			
		||||
			// TODO: If there are multiple operations at the same time and cache has just expired,
 | 
			
		||||
			// this may cause multiple List operations being issued at the same time.
 | 
			
		||||
			// If there is already in-flight List() for a given namespace, we should wait until
 | 
			
		||||
			// it is finished and cache is updated instead of doing the same, also to avoid
 | 
			
		||||
			// throttling - see #22422 for details.
 | 
			
		||||
			// use singleflight.Group to avoid flooding the apiserver with repeated
 | 
			
		||||
			// requests. See #22422 for details.
 | 
			
		||||
			lruItemObj, err, _ = e.group.Do(namespace, func() (interface{}, error) {
 | 
			
		||||
				liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return nil, err
 | 
			
		||||
@@ -128,7 +128,11 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err
 | 
			
		||||
					newEntry.items = append(newEntry.items, &liveList.Items[i])
 | 
			
		||||
				}
 | 
			
		||||
				e.liveLookupCache.Add(namespace, newEntry)
 | 
			
		||||
			lruItemObj = newEntry
 | 
			
		||||
				return newEntry, nil
 | 
			
		||||
			})
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		lruEntry := lruItemObj.(liveLookupEntry)
 | 
			
		||||
		for i := range lruEntry.items {
 | 
			
		||||
 
 | 
			
		||||
@@ -17,7 +17,10 @@ limitations under the License.
 | 
			
		||||
package resourcequota
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -27,6 +30,7 @@ import (
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	"k8s.io/client-go/informers"
 | 
			
		||||
	"k8s.io/client-go/kubernetes/fake"
 | 
			
		||||
	core "k8s.io/client-go/testing"
 | 
			
		||||
	"k8s.io/utils/lru"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -121,5 +125,122 @@ func TestLRUCacheLookup(t *testing.T) {
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TestGetQuotas ensures we do not have multiple LIST calls to the apiserver
 | 
			
		||||
// in-flight at any one time. This is to ensure the issue described in #22422 do
 | 
			
		||||
// not happen again.
 | 
			
		||||
func TestGetQuotas(t *testing.T) {
 | 
			
		||||
	var (
 | 
			
		||||
		testNamespace1              = "test-a"
 | 
			
		||||
		testNamespace2              = "test-b"
 | 
			
		||||
		listCallCountTestNamespace1 int64
 | 
			
		||||
		listCallCountTestNamespace2 int64
 | 
			
		||||
	)
 | 
			
		||||
	resourceQuota := &corev1.ResourceQuota{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			Name: "foo",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	resourceQuotas := []*corev1.ResourceQuota{resourceQuota}
 | 
			
		||||
 | 
			
		||||
	kubeClient := &fake.Clientset{}
 | 
			
		||||
	informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
 | 
			
		||||
 | 
			
		||||
	accessor, _ := newQuotaAccessor()
 | 
			
		||||
	accessor.client = kubeClient
 | 
			
		||||
	accessor.lister = informerFactory.Core().V1().ResourceQuotas().Lister()
 | 
			
		||||
 | 
			
		||||
	kubeClient.AddReactor("list", "resourcequotas", func(action core.Action) (bool, runtime.Object, error) {
 | 
			
		||||
		switch action.GetNamespace() {
 | 
			
		||||
		case testNamespace1:
 | 
			
		||||
			atomic.AddInt64(&listCallCountTestNamespace1, 1)
 | 
			
		||||
		case testNamespace2:
 | 
			
		||||
			atomic.AddInt64(&listCallCountTestNamespace2, 1)
 | 
			
		||||
		default:
 | 
			
		||||
			t.Error("unexpected namespace")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		resourceQuotaList := &corev1.ResourceQuotaList{
 | 
			
		||||
			ListMeta: metav1.ListMeta{
 | 
			
		||||
				ResourceVersion: fmt.Sprintf("%d", len(resourceQuotas)),
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
		for i, quota := range resourceQuotas {
 | 
			
		||||
			quota.ResourceVersion = fmt.Sprintf("%d", i)
 | 
			
		||||
			quota.Namespace = action.GetNamespace()
 | 
			
		||||
			resourceQuotaList.Items = append(resourceQuotaList.Items, *quota)
 | 
			
		||||
		}
 | 
			
		||||
		// make the handler slow so concurrent calls exercise the singleflight
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
		return true, resourceQuotaList, nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	wg := sync.WaitGroup{}
 | 
			
		||||
	for i := 0; i < 10; i++ {
 | 
			
		||||
		wg.Add(2)
 | 
			
		||||
		// simulating concurrent calls after a cache failure
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			quotas, err := accessor.GetQuotas(testNamespace1)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("unexpected error: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if len(quotas) != len(resourceQuotas) {
 | 
			
		||||
				t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
 | 
			
		||||
			}
 | 
			
		||||
			for _, q := range quotas {
 | 
			
		||||
				if q.Namespace != testNamespace1 {
 | 
			
		||||
					t.Errorf("Expected %s namespace, got %s", testNamespace1, q.Namespace)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		// simulation of different namespaces is a call for a different group key, but not shared with the first namespace
 | 
			
		||||
		go func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			quotas, err := accessor.GetQuotas(testNamespace2)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Errorf("unexpected error: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			if len(quotas) != len(resourceQuotas) {
 | 
			
		||||
				t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
 | 
			
		||||
			}
 | 
			
		||||
			for _, q := range quotas {
 | 
			
		||||
				if q.Namespace != testNamespace2 {
 | 
			
		||||
					t.Errorf("Expected %s namespace, got %s", testNamespace2, q.Namespace)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// and here we wait for all the goroutines
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	// since all the calls with the same namespace will be held, they must
 | 
			
		||||
	// be caught on the singleflight group. there are two different sets of
 | 
			
		||||
	// namespace calls hence only 2.
 | 
			
		||||
	if listCallCountTestNamespace1 != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace1)
 | 
			
		||||
	}
 | 
			
		||||
	if listCallCountTestNamespace2 != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// invalidate the cache
 | 
			
		||||
	accessor.liveLookupCache.Remove(testNamespace1)
 | 
			
		||||
	quotas, err := accessor.GetQuotas(testNamespace1)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(quotas) != len(resourceQuotas) {
 | 
			
		||||
		t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if listCallCountTestNamespace1 != 2 {
 | 
			
		||||
		t.Errorf("Expected 2 resource quota call, got %d", listCallCountTestNamespace1)
 | 
			
		||||
	}
 | 
			
		||||
	if listCallCountTestNamespace2 != 1 {
 | 
			
		||||
		t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user