Fixes service controller update race condition
This commit is contained in:
@@ -89,7 +89,6 @@ type serviceCache struct {
|
||||
type ServiceController struct {
|
||||
cloud cloudprovider.Interface
|
||||
knownHosts []*v1.Node
|
||||
servicesToUpdate []*v1.Service
|
||||
kubeClient clientset.Interface
|
||||
clusterName string
|
||||
balancer cloudprovider.LoadBalancer
|
||||
@@ -241,6 +240,20 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cachedService.state != nil {
|
||||
if !s.needsUpdate(cachedService.state, service) {
|
||||
// The service does not require an update which means it was placed on the work queue
|
||||
// by the node sync loop and indicates that the hosts need to be updated.
|
||||
err := s.updateLoadBalancerHosts(service)
|
||||
if err != nil {
|
||||
return err, cachedService.nextRetryDelay()
|
||||
}
|
||||
cachedService.resetRetryDelay()
|
||||
return nil, doNotRetry
|
||||
}
|
||||
}
|
||||
|
||||
// cache the service, we need the info for service deletion
|
||||
cachedService.state = service
|
||||
err, retry := s.createLoadBalancerIfNeeded(key, service)
|
||||
@@ -435,6 +448,8 @@ func (s *serviceCache) delete(serviceName string) {
|
||||
delete(s.serviceMap, serviceName)
|
||||
}
|
||||
|
||||
// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update.
|
||||
// This method does not and should not check if the hosts have changed.
|
||||
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
|
||||
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
|
||||
return false
|
||||
@@ -637,62 +652,45 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
|
||||
}
|
||||
}
|
||||
|
||||
// nodeSyncLoop handles updating the hosts pointed to by all load
|
||||
// balancers whenever the set of nodes in the cluster changes.
|
||||
// nodeSyncLoop handles adding all existing cached services to the work queue
|
||||
// to be reprocessed so that they can have their hosts updated, if any
|
||||
// host changes have occurred since the last sync loop.
|
||||
func (s *ServiceController) nodeSyncLoop() {
|
||||
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
|
||||
// The set of nodes in the cluster hasn't changed, but we can retry
|
||||
// updating any services that we failed to update last time around.
|
||||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||
// Nothing to do since the hosts have not changed.
|
||||
return
|
||||
}
|
||||
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
|
||||
nodeNames(newHosts))
|
||||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts))
|
||||
|
||||
// Try updating all services, and save the ones that fail to try again next
|
||||
// round.
|
||||
s.servicesToUpdate = s.cache.allServices()
|
||||
numServices := len(s.servicesToUpdate)
|
||||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
|
||||
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
|
||||
numServices-len(s.servicesToUpdate), numServices)
|
||||
for _, svc := range s.cache.allServices() {
|
||||
s.enqueueService(svc)
|
||||
}
|
||||
|
||||
// Update the known hosts so we can check next sync loop for changes.
|
||||
s.knownHosts = newHosts
|
||||
}
|
||||
|
||||
// updateLoadBalancerHosts updates all existing load balancers so that
|
||||
// they will match the list of hosts provided.
|
||||
// Returns the list of services that couldn't be updated.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
|
||||
for _, service := range services {
|
||||
func() {
|
||||
if service == nil {
|
||||
return
|
||||
}
|
||||
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
|
||||
glog.Errorf("External error while updating load balancer: %v.", err)
|
||||
servicesToRetry = append(servicesToRetry, service)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return servicesToRetry
|
||||
}
|
||||
|
||||
// Updates the load balancer of a service, assuming we hold the mutex
|
||||
// associated with the service.
|
||||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
|
||||
// Updates the load balancer of the service with updated nodes ONLY.
|
||||
// This method will not trigger the cloud provider to create or full update a load balancer.
|
||||
func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error {
|
||||
if !wantsLoadBalancer(service) {
|
||||
return nil
|
||||
}
|
||||
|
||||
hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
|
||||
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
|
||||
err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
|
||||
if err == nil {
|
||||
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
|
||||
if len(hosts) == 0 {
|
||||
|
Reference in New Issue
Block a user