Refactor service controller to common controller pattern

This commit is contained in:
Josh Horwitz
2017-11-25 13:44:39 -05:00
parent f302487942
commit ffba27d72e
2 changed files with 73 additions and 147 deletions

View File

@@ -60,11 +60,6 @@ const (
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
doNotRetry = time.Duration(0)
// LabelNodeRoleMaster specifies that a node is a master
// It's copied over to kubeadm until it's merged in core: https://github.com/kubernetes/kubernetes/pull/39112
LabelNodeRoleMaster = "node-role.kubernetes.io/master"
@@ -77,8 +72,6 @@ const (
type cachedService struct {
// The cached state of the service
state *v1.Service
// Controls error back-off
lastRetryDelay time.Duration
}
type serviceCache struct {
@@ -86,6 +79,8 @@ type serviceCache struct {
serviceMap map[string]*cachedService
}
// ServiceController keeps cloud provider service resources
// (like load balancers) in sync with the registry.
type ServiceController struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
@@ -101,7 +96,7 @@ type ServiceController struct {
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
// services that need to be synced
workingQueue workqueue.DelayingInterface
queue workqueue.RateLimitingInterface
}
// New returns a new service controller to keep cloud provider service resources
@@ -134,7 +129,7 @@ func New(
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
workingQueue: workqueue.NewNamedDelayingQueue("service"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
}
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@@ -167,7 +162,7 @@ func (s *ServiceController) enqueueService(obj interface{}) {
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
return
}
s.workingQueue.Add(key)
s.queue.Add(key)
}
// Run starts a background goroutine that watches for changes to services that
@@ -182,7 +177,7 @@ func (s *ServiceController) enqueueService(obj interface{}) {
// object.
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
defer runtime.HandleCrash()
defer s.workingQueue.ShutDown()
defer s.queue.ShutDown()
glog.Info("Starting service controller")
defer glog.Info("Shutting down service controller")
@@ -203,21 +198,28 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *ServiceController) worker() {
for {
func() {
key, quit := s.workingQueue.Get()
if quit {
return
}
defer s.workingQueue.Done(key)
err := s.syncService(key.(string))
if err != nil {
glog.Errorf("Error syncing service: %v", err)
}
}()
for s.processNextWorkItem() {
}
}
func (s *ServiceController) processNextWorkItem() bool {
key, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(key)
err := s.syncService(key.(string))
if err == nil {
s.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
s.queue.AddRateLimited(key)
return true
}
func (s *ServiceController) init() error {
if s.cloud == nil {
return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
@@ -235,31 +237,21 @@ func (s *ServiceController) init() error {
// Returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
if cachedService.state != nil {
if cachedService.state.UID != service.UID {
err, retry := s.processLoadBalancerDelete(cachedService, key)
err := s.processLoadBalancerDelete(cachedService, key)
if err != nil {
return err, retry
return err
}
}
}
// cache the service, we need the info for service deletion
cachedService.state = service
err, retry := s.createLoadBalancerIfNeeded(key, service)
err := s.createLoadBalancerIfNeeded(key, service)
if err != nil {
message := "Error creating load balancer"
var retryToReturn time.Duration
if retry {
message += " (will retry): "
retryToReturn = cachedService.nextRetryDelay()
} else {
message += " (will not retry): "
retryToReturn = doNotRetry
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", message)
return err, retryToReturn
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", "Error creating load balancer (will retry): %v", err)
return err
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
@@ -267,13 +259,12 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
// been successfully processed.
s.cache.set(key, cachedService)
cachedService.resetRetryDelay()
return nil, doNotRetry
return nil
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
@@ -285,13 +276,13 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
if !wantsLoadBalancer(service) {
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err), retryable
return fmt.Errorf("error getting LB for service %s: %v", key, err)
}
if exists {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
return err, retryable
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
@@ -305,7 +296,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
newState, err = s.ensureLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err), retryable
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
@@ -320,13 +311,14 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
service.Status.LoadBalancer = *newState
if err := s.persistUpdate(service); err != nil {
return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
return nil
}
} else {
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}
return nil, notRetryable
return nil
}
func (s *ServiceController) persistUpdate(service *v1.Service) error {
@@ -721,31 +713,12 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextRetryDelay() time.Duration {
s.lastRetryDelay = s.lastRetryDelay * 2
if s.lastRetryDelay < minRetryDelay {
s.lastRetryDelay = minRetryDelay
}
if s.lastRetryDelay > maxRetryDelay {
s.lastRetryDelay = maxRetryDelay
}
return s.lastRetryDelay
}
// Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
var retryDelay time.Duration
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()
@@ -760,59 +733,44 @@ func (s *ServiceController) syncService(key string) error {
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
glog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
err = s.processServiceDeletion(key)
case err != nil:
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)
return err
default:
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
err = s.processServiceUpdate(cachedService, service, key)
}
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service %v. Retrying in %s: %v", key, retryDelay, err)
go func(obj interface{}, delay time.Duration) {
// put back the service key to working queue, it is possible that more entries of the service
// were added into the queue during the delay, but it does not mess as when handling the retry,
// it always get the last service info from service store
s.workingQueue.AddAfter(obj, delay)
}(key, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("failed to process service %v. Not retrying: %v", key, err))
}
return nil
return err
}
// Returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry after that Duration.
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
func (s *ServiceController) processServiceDeletion(key string) error {
cachedService, ok := s.cache.get(key)
if !ok {
return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key), doNotRetry
glog.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key)
return nil
}
return s.processLoadBalancerDelete(cachedService, key)
}
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) (error, time.Duration) {
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) error {
service := cachedService.state
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil, doNotRetry
return nil
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err)
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
cachedService.resetRetryDelay()
return nil, doNotRetry
return nil
}