Updating federation service controller to support cascading deletion
This commit is contained in:
		| @@ -28,6 +28,9 @@ import ( | |||||||
| 	federationcache "k8s.io/kubernetes/federation/client/cache" | 	federationcache "k8s.io/kubernetes/federation/client/cache" | ||||||
| 	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" | 	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" | ||||||
| 	"k8s.io/kubernetes/federation/pkg/dnsprovider" | 	"k8s.io/kubernetes/federation/pkg/dnsprovider" | ||||||
|  | 	"k8s.io/kubernetes/federation/pkg/federation-controller/util" | ||||||
|  | 	fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" | ||||||
|  | 	"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
| 	"k8s.io/kubernetes/pkg/api/errors" | 	"k8s.io/kubernetes/pkg/api/errors" | ||||||
| 	v1 "k8s.io/kubernetes/pkg/api/v1" | 	v1 "k8s.io/kubernetes/pkg/api/v1" | ||||||
| @@ -36,6 +39,7 @@ import ( | |||||||
| 	kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | 	kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||||
| 	"k8s.io/kubernetes/pkg/client/record" | 	"k8s.io/kubernetes/pkg/client/record" | ||||||
| 	"k8s.io/kubernetes/pkg/controller" | 	"k8s.io/kubernetes/pkg/controller" | ||||||
|  | 	"k8s.io/kubernetes/pkg/conversion" | ||||||
| 	pkgruntime "k8s.io/kubernetes/pkg/runtime" | 	pkgruntime "k8s.io/kubernetes/pkg/runtime" | ||||||
| 	"k8s.io/kubernetes/pkg/util/runtime" | 	"k8s.io/kubernetes/pkg/util/runtime" | ||||||
| 	"k8s.io/kubernetes/pkg/util/sets" | 	"k8s.io/kubernetes/pkg/util/sets" | ||||||
| @@ -68,6 +72,10 @@ const ( | |||||||
| 	KubeAPIBurst  = 30 | 	KubeAPIBurst  = 30 | ||||||
|  |  | ||||||
| 	maxNoOfClusters = 100 | 	maxNoOfClusters = 100 | ||||||
|  |  | ||||||
|  | 	updateTimeout         = 30 * time.Second | ||||||
|  | 	allClustersKey        = "ALL_CLUSTERS" | ||||||
|  | 	clusterAvailableDelay = time.Second * 20 | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type cachedService struct { | type cachedService struct { | ||||||
| @@ -114,6 +122,7 @@ type ServiceController struct { | |||||||
| 	serviceStore cache.StoreToServiceLister | 	serviceStore cache.StoreToServiceLister | ||||||
| 	// Watches changes to all services | 	// Watches changes to all services | ||||||
| 	serviceController *cache.Controller | 	serviceController *cache.Controller | ||||||
|  | 	federatedInformer fedutil.FederatedInformer | ||||||
| 	// A store of services, populated by the serviceController | 	// A store of services, populated by the serviceController | ||||||
| 	clusterStore federationcache.StoreToClusterLister | 	clusterStore federationcache.StoreToClusterLister | ||||||
| 	// Watches changes to all services | 	// Watches changes to all services | ||||||
| @@ -133,6 +142,12 @@ type ServiceController struct { | |||||||
| 	serviceWorkerMap map[string]bool | 	serviceWorkerMap map[string]bool | ||||||
| 	// channel for worker to signal that it is going out of existence | 	// channel for worker to signal that it is going out of existence | ||||||
| 	serviceWorkerDoneChan chan string | 	serviceWorkerDoneChan chan string | ||||||
|  |  | ||||||
|  | 	// For triggering all services reconciliation. This is used when | ||||||
|  | 	// a new cluster becomes available. | ||||||
|  | 	clusterDeliverer *util.DelayingDeliverer | ||||||
|  |  | ||||||
|  | 	deletionHelper *deletionhelper.DeletionHelper | ||||||
| } | } | ||||||
|  |  | ||||||
| // New returns a new service controller to keep DNS provider service resources | // New returns a new service controller to keep DNS provider service resources | ||||||
| @@ -162,6 +177,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, | |||||||
| 		queue:            workqueue.New(), | 		queue:            workqueue.New(), | ||||||
| 		knownClusterSet:  make(sets.String), | 		knownClusterSet:  make(sets.String), | ||||||
| 	} | 	} | ||||||
|  | 	s.clusterDeliverer = util.NewDelayingDeliverer() | ||||||
| 	s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( | 	s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer( | ||||||
| 		&cache.ListWatch{ | 		&cache.ListWatch{ | ||||||
| 			ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { | 			ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { | ||||||
| @@ -224,6 +240,66 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, | |||||||
| 		}, | 		}, | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
|  | 	clusterLifecycle := fedutil.ClusterLifecycleHandlerFuncs{ | ||||||
|  | 		ClusterAvailable: func(cluster *v1beta1.Cluster) { | ||||||
|  | 			s.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay) | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	fedInformerFactory := func(cluster *v1beta1.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) { | ||||||
|  | 		return cache.NewInformer( | ||||||
|  | 			&cache.ListWatch{ | ||||||
|  | 				ListFunc: func(options v1.ListOptions) (pkgruntime.Object, error) { | ||||||
|  | 					return targetClient.Core().Services(v1.NamespaceAll).List(options) | ||||||
|  | 				}, | ||||||
|  | 				WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { | ||||||
|  | 					return targetClient.Core().Services(v1.NamespaceAll).Watch(options) | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			&v1.Service{}, | ||||||
|  | 			controller.NoResyncPeriodFunc(), | ||||||
|  | 			// Trigger reconciliation whenever something in federated cluster is changed. In most cases it | ||||||
|  | 			// would be just confirmation that some service operation succeeded. | ||||||
|  | 			util.NewTriggerOnAllChanges( | ||||||
|  | 				func(obj pkgruntime.Object) { | ||||||
|  | 					// TODO: Use this to enque services. | ||||||
|  | 				}, | ||||||
|  | 			)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s.federatedInformer = fedutil.NewFederatedInformer(federationClient, fedInformerFactory, &clusterLifecycle) | ||||||
|  |  | ||||||
|  | 	federatedUpdater := fedutil.NewFederatedUpdater(s.federatedInformer, | ||||||
|  | 		func(client kubeclientset.Interface, obj pkgruntime.Object) error { | ||||||
|  | 			svc := obj.(*v1.Service) | ||||||
|  | 			_, err := client.Core().Services(svc.Namespace).Create(svc) | ||||||
|  | 			return err | ||||||
|  | 		}, | ||||||
|  | 		func(client kubeclientset.Interface, obj pkgruntime.Object) error { | ||||||
|  | 			svc := obj.(*v1.Service) | ||||||
|  | 			_, err := client.Core().Services(svc.Namespace).Update(svc) | ||||||
|  | 			return err | ||||||
|  | 		}, | ||||||
|  | 		func(client kubeclientset.Interface, obj pkgruntime.Object) error { | ||||||
|  | 			svc := obj.(*v1.Service) | ||||||
|  | 			err := client.Core().Services(svc.Namespace).Delete(svc.Name, &v1.DeleteOptions{}) | ||||||
|  | 			return err | ||||||
|  | 		}) | ||||||
|  |  | ||||||
|  | 	s.deletionHelper = deletionhelper.NewDeletionHelper( | ||||||
|  | 		s.hasFinalizerFunc, | ||||||
|  | 		s.removeFinalizerFunc, | ||||||
|  | 		s.addFinalizerFunc, | ||||||
|  | 		// objNameFunc | ||||||
|  | 		func(obj pkgruntime.Object) string { | ||||||
|  | 			service := obj.(*v1.Service) | ||||||
|  | 			return service.Name | ||||||
|  | 		}, | ||||||
|  | 		updateTimeout, | ||||||
|  | 		s.eventRecorder, | ||||||
|  | 		s.federatedInformer, | ||||||
|  | 		federatedUpdater, | ||||||
|  | 	) | ||||||
|  |  | ||||||
| 	s.endpointWorkerMap = make(map[string]bool) | 	s.endpointWorkerMap = make(map[string]bool) | ||||||
| 	s.serviceWorkerMap = make(map[string]bool) | 	s.serviceWorkerMap = make(map[string]bool) | ||||||
| 	s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) | 	s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters) | ||||||
| @@ -231,6 +307,54 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface, | |||||||
| 	return s | 	return s | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Returns true if the given object has the given finalizer in its ObjectMeta. | ||||||
|  | func (s *ServiceController) hasFinalizerFunc(obj pkgruntime.Object, finalizer string) bool { | ||||||
|  | 	service := obj.(*v1.Service) | ||||||
|  | 	for i := range service.ObjectMeta.Finalizers { | ||||||
|  | 		if string(service.ObjectMeta.Finalizers[i]) == finalizer { | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return false | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Removes the finalizer from the given objects ObjectMeta. | ||||||
|  | // Assumes that the given object is a service. | ||||||
|  | func (s *ServiceController) removeFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) { | ||||||
|  | 	service := obj.(*v1.Service) | ||||||
|  | 	newFinalizers := []string{} | ||||||
|  | 	hasFinalizer := false | ||||||
|  | 	for i := range service.ObjectMeta.Finalizers { | ||||||
|  | 		if string(service.ObjectMeta.Finalizers[i]) != finalizer { | ||||||
|  | 			newFinalizers = append(newFinalizers, service.ObjectMeta.Finalizers[i]) | ||||||
|  | 		} else { | ||||||
|  | 			hasFinalizer = true | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if !hasFinalizer { | ||||||
|  | 		// Nothing to do. | ||||||
|  | 		return obj, nil | ||||||
|  | 	} | ||||||
|  | 	service.ObjectMeta.Finalizers = newFinalizers | ||||||
|  | 	service, err := s.federationClient.Core().Services(service.Namespace).Update(service) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to remove finalizer %s from service %s: %v", finalizer, service.Name, err) | ||||||
|  | 	} | ||||||
|  | 	return service, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Adds the given finalizer to the given objects ObjectMeta. | ||||||
|  | // Assumes that the given object is a service. | ||||||
|  | func (s *ServiceController) addFinalizerFunc(obj pkgruntime.Object, finalizer string) (pkgruntime.Object, error) { | ||||||
|  | 	service := obj.(*v1.Service) | ||||||
|  | 	service.ObjectMeta.Finalizers = append(service.ObjectMeta.Finalizers, finalizer) | ||||||
|  | 	service, err := s.federationClient.Core().Services(service.Namespace).Update(service) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to add finalizer %s to service %s: %v", finalizer, service.Name, err) | ||||||
|  | 	} | ||||||
|  | 	return service, nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. | // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. | ||||||
| func (s *ServiceController) enqueueService(obj interface{}) { | func (s *ServiceController) enqueueService(obj interface{}) { | ||||||
| 	key, err := controller.KeyFunc(obj) | 	key, err := controller.KeyFunc(obj) | ||||||
| @@ -257,6 +381,10 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	defer runtime.HandleCrash() | 	defer runtime.HandleCrash() | ||||||
|  | 	s.federatedInformer.Start() | ||||||
|  | 	s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { | ||||||
|  | 		// TODO: Use this new clusterDeliverer to reconcile services in new clusters. | ||||||
|  | 	}) | ||||||
| 	go s.serviceController.Run(stopCh) | 	go s.serviceController.Run(stopCh) | ||||||
| 	go s.clusterController.Run(stopCh) | 	go s.clusterController.Run(stopCh) | ||||||
| 	for i := 0; i < workers; i++ { | 	for i := 0; i < workers; i++ { | ||||||
| @@ -268,6 +396,7 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error { | |||||||
| 	<-stopCh | 	<-stopCh | ||||||
| 	glog.Infof("Shutting down Federation Service Controller") | 	glog.Infof("Shutting down Federation Service Controller") | ||||||
| 	s.queue.ShutDown() | 	s.queue.ShutDown() | ||||||
|  | 	s.federatedInformer.Stop() | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -361,20 +490,16 @@ func (s *ServiceController) processServiceForCluster(cachedService *cachedServic | |||||||
| // should be retried. | // should be retried. | ||||||
| func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { | func (s *ServiceController) updateFederationService(key string, cachedService *cachedService) (error, bool) { | ||||||
| 	// Clone federation service, and create them in underlying k8s cluster | 	// Clone federation service, and create them in underlying k8s cluster | ||||||
| 	clone, err := api.Scheme.DeepCopy(cachedService.lastState) | 	desiredService := &v1.Service{ | ||||||
| 	if err != nil { | 		ObjectMeta: util.DeepCopyRelevantObjectMeta(cachedService.lastState.ObjectMeta), | ||||||
| 		return err, !retryable | 		Spec:       *(util.DeepCopyApiTypeOrPanic(&cachedService.lastState.Spec).(*v1.ServiceSpec)), | ||||||
| 	} |  | ||||||
| 	service, ok := clone.(*v1.Service) |  | ||||||
| 	if !ok { |  | ||||||
| 		return fmt.Errorf("Unexpected service cast error : %v\n", service), !retryable |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// handle available clusters one by one | 	// handle available clusters one by one | ||||||
| 	var hasErr bool | 	var hasErr bool | ||||||
| 	for clusterName, cache := range s.clusterCache.clientMap { | 	for clusterName, cache := range s.clusterCache.clientMap { | ||||||
| 		go func(cache *clusterCache, clusterName string) { | 		go func(cache *clusterCache, clusterName string) { | ||||||
| 			err = s.processServiceForCluster(cachedService, clusterName, service, cache.clientset) | 			err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				hasErr = true | 				hasErr = true | ||||||
| 			} | 			} | ||||||
| @@ -382,7 +507,7 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c | |||||||
| 	} | 	} | ||||||
| 	if hasErr { | 	if hasErr { | ||||||
| 		// detail error has been dumped inside the loop | 		// detail error has been dumped inside the loop | ||||||
| 		return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", service.Namespace, service.Name), retryable | 		return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable | ||||||
| 	} | 	} | ||||||
| 	return nil, !retryable | 	return nil, !retryable | ||||||
| } | } | ||||||
| @@ -812,18 +937,25 @@ func (s *ServiceController) syncService(key string) error { | |||||||
| 		glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) | 		glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime)) | ||||||
| 	}() | 	}() | ||||||
| 	// obj holds the latest service info from apiserver | 	// obj holds the latest service info from apiserver | ||||||
| 	obj, exists, err := s.serviceStore.Indexer.GetByKey(key) | 	objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		glog.Errorf("Unable to retrieve service %v from store: %v", key, err) | 		glog.Errorf("Unable to retrieve service %v from store: %v", key, err) | ||||||
| 		s.queue.Add(key) | 		s.queue.Add(key) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if !exists { | 	if !exists { | ||||||
| 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned | 		// service absence in store means watcher caught the deletion, ensure LB info is cleaned | ||||||
| 		glog.Infof("Service has been deleted %v", key) | 		glog.Infof("Service has been deleted %v", key) | ||||||
| 		err, retryDelay = s.processServiceDeletion(key) | 		err, retryDelay = s.processServiceDeletion(key) | ||||||
| 	} | 	} | ||||||
|  | 	// Create a copy before modifying the obj to prevent race condition with | ||||||
|  | 	// other readers of obj from store. | ||||||
|  | 	obj, err := conversion.NewCloner().DeepCopy(objFromStore) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err) | ||||||
|  | 		s.queue.Add(key) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if exists { | 	if exists { | ||||||
| 		service, ok := obj.(*v1.Service) | 		service, ok := obj.(*v1.Service) | ||||||
| @@ -857,6 +989,29 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s | |||||||
| 	cachedService.rwlock.Lock() | 	cachedService.rwlock.Lock() | ||||||
| 	defer cachedService.rwlock.Unlock() | 	defer cachedService.rwlock.Unlock() | ||||||
|  |  | ||||||
|  | 	if service.DeletionTimestamp != nil { | ||||||
|  | 		if err := s.delete(service); err != nil { | ||||||
|  | 			glog.Errorf("Failed to delete %s: %v", service, err) | ||||||
|  | 			s.eventRecorder.Eventf(service, api.EventTypeNormal, "DeleteFailed", | ||||||
|  | 				"Service delete failed: %v", err) | ||||||
|  | 			return err, cachedService.nextRetryDelay() | ||||||
|  | 		} | ||||||
|  | 		return nil, doNotRetry | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for service: %s", | ||||||
|  | 		service.Name) | ||||||
|  | 	// Add the required finalizers before creating a service in underlying clusters. | ||||||
|  | 	updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(service) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in service %s: %v", | ||||||
|  | 			service.Name, err) | ||||||
|  | 		return err, cachedService.nextRetryDelay() | ||||||
|  | 	} | ||||||
|  | 	service = updatedServiceObj.(*v1.Service) | ||||||
|  |  | ||||||
|  | 	glog.V(3).Infof("Syncing service %s in underlying clusters", service.Name) | ||||||
|  |  | ||||||
| 	// Update the cached service (used above for populating synthetic deletes) | 	// Update the cached service (used above for populating synthetic deletes) | ||||||
| 	// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver | 	// alway trust service, which is retrieve from serviceStore, which keeps the latest service info getting from apiserver | ||||||
| 	// if the same service is changed before this go routine finished, there will be another queue entry to handle that. | 	// if the same service is changed before this go routine finished, there will be another queue entry to handle that. | ||||||
| @@ -885,6 +1040,26 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s | |||||||
| 	return nil, doNotRetry | 	return nil, doNotRetry | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // delete deletes the given service or returns error if the deletion was not complete. | ||||||
|  | func (s *ServiceController) delete(service *v1.Service) error { | ||||||
|  | 	glog.V(3).Infof("Handling deletion of service: %v", *service) | ||||||
|  | 	_, err := s.deletionHelper.HandleObjectInUnderlyingClusters(service) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything. | ||||||
|  | 		// This is expected when we are processing an update as a result of service finalizer deletion. | ||||||
|  | 		// The process that deleted the last finalizer is also going to delete the service and we do not have to do anything. | ||||||
|  | 		if !errors.IsNotFound(err) { | ||||||
|  | 			return fmt.Errorf("failed to delete service: %v", err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration | // processServiceDeletion returns an error if processing the service deletion failed, along with a time.Duration | ||||||
| // indicating whether processing should be retried; zero means no-retry; otherwise | // indicating whether processing should be retried; zero means no-retry; otherwise | ||||||
| // we should retry in that Duration. | // we should retry in that Duration. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 nikhiljindal
					nikhiljindal