Add integration test
This commit is contained in:
		| @@ -18,6 +18,7 @@ package testing | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"crypto/rsa" | ||||
| 	"crypto/x509" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| @@ -38,12 +39,15 @@ import ( | ||||
| 	serveroptions "k8s.io/apiserver/pkg/server/options" | ||||
| 	"k8s.io/apiserver/pkg/storage/storagebackend" | ||||
| 	"k8s.io/apiserver/pkg/storageversion" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	clientgotransport "k8s.io/client-go/transport" | ||||
| 	"k8s.io/client-go/util/cert" | ||||
| 	logsapi "k8s.io/component-base/logs/api/v1" | ||||
| 	"k8s.io/klog/v2" | ||||
| 	"k8s.io/kube-aggregator/pkg/apiserver" | ||||
| 	"k8s.io/kubernetes/pkg/features" | ||||
|  | ||||
| 	"k8s.io/kubernetes/cmd/kube-apiserver/app" | ||||
| 	"k8s.io/kubernetes/cmd/kube-apiserver/app/options" | ||||
| @@ -77,6 +81,14 @@ type TestServerInstanceOptions struct { | ||||
| 	EnableCertAuth bool | ||||
| 	// Wrap the storage version interface of the created server's generic server. | ||||
| 	StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager | ||||
| 	// CA file used for requestheader authn during communication between: | ||||
| 	// 1. kube-apiserver and peer when the local apiserver is not able to serve the request due | ||||
| 	// to version skew | ||||
| 	// 2. kube-apiserver and aggregated apiserver | ||||
|  | ||||
| 	// We specify this as on option to pass a common proxyCA to multiple apiservers to simulate | ||||
| 	// an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections. | ||||
| 	ProxyCA *ProxyCA | ||||
| } | ||||
|  | ||||
| // TestServer return values supplied by kube-test-ApiServer | ||||
| @@ -95,6 +107,16 @@ type Logger interface { | ||||
| 	Errorf(format string, args ...interface{}) | ||||
| 	Fatalf(format string, args ...interface{}) | ||||
| 	Logf(format string, args ...interface{}) | ||||
| 	Cleanup(func()) | ||||
| } | ||||
|  | ||||
| // ProxyCA contains the certificate authority certificate and key which is used to verify client connections | ||||
| // to kube-apiservers. The clients can be : | ||||
| // 1. aggregated apiservers | ||||
| // 2. peer kube-apiservers | ||||
| type ProxyCA struct { | ||||
| 	ProxySigningCert *x509.Certificate | ||||
| 	ProxySigningKey  *rsa.PrivateKey | ||||
| } | ||||
|  | ||||
| // NewDefaultTestServerOptions Default options for TestServer instances | ||||
| @@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo | ||||
| 		reqHeaders := serveroptions.NewDelegatingAuthenticationOptions() | ||||
| 		s.Authentication.RequestHeader = &reqHeaders.RequestHeader | ||||
|  | ||||
| 		// create certificates for aggregation and client-cert auth | ||||
| 		proxySigningKey, err := testutil.NewPrivateKey() | ||||
| 		if err != nil { | ||||
| 			return result, err | ||||
| 		} | ||||
| 		proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) | ||||
| 		if err != nil { | ||||
| 			return result, err | ||||
| 		var proxySigningKey *rsa.PrivateKey | ||||
| 		var proxySigningCert *x509.Certificate | ||||
|  | ||||
| 		if instanceOptions.ProxyCA != nil { | ||||
| 			// use provided proxyCA | ||||
| 			proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey | ||||
| 			proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert | ||||
|  | ||||
| 		} else { | ||||
| 			// create certificates for aggregation and client-cert auth | ||||
| 			proxySigningKey, err = testutil.NewPrivateKey() | ||||
| 			if err != nil { | ||||
| 				return result, err | ||||
| 			} | ||||
| 			proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) | ||||
| 			if err != nil { | ||||
| 				return result, err | ||||
| 			} | ||||
| 		} | ||||
| 		proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt") | ||||
| 		if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil { | ||||
| @@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo | ||||
| 			return result, err | ||||
| 		} | ||||
| 		s.Authentication.ClientCert.ClientCA = clientCACertFile | ||||
| 		if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { | ||||
| 			// TODO: set up a general clean up for testserver | ||||
| 			if clientgotransport.DialerStopCh == wait.NeverStop { | ||||
| 				ctx, cancel := context.WithTimeout(context.Background(), time.Hour) | ||||
| 				t.Cleanup(cancel) | ||||
| 				clientgotransport.DialerStopCh = ctx.Done() | ||||
| 			} | ||||
| 			s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device | ||||
|   | ||||
							
								
								
									
										27
									
								
								test/integration/apiserver/peerproxy/main_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								test/integration/apiserver/peerproxy/main_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,27 @@ | ||||
| /* | ||||
| Copyright 2023 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package peerproxy | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"k8s.io/kubernetes/test/integration/framework" | ||||
| ) | ||||
|  | ||||
| func TestMain(m *testing.M) { | ||||
| 	framework.EtcdMain(m.Run) | ||||
| } | ||||
							
								
								
									
										244
									
								
								test/integration/apiserver/peerproxy/peer_proxy_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										244
									
								
								test/integration/apiserver/peerproxy/peer_proxy_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,244 @@ | ||||
| /* | ||||
| Copyright 2023 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package peerproxy | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/stretchr/testify/require" | ||||
| 	v1 "k8s.io/api/batch/v1" | ||||
| 	corev1 "k8s.io/api/core/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apiserver/pkg/features" | ||||
| 	"k8s.io/apiserver/pkg/server" | ||||
| 	utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/client-go/transport" | ||||
| 	"k8s.io/client-go/util/cert" | ||||
| 	featuregatetesting "k8s.io/component-base/featuregate/testing" | ||||
| 	"k8s.io/klog/v2" | ||||
| 	kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" | ||||
| 	"k8s.io/kubernetes/pkg/controller/storageversiongc" | ||||
| 	"k8s.io/kubernetes/pkg/controlplane" | ||||
| 	kubefeatures "k8s.io/kubernetes/pkg/features" | ||||
|  | ||||
| 	"k8s.io/kubernetes/test/integration/framework" | ||||
| 	testutil "k8s.io/kubernetes/test/utils" | ||||
| 	"k8s.io/kubernetes/test/utils/ktesting" | ||||
| ) | ||||
|  | ||||
| func TestPeerProxiedRequest(t *testing.T) { | ||||
|  | ||||
| 	ktesting.SetDefaultVerbosity(1) | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | ||||
| 	t.Cleanup(cancel) | ||||
|  | ||||
| 	// ensure to stop cert reloading after shutdown | ||||
| 	transport.DialerStopCh = ctx.Done() | ||||
|  | ||||
| 	// enable feature flags | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() | ||||
|  | ||||
| 	// create sharedetcd | ||||
| 	etcd := framework.SharedEtcd() | ||||
|  | ||||
| 	// create certificates for aggregation and client-cert auth | ||||
| 	proxyCA, err := createProxyCertContent() | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	// start test server with all APIs enabled | ||||
| 	// override hostname to ensure unique ips | ||||
| 	server.SetHostnameFuncForTests("test-server-a") | ||||
| 	serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ | ||||
| 		EnableCertAuth: true, | ||||
| 		ProxyCA:        &proxyCA}, | ||||
| 		[]string{}, etcd) | ||||
| 	defer serverA.TearDownFn() | ||||
|  | ||||
| 	// start another test server with some api disabled | ||||
| 	// override hostname to ensure unique ips | ||||
| 	server.SetHostnameFuncForTests("test-server-b") | ||||
| 	serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ | ||||
| 		EnableCertAuth: true, | ||||
| 		ProxyCA:        &proxyCA}, | ||||
| 		[]string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd) | ||||
| 	defer serverB.TearDownFn() | ||||
|  | ||||
| 	kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	// create jobs resource using serverA | ||||
| 	job := createJobResource() | ||||
| 	_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	klog.Infof("\nServerA has created jobs\n") | ||||
|  | ||||
| 	// List jobs using ServerB | ||||
| 	// This request should be proxied to ServerA since ServerB does not have batch API enabled | ||||
| 	jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) | ||||
| 	klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) | ||||
| 	require.NoError(t, err) | ||||
| 	assert.NotEmpty(t, jobsB) | ||||
| 	assert.Equal(t, job.Name, jobsB.Items[0].Name) | ||||
| } | ||||
|  | ||||
| func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { | ||||
|  | ||||
| 	ktesting.SetDefaultVerbosity(1) | ||||
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | ||||
| 	t.Cleanup(cancel) | ||||
|  | ||||
| 	// ensure to stop cert reloading after shutdown | ||||
| 	transport.DialerStopCh = ctx.Done() | ||||
|  | ||||
| 	// enable feature flags | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() | ||||
| 	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() | ||||
|  | ||||
| 	// create sharedetcd | ||||
| 	etcd := framework.SharedEtcd() | ||||
|  | ||||
| 	// create certificates for aggregation and client-cert auth | ||||
| 	proxyCA, err := createProxyCertContent() | ||||
| 	require.NoError(t, err) | ||||
|  | ||||
| 	// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated | ||||
| 	// once it is shutdown | ||||
| 	controlplane.IdentityLeaseDurationSeconds = 10 | ||||
| 	controlplane.IdentityLeaseGCPeriod = time.Second | ||||
| 	controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second | ||||
|  | ||||
| 	// start serverA with all APIs enabled | ||||
| 	// override hostname to ensure unique ips | ||||
| 	server.SetHostnameFuncForTests("test-server-a") | ||||
| 	serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) | ||||
| 	kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) | ||||
| 	require.NoError(t, err) | ||||
| 	// ensure storageversion garbage collector ctlr is set up | ||||
| 	informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second) | ||||
| 	setupStorageVersionGC(ctx, kubeClientSetA, informersA) | ||||
| 	// reset lease duration to default value for serverB and serverC since we will not be | ||||
| 	// shutting these down | ||||
| 	controlplane.IdentityLeaseDurationSeconds = 3600 | ||||
|  | ||||
| 	// start serverB with some api disabled | ||||
| 	// override hostname to ensure unique ips | ||||
| 	server.SetHostnameFuncForTests("test-server-b") | ||||
| 	serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{ | ||||
| 		fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd) | ||||
| 	defer serverB.TearDownFn() | ||||
| 	kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) | ||||
| 	require.NoError(t, err) | ||||
| 	// ensure storageversion garbage collector ctlr is set up | ||||
| 	informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second) | ||||
| 	setupStorageVersionGC(ctx, kubeClientSetB, informersB) | ||||
|  | ||||
| 	// start serverC with all APIs enabled | ||||
| 	// override hostname to ensure unique ips | ||||
| 	server.SetHostnameFuncForTests("test-server-c") | ||||
| 	serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) | ||||
| 	defer serverC.TearDownFn() | ||||
|  | ||||
| 	// create jobs resource using serverA | ||||
| 	job := createJobResource() | ||||
| 	_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) | ||||
| 	require.NoError(t, err) | ||||
| 	klog.Infof("\nServerA has created jobs\n") | ||||
|  | ||||
| 	// shutdown serverA | ||||
| 	serverA.TearDownFn() | ||||
|  | ||||
| 	var jobsB *v1.JobList | ||||
| 	// list jobs using ServerB which it should proxy to ServerC and get back valid response | ||||
| 	err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) { | ||||
| 		jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) | ||||
| 		if err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		if jobsB != nil { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 		return false, nil | ||||
| 	}) | ||||
| 	klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) | ||||
| 	require.NoError(t, err) | ||||
| 	assert.NotEmpty(t, jobsB) | ||||
| 	assert.Equal(t, job.Name, jobsB.Items[0].Name) | ||||
| } | ||||
|  | ||||
| func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) { | ||||
| 	leaseInformer := informers.Coordination().V1().Leases() | ||||
| 	storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() | ||||
| 	go leaseInformer.Informer().Run(ctx.Done()) | ||||
| 	go storageVersionInformer.Informer().Run(ctx.Done()) | ||||
|  | ||||
| 	controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer) | ||||
| 	go controller.Run(ctx) | ||||
| } | ||||
|  | ||||
| func createProxyCertContent() (kastesting.ProxyCA, error) { | ||||
| 	result := kastesting.ProxyCA{} | ||||
| 	proxySigningKey, err := testutil.NewPrivateKey() | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
| 	proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) | ||||
| 	if err != nil { | ||||
| 		return result, err | ||||
| 	} | ||||
|  | ||||
| 	result = kastesting.ProxyCA{ | ||||
| 		ProxySigningCert: proxySigningCert, | ||||
| 		ProxySigningKey:  proxySigningKey, | ||||
| 	} | ||||
| 	return result, nil | ||||
| } | ||||
|  | ||||
| func createJobResource() *v1.Job { | ||||
| 	return &v1.Job{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      "test-job", | ||||
| 			Namespace: "default", | ||||
| 		}, | ||||
| 		Spec: v1.JobSpec{ | ||||
| 			Template: corev1.PodTemplateSpec{ | ||||
| 				Spec: corev1.PodSpec{ | ||||
| 					Containers: []corev1.Container{ | ||||
| 						{ | ||||
| 							Name:  "test", | ||||
| 							Image: "test", | ||||
| 						}, | ||||
| 					}, | ||||
| 					RestartPolicy: corev1.RestartPolicyNever, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Richa Banker
					Richa Banker