Merge pull request #110604 from wojtek-t/fix_leaking_goroutines_9

Fix leaking goroutines in multiple integration tests
This commit is contained in:
Kubernetes Prow Robot 2022-06-15 09:01:18 -07:00 committed by GitHub
commit 2b51b2595a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 46 additions and 56 deletions

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/http/httptest"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -31,14 +30,13 @@ import (
flowcontrol "k8s.io/api/flowcontrol/v1beta2" flowcontrol "k8s.io/api/flowcontrol/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features" genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -50,33 +48,27 @@ const (
timeout = time.Second * 10 timeout = time.Second * 10
) )
func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*httptest.Server, *rest.Config, framework.CloseFunc) { func setup(t testing.TB, maxReadonlyRequestsInFlight, MaxMutatingRequestsInFlight int) (*rest.Config, framework.TearDownFunc) {
opts := framework.ControlPlaneConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()} _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&opts) // Ensure all clients are allowed to send requests.
resourceConfig := controlplane.DefaultAPIResourceConfigSource() opts.Authorization.Modes = []string{"AlwaysAllow"}
resourceConfig.EnableVersions(schema.GroupVersion{ opts.GenericServerRunOptions.MaxRequestsInFlight = maxReadonlyRequestsInFlight
Group: "flowcontrol.apiserver.k8s.io", opts.GenericServerRunOptions.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight
Version: "v1alpha1", },
}) })
controlPlaneConfig.GenericConfig.MaxRequestsInFlight = maxReadonlyRequestsInFlight return kubeConfig, tearDownFn
controlPlaneConfig.GenericConfig.MaxMutatingRequestsInFlight = MaxMutatingRequestsInFlight
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
return s, controlPlaneConfig.GenericConfig.LoopbackClientConfig, closeFn
} }
func TestPriorityLevelIsolation(t *testing.T) { func TestPriorityLevelIsolation(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
// NOTE: disabling the feature should fail the test // NOTE: disabling the feature should fail the test
_, loopbackConfig, closeFn := setup(t, 1, 1) kubeConfig, closeFn := setup(t, 1, 1)
defer closeFn() defer closeFn()
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig) loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
noxu1Client := getClientFor(loopbackConfig, "noxu1") noxu1Client := getClientFor(kubeConfig, "noxu1")
noxu2Client := getClientFor(loopbackConfig, "noxu2") noxu2Client := getClientFor(kubeConfig, "noxu2")
queueLength := 50 queueLength := 50
concurrencyShares := 1 concurrencyShares := 1
@ -153,13 +145,9 @@ func TestPriorityLevelIsolation(t *testing.T) {
} }
func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface { func getClientFor(loopbackConfig *rest.Config, username string) clientset.Interface {
config := &rest.Config{ config := rest.CopyConfig(loopbackConfig)
Host: loopbackConfig.Host, config.Impersonate = rest.ImpersonationConfig{
QPS: -1, UserName: username,
BearerToken: loopbackConfig.BearerToken,
Impersonate: rest.ImpersonationConfig{
UserName: username,
},
} }
return clientset.NewForConfigOrDie(config) return clientset.NewForConfigOrDie(config)
} }

View File

@ -170,10 +170,10 @@ func (ft *fightTest) evaluate(tBeforeCreate, tAfterCreate time.Time) {
} }
func TestConfigConsumerFight(t *testing.T) { func TestConfigConsumerFight(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
_, loopbackConfig, closeFn := setup(t, 100, 100) kubeConfig, closeFn := setup(t, 100, 100)
defer closeFn() defer closeFn()
const teamSize = 3 const teamSize = 3
ft := newFightTest(t, loopbackConfig, teamSize) ft := newFightTest(t, kubeConfig, teamSize)
tBeforeCreate := time.Now() tBeforeCreate := time.Now()
ft.createMainInformer() ft.createMainInformer()
ft.foreach(ft.createController) ft.foreach(ft.createController)

View File

@ -38,10 +38,10 @@ import (
func TestConditionIsolation(t *testing.T) { func TestConditionIsolation(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.APIPriorityAndFairness, true)()
// NOTE: disabling the feature should fail the test // NOTE: disabling the feature should fail the test
_, loopbackConfig, closeFn := setup(t, 10, 10) kubeConfig, closeFn := setup(t, 10, 10)
defer closeFn() defer closeFn()
loopbackClient := clientset.NewForConfigOrDie(loopbackConfig) loopbackClient := clientset.NewForConfigOrDie(kubeConfig)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)

View File

@ -28,6 +28,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec" "k8s.io/kube-openapi/pkg/validation/spec"
"k8s.io/kubernetes/pkg/controlplane"
generated "k8s.io/kubernetes/pkg/generated/openapi" generated "k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
@ -55,9 +56,7 @@ func TestEnablingOpenAPIEnumTypes(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.OpenAPIEnums, tc.featureEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.OpenAPIEnums, tc.featureEnabled)()
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{}) getDefinitionsFn := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
controlPlaneConfig.GenericConfig.OpenAPIConfig.GetDefinitions = openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
defs := generated.GetOpenAPIDefinitions(ref) defs := generated.GetOpenAPIDefinitions(ref)
def := defs[typeToAddEnum] def := defs[typeToAddEnum]
// replace protocol to add the would-be enum field. // replace protocol to add the would-be enum field.
@ -74,15 +73,20 @@ func TestEnablingOpenAPIEnumTypes(t *testing.T) {
return defs return defs
}) })
instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig) _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
defer closeFn() ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
config.GenericConfig.OpenAPIConfig.GetDefinitions = getDefinitionsFn
},
})
defer tearDownFn()
rt, err := restclient.TransportFor(instanceConfig.GenericAPIServer.LoopbackClientConfig) rt, err := restclient.TransportFor(kubeConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
req, err := http.NewRequest("GET", instanceConfig.GenericAPIServer.LoopbackClientConfig.Host+"/openapi/v2", nil) req, err := http.NewRequest("GET", kubeConfig.Host+"/openapi/v2", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -46,11 +46,10 @@ import (
func TestOpenAPIV3SpecRoundTrip(t *testing.T) { func TestOpenAPIV3SpecRoundTrip(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.OpenAPIV3, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.OpenAPIV3, true)()
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{})
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{})
controlPlaneConfig.GenericConfig.OpenAPIV3Config = framework.DefaultOpenAPIV3Config() defer tearDownFn()
instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
paths := []string{ paths := []string{
"/apis/apps/v1", "/apis/apps/v1",
"/apis/authentication.k8s.io/v1", "/apis/authentication.k8s.io/v1",
@ -60,12 +59,12 @@ func TestOpenAPIV3SpecRoundTrip(t *testing.T) {
} }
for _, path := range paths { for _, path := range paths {
t.Run(path, func(t *testing.T) { t.Run(path, func(t *testing.T) {
rt, err := restclient.TransportFor(instanceConfig.GenericAPIServer.LoopbackClientConfig) rt, err := restclient.TransportFor(kubeConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// attempt to fetch and unmarshal // attempt to fetch and unmarshal
url := instanceConfig.GenericAPIServer.LoopbackClientConfig.Host + "/openapi/v3" + path url := kubeConfig.Host + "/openapi/v3" + path
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -191,17 +190,16 @@ func TestOpenAPIV3ProtoRoundtrip(t *testing.T) {
// See https://github.com/kubernetes/kubernetes/issues/106387 for more details // See https://github.com/kubernetes/kubernetes/issues/106387 for more details
t.Skip("Skipping OpenAPI V3 Proto roundtrip test") t.Skip("Skipping OpenAPI V3 Proto roundtrip test")
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.OpenAPIV3, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.OpenAPIV3, true)()
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{})
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig() _, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{})
controlPlaneConfig.GenericConfig.OpenAPIV3Config = framework.DefaultOpenAPIV3Config() defer tearDownFn()
instanceConfig, _, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn() rt, err := restclient.TransportFor(kubeConfig)
rt, err := restclient.TransportFor(instanceConfig.GenericAPIServer.LoopbackClientConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// attempt to fetch and unmarshal // attempt to fetch and unmarshal
req, err := http.NewRequest("GET", instanceConfig.GenericAPIServer.LoopbackClientConfig.Host+"/openapi/v3/apis/apps/v1", nil) req, err := http.NewRequest("GET", kubeConfig.Host+"/openapi/v3/apis/apps/v1", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -220,7 +218,7 @@ func TestOpenAPIV3ProtoRoundtrip(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
protoReq, err := http.NewRequest("GET", instanceConfig.GenericAPIServer.LoopbackClientConfig.Host+"/openapi/v3/apis/apps/v1", nil) protoReq, err := http.NewRequest("GET", kubeConfig.Host+"/openapi/v3/apis/apps/v1", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -59,7 +59,7 @@ type TestServerSetup struct {
type TearDownFunc func() type TearDownFunc func()
// StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions // StartTestServer runs a kube-apiserver, optionally calling out to the setup.ModifyServerRunOptions and setup.ModifyServerConfig functions
func StartTestServer(t *testing.T, setup TestServerSetup) (client.Interface, *rest.Config, TearDownFunc) { func StartTestServer(t testing.TB, setup TestServerSetup) (client.Interface, *rest.Config, TearDownFunc) {
certDir, err := os.MkdirTemp("", "test-integration-"+strings.ReplaceAll(t.Name(), "/", "_")) certDir, err := os.MkdirTemp("", "test-integration-"+strings.ReplaceAll(t.Name(), "/", "_"))
if err != nil { if err != nil {
t.Fatalf("Couldn't create temp dir: %v", err) t.Fatalf("Couldn't create temp dir: %v", err)