From 875407a450892355744441cb550cc13d16a1cacc Mon Sep 17 00:00:00 2001 From: yue9944882 <291271447@qq.com> Date: Fri, 20 Mar 2020 01:46:19 +0800 Subject: [PATCH] add flowcontrol integration test to import whitelist --- .../src/k8s.io/component-base/metrics/BUILD | 1 + test/integration/apiserver/flowcontrol/BUILD | 1 - .../apiserver/flowcontrol/concurrency_test.go | 83 ++++++++++--------- 3 files changed, 44 insertions(+), 41 deletions(-) diff --git a/staging/src/k8s.io/component-base/metrics/BUILD b/staging/src/k8s.io/component-base/metrics/BUILD index 4da26f4d462..99d652ae35a 100644 --- a/staging/src/k8s.io/component-base/metrics/BUILD +++ b/staging/src/k8s.io/component-base/metrics/BUILD @@ -93,6 +93,7 @@ package_group( "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics", "//staging/src/k8s.io/component-base/metrics/...", "//test/e2e_node", + "//test/integration/apiserver/flowcontrol", "//vendor/...", ], ) diff --git a/test/integration/apiserver/flowcontrol/BUILD b/test/integration/apiserver/flowcontrol/BUILD index bc8b2c2f3a4..6bc8be792a5 100644 --- a/test/integration/apiserver/flowcontrol/BUILD +++ b/test/integration/apiserver/flowcontrol/BUILD @@ -21,7 +21,6 @@ go_test( "//test/integration/framework:go_default_library", "//vendor/github.com/prometheus/common/expfmt:go_default_library", "//vendor/github.com/prometheus/common/model:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", ], ) diff --git a/test/integration/apiserver/flowcontrol/concurrency_test.go b/test/integration/apiserver/flowcontrol/concurrency_test.go index 3471d8a7eea..08ca660adc7 100644 --- a/test/integration/apiserver/flowcontrol/concurrency_test.go +++ b/test/integration/apiserver/flowcontrol/concurrency_test.go @@ -22,13 +22,11 @@ import ( "io" "net/http/httptest" "strings" - "sync" "testing" "time" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" flowcontrolv1alpha1 "k8s.io/api/flowcontrol/v1alpha1" @@ -78,32 +76,39 @@ func TestPriorityLevelIsolation(t *testing.T) { noxu1Client := getClientFor(loopbackConfig, "noxu1") noxu2Client := getClientFor(loopbackConfig, "noxu2") - priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu1") + queueLength := 50 + concurrencyShares := 1 + + priorityLevelNoxu1, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu1", concurrencyShares, queueLength) require.NoError(t, err) - priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser(loopbackClient, "noxu2") + priorityLevelNoxu2, _, err := createPriorityLevelAndBindingFlowSchemaForUser( + loopbackClient, "noxu2", concurrencyShares, queueLength) require.NoError(t, err) - wg := &sync.WaitGroup{} + stopCh := make(chan struct{}) + defer close(stopCh) // "elephant" - streamRequests(wg, 10, 100, func() { - _, err := noxu1Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + streamRequests(concurrencyShares+queueLength, func() { + _, err := noxu1Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) - }) - - streamRequests(nil, 1, 100, func() { - _, err := noxu2Client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + }, stopCh) + // "mouse" + streamRequests(1, func() { + _, err := noxu2Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) - }) + }, stopCh) - wg.Wait() + time.Sleep(time.Second * 10) // running in background for a while - dispatchedCountNoxu1, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu1.Name) - require.NoError(t, err) - dispatchedCountNoxu2, err := getRequestCountOfPriorityLevel(loopbackClient, priorityLevelNoxu2.Name) - require.NoError(t, err) + reqCounts, err := getRequestCountOfPriorityLevel(loopbackClient) - assert.Equal(t, 1000, dispatchedCountNoxu1) - assert.Equal(t, 100, dispatchedCountNoxu2) + noxu1RequestCount := reqCounts[priorityLevelNoxu1.Name] + noxu2RequestCount := reqCounts[priorityLevelNoxu2.Name] + + if (noxu1RequestCount / 2) > noxu2RequestCount { + t.Errorf("total requests made by noxu2 should at least half of noxu1: (%d:%d)", noxu1RequestCount, noxu2RequestCount) + } } func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { @@ -118,14 +123,14 @@ func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interf return clientset.NewForConfigOrDie(config) } -func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName string) (int, error) { +func getRequestCountOfPriorityLevel(c clientset.Interface) (map[string]int, error) { resp, err := c.CoreV1(). RESTClient(). Get(). RequestURI("/metrics"). - DoRaw(context.TODO()) + DoRaw(context.Background()) if err != nil { - return 0, err + return nil, err } dec := expfmt.NewDecoder(strings.NewReader(string(resp)), expfmt.FmtText) @@ -134,41 +139,40 @@ func getRequestCountOfPriorityLevel(c clientset.Interface, priorityLevelName str Opts: &expfmt.DecodeOptions{}, } + reqCounts := make(map[string]int) for { var v model.Vector if err := decoder.Decode(&v); err != nil { if err == io.EOF { // Expected loop termination condition. - return 0, fmt.Errorf("no dispatched-count metrics found for priorityLevel %v", priorityLevelName) + return reqCounts, nil } - return 0, fmt.Errorf("failed decoding metrics: %v", err) + return nil, fmt.Errorf("failed decoding metrics: %v", err) } for _, metric := range v { switch name := string(metric.Metric[model.MetricNameLabel]); name { case dispatchedRequestCountMetricsName: - if priorityLevelName == string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel]) { - return int(metric.Value), nil - } + reqCounts[string(metric.Metric[dispatchedRequestCountMetricsLabelPriorityLevel])] = int(metric.Value) } } } } -func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { - pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.TODO(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ +func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, username string, concurrencyShares, queuelength int) (*flowcontrolv1alpha1.PriorityLevelConfiguration, *flowcontrolv1alpha1.FlowSchema, error) { + pl, err := c.FlowcontrolV1alpha1().PriorityLevelConfigurations().Create(context.Background(), &flowcontrolv1alpha1.PriorityLevelConfiguration{ ObjectMeta: metav1.ObjectMeta{ Name: username, }, Spec: flowcontrolv1alpha1.PriorityLevelConfigurationSpec{ Type: flowcontrolv1alpha1.PriorityLevelEnablementLimited, Limited: &flowcontrolv1alpha1.LimitedPriorityLevelConfiguration{ - AssuredConcurrencyShares: 10, + AssuredConcurrencyShares: int32(concurrencyShares), LimitResponse: flowcontrolv1alpha1.LimitResponse{ Type: flowcontrolv1alpha1.LimitResponseTypeQueue, Queuing: &flowcontrolv1alpha1.QueuingConfiguration{ Queues: 100, HandSize: 1, - QueueLengthLimit: 10, + QueueLengthLimit: int32(queuelength), }, }, }, @@ -232,17 +236,16 @@ func createPriorityLevelAndBindingFlowSchemaForUser(c clientset.Interface, usern }) } -func streamRequests(wg *sync.WaitGroup, parallel, times int, request func()) { +func streamRequests(parallel int, request func(), stopCh <-chan struct{}) { for i := 0; i < parallel; i++ { - if wg != nil { - wg.Add(1) - } go func() { - for j := 0; j < times; j++ { - request() - } - if wg != nil { - wg.Done() + for { + select { + case <-stopCh: + return + default: + request() + } } }() }