Move the logic for reconciling the host targets of external load balancers

from the node controller to the service controller before impending changes
to the node controller make it not fit there anymore.
This commit is contained in:
Alex Robinson
2015-04-22 20:54:44 +00:00
parent f7831dcd93
commit 6ae8e40d3d
6 changed files with 215 additions and 150 deletions

View File

@@ -87,8 +87,6 @@ type NodeController struct {
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
clusterName string
// Should external services be reconciled during syncing cloud nodes, even though the nodes were not changed.
reconcileServices bool
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
now func() util.Time
@@ -221,62 +219,6 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
}
}
// reconcileExternalServices updates balancers for external services, so that they will match the nodes given.
// Returns true if something went wrong and we should call reconcile again.
func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) {
balancer, ok := nc.cloud.TCPLoadBalancer()
if !ok {
glog.Error("The cloud provider does not support external TCP load balancers.")
return false
}
zones, ok := nc.cloud.Zones()
if !ok {
glog.Error("The cloud provider does not support zone enumeration.")
return false
}
zone, err := zones.GetZone()
if err != nil {
glog.Errorf("Error while getting zone: %v", err)
return false
}
hosts := []string{}
for _, node := range nodes.Items {
hosts = append(hosts, node.Name)
}
services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services: %v", err)
return true
}
shouldRetry = false
for _, service := range services.Items {
if service.Spec.CreateExternalLoadBalancer {
nonTCPPort := false
for i := range service.Spec.Ports {
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
nonTCPPort = true
break
}
}
if nonTCPPort {
// TODO: Support UDP here.
glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service)
continue
}
name := cloudprovider.GetLoadBalancerName(&service)
err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts)
if err != nil {
glog.Errorf("External error while updating TCP load balancer: %v.", err)
shouldRetry = true
}
}
}
return shouldRetry
}
// syncCloudNodes synchronizes the list of instances from cloudprovider to master server.
func (nc *NodeController) syncCloudNodes() error {
matches, err := nc.getCloudNodesWithSpec()
@@ -295,7 +237,6 @@ func (nc *NodeController) syncCloudNodes() error {
// Create nodes which have been created in cloud, but not in kubernetes cluster
// Skip nodes if we hit an error while trying to get their addresses.
nodesChanged := false
for _, node := range matches.Items {
if _, ok := nodeMap[node.Name]; !ok {
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
@@ -313,7 +254,6 @@ func (nc *NodeController) syncCloudNodes() error {
if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err)
}
nodesChanged = true
}
delete(nodeMap, node.Name)
}
@@ -326,15 +266,6 @@ func (nc *NodeController) syncCloudNodes() error {
glog.Errorf("Delete node %s error: %v", nodeID, err)
}
nc.deletePods(nodeID)
nodesChanged = true
}
// Make external services aware of nodes currently present in the cluster.
if nodesChanged || nc.reconcileServices {
nc.reconcileServices = nc.reconcileExternalServices(matches)
if nc.reconcileServices {
glog.Error("Reconcilation of external services failed and will be retried during the next sync.")
}
}
return nil