FIXUP: pkg/controlplane: move bootstrap controller to controllers/kubernetesservice

This commit is contained in:
Dr. Stefan Schimanski
2023-07-03 20:23:20 +02:00
parent 7b62d000a4
commit 616c959988
5 changed files with 141 additions and 118 deletions

View File

@@ -31,20 +31,18 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
"k8s.io/kubernetes/pkg/util/async"
netutils "k8s.io/utils/net"
)
const (
@@ -55,91 +53,59 @@ const (
// controller loops, which manage creating the "kubernetes" service and
// provide the IP repair check on service IPs
type Controller struct {
client kubernetes.Interface
informers informers.SharedInformerFactory
Config
RangeRegistries
runner *async.Runner
}
type RangeRegistries struct {
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
SecondaryServiceClusterIPRange net.IPNet
ServiceNodePortRegistry rangeallocation.RangeRegistry
}
ServiceClusterIPInterval time.Duration
type Config struct {
Client kubernetes.Interface
Informers informers.SharedInformerFactory
ServiceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortInterval time.Duration
ServiceNodePortRange utilnet.PortRange
KubernetesService
ClusterIP
NodePort
}
type KubernetesService struct {
PublicIP net.IP
EndpointReconciler reconcilers.EndpointReconciler
EndpointInterval time.Duration
PublicIP net.IP
// ServiceIP indicates where the kubernetes service will live. It may not be nil.
ServiceIP net.IP
ServicePort int
PublicServicePort int
KubernetesServiceNodePort int
runner *async.Runner
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, client kubernetes.Interface) (*Controller, error) {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
type ClusterIP struct {
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRange net.IPNet
ServiceClusterIPInterval time.Duration
}
// The "kubernetes.default" Service is SingleStack based on the configured ServiceIPRange.
// If the bootstrap controller reconcile the kubernetes.default Service and Endpoints, it must
// guarantee that the Service ClusterIP and the associated Endpoints have the same IP family, or
// it will not work for clients because of the IP family mismatch.
// TODO: revisit for dual-stack https://github.com/kubernetes/enhancements/issues/2438
if c.ExtraConfig.EndpointReconcilerType != reconcilers.NoneEndpointReconcilerType {
if netutils.IsIPv4CIDR(&c.ExtraConfig.ServiceIPRange) != netutils.IsIPv4(c.GenericConfig.PublicAddress) {
return nil, fmt.Errorf("service IP family %q must match public address family %q", c.ExtraConfig.ServiceIPRange.String(), c.GenericConfig.PublicAddress.String())
}
}
type NodePort struct {
ServiceNodePortInterval time.Duration
ServiceNodePortRange utilnet.PortRange
}
// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) {
return &Controller{
client: client,
informers: c.ExtraConfig.VersionedInformers,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval,
ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval,
PublicIP: c.GenericConfig.PublicAddress,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
Config: config,
RangeRegistries: rangeRegistries,
}, nil
}
// PostStartHook initiates the core controller loops that must exist for bootstrapping.
func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
c.Start()
return nil
}
// PreShutdownHook triggers the actions needed to shut down the API Server cleanly.
func (c *Controller) PreShutdownHook() error {
c.Stop()
return nil
}
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
@@ -155,7 +121,7 @@ func (c *Controller) Start() {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
}
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
@@ -177,8 +143,8 @@ func (c *Controller) Start() {
var runRepairClusterIPs func(stopCh chan struct{})
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
c.client.CoreV1(),
c.client.EventsV1(),
c.Client.CoreV1(),
c.Client.EventsV1(),
&c.ServiceClusterIPRange,
c.ServiceClusterIPRegistry,
&c.SecondaryServiceClusterIPRange,
@@ -188,11 +154,11 @@ func (c *Controller) Start() {
}
} else {
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
c.client,
c.Client,
&c.ServiceClusterIPRange,
&c.SecondaryServiceClusterIPRange,
c.informers.Core().V1().Services(),
c.informers.Networking().V1alpha1().IPAddresses(),
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().IPAddresses(),
)
runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
@@ -247,7 +213,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
@@ -267,8 +233,15 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if err := createNamespaceIfNeeded(c.client.CoreV1(), metav1.NamespaceDefault); err != nil {
return err
if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
},
}, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) {
return err
}
}
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
@@ -313,12 +286,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
// The service already exists.
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
return err
}
}
@@ -342,7 +315,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
},
}
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
}

View File

@@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.client = fakeClient
master.Client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
@@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
master.Client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
@@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
master.Client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)