Extract interface for master endpoints reconciler.
Make the master endpoints reconciler an interface so its implementation can be overridden, if desired.
This commit is contained in:
@@ -44,8 +44,6 @@ import (
|
||||
type Controller struct {
|
||||
NamespaceRegistry namespace.Registry
|
||||
ServiceRegistry service.Registry
|
||||
// TODO: MasterCount is yucky
|
||||
MasterCount int
|
||||
|
||||
ServiceClusterIPRegistry service.RangeRegistry
|
||||
ServiceClusterIPInterval time.Duration
|
||||
@@ -55,8 +53,8 @@ type Controller struct {
|
||||
ServiceNodePortInterval time.Duration
|
||||
ServiceNodePortRange utilnet.PortRange
|
||||
|
||||
EndpointRegistry endpoint.Registry
|
||||
EndpointInterval time.Duration
|
||||
EndpointReconciler EndpointReconciler
|
||||
EndpointInterval time.Duration
|
||||
|
||||
SystemNamespaces []string
|
||||
SystemNamespacesInterval time.Duration
|
||||
@@ -140,7 +138,7 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
|
||||
return err
|
||||
}
|
||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
|
||||
if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
|
||||
if err := c.EndpointReconciler.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -240,6 +238,39 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
|
||||
return err
|
||||
}
|
||||
|
||||
// EndpointReconciler knows how to reconcile the endpoints for the apiserver service.
|
||||
type EndpointReconciler interface {
|
||||
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
|
||||
// ReconcileEndpoints expects that the endpoints objects it manages will all be
|
||||
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
|
||||
// understand the requirements.
|
||||
//
|
||||
// Requirements:
|
||||
// * All apiservers MUST use the same ports for their {rw, ro} services.
|
||||
// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
|
||||
// endpoints for their {rw, ro} services.
|
||||
// * ReconcileEndpoints is called periodically from all apiservers.
|
||||
ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error
|
||||
}
|
||||
|
||||
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
|
||||
// masters. masterCountEndpointReconciler implements EndpointReconciler.
|
||||
type masterCountEndpointReconciler struct {
|
||||
masterCount int
|
||||
endpointRegistry endpoint.Registry
|
||||
}
|
||||
|
||||
var _ EndpointReconciler = &masterCountEndpointReconciler{}
|
||||
|
||||
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
|
||||
// specified expected number of masters.
|
||||
func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler {
|
||||
return &masterCountEndpointReconciler{
|
||||
masterCount: masterCount,
|
||||
endpointRegistry: endpointRegistry,
|
||||
}
|
||||
}
|
||||
|
||||
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
|
||||
// ReconcileEndpoints expects that the endpoints objects it manages will all be
|
||||
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
|
||||
@@ -252,10 +283,9 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
|
||||
// * All apiservers MUST know and agree on the number of apiservers expected
|
||||
// to be running (c.masterCount).
|
||||
// * ReconcileEndpoints is called periodically from all apiservers.
|
||||
//
|
||||
func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
|
||||
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
|
||||
ctx := api.NewDefaultContext()
|
||||
e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName)
|
||||
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName)
|
||||
if err != nil {
|
||||
e = &api.Endpoints{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
@@ -267,7 +297,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
|
||||
|
||||
// First, determine if the endpoint is in the format we expect (one
|
||||
// subset, ports matching endpointPorts, N IP addresses).
|
||||
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts)
|
||||
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
|
||||
if !formatCorrect {
|
||||
// Something is egregiously wrong, just re-make the endpoints record.
|
||||
e.Subsets = []api.EndpointSubset{{
|
||||
@@ -275,7 +305,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
|
||||
Ports: endpointPorts,
|
||||
}}
|
||||
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
|
||||
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
|
||||
return r.endpointRegistry.UpdateEndpoints(ctx, e)
|
||||
}
|
||||
if ipCorrect && portsCorrect {
|
||||
return nil
|
||||
@@ -291,11 +321,11 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
|
||||
// own IP address. Given the requirements stated at the top of
|
||||
// this function, this should cause the list of IP addresses to
|
||||
// become eventually correct.
|
||||
if addrs := &e.Subsets[0].Addresses; len(*addrs) > c.MasterCount {
|
||||
if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
|
||||
// addrs is a pointer because we're going to mutate it.
|
||||
for i, addr := range *addrs {
|
||||
if addr.IP == ip.String() {
|
||||
for len(*addrs) > c.MasterCount {
|
||||
for len(*addrs) > r.masterCount {
|
||||
// wrap around if necessary.
|
||||
remove := (i + 1) % len(*addrs)
|
||||
*addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
|
||||
@@ -310,7 +340,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP
|
||||
e.Subsets[0].Ports = endpointPorts
|
||||
}
|
||||
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
|
||||
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
|
||||
return r.endpointRegistry.UpdateEndpoints(ctx, e)
|
||||
}
|
||||
|
||||
// Determine if the endpoint is in the format ReconcileEndpoints expects.
|
||||
|
Reference in New Issue
Block a user