Expand aggregated API server integration test to include CRUD

This change updates TestAggregatedAPIServer and the related test
server wiring to exercise the full network path between the Kube API
server and the aggregated API server.  We now assert that the wardle
API service and Kube API server discovery endpoints are fully healthy.
CRUD operations are performed through the Kube API server to the
wardle API server.

Signed-off-by: Monis Khan <mok@microsoft.com>
This commit is contained in:
Monis Khan
2022-08-25 17:16:12 +00:00
parent 6dd8b86124
commit ec283e526b
4 changed files with 202 additions and 62 deletions

View File

@@ -22,21 +22,21 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"reflect"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
apierrors "k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/discovery"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@@ -44,21 +44,34 @@ import (
"k8s.io/client-go/util/cert"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
sampleserver "k8s.io/sample-apiserver/pkg/cmd/server"
wardlev1alpha1client "k8s.io/sample-apiserver/pkg/generated/clientset/versioned/typed/wardle/v1alpha1"
netutils "k8s.io/utils/net"
)
func TestAggregatedAPIServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
// makes the kube-apiserver very responsive. it's normally a minute
dynamiccertificates.FileRefreshDuration = 1 * time.Second
stopCh := make(chan struct{})
defer close(stopCh)
// we need the wardle port information first to set up the service resolver
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
if err != nil {
t.Fatal(err)
}
// endpoints cannot have loopback IPs so we need to override the resolver itself
t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort))))
testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true}, nil, framework.SharedEtcd())
defer testServer.TearDownFn()
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
@@ -67,18 +80,41 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeClientConfig.AcceptContentTypes = ""
kubeClient := client.NewForConfigOrDie(kubeClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig)
// create the bare minimum resources required to be able to get the API service into an available state
_, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-wardle",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
_, err = kubeClient.CoreV1().Services("kube-wardle").Create(ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "api",
},
Spec: corev1.ServiceSpec{
ExternalName: "needs-to-be-non-empty",
Type: corev1.ServiceTypeExternalName,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
// start the wardle server to prove we can aggregate it
wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig))
defer os.Remove(wardleToKASKubeConfigFile)
wardleCertDir, _ := os.MkdirTemp("", "test-integration-wardle-server")
defer os.RemoveAll(wardleCertDir)
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
if err != nil {
t.Fatal(err)
}
go func() {
o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
// ensure this is a SAN on the generated cert for service FQDN
o.AlternateDNS = []string{
"api.kube-wardle.svc",
}
o.RecommendedOptions.SecureServing.Listener = listener
o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1")
wardleCmd := sampleserver.NewCommandStartWardleServer(o, stopCh)
@@ -93,25 +129,22 @@ func TestAggregatedAPIServer(t *testing.T) {
t.Error(err)
}
}()
directWardleClientConfig, err := waitForWardleRunning(t, kubeClientConfig, wardleCertDir, wardlePort)
directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeClientConfig, wardleCertDir, wardlePort)
if err != nil {
t.Fatal(err)
}
// now we're finally ready to test. These are what's run by default now
wardleClient, err := client.NewForConfig(directWardleClientConfig)
if err != nil {
t.Fatal(err)
}
testAPIGroupList(t, wardleClient.Discovery().RESTClient())
testAPIGroup(t, wardleClient.Discovery().RESTClient())
testAPIResourceList(t, wardleClient.Discovery().RESTClient())
wardleDirectClient := client.NewForConfigOrDie(directWardleClientConfig)
testAPIGroupList(ctx, t, wardleDirectClient.Discovery().RESTClient())
testAPIGroup(ctx, t, wardleDirectClient.Discovery().RESTClient())
testAPIResourceList(ctx, t, wardleDirectClient.Discovery().RESTClient())
wardleCA, err := os.ReadFile(directWardleClientConfig.CAFile)
if err != nil {
t.Fatal(err)
}
_, err = aggregatorClient.ApiregistrationV1().APIServices().Create(context.TODO(), &apiregistrationv1.APIService{
_, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
Spec: apiregistrationv1.APIServiceSpec{
Service: &apiregistrationv1.ServiceReference{
@@ -129,16 +162,92 @@ func TestAggregatedAPIServer(t *testing.T) {
t.Fatal(err)
}
// wait for the unavailable API service to be processed with updated status
err = wait.Poll(1*time.Second, wait.ForeverTestTimeout, func() (done bool, err error) {
// wait for the API service to be available
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) {
apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{})
if err != nil {
return false, err
}
var available bool
for _, condition := range apiService.Status.Conditions {
if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue {
available = true
break
}
}
if !available {
t.Log("api service is not available", apiService.Status.Conditions)
return false, nil
}
// make sure discovery is healthy overall
_, _, err = kubeClient.Discovery().ServerGroupsAndResources()
hasExpectedError := checkWardleUnavailableDiscoveryError(t, err)
return hasExpectedError, nil
if err != nil {
t.Log("discovery failed", err)
return false, nil
}
// make sure we have the wardle resources in discovery
apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1")
if err != nil {
t.Log("wardle discovery failed", err)
return false, nil
}
if len(apiResources.APIResources) != 2 {
t.Log("wardle discovery has wrong resources", apiResources.APIResources)
return false, nil
}
resources := make([]string, 0, 2)
for _, resource := range apiResources.APIResources {
resource := resource
resources = append(resources, resource.Name)
}
sort.Strings(resources)
if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) {
return false, fmt.Errorf("unexpected resources: %v", resources)
}
return true, nil
})
if err != nil {
t.Fatal(err)
}
// TODO figure out how to turn on enough of services and dns to run more
// perform simple CRUD operations against the wardle resources
_, err = wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{
ObjectMeta: metav1.ObjectMeta{
Name: "panda",
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
fischersList, err := wardleClient.Fischers().List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if len(fischersList.Items) != 1 {
t.Errorf("expected one fischer: %#v", fischersList.Items)
}
if len(fischersList.ResourceVersion) == 0 {
t.Error("expected non-empty resource version for fischer list")
}
_, err = wardleClient.Flunders(metav1.NamespaceSystem).Create(ctx, &wardlev1alpha1.Flunder{
ObjectMeta: metav1.ObjectMeta{
Name: "panda",
},
}, metav1.CreateOptions{})
flunderList, err := wardleClient.Flunders(metav1.NamespaceSystem).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
if len(flunderList.Items) != 1 {
t.Errorf("expected one flunder: %#v", flunderList.Items)
}
if len(flunderList.ResourceVersion) == 0 {
t.Error("expected non-empty resource version for flunder list")
}
// Since ClientCAs are provided by "client-ca::kube-system::extension-apiserver-authentication::client-ca-file" controller
// we need to wait until it picks up the configmap (via a lister) otherwise the response might contain an empty result.
@@ -224,10 +333,9 @@ func TestAggregatedAPIServer(t *testing.T) {
if numMatches != 4 {
t.Fatal("names don't match")
}
}
func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig))
directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
directWardleClientConfig.CAData = nil
@@ -249,7 +357,7 @@ func waitForWardleRunning(t *testing.T, wardleToKASKubeConfig *rest.Config, ward
return false, nil
}
healthStatus := 0
result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus)
result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
lastHealthContent, lastHealthErr = result.Raw()
if healthStatus != http.StatusOK {
return false, nil
@@ -301,33 +409,6 @@ func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfi
return wardleToKASKubeConfigFile.Name()
}
func checkWardleUnavailableDiscoveryError(t *testing.T, err error) bool {
if err == nil {
t.Log("Discovery call expected to return failed unavailable service")
return false
}
if !discovery.IsGroupDiscoveryFailedError(err) {
t.Logf("Unexpected error: %T, %v", err, err)
return false
}
discoveryErr := err.(*discovery.ErrGroupDiscoveryFailed)
if len(discoveryErr.Groups) != 1 {
t.Logf("Unexpected failed groups: %v", err)
return false
}
groupVersion := schema.GroupVersion{Group: "wardle.example.com", Version: "v1alpha1"}
groupVersionErr, ok := discoveryErr.Groups[groupVersion]
if !ok {
t.Logf("Unexpected failed group version: %v", err)
return false
}
if !apierrors.IsServiceUnavailable(groupVersionErr) {
t.Logf("Unexpected failed group version error: %v", err)
return false
}
return true
}
func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
clusterNick := "cluster"
userNick := "user"
@@ -365,12 +446,12 @@ func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config {
return config
}
func readResponse(client rest.Interface, location string) ([]byte, error) {
return client.Get().AbsPath(location).DoRaw(context.TODO())
func readResponse(ctx context.Context, client rest.Interface, location string) ([]byte, error) {
return client.Get().AbsPath(location).DoRaw(ctx)
}
func testAPIGroupList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis")
func testAPIGroupList(ctx context.Context, t *testing.T, client rest.Interface) {
contents, err := readResponse(ctx, client, "/apis")
if err != nil {
t.Fatalf("%v", err)
}
@@ -398,8 +479,8 @@ func testAPIGroupList(t *testing.T, client rest.Interface) {
assert.Equal(t, v1beta1, apiGroupList.Groups[0].PreferredVersion)
}
func testAPIGroup(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.example.com")
func testAPIGroup(ctx context.Context, t *testing.T, client rest.Interface) {
contents, err := readResponse(ctx, client, "/apis/wardle.example.com")
if err != nil {
t.Fatalf("%v", err)
}
@@ -416,8 +497,8 @@ func testAPIGroup(t *testing.T, client rest.Interface) {
assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0])
}
func testAPIResourceList(t *testing.T, client rest.Interface) {
contents, err := readResponse(client, "/apis/wardle.example.com/v1alpha1")
func testAPIResourceList(ctx context.Context, t *testing.T, client rest.Interface) {
contents, err := readResponse(ctx, client, "/apis/wardle.example.com/v1alpha1")
if err != nil {
t.Fatalf("%v", err)
}
@@ -472,3 +553,9 @@ MnVCuBwfwDXCAiEAw/1TA+CjPq9JC5ek1ifR0FybTURjeQqYkKpve1dveps=
`)
)
type staticURLServiceResolver string
func (u staticURLServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
return url.Parse(string(u))
}