refactor controlplane to use just one client-go

This commit is contained in:
Antonio Ojea
2022-06-09 11:04:56 +02:00
parent 5b92e46b22
commit 586a3d4ac5
3 changed files with 20 additions and 35 deletions

View File

@@ -33,9 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
@@ -55,10 +53,7 @@ const (
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
// repair check on service IPs
type Controller struct {
ServiceClient corev1client.ServicesGetter
NamespaceClient corev1client.NamespacesGetter
EventClient eventsv1client.EventsV1Interface
readyzClient rest.Interface
client kubernetes.Interface
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPRange net.IPNet
@@ -89,7 +84,7 @@ type Controller struct {
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient eventsv1client.EventsV1Interface, readyzClient rest.Interface) (*Controller, error) {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, client kubernetes.Interface) (*Controller, error) {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
@@ -109,10 +104,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
return &Controller{
ServiceClient: serviceClient,
NamespaceClient: nsClient,
EventClient: eventClient,
readyzClient: readyzClient,
client: client,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
@@ -167,8 +159,8 @@ func (c *Controller) Start() {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.client.CoreV1(), c.client.EventsV1(), &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
@@ -238,7 +230,7 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
wait.Until(func() {
// Loop the system namespace list, and create them if they do not exist
for _, ns := range c.SystemNamespaces {
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
if err := createNamespaceIfNeeded(c.client.CoreV1(), ns); err != nil {
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
}
}
@@ -250,7 +242,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
@@ -270,7 +262,7 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
if err := createNamespaceIfNeeded(c.client.CoreV1(), metav1.NamespaceDefault); err != nil {
return err
}
@@ -316,12 +308,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
// The service already exists.
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
return err
}
}
@@ -345,7 +337,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
},
}
_, err := c.ServiceClient.Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
}