Update master service ports and type via controller.
This commit is contained in:
@@ -86,7 +86,8 @@ func (c *Controller) Start() {
|
||||
// If we fail to repair node ports apiserver is useless. We should restart and retry.
|
||||
glog.Fatalf("Unable to perform initial service nodePort check: %v", err)
|
||||
}
|
||||
if err := c.UpdateKubernetesService(); err != nil {
|
||||
// Service definition is reconciled during first run to correct port and type per expectations.
|
||||
if err := c.UpdateKubernetesService(true); err != nil {
|
||||
glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err)
|
||||
}
|
||||
|
||||
@@ -97,14 +98,17 @@ func (c *Controller) Start() {
|
||||
// RunKubernetesService periodically updates the kubernetes service
|
||||
func (c *Controller) RunKubernetesService(ch chan struct{}) {
|
||||
util.Until(func() {
|
||||
if err := c.UpdateKubernetesService(); err != nil {
|
||||
// Service definition is not reconciled after first
|
||||
// run, ports and type will be corrected only during
|
||||
// start.
|
||||
if err := c.UpdateKubernetesService(false); err != nil {
|
||||
util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
|
||||
}
|
||||
}, c.EndpointInterval, ch)
|
||||
}
|
||||
|
||||
// UpdateKubernetesService attempts to update the default Kube service.
|
||||
func (c *Controller) UpdateKubernetesService() error {
|
||||
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
|
||||
// Update service & endpoint records.
|
||||
// TODO: when it becomes possible to change this stuff,
|
||||
// stop polling and start watching.
|
||||
@@ -114,11 +118,11 @@ func (c *Controller) UpdateKubernetesService() error {
|
||||
}
|
||||
if c.ServiceIP != nil {
|
||||
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
|
||||
if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType); err != nil {
|
||||
if err := c.CreateOrUpdateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
|
||||
return err
|
||||
}
|
||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
|
||||
if err := c.SetEndpoints("kubernetes", c.PublicIP, endpointPorts); err != nil {
|
||||
if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -179,10 +183,17 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp
|
||||
|
||||
// CreateMasterServiceIfNeeded will create the specified service if it
|
||||
// doesn't already exist.
|
||||
func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType) error {
|
||||
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error {
|
||||
ctx := api.NewDefaultContext()
|
||||
if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil {
|
||||
if s, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil {
|
||||
// The service already exists.
|
||||
if reconcile {
|
||||
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
|
||||
glog.Warningf("Resetting master service %q to %#v", serviceName, svc)
|
||||
_, err := c.ServiceRegistry.UpdateService(ctx, svc)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
svc := &api.Service{
|
||||
@@ -211,20 +222,20 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n
|
||||
return err
|
||||
}
|
||||
|
||||
// SetEndpoints sets the endpoints for the given apiserver service (ro or rw).
|
||||
// SetEndpoints expects that the endpoints objects it manages will all be
|
||||
// managed only by SetEndpoints; therefore, to understand this, you need only
|
||||
// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
|
||||
// ReconcileEndpoints expects that the endpoints objects it manages will all be
|
||||
// managed only by ReconcileEndpoints; therefore, to understand this, you need only
|
||||
// understand the requirements and the body of this function.
|
||||
//
|
||||
// Requirements:
|
||||
// * All apiservers MUST use the same ports for their {rw, ro} services.
|
||||
// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the
|
||||
// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
|
||||
// endpoints for their {rw, ro} services.
|
||||
// * All apiservers MUST know and agree on the number of apiservers expected
|
||||
// to be running (c.masterCount).
|
||||
// * SetEndpoints is called periodically from all apiservers.
|
||||
// * ReconcileEndpoints is called periodically from all apiservers.
|
||||
//
|
||||
func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
|
||||
func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
|
||||
ctx := api.NewDefaultContext()
|
||||
e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName)
|
||||
if err != nil {
|
||||
@@ -238,7 +249,7 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [
|
||||
|
||||
// First, determine if the endpoint is in the format we expect (one
|
||||
// subset, ports matching endpointPorts, N IP addresses).
|
||||
formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount)
|
||||
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts)
|
||||
if !formatCorrect {
|
||||
// Something is egregiously wrong, just re-make the endpoints record.
|
||||
e.Subsets = []api.EndpointSubset{{
|
||||
@@ -247,7 +258,11 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [
|
||||
}}
|
||||
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
|
||||
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
|
||||
} else if !ipCorrect {
|
||||
}
|
||||
if ipCorrect && portsCorrect {
|
||||
return nil
|
||||
}
|
||||
if !ipCorrect {
|
||||
// We *always* add our own IP address.
|
||||
e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()})
|
||||
|
||||
@@ -271,38 +286,83 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts [
|
||||
}
|
||||
}
|
||||
}
|
||||
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
|
||||
}
|
||||
// We didn't make any changes, no need to actually call update.
|
||||
return nil
|
||||
if !portsCorrect {
|
||||
// Reset ports.
|
||||
e.Subsets[0].Ports = endpointPorts
|
||||
}
|
||||
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
|
||||
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
|
||||
}
|
||||
|
||||
// Determine if the endpoint is in the format SetEndpoints expect (one subset,
|
||||
// Determine if the endpoint is in the format ReconcileEndpoints expect (one subset,
|
||||
// correct ports, N IP addresses); and if the specified IP address is present and
|
||||
// the correct number of ip addresses are found.
|
||||
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int) (formatCorrect, ipCorrect bool) {
|
||||
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
|
||||
if len(e.Subsets) != 1 {
|
||||
return false, false
|
||||
return false, false, false
|
||||
}
|
||||
sub := &e.Subsets[0]
|
||||
if len(sub.Ports) != len(ports) {
|
||||
return false, false
|
||||
}
|
||||
for _, port := range ports {
|
||||
contains := false
|
||||
for _, subPort := range sub.Ports {
|
||||
if port == subPort {
|
||||
contains = true
|
||||
}
|
||||
portsCorrect = true
|
||||
if reconcilePorts {
|
||||
if len(sub.Ports) != len(ports) {
|
||||
portsCorrect = false
|
||||
}
|
||||
if !contains {
|
||||
return false, false
|
||||
for i, port := range ports {
|
||||
if len(sub.Ports) <= i || port != sub.Ports[i] {
|
||||
portsCorrect = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, addr := range sub.Addresses {
|
||||
if addr.IP == ip {
|
||||
return true, len(sub.Addresses) == count
|
||||
ipCorrect = len(sub.Addresses) == count
|
||||
break
|
||||
}
|
||||
}
|
||||
return true, false
|
||||
return true, ipCorrect, portsCorrect
|
||||
}
|
||||
|
||||
// * getMasterServiceUpdateIfNeeded sets service attributes for the
|
||||
// given apiserver service.
|
||||
// * getMasterServiceUpdateIfNeeded expects that the service object it
|
||||
// manages will be managed only by getMasterServiceUpdateIfNeeded;
|
||||
// therefore, to understand this, you need only understand the
|
||||
// requirements and the body of this function.
|
||||
// * getMasterServiceUpdateIfNeeded ensures that the correct ports are
|
||||
// are set.
|
||||
//
|
||||
// Requirements:
|
||||
// * All apiservers MUST use getMasterServiceUpdateIfNeeded and only
|
||||
// getMasterServiceUpdateIfNeeded to manage service attributes
|
||||
// * updateMasterService is called periodically from all apiservers.
|
||||
func getMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) {
|
||||
// Determine if the service is in the format we expect
|
||||
// (servicePorts are present and service type matches)
|
||||
formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
|
||||
if formatCorrect {
|
||||
return svc, false
|
||||
}
|
||||
svc.Spec.Ports = servicePorts
|
||||
svc.Spec.Type = serviceType
|
||||
return svc, true
|
||||
}
|
||||
|
||||
// Determine if the service is in the correct format
|
||||
// getMasterServiceUpdateIfNeeded expects (servicePorts are correct
|
||||
// and service type matches).
|
||||
func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) {
|
||||
if s.Spec.Type != serviceType {
|
||||
return false
|
||||
}
|
||||
if len(ports) != len(s.Spec.Ports) {
|
||||
return false
|
||||
}
|
||||
for i, port := range ports {
|
||||
if port != s.Spec.Ports[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
Reference in New Issue
Block a user