Merge pull request #26915 from ncdc/master-endpoint-reconciler-interface
Automatic merge from submit-queue Extract interface for master endpoints reconciler. Make the master endpoints reconciler an interface so its implementation can be overridden, if desired. xref #20975 #26574 cc @kubernetes/sig-api-machinery @lavalamp @smarterclayton @pmorie @DirectXMan12 @wojtek-t @kubernetes/rh-cluster-infra
This commit is contained in:
		@@ -44,8 +44,6 @@ import (
 | 
				
			|||||||
type Controller struct {
 | 
					type Controller struct {
 | 
				
			||||||
	NamespaceRegistry namespace.Registry
 | 
						NamespaceRegistry namespace.Registry
 | 
				
			||||||
	ServiceRegistry   service.Registry
 | 
						ServiceRegistry   service.Registry
 | 
				
			||||||
	// TODO: MasterCount is yucky
 | 
					 | 
				
			||||||
	MasterCount int
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ServiceClusterIPRegistry service.RangeRegistry
 | 
						ServiceClusterIPRegistry service.RangeRegistry
 | 
				
			||||||
	ServiceClusterIPInterval time.Duration
 | 
						ServiceClusterIPInterval time.Duration
 | 
				
			||||||
@@ -55,7 +53,7 @@ type Controller struct {
 | 
				
			|||||||
	ServiceNodePortInterval time.Duration
 | 
						ServiceNodePortInterval time.Duration
 | 
				
			||||||
	ServiceNodePortRange    utilnet.PortRange
 | 
						ServiceNodePortRange    utilnet.PortRange
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	EndpointRegistry endpoint.Registry
 | 
						EndpointReconciler EndpointReconciler
 | 
				
			||||||
	EndpointInterval   time.Duration
 | 
						EndpointInterval   time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SystemNamespaces         []string
 | 
						SystemNamespaces         []string
 | 
				
			||||||
@@ -140,7 +138,7 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
 | 
				
			|||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
 | 
							endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
 | 
				
			||||||
		if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
 | 
							if err := c.EndpointReconciler.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -240,6 +238,39 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
 | 
				
			|||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// EndpointReconciler knows how to reconcile the endpoints for the apiserver service.
 | 
				
			||||||
 | 
					type EndpointReconciler interface {
 | 
				
			||||||
 | 
						// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
 | 
				
			||||||
 | 
						// ReconcileEndpoints expects that the endpoints objects it manages will all be
 | 
				
			||||||
 | 
						// managed only by ReconcileEndpoints; therefore, to understand this, you need only
 | 
				
			||||||
 | 
						// understand the requirements.
 | 
				
			||||||
 | 
						//
 | 
				
			||||||
 | 
						// Requirements:
 | 
				
			||||||
 | 
						//  * All apiservers MUST use the same ports for their {rw, ro} services.
 | 
				
			||||||
 | 
						//  * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
 | 
				
			||||||
 | 
						//      endpoints for their {rw, ro} services.
 | 
				
			||||||
 | 
						//  * ReconcileEndpoints is called periodically from all apiservers.
 | 
				
			||||||
 | 
						ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
 | 
				
			||||||
 | 
					// masters. masterCountEndpointReconciler implements EndpointReconciler.
 | 
				
			||||||
 | 
					type masterCountEndpointReconciler struct {
 | 
				
			||||||
 | 
						masterCount      int
 | 
				
			||||||
 | 
						endpointRegistry endpoint.Registry
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ EndpointReconciler = &masterCountEndpointReconciler{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
 | 
				
			||||||
 | 
					// specified expected number of masters.
 | 
				
			||||||
 | 
					func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler {
 | 
				
			||||||
 | 
						return &masterCountEndpointReconciler{
 | 
				
			||||||
 | 
							masterCount:      masterCount,
 | 
				
			||||||
 | 
							endpointRegistry: endpointRegistry,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
 | 
					// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
 | 
				
			||||||
// ReconcileEndpoints expects that the endpoints objects it manages will all be
 | 
					// ReconcileEndpoints expects that the endpoints objects it manages will all be
 | 
				
			||||||
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
 | 
					// managed only by ReconcileEndpoints; therefore, to understand this, you need only
 | 
				
			||||||
@@ -252,10 +283,9 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
 | 
				
			|||||||
//  * All apiservers MUST know and agree on the number of apiservers expected
 | 
					//  * All apiservers MUST know and agree on the number of apiservers expected
 | 
				
			||||||
//      to be running (c.masterCount).
 | 
					//      to be running (c.masterCount).
 | 
				
			||||||
//  * ReconcileEndpoints is called periodically from all apiservers.
 | 
					//  * ReconcileEndpoints is called periodically from all apiservers.
 | 
				
			||||||
//
 | 
					func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
 | 
				
			||||||
func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
 | 
					 | 
				
			||||||
	ctx := api.NewDefaultContext()
 | 
						ctx := api.NewDefaultContext()
 | 
				
			||||||
	e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName)
 | 
						e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		e = &api.Endpoints{
 | 
							e = &api.Endpoints{
 | 
				
			||||||
			ObjectMeta: api.ObjectMeta{
 | 
								ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
@@ -267,7 +297,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// First, determine if the endpoint is in the format we expect (one
 | 
						// First, determine if the endpoint is in the format we expect (one
 | 
				
			||||||
	// subset, ports matching endpointPorts, N IP addresses).
 | 
						// subset, ports matching endpointPorts, N IP addresses).
 | 
				
			||||||
	formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts)
 | 
						formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
 | 
				
			||||||
	if !formatCorrect {
 | 
						if !formatCorrect {
 | 
				
			||||||
		// Something is egregiously wrong, just re-make the endpoints record.
 | 
							// Something is egregiously wrong, just re-make the endpoints record.
 | 
				
			||||||
		e.Subsets = []api.EndpointSubset{{
 | 
							e.Subsets = []api.EndpointSubset{{
 | 
				
			||||||
@@ -275,7 +305,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
 | 
				
			|||||||
			Ports:     endpointPorts,
 | 
								Ports:     endpointPorts,
 | 
				
			||||||
		}}
 | 
							}}
 | 
				
			||||||
		glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
 | 
							glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
 | 
				
			||||||
		return c.EndpointRegistry.UpdateEndpoints(ctx, e)
 | 
							return r.endpointRegistry.UpdateEndpoints(ctx, e)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if ipCorrect && portsCorrect {
 | 
						if ipCorrect && portsCorrect {
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
@@ -291,11 +321,11 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
 | 
				
			|||||||
		// own IP address.  Given the requirements stated at the top of
 | 
							// own IP address.  Given the requirements stated at the top of
 | 
				
			||||||
		// this function, this should cause the list of IP addresses to
 | 
							// this function, this should cause the list of IP addresses to
 | 
				
			||||||
		// become eventually correct.
 | 
							// become eventually correct.
 | 
				
			||||||
		if addrs := &e.Subsets[0].Addresses; len(*addrs) > c.MasterCount {
 | 
							if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
 | 
				
			||||||
			// addrs is a pointer because we're going to mutate it.
 | 
								// addrs is a pointer because we're going to mutate it.
 | 
				
			||||||
			for i, addr := range *addrs {
 | 
								for i, addr := range *addrs {
 | 
				
			||||||
				if addr.IP == ip.String() {
 | 
									if addr.IP == ip.String() {
 | 
				
			||||||
					for len(*addrs) > c.MasterCount {
 | 
										for len(*addrs) > r.masterCount {
 | 
				
			||||||
						// wrap around if necessary.
 | 
											// wrap around if necessary.
 | 
				
			||||||
						remove := (i + 1) % len(*addrs)
 | 
											remove := (i + 1) % len(*addrs)
 | 
				
			||||||
						*addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
 | 
											*addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
 | 
				
			||||||
@@ -310,7 +340,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
 | 
				
			|||||||
		e.Subsets[0].Ports = endpointPorts
 | 
							e.Subsets[0].Ports = endpointPorts
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
 | 
						glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
 | 
				
			||||||
	return c.EndpointRegistry.UpdateEndpoints(ctx, e)
 | 
						return r.endpointRegistry.UpdateEndpoints(ctx, e)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Determine if the endpoint is in the format ReconcileEndpoints expects.
 | 
					// Determine if the endpoint is in the format ReconcileEndpoints expects.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -371,12 +371,11 @@ func TestReconcileEndpoints(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, test := range reconcile_tests {
 | 
						for _, test := range reconcile_tests {
 | 
				
			||||||
		master := Controller{MasterCount: test.additionalMasters + 1}
 | 
					 | 
				
			||||||
		registry := ®istrytest.EndpointRegistry{
 | 
							registry := ®istrytest.EndpointRegistry{
 | 
				
			||||||
			Endpoints: test.endpoints,
 | 
								Endpoints: test.endpoints,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		master.EndpointRegistry = registry
 | 
							reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
 | 
				
			||||||
		err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
 | 
							err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			t.Errorf("case %q: unexpected error: %v", test.testName, err)
 | 
								t.Errorf("case %q: unexpected error: %v", test.testName, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -461,12 +460,11 @@ func TestReconcileEndpoints(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, test := range non_reconcile_tests {
 | 
						for _, test := range non_reconcile_tests {
 | 
				
			||||||
		master := Controller{MasterCount: test.additionalMasters + 1}
 | 
					 | 
				
			||||||
		registry := ®istrytest.EndpointRegistry{
 | 
							registry := ®istrytest.EndpointRegistry{
 | 
				
			||||||
			Endpoints: test.endpoints,
 | 
								Endpoints: test.endpoints,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		master.EndpointRegistry = registry
 | 
							reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
 | 
				
			||||||
		err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
 | 
							err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			t.Errorf("case %q: unexpected error: %v", test.testName, err)
 | 
								t.Errorf("case %q: unexpected error: %v", test.testName, err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -519,7 +517,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, test := range create_tests {
 | 
						for _, test := range create_tests {
 | 
				
			||||||
		master := Controller{MasterCount: 1}
 | 
							master := Controller{}
 | 
				
			||||||
		registry := ®istrytest.ServiceRegistry{
 | 
							registry := ®istrytest.ServiceRegistry{
 | 
				
			||||||
			Err: errors.New("unable to get svc"),
 | 
								Err: errors.New("unable to get svc"),
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -794,7 +792,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, test := range reconcile_tests {
 | 
						for _, test := range reconcile_tests {
 | 
				
			||||||
		master := Controller{MasterCount: 1}
 | 
							master := Controller{}
 | 
				
			||||||
		registry := ®istrytest.ServiceRegistry{
 | 
							registry := ®istrytest.ServiceRegistry{
 | 
				
			||||||
			Service: test.service,
 | 
								Service: test.service,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -846,7 +844,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	for _, test := range non_reconcile_tests {
 | 
						for _, test := range non_reconcile_tests {
 | 
				
			||||||
		master := Controller{MasterCount: 1}
 | 
							master := Controller{}
 | 
				
			||||||
		registry := ®istrytest.ServiceRegistry{
 | 
							registry := ®istrytest.ServiceRegistry{
 | 
				
			||||||
			Service: test.service,
 | 
								Service: test.service,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -590,9 +590,8 @@ func (m *Master) NewBootstrapController() *Controller {
 | 
				
			|||||||
	return &Controller{
 | 
						return &Controller{
 | 
				
			||||||
		NamespaceRegistry: m.namespaceRegistry,
 | 
							NamespaceRegistry: m.namespaceRegistry,
 | 
				
			||||||
		ServiceRegistry:   m.serviceRegistry,
 | 
							ServiceRegistry:   m.serviceRegistry,
 | 
				
			||||||
		MasterCount:       m.MasterCount,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		EndpointRegistry: m.endpointRegistry,
 | 
							EndpointReconciler: NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry),
 | 
				
			||||||
		EndpointInterval:   10 * time.Second,
 | 
							EndpointInterval:   10 * time.Second,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		SystemNamespaces:         []string{api.NamespaceSystem},
 | 
							SystemNamespaces:         []string{api.NamespaceSystem},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -257,10 +257,9 @@ func TestNewBootstrapController(t *testing.T) {
 | 
				
			|||||||
	controller := master.NewBootstrapController()
 | 
						controller := master.NewBootstrapController()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry)
 | 
						assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry)
 | 
				
			||||||
	assert.Equal(controller.EndpointRegistry, master.endpointRegistry)
 | 
						assert.Equal(controller.EndpointReconciler, NewMasterCountEndpointReconciler(master.MasterCount, master.endpointRegistry))
 | 
				
			||||||
	assert.Equal(controller.ServiceRegistry, master.serviceRegistry)
 | 
						assert.Equal(controller.ServiceRegistry, master.serviceRegistry)
 | 
				
			||||||
	assert.Equal(controller.ServiceNodePortRange, portRange)
 | 
						assert.Equal(controller.ServiceNodePortRange, portRange)
 | 
				
			||||||
	assert.Equal(controller.MasterCount, master.MasterCount)
 | 
					 | 
				
			||||||
	assert.Equal(controller.ServicePort, master.ServiceReadWritePort)
 | 
						assert.Equal(controller.ServicePort, master.ServiceReadWritePort)
 | 
				
			||||||
	assert.Equal(controller.PublicServicePort, master.PublicReadWritePort)
 | 
						assert.Equal(controller.PublicServicePort, master.PublicReadWritePort)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user