Merge pull request #31321 from anguslees/lb-nodes
Automatic merge from submit-queue (batch tested with PRs 37328, 38102, 37261, 31321, 38146) Pass full Node objects to provider LoadBalancer methods
This commit is contained in:
commit
cffaf1b71b
@ -85,13 +85,15 @@ type LoadBalancer interface {
|
||||
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
|
||||
GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
|
||||
// Implementations must treat the *v1.Service and *v1.Node
|
||||
// parameters as read-only and not modify them.
|
||||
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
|
||||
EnsureLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error)
|
||||
EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
|
||||
// Implementations must treat the *v1.Service and *v1.Node
|
||||
// parameters as read-only and not modify them.
|
||||
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
|
||||
UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error
|
||||
UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error
|
||||
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
|
||||
// exists, returning nil if the load balancer specified either didn't exist or
|
||||
// was successfully deleted.
|
||||
|
@ -2498,11 +2498,19 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
func nodeNames(nodes []*v1.Node) sets.String {
|
||||
ret := sets.String{}
|
||||
for _, node := range nodes {
|
||||
ret.Insert(node.Name)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
|
||||
func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, hosts []string) (*v1.LoadBalancerStatus, error) {
|
||||
func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
annotations := apiService.Annotations
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v, %v)",
|
||||
clusterName, apiService.Namespace, apiService.Name, c.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
|
||||
clusterName, apiService.Namespace, apiService.Name, c.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, annotations)
|
||||
|
||||
if apiService.Spec.SessionAffinity != v1.ServiceAffinityNone {
|
||||
// ELB supports sticky sessions, but only when configured for HTTP/HTTPS
|
||||
@ -2535,8 +2543,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, h
|
||||
return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
|
||||
}
|
||||
|
||||
hostSet := sets.NewString(hosts...)
|
||||
instances, err := c.getInstancesByNodeNamesCached(hostSet)
|
||||
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -3085,9 +3092,8 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
|
||||
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hosts []string) error {
|
||||
hostSet := sets.NewString(hosts...)
|
||||
instances, err := c.getInstancesByNodeNamesCached(hostSet)
|
||||
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1200,7 +1200,7 @@ func TestDescribeLoadBalancerOnUpdate(t *testing.T) {
|
||||
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
|
||||
awsServices.elb.expectDescribeLoadBalancers("aid")
|
||||
|
||||
c.UpdateLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: v1.ObjectMeta{Name: "myservice", UID: "id"}}, []string{})
|
||||
c.UpdateLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: v1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
|
||||
}
|
||||
|
||||
func TestDescribeLoadBalancerOnGet(t *testing.T) {
|
||||
@ -1216,7 +1216,7 @@ func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
|
||||
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
|
||||
awsServices.elb.expectDescribeLoadBalancers("aid")
|
||||
|
||||
c.EnsureLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: v1.ObjectMeta{Name: "myservice", UID: "id"}}, []string{})
|
||||
c.EnsureLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: v1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
|
||||
}
|
||||
|
||||
func TestBuildListener(t *testing.T) {
|
||||
|
@ -62,7 +62,7 @@ func (az *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (statu
|
||||
}
|
||||
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error) {
|
||||
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
lbName := getLoadBalancerName(clusterName)
|
||||
pipName := getPublicIPName(clusterName, service)
|
||||
serviceName := getServiceName(service)
|
||||
@ -101,7 +101,7 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
||||
}
|
||||
}
|
||||
|
||||
lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, pip, clusterName, service, nodeNames)
|
||||
lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, pip, clusterName, service, nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -116,9 +116,9 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
||||
// Add the machines to the backend pool if they're not already
|
||||
lbBackendName := getBackendPoolName(clusterName)
|
||||
lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendName)
|
||||
hostUpdates := make([]func() error, len(nodeNames))
|
||||
for i, nodeName := range nodeNames {
|
||||
localNodeName := nodeName
|
||||
hostUpdates := make([]func() error, len(nodes))
|
||||
for i, node := range nodes {
|
||||
localNodeName := node.Name
|
||||
f := func() error {
|
||||
err := az.ensureHostInPool(serviceName, types.NodeName(localNodeName), lbBackendPoolID)
|
||||
if err != nil {
|
||||
@ -141,8 +141,8 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
func (az *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error {
|
||||
_, err := az.EnsureLoadBalancer(clusterName, service, nodeNames)
|
||||
func (az *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
_, err := az.EnsureLoadBalancer(clusterName, service, nodes)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -167,7 +167,7 @@ func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servi
|
||||
return err
|
||||
}
|
||||
if existsLb {
|
||||
lb, lbNeedsUpdate, reconcileErr := az.reconcileLoadBalancer(lb, nil, clusterName, service, []string{})
|
||||
lb, lbNeedsUpdate, reconcileErr := az.reconcileLoadBalancer(lb, nil, clusterName, service, []*v1.Node{})
|
||||
if reconcileErr != nil {
|
||||
return reconcileErr
|
||||
}
|
||||
@ -259,7 +259,7 @@ func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error {
|
||||
// This ensures load balancer exists and the frontend ip config is setup.
|
||||
// This also reconciles the Service's Ports with the LoadBalancer config.
|
||||
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
|
||||
func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, pip *network.PublicIPAddress, clusterName string, service *v1.Service, nodeNames []string) (network.LoadBalancer, bool, error) {
|
||||
func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, pip *network.PublicIPAddress, clusterName string, service *v1.Service, nodes []*v1.Node) (network.LoadBalancer, bool, error) {
|
||||
lbName := getLoadBalancerName(clusterName)
|
||||
serviceName := getServiceName(service)
|
||||
lbFrontendIPConfigName := getFrontendIPConfigName(service)
|
||||
|
@ -38,9 +38,9 @@ func TestReconcileLoadBalancerAddPort(t *testing.T) {
|
||||
svc := getTestService("servicea", 80)
|
||||
pip := getTestPublicIP()
|
||||
lb := getTestLoadBalancer()
|
||||
hosts := []string{}
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, hosts)
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
@ -67,9 +67,9 @@ func TestReconcileLoadBalancerNodeHealth(t *testing.T) {
|
||||
pip := getTestPublicIP()
|
||||
lb := getTestLoadBalancer()
|
||||
|
||||
hosts := []string{}
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, hosts)
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
@ -92,15 +92,15 @@ func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T)
|
||||
svc := getTestService("servicea", 80)
|
||||
lb := getTestLoadBalancer()
|
||||
pip := getTestPublicIP()
|
||||
hosts := []string{}
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, hosts)
|
||||
lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
|
||||
svcUpdated := getTestService("servicea")
|
||||
lb, updated, err = az.reconcileLoadBalancer(lb, nil, testClusterName, &svcUpdated, hosts)
|
||||
lb, updated, err = az.reconcileLoadBalancer(lb, nil, testClusterName, &svcUpdated, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
@ -122,12 +122,12 @@ func TestReconcileLoadBalancerRemovesPort(t *testing.T) {
|
||||
az := getTestCloud()
|
||||
svc := getTestService("servicea", 80, 443)
|
||||
pip := getTestPublicIP()
|
||||
hosts := []string{}
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
existingLoadBalancer := getTestLoadBalancer(svc)
|
||||
|
||||
svcUpdated := getTestService("servicea", 80)
|
||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svcUpdated, hosts)
|
||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svcUpdated, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
@ -141,16 +141,16 @@ func TestReconcileLoadBalancerMultipleServices(t *testing.T) {
|
||||
svc1 := getTestService("servicea", 80, 443)
|
||||
svc2 := getTestService("serviceb", 80)
|
||||
pip := getTestPublicIP()
|
||||
hosts := []string{}
|
||||
nodes := []*v1.Node{}
|
||||
|
||||
existingLoadBalancer := getTestLoadBalancer()
|
||||
|
||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svc1, hosts)
|
||||
updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svc1, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
|
||||
updatedLoadBalancer, _, err = az.reconcileLoadBalancer(updatedLoadBalancer, &pip, testClusterName, &svc2, hosts)
|
||||
updatedLoadBalancer, _, err = az.reconcileLoadBalancer(updatedLoadBalancer, &pip, testClusterName, &svc2, nodes)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %q", err)
|
||||
}
|
||||
|
@ -63,8 +63,8 @@ func (cs *CSCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1
|
||||
}
|
||||
|
||||
// EnsureLoadBalancer creates a new load balancer, or updates the existing one. Returns the status of the balancer.
|
||||
func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, hosts []string) (status *v1.LoadBalancerStatus, err error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, service.Spec.LoadBalancerIP, service.Spec.Ports, hosts)
|
||||
func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (status *v1.LoadBalancerStatus, err error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, service.Spec.LoadBalancerIP, service.Spec.Ports, nodes)
|
||||
|
||||
if len(service.Spec.Ports) == 0 {
|
||||
return nil, fmt.Errorf("requested load balancer with no ports")
|
||||
@ -87,7 +87,7 @@ func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, h
|
||||
}
|
||||
|
||||
// Verify that all the hosts belong to the same network, and retrieve their ID's.
|
||||
lb.hostIDs, lb.networkID, err = cs.verifyHosts(hosts)
|
||||
lb.hostIDs, lb.networkID, err = cs.verifyHosts(nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -165,8 +165,8 @@ func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, h
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hosts []string) error {
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v)", clusterName, service.Namespace, service.Name, hosts)
|
||||
func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v)", clusterName, service.Namespace, service.Name, nodes)
|
||||
|
||||
// Get the load balancer details and existing rules.
|
||||
lb, err := cs.getLoadBalancer(service)
|
||||
@ -175,7 +175,7 @@ func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, h
|
||||
}
|
||||
|
||||
// Verify that all the hosts belong to the same network, and retrieve their ID's.
|
||||
lb.hostIDs, _, err = cs.verifyHosts(hosts)
|
||||
lb.hostIDs, _, err = cs.verifyHosts(nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -276,10 +276,10 @@ func (cs *CSCloud) getLoadBalancer(service *v1.Service) (*loadBalancer, error) {
|
||||
}
|
||||
|
||||
// verifyHosts verifies if all hosts belong to the same network, and returns the host ID's and network ID.
|
||||
func (cs *CSCloud) verifyHosts(hosts []string) ([]string, string, error) {
|
||||
func (cs *CSCloud) verifyHosts(nodes []*v1.Node) ([]string, string, error) {
|
||||
hostNames := map[string]bool{}
|
||||
for _, host := range hosts {
|
||||
hostNames[host] = true
|
||||
for _, node := range nodes {
|
||||
hostNames[node.Name] = true
|
||||
}
|
||||
|
||||
p := cs.client.VirtualMachine.NewListVirtualMachinesParams()
|
||||
|
@ -36,12 +36,12 @@ type FakeBalancer struct {
|
||||
Region string
|
||||
LoadBalancerIP string
|
||||
Ports []v1.ServicePort
|
||||
Hosts []string
|
||||
Hosts []*v1.Node
|
||||
}
|
||||
|
||||
type FakeUpdateBalancerCall struct {
|
||||
Service *v1.Service
|
||||
Hosts []string
|
||||
Hosts []*v1.Node
|
||||
}
|
||||
|
||||
// FakeCloud is a test-double implementation of Interface, LoadBalancer, Instances, and Routes. It is useful for testing.
|
||||
@ -131,7 +131,7 @@ func (f *FakeCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v
|
||||
|
||||
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, hosts []string) (*v1.LoadBalancerStatus, error) {
|
||||
func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
if f.Balancers == nil {
|
||||
f.Balancers = make(map[string]FakeBalancer)
|
||||
@ -146,7 +146,7 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
|
||||
}
|
||||
region := zone.Region
|
||||
|
||||
f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, hosts}
|
||||
f.Balancers[name] = FakeBalancer{name, region, spec.LoadBalancerIP, spec.Ports, nodes}
|
||||
|
||||
status := &v1.LoadBalancerStatus{}
|
||||
status.Ingress = []v1.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
|
||||
@ -156,9 +156,9 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
|
||||
|
||||
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
|
||||
// It adds an entry "update" into the internal method call record.
|
||||
func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hosts []string) error {
|
||||
func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
f.addCall("update")
|
||||
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts})
|
||||
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, nodes})
|
||||
return f.Err
|
||||
}
|
||||
|
||||
|
@ -547,6 +547,14 @@ func isHTTPErrorCode(err error, code int) bool {
|
||||
return ok && apiErr.Code == code
|
||||
}
|
||||
|
||||
func nodeNames(nodes []*v1.Node) []string {
|
||||
ret := make([]string, len(nodes))
|
||||
for i, node := range nodes {
|
||||
ret[i] = node.Name
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// Our load balancers in GCE consist of four separate GCE resources - a static
|
||||
// IP address, a firewall rule, a target pool, and a forwarding rule. This
|
||||
@ -554,11 +562,12 @@ func isHTTPErrorCode(err error, code int) bool {
|
||||
// Due to an interesting series of design decisions, this handles both creating
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, hostNames []string) (*v1.LoadBalancerStatus, error) {
|
||||
if len(hostNames) == 0 {
|
||||
func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
if len(nodes) == 0 {
|
||||
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
|
||||
}
|
||||
|
||||
hostNames := nodeNames(nodes)
|
||||
hosts, err := gce.getInstancesByNames(hostNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -1331,8 +1340,8 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
|
||||
}
|
||||
|
||||
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
|
||||
func (gce *GCECloud) UpdateLoadBalancer(clusterName string, service *v1.Service, hostNames []string) error {
|
||||
hosts, err := gce.getInstancesByNames(hostNames)
|
||||
func (gce *GCECloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
hosts, err := gce.getInstancesByNames(nodeNames(nodes))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -42,7 +42,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/api/v1/service"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
|
||||
@ -565,13 +564,32 @@ func (lbaas *LbaasV2) GetLoadBalancer(clusterName string, service *v1.Service) (
|
||||
return status, true, err
|
||||
}
|
||||
|
||||
// The LB needs to be configured with instance addresses on the same
|
||||
// subnet as the LB (aka opts.SubnetId). Currently we're just
|
||||
// guessing that the node's InternalIP is the right address - and that
|
||||
// should be sufficient for all "normal" cases.
|
||||
func nodeAddressForLB(node *v1.Node) (string, error) {
|
||||
addrs := node.Status.Addresses
|
||||
if len(addrs) == 0 {
|
||||
return "", ErrNoAddressFound
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if addr.Type == v1.NodeInternalIP {
|
||||
return addr.Address, nil
|
||||
}
|
||||
}
|
||||
|
||||
return addrs[0].Address, nil
|
||||
}
|
||||
|
||||
// TODO: This code currently ignores 'region' and always creates a
|
||||
// loadbalancer in only the current OpenStack region. We should take
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodeNames, apiService.Annotations)
|
||||
func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
|
||||
|
||||
ports := apiService.Spec.Ports
|
||||
if len(ports) == 0 {
|
||||
@ -681,15 +699,15 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
|
||||
if err != nil && !isNotFound(err) {
|
||||
return nil, fmt.Errorf("Error getting pool members %s: %v", pool.ID, err)
|
||||
}
|
||||
for _, nodeName := range nodeNames {
|
||||
addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName))
|
||||
for _, node := range nodes {
|
||||
addr, err := nodeAddressForLB(node)
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
// Node failure, do not create member
|
||||
glog.Warningf("Failed to create LB pool member for node %s: %v", nodeName, err)
|
||||
glog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
|
||||
continue
|
||||
} else {
|
||||
return nil, fmt.Errorf("Error getting address for node %s: %v", nodeName, err)
|
||||
return nil, fmt.Errorf("Error getting address for node %s: %v", node.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -701,7 +719,7 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
|
||||
SubnetID: lbaas.opts.SubnetId,
|
||||
}).Extract()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error creating LB pool member for node: %s, %v", nodeName, err)
|
||||
return nil, fmt.Errorf("Error creating LB pool member for node: %s, %v", node.Name, err)
|
||||
}
|
||||
|
||||
waitLoadbalancerActiveProvisioningStatus(lbaas.network, loadbalancer.ID)
|
||||
@ -710,7 +728,7 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
|
||||
members = popMember(members, addr, int(port.NodePort))
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, nodeName, addr)
|
||||
glog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
|
||||
}
|
||||
|
||||
// Delete obsolete members for this pool
|
||||
@ -939,9 +957,9 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error {
|
||||
func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodeNames)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
|
||||
|
||||
ports := service.Spec.Ports
|
||||
if len(ports) == 0 {
|
||||
@ -1012,8 +1030,8 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service
|
||||
|
||||
// Compose Set of member (addresses) that _should_ exist
|
||||
addrs := map[string]empty{}
|
||||
for _, nodeName := range nodeNames {
|
||||
addr, err := getAddressByName(lbaas.compute, types.NodeName(nodeName))
|
||||
for _, node := range nodes {
|
||||
addr, err := nodeAddressForLB(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1277,8 +1295,8 @@ func (lb *LbaasV1) GetLoadBalancer(clusterName string, service *v1.Service) (*v1
|
||||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodeNames []string) (*v1.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodeNames, apiService.Annotations)
|
||||
func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
|
||||
|
||||
ports := apiService.Spec.Ports
|
||||
if len(ports) > 1 {
|
||||
@ -1343,8 +1361,8 @@ func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *v1.Service
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, nodeName := range nodeNames {
|
||||
addr, err := getAddressByName(lb.compute, types.NodeName(nodeName))
|
||||
for _, node := range nodes {
|
||||
addr, err := nodeAddressForLB(node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1426,9 +1444,9 @@ func (lb *LbaasV1) EnsureLoadBalancer(clusterName string, apiService *v1.Service
|
||||
|
||||
}
|
||||
|
||||
func (lb *LbaasV1) UpdateLoadBalancer(clusterName string, service *v1.Service, nodeNames []string) error {
|
||||
func (lb *LbaasV1) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
||||
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodeNames)
|
||||
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
|
||||
|
||||
vip, err := getVipByName(lb.network, loadBalancerName)
|
||||
if err != nil {
|
||||
@ -1437,8 +1455,8 @@ func (lb *LbaasV1) UpdateLoadBalancer(clusterName string, service *v1.Service, n
|
||||
|
||||
// Set of member (addresses) that _should_ exist
|
||||
addrs := map[string]bool{}
|
||||
for _, nodeName := range nodeNames {
|
||||
addr, err := getAddressByName(lb.compute, types.NodeName(nodeName))
|
||||
for _, node := range nodes {
|
||||
addr, err := nodeAddressForLB(node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ type serviceCache struct {
|
||||
|
||||
type ServiceController struct {
|
||||
cloud cloudprovider.Interface
|
||||
knownHosts []string
|
||||
knownHosts []*v1.Node
|
||||
servicesToUpdate []*v1.Service
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
@ -108,7 +108,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
|
||||
s := &ServiceController{
|
||||
cloud: cloud,
|
||||
knownHosts: []string{},
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
@ -344,10 +344,17 @@ func (s *ServiceController) createLoadBalancer(service *v1.Service) error {
|
||||
return err
|
||||
}
|
||||
|
||||
lbNodes := []*v1.Node{}
|
||||
for ix := range nodes.Items {
|
||||
if includeNodeFromNodeList(&nodes.Items[ix]) {
|
||||
lbNodes = append(lbNodes, &nodes.Items[ix])
|
||||
}
|
||||
}
|
||||
|
||||
// - Only one protocol supported per service
|
||||
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||
// an error for unsupported protocols
|
||||
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, hostsFromNodeList(&nodes))
|
||||
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
@ -540,6 +547,21 @@ func portEqualForLB(x, y *v1.ServicePort) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func nodeNames(nodes []*v1.Node) []string {
|
||||
ret := make([]string, len(nodes))
|
||||
for i, node := range nodes {
|
||||
ret[i] = node.Name
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
}
|
||||
return stringSlicesEqual(nodeNames(x), nodeNames(y))
|
||||
}
|
||||
|
||||
func intSlicesEqual(x, y []int) bool {
|
||||
if len(x) != len(y) {
|
||||
return false
|
||||
@ -580,26 +602,6 @@ func includeNodeFromNodeList(node *v1.Node) bool {
|
||||
return !node.Spec.Unschedulable
|
||||
}
|
||||
|
||||
func hostsFromNodeList(list *v1.NodeList) []string {
|
||||
result := []string{}
|
||||
for ix := range list.Items {
|
||||
if includeNodeFromNodeList(&list.Items[ix]) {
|
||||
result = append(result, list.Items[ix].Name)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func hostsFromNodeSlice(nodes []*v1.Node) []string {
|
||||
result := []string{}
|
||||
for _, node := range nodes {
|
||||
if includeNodeFromNodeList(node) {
|
||||
result = append(result, node.Name)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
return func(node *v1.Node) bool {
|
||||
// We add the master to the node list, but its unschedulable. So we use this to filter
|
||||
@ -627,19 +629,20 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
|
||||
// nodeSyncLoop handles updating the hosts pointed to by all load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
func (s *ServiceController) nodeSyncLoop() {
|
||||
nodes, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
|
||||
newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||
return
|
||||
}
|
||||
newHosts := hostsFromNodeSlice(nodes)
|
||||
if stringSlicesEqual(newHosts, s.knownHosts) {
|
||||
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||
return
|
||||
}
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", newHosts)
|
||||
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
|
||||
nodeNames(newHosts))
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
@ -655,7 +658,7 @@ func (s *ServiceController) nodeSyncLoop() {
|
||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||
// they will match the list of hosts provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []string) (servicesToRetry []*v1.Service) {
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
||||
for _, service := range services {
|
||||
func() {
|
||||
if service == nil {
|
||||
@ -672,7 +675,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host
|
||||
|
||||
// Updates the load balancer of a service, assuming we hold the mutex
|
||||
// associated with the service.
|
||||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []string) error {
|
||||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
||||
if !wantsLoadBalancer(service) {
|
||||
return nil
|
||||
}
|
||||
@ -691,7 +694,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
|
||||
return nil
|
||||
}
|
||||
|
||||
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
|
||||
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,11 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
|
||||
|
||||
// TODO: Finish converting and update comments
|
||||
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
hosts := []string{"node0", "node1", "node73"}
|
||||
nodes := []*v1.Node{
|
||||
{ObjectMeta: v1.ObjectMeta{Name: "node0"}},
|
||||
{ObjectMeta: v1.ObjectMeta{Name: "node1"}},
|
||||
{ObjectMeta: v1.ObjectMeta{Name: "node73"}},
|
||||
}
|
||||
table := []struct {
|
||||
services []*v1.Service
|
||||
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
|
||||
@ -169,7 +173,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
newService("s0", "333", v1.ServiceTypeLoadBalancer),
|
||||
},
|
||||
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
|
||||
{newService("s0", "333", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s0", "333", v1.ServiceTypeLoadBalancer), nodes},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -180,9 +184,9 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
newService("s2", "666", v1.ServiceTypeLoadBalancer),
|
||||
},
|
||||
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
|
||||
{newService("s0", "444", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s1", "555", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s2", "666", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s0", "444", v1.ServiceTypeLoadBalancer), nodes},
|
||||
{newService("s1", "555", v1.ServiceTypeLoadBalancer), nodes},
|
||||
{newService("s2", "666", v1.ServiceTypeLoadBalancer), nodes},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -194,8 +198,8 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
newService("s4", "123", v1.ServiceTypeClusterIP),
|
||||
},
|
||||
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
|
||||
{newService("s1", "888", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s3", "999", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s1", "888", v1.ServiceTypeLoadBalancer), nodes},
|
||||
{newService("s3", "999", v1.ServiceTypeLoadBalancer), nodes},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -205,7 +209,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
nil,
|
||||
},
|
||||
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
|
||||
{newService("s0", "234", v1.ServiceTypeLoadBalancer), hosts},
|
||||
{newService("s0", "234", v1.ServiceTypeLoadBalancer), nodes},
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -222,7 +226,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
for _, service := range item.services {
|
||||
services = append(services, service)
|
||||
}
|
||||
if err := controller.updateLoadBalancerHosts(services, hosts); err != nil {
|
||||
if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
|
||||
@ -231,60 +235,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHostsFromNodeList(t *testing.T) {
|
||||
tests := []struct {
|
||||
nodes *v1.NodeList
|
||||
expectedHosts []string
|
||||
}{
|
||||
{
|
||||
nodes: &v1.NodeList{},
|
||||
expectedHosts: []string{},
|
||||
},
|
||||
{
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{Name: "foo"},
|
||||
Status: v1.NodeStatus{Phase: v1.NodeRunning},
|
||||
},
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{Name: "bar"},
|
||||
Status: v1.NodeStatus{Phase: v1.NodeRunning},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedHosts: []string{"foo", "bar"},
|
||||
},
|
||||
{
|
||||
nodes: &v1.NodeList{
|
||||
Items: []v1.Node{
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{Name: "foo"},
|
||||
Status: v1.NodeStatus{Phase: v1.NodeRunning},
|
||||
},
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{Name: "bar"},
|
||||
Status: v1.NodeStatus{Phase: v1.NodeRunning},
|
||||
},
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{Name: "unschedulable"},
|
||||
Spec: v1.NodeSpec{Unschedulable: true},
|
||||
Status: v1.NodeStatus{Phase: v1.NodeRunning},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedHosts: []string{"foo", "bar"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
hosts := hostsFromNodeList(test.nodes)
|
||||
if !reflect.DeepEqual(hosts, test.expectedHosts) {
|
||||
t.Errorf("expected: %v, saw: %v", test.expectedHosts, hosts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetNodeConditionPredicate(t *testing.T) {
|
||||
tests := []struct {
|
||||
node v1.Node
|
||||
|
Loading…
Reference in New Issue
Block a user