Remove RunAnAPIServer from integration tests

This commit is contained in:
Wojciech Tyczyński
2022-07-25 12:18:44 +02:00
parent c633f7124d
commit 5b042f0bf4
8 changed files with 46 additions and 404 deletions

View File

@@ -17,48 +17,19 @@ limitations under the License.
package framework
import (
"context"
"flag"
"net"
"net/http"
"net/http/httptest"
"path"
"strconv"
"time"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
authauthenticator "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
authenticatorunion "k8s.io/apiserver/pkg/authentication/request/union"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
authorizerunion "k8s.io/apiserver/pkg/authorization/union"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/kubeapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
netutils "k8s.io/utils/net"
)
const (
@@ -72,56 +43,6 @@ const (
// plane or choose some different minimum verbosity.
var MinVerbosity = 4
// Config is a struct of configuration directives for NewControlPlaneComponents.
type Config struct {
// If nil, a default is used, partially filled configs will not get populated.
InstanceConfig *controlplane.Config
StartReplicationManager bool
// Client throttling qps
QPS float32
// Client burst qps, also burst replicas allowed in rc manager
Burst int
// TODO: Add configs for endpoints controller, scheduler etc
}
// alwaysAllow always allows an action
type alwaysAllow struct{}
func (alwaysAllow) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
return authorizer.DecisionAllow, "always allow", nil
}
// unsecuredUser simulates requests to the unsecured endpoint for old tests
func unsecuredUser(req *http.Request) (*authauthenticator.Response, bool, error) {
auth := req.Header.Get("Authorization")
if len(auth) != 0 {
return nil, false, nil
}
return &authauthenticator.Response{
User: &user.DefaultInfo{
Name: "system:unsecured",
Groups: []string{user.SystemPrivilegedGroup, user.AllAuthenticated},
},
}, true, nil
}
// APIServerReceiver can be used to provide the API server to a custom incoming server function
type APIServerReceiver interface {
SetAPIServer(m *controlplane.Instance)
}
// APIServerHolder implements
type APIServerHolder struct {
Initialized chan struct{}
M *controlplane.Instance
}
// SetAPIServer assigns the current API server.
func (h *APIServerHolder) SetAPIServer(m *controlplane.Instance) {
h.M = m
close(h.Initialized)
}
// DefaultOpenAPIConfig returns an openapicommon.Config initialized to default values.
func DefaultOpenAPIConfig() *openapicommon.Config {
openAPIConfig := genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme))
@@ -160,180 +81,6 @@ func DefaultOpenAPIV3Config() *openapicommon.Config {
return openAPIConfig
}
// startAPIServerOrDie starts a kubernetes API server and an httpserver to handle api requests
func startAPIServerOrDie(controlPlaneConfig *controlplane.Config, incomingServer *httptest.Server, apiServerReceiver APIServerReceiver) (*controlplane.Instance, *httptest.Server, CloseFunc) {
var m *controlplane.Instance
var s *httptest.Server
// Ensure we log at least at the desired level
v := flag.Lookup("v").Value
level, _ := strconv.Atoi(v.String())
if level < MinVerbosity {
v.Set(strconv.Itoa(MinVerbosity))
}
if incomingServer != nil {
s = incomingServer
} else {
s = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.GenericAPIServer.Handler.ServeHTTP(w, req)
}))
}
stopCh := make(chan struct{})
// the APIServer implements logic to handle the shutdown process, taking care of draining
// the connections, closing the listener socket, running the preShutdown hooks, stopping the postStartHooks, ...
// In the integration framework we don't have that logic so we try to emulate a similar shutdown process.
// Ref: staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
closeFn := func() {
if m != nil {
m.GenericAPIServer.RunPreShutdownHooks()
}
// Signal RunPostStartHooks to finish
close(stopCh)
// Clean up APIServer resources
m.GenericAPIServer.Destroy()
// At this point the APIserver was already "destroyed", new requests will not be processed,
// however, the httptest.Server.Close() method will block if there are active connections.
// To avoid that any spurious connection keeps the test hanging, we forcefully close the
// connections before shuting down the server. There is a small window where new connections
// can be initiated but is unlikely those move to active, hanging the server shutdown.
s.CloseClientConnections()
s.Close()
}
if controlPlaneConfig == nil {
controlPlaneConfig = NewControlPlaneConfig()
controlPlaneConfig.GenericConfig.OpenAPIConfig = DefaultOpenAPIConfig()
}
// set the loopback client config
if controlPlaneConfig.GenericConfig.LoopbackClientConfig == nil {
controlPlaneConfig.GenericConfig.LoopbackClientConfig = &restclient.Config{QPS: 50, Burst: 100, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}}
}
controlPlaneConfig.GenericConfig.LoopbackClientConfig.Host = s.URL
privilegedLoopbackToken := uuid.New().String()
// wrap any available authorizer
tokens := make(map[string]*user.DefaultInfo)
tokens[privilegedLoopbackToken] = &user.DefaultInfo{
Name: user.APIServerUser,
UID: uuid.New().String(),
Groups: []string{user.SystemPrivilegedGroup, user.AllAuthenticated},
}
tokens[UnprivilegedUserToken] = &user.DefaultInfo{
Name: "unprivileged",
UID: uuid.New().String(),
Groups: []string{user.AllAuthenticated},
}
tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, controlPlaneConfig.GenericConfig.Authentication.APIAudiences)
if controlPlaneConfig.GenericConfig.Authentication.Authenticator == nil {
controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, authauthenticator.RequestFunc(unsecuredUser))
} else {
controlPlaneConfig.GenericConfig.Authentication.Authenticator = authenticatorunion.New(tokenAuthenticator, controlPlaneConfig.GenericConfig.Authentication.Authenticator)
}
if controlPlaneConfig.GenericConfig.Authorization.Authorizer != nil {
tokenAuthorizer := authorizerfactory.NewPrivilegedGroups(user.SystemPrivilegedGroup)
controlPlaneConfig.GenericConfig.Authorization.Authorizer = authorizerunion.New(tokenAuthorizer, controlPlaneConfig.GenericConfig.Authorization.Authorizer)
} else {
controlPlaneConfig.GenericConfig.Authorization.Authorizer = alwaysAllow{}
}
controlPlaneConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken
clientset, err := clientset.NewForConfig(controlPlaneConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
closeFn()
klog.Fatal(err)
}
controlPlaneConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, controlPlaneConfig.GenericConfig.LoopbackClientConfig.Timeout)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) {
controlPlaneConfig.GenericConfig.FlowControl = utilflowcontrol.New(
controlPlaneConfig.ExtraConfig.VersionedInformers,
clientset.FlowcontrolV1beta2(),
controlPlaneConfig.GenericConfig.MaxRequestsInFlight+controlPlaneConfig.GenericConfig.MaxMutatingRequestsInFlight,
controlPlaneConfig.GenericConfig.RequestTimeout/4,
)
}
if controlPlaneConfig.ExtraConfig.ServiceIPRange.IP == nil {
controlPlaneConfig.ExtraConfig.ServiceIPRange = net.IPNet{IP: netutils.ParseIPSloppy("10.0.0.0"), Mask: net.CIDRMask(24, 32)}
}
m, err = controlPlaneConfig.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
// We log the error first so that even if closeFn crashes, the error is shown
klog.Errorf("error in bringing up the apiserver: %v", err)
closeFn()
klog.Fatalf("error in bringing up the apiserver: %v", err)
}
if apiServerReceiver != nil {
apiServerReceiver.SetAPIServer(m)
}
// TODO have this start method actually use the normal start sequence for the API server
// this method never actually calls the `Run` method for the API server
// fire the post hooks ourselves
m.GenericAPIServer.PrepareRun()
m.GenericAPIServer.RunPostStartHooks(stopCh)
cfg := *controlPlaneConfig.GenericConfig.LoopbackClientConfig
cfg.ContentConfig.GroupVersion = &schema.GroupVersion{}
privilegedClient, err := restclient.RESTClientFor(&cfg)
if err != nil {
closeFn()
klog.Fatal(err)
}
var lastHealthContent []byte
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
result := privilegedClient.Get().AbsPath("/healthz").Do(ctx)
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
lastHealthContent, _ = result.Raw()
return false, nil
})
if err != nil {
closeFn()
klog.Errorf("last health content: %q", string(lastHealthContent))
klog.Fatal(err)
}
return m, s, closeFn
}
// NewIntegrationTestControlPlaneConfig returns the control plane config appropriate for most integration tests.
func NewIntegrationTestControlPlaneConfig() *controlplane.Config {
return NewIntegrationTestControlPlaneConfigWithOptions(&ControlPlaneConfigOptions{})
}
// NewIntegrationTestControlPlaneConfigWithOptions returns the control plane config appropriate for most integration tests
// configured with the provided options.
func NewIntegrationTestControlPlaneConfigWithOptions(opts *ControlPlaneConfigOptions) *controlplane.Config {
controlPlaneConfig := NewControlPlaneConfigWithOptions(opts)
controlPlaneConfig.GenericConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
controlPlaneConfig.ExtraConfig.APIResourceConfigSource = controlplane.DefaultAPIResourceConfigSource()
// TODO: get rid of these tests or port them to secure serving
controlPlaneConfig.GenericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
return controlPlaneConfig
}
// ControlPlaneConfigOptions are the configurable options for a new integration test control plane config.
type ControlPlaneConfigOptions struct {
EtcdOptions *options.EtcdOptions
}
// DefaultEtcdOptions are the default EtcdOptions for use with integration tests.
func DefaultEtcdOptions() *options.EtcdOptions {
// This causes the integration tests to exercise the etcd
@@ -344,101 +91,9 @@ func DefaultEtcdOptions() *options.EtcdOptions {
return etcdOptions
}
// NewControlPlaneConfig returns a basic control plane config.
func NewControlPlaneConfig() *controlplane.Config {
return NewControlPlaneConfigWithOptions(&ControlPlaneConfigOptions{})
}
// NewControlPlaneConfigWithOptions returns a basic control plane config configured with the provided options.
func NewControlPlaneConfigWithOptions(opts *ControlPlaneConfigOptions) *controlplane.Config {
etcdOptions := DefaultEtcdOptions()
if opts.EtcdOptions != nil {
etcdOptions = opts.EtcdOptions
}
storageConfig := kubeapiserver.NewStorageFactoryConfig()
storageConfig.APIResourceConfig = serverstorage.NewResourceConfig()
completedStorageConfig, err := storageConfig.Complete(etcdOptions)
if err != nil {
panic(err)
}
storageFactory, err := completedStorageConfig.New()
if err != nil {
panic(err)
}
genericConfig := genericapiserver.NewConfig(legacyscheme.Codecs)
kubeVersion := version.Get()
if len(kubeVersion.Major) == 0 {
kubeVersion.Major = "1"
}
if len(kubeVersion.Minor) == 0 {
kubeVersion.Minor = "22"
}
genericConfig.Version = &kubeVersion
genericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
// TODO: get rid of these tests or port them to secure serving
genericConfig.SecureServing = &genericapiserver.SecureServingInfo{Listener: fakeLocalhost443Listener{}}
// if using endpoint reconciler the service subnet IP family must match the Public address
genericConfig.PublicAddress = netutils.ParseIPSloppy("10.1.1.1")
err = etcdOptions.ApplyWithStorageFactoryTo(storageFactory, genericConfig)
if err != nil {
panic(err)
}
return &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
APIResourceConfigSource: controlplane.DefaultAPIResourceConfigSource(),
StorageFactory: storageFactory,
KubeletClientConfig: kubeletclient.KubeletClientConfig{Port: 10250},
APIServerServicePort: 443,
MasterCount: 1,
},
}
}
// CloseFunc can be called to cleanup the API server
type CloseFunc func()
// DEPRECATED: Use StartTestServer or directly StartTestServer directly
// from cmd/kube-apiserver/app/testing.
//
// RunAnAPIServer starts a API server with the provided config.
func RunAnAPIServer(controlPlaneConfig *controlplane.Config) (*controlplane.Instance, *httptest.Server, CloseFunc) {
if controlPlaneConfig == nil {
controlPlaneConfig = NewControlPlaneConfig()
controlPlaneConfig.GenericConfig.EnableProfiling = true
}
return startAPIServerOrDie(controlPlaneConfig, nil, nil)
}
// RunAnAPIServerUsingServer starts up an instance using the provided config on the specified server.
func RunAnAPIServerUsingServer(controlPlaneConfig *controlplane.Config, s *httptest.Server, apiServerReceiver APIServerReceiver) (*controlplane.Instance, *httptest.Server, CloseFunc) {
return startAPIServerOrDie(controlPlaneConfig, s, apiServerReceiver)
}
// SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
func SharedEtcd() *storagebackend.Config {
cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New().String(), "registry"), nil)
cfg.Transport.ServerList = []string{GetEtcdURL()}
return cfg
}
type fakeLocalhost443Listener struct{}
func (fakeLocalhost443Listener) Accept() (net.Conn, error) {
return nil, nil
}
func (fakeLocalhost443Listener) Close() error {
return nil
}
func (fakeLocalhost443Listener) Addr() net.Addr {
return &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 443,
}
}