Segregate DNS related code to separate controller
This commit is contained in:
		@@ -35,7 +35,6 @@ import (
 | 
			
		||||
	"k8s.io/client-go/tools/clientcmd"
 | 
			
		||||
	federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | 
			
		||||
	"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federatedtypes"
 | 
			
		||||
	clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
 | 
			
		||||
	deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
 | 
			
		||||
@@ -137,17 +136,20 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
 | 
			
		||||
	clustercontroller.StartClusterController(restClientCfg, stopChan, s.ClusterMonitorPeriod.Duration)
 | 
			
		||||
 | 
			
		||||
	if controllerEnabled(s.Controllers, serverResources, servicecontroller.ControllerName, servicecontroller.RequiredResources, true) {
 | 
			
		||||
		dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Fatalf("Cloud provider could not be initialized: %v", err)
 | 
			
		||||
		if controllerEnabled(s.Controllers, serverResources, servicecontroller.DNSControllerName, servicecontroller.RequiredResources, true) {
 | 
			
		||||
			serviceDNScontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.DNSUserAgentName))
 | 
			
		||||
			serviceDNSController, err := servicecontroller.NewServiceDNSController(serviceDNScontrollerClientset, s.DnsProvider, s.DnsConfigFile, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				glog.Fatalf("Failed to start service dns controller: %v", err)
 | 
			
		||||
			} else {
 | 
			
		||||
				go serviceDNSController.DNSControllerRun(s.ConcurrentServiceSyncs, wait.NeverStop)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		glog.V(3).Infof("Loading client config for service controller %q", servicecontroller.UserAgentName)
 | 
			
		||||
		scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
 | 
			
		||||
		servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ServiceDnsSuffix, s.ZoneName, s.ZoneID)
 | 
			
		||||
		glog.V(3).Infof("Running service controller")
 | 
			
		||||
		if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
 | 
			
		||||
			glog.Fatalf("Failed to start service controller: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		serviceController := servicecontroller.New(scClientset)
 | 
			
		||||
		go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if controllerEnabled(s.Controllers, serverResources, namespacecontroller.ControllerName, namespacecontroller.RequiredResources, true) {
 | 
			
		||||
 
 | 
			
		||||
@@ -19,24 +19,210 @@ package service
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	pkgruntime "k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider/rrstype"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
	corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// minDnsTtl is the minimum safe DNS TTL value to use (in seconds).  We use this as the TTL for all DNS records.
 | 
			
		||||
	minDnsTtl = 180
 | 
			
		||||
	DNSControllerName = "service-dns"
 | 
			
		||||
 | 
			
		||||
	DNSUserAgentName = "federation-service-dns-controller"
 | 
			
		||||
 | 
			
		||||
	// minDNSTTL is the minimum safe DNS TTL value to use (in seconds).  We use this as the TTL for all DNS records.
 | 
			
		||||
	minDNSTTL = 180
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ServiceDNSController struct {
 | 
			
		||||
	// Client to federation api server
 | 
			
		||||
	federationClient fedclientset.Interface
 | 
			
		||||
	dns              dnsprovider.Interface
 | 
			
		||||
	federationName   string
 | 
			
		||||
	// serviceDNSSuffix is the DNS suffix we use when publishing service DNS names
 | 
			
		||||
	serviceDNSSuffix string
 | 
			
		||||
	// zoneName and zoneID are used to identify the zone in which to put records
 | 
			
		||||
	zoneName string
 | 
			
		||||
	zoneID   string
 | 
			
		||||
	dnsZones dnsprovider.Zones
 | 
			
		||||
	// each federation should be configured with a single zone (e.g. "mycompany.com")
 | 
			
		||||
	dnsZone dnsprovider.Zone
 | 
			
		||||
	// Informer Store for federated services
 | 
			
		||||
	serviceStore corelisters.ServiceLister
 | 
			
		||||
	// Informer controller for federated services
 | 
			
		||||
	serviceController cache.Controller
 | 
			
		||||
	workQueue         workqueue.Interface
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewServiceDNSController returns a new service dns controller to manage DNS records for federated services
 | 
			
		||||
func NewServiceDNSController(client fedclientset.Interface, dnsProvider, dnsProviderConfig, federationName,
 | 
			
		||||
	serviceDNSSuffix, zoneName, zoneID string) (*ServiceDNSController, error) {
 | 
			
		||||
	dns, err := dnsprovider.InitDnsProvider(dnsProvider, dnsProviderConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("DNS provider could not be initialized: %v", err))
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	d := &ServiceDNSController{
 | 
			
		||||
		federationClient: client,
 | 
			
		||||
		dns:              dns,
 | 
			
		||||
		federationName:   federationName,
 | 
			
		||||
		serviceDNSSuffix: serviceDNSSuffix,
 | 
			
		||||
		zoneName:         zoneName,
 | 
			
		||||
		zoneID:           zoneID,
 | 
			
		||||
		workQueue:        workqueue.New(),
 | 
			
		||||
	}
 | 
			
		||||
	if err := d.validateConfig(); err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Invalid configuration passed to DNS provider: %v", err))
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if err := d.retrieveOrCreateDNSZone(); err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Failed to retrieve DNS zone: %v", err))
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Start informer in federated API servers on federated services
 | 
			
		||||
	var serviceIndexer cache.Indexer
 | 
			
		||||
	serviceIndexer, d.serviceController = cache.NewIndexerInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
			ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
 | 
			
		||||
				return client.Core().Services(metav1.NamespaceAll).List(options)
 | 
			
		||||
			},
 | 
			
		||||
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
			
		||||
				return client.Core().Services(metav1.NamespaceAll).Watch(options)
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		&v1.Service{},
 | 
			
		||||
		serviceSyncPeriod,
 | 
			
		||||
		util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) { d.workQueue.Add(obj) }),
 | 
			
		||||
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 | 
			
		||||
	)
 | 
			
		||||
	d.serviceStore = corelisters.NewServiceLister(serviceIndexer)
 | 
			
		||||
 | 
			
		||||
	return d, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceDNSController) DNSControllerRun(workers int, stopCh <-chan struct{}) {
 | 
			
		||||
	defer runtime.HandleCrash()
 | 
			
		||||
	defer s.workQueue.ShutDown()
 | 
			
		||||
 | 
			
		||||
	glog.Infof("Starting federation service dns controller")
 | 
			
		||||
	defer glog.Infof("Stopping federation service dns controller")
 | 
			
		||||
 | 
			
		||||
	go s.serviceController.Run(stopCh)
 | 
			
		||||
 | 
			
		||||
	for i := 0; i < workers; i++ {
 | 
			
		||||
		go wait.Until(s.worker, time.Second, stopCh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	<-stopCh
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func wantsDNSRecords(service *v1.Service) bool {
 | 
			
		||||
	return service.Spec.Type == v1.ServiceTypeLoadBalancer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceDNSController) workerFunction() bool {
 | 
			
		||||
	item, quit := s.workQueue.Get()
 | 
			
		||||
	if quit {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	defer s.workQueue.Done(item)
 | 
			
		||||
 | 
			
		||||
	service := item.(*v1.Service)
 | 
			
		||||
 | 
			
		||||
	if !wantsDNSRecords(service) {
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ingress, err := ParseFederatedServiceIngress(service)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Error in parsing lb ingress for service %s/%s: %v", service.Namespace, service.Name, err))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
	for _, clusterIngress := range ingress.Items {
 | 
			
		||||
		s.ensureDNSRecords(clusterIngress.Cluster, service)
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceDNSController) worker() {
 | 
			
		||||
	for {
 | 
			
		||||
		if quit := s.workerFunction(); quit {
 | 
			
		||||
			glog.Infof("service dns controller worker queue shutting down")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceDNSController) validateConfig() error {
 | 
			
		||||
	if s.federationName == "" {
 | 
			
		||||
		return fmt.Errorf("DNSController should not be run without federationName")
 | 
			
		||||
	}
 | 
			
		||||
	if s.zoneName == "" && s.zoneID == "" {
 | 
			
		||||
		return fmt.Errorf("DNSController must be run with either zoneName or zoneID")
 | 
			
		||||
	}
 | 
			
		||||
	if s.serviceDNSSuffix == "" {
 | 
			
		||||
		if s.zoneName == "" {
 | 
			
		||||
			return fmt.Errorf("DNSController must be run with zoneName, if serviceDnsSuffix is not set")
 | 
			
		||||
		}
 | 
			
		||||
		s.serviceDNSSuffix = s.zoneName
 | 
			
		||||
	}
 | 
			
		||||
	if s.dns == nil {
 | 
			
		||||
		return fmt.Errorf("DNSController should not be run without a dnsprovider")
 | 
			
		||||
	}
 | 
			
		||||
	zones, ok := s.dns.Zones()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records")
 | 
			
		||||
	}
 | 
			
		||||
	s.dnsZones = zones
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceDNSController) retrieveOrCreateDNSZone() error {
 | 
			
		||||
	matchingZones, err := getDNSZones(s.zoneName, s.zoneID, s.dnsZones)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error querying for DNS zones: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	switch len(matchingZones) {
 | 
			
		||||
	case 0: // No matching zones for s.zoneName, so create one
 | 
			
		||||
		if s.zoneName == "" {
 | 
			
		||||
			return fmt.Errorf("DNSController must be run with zoneName to create zone automatically")
 | 
			
		||||
		}
 | 
			
		||||
		glog.Infof("DNS zone %q not found.  Creating DNS zone %q.", s.zoneName, s.zoneName)
 | 
			
		||||
		managedZone, err := s.dnsZones.New(s.zoneName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		zone, err := s.dnsZones.Add(managedZone)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		glog.Infof("DNS zone %q successfully created.  Note that DNS resolution will not work until you have registered this name with "+
 | 
			
		||||
			"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider", zone.Name())
 | 
			
		||||
	case 1: // s.zoneName matches exactly one DNS zone
 | 
			
		||||
		s.dnsZone = matchingZones[0]
 | 
			
		||||
	default: // s.zoneName matches more than one DNS zone
 | 
			
		||||
		return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getHealthyEndpoints returns the hostnames and/or IP addresses of healthy endpoints for the service, at a zone, region and global level (or an error)
 | 
			
		||||
func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
 | 
			
		||||
func (s *ServiceDNSController) getHealthyEndpoints(clusterName string, service *v1.Service) (zoneEndpoints, regionEndpoints, globalEndpoints []string, err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		zoneNames  []string
 | 
			
		||||
		regionName string
 | 
			
		||||
@@ -70,7 +256,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.
 | 
			
		||||
				address = ingress.Hostname
 | 
			
		||||
			}
 | 
			
		||||
			if len(address) <= 0 {
 | 
			
		||||
				return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service.",
 | 
			
		||||
				return nil, nil, nil, fmt.Errorf("Service %s/%s in cluster %s has neither LoadBalancerStatus.ingress.ip nor LoadBalancerStatus.ingress.hostname. Cannot use it as endpoint for federated service",
 | 
			
		||||
					service.Name, service.Namespace, clusterName)
 | 
			
		||||
			}
 | 
			
		||||
			for _, lbZoneName := range lbZoneNames {
 | 
			
		||||
@@ -90,7 +276,7 @@ func (s *ServiceController) getHealthyEndpoints(clusterName string, service *v1.
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getClusterZoneNames returns the name of the zones (and the region) where the specified cluster exists (e.g. zones "us-east1-c" on GCE, or "us-east-1b" on AWS)
 | 
			
		||||
func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, string, error) {
 | 
			
		||||
func (s *ServiceDNSController) getClusterZoneNames(clusterName string) ([]string, string, error) {
 | 
			
		||||
	cluster, err := s.federationClient.Federation().Clusters().Get(clusterName, metav1.GetOptions{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, "", err
 | 
			
		||||
@@ -98,13 +284,8 @@ func (s *ServiceController) getClusterZoneNames(clusterName string) ([]string, s
 | 
			
		||||
	return cluster.Status.Zones, cluster.Status.Region, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getServiceDnsSuffix returns the DNS suffix to use when creating federated-service DNS records
 | 
			
		||||
func (s *ServiceController) getServiceDnsSuffix() (string, error) {
 | 
			
		||||
	return s.serviceDnsSuffix, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getDnsZones returns the DNS zones matching dnsZoneName and dnsZoneID (if specified)
 | 
			
		||||
func getDnsZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) ([]dnsprovider.Zone, error) {
 | 
			
		||||
// getDNSZones returns the DNS zones matching dnsZoneName and dnsZoneID (if specified)
 | 
			
		||||
func getDNSZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) ([]dnsprovider.Zone, error) {
 | 
			
		||||
	// TODO: We need query-by-name and query-by-id functions
 | 
			
		||||
	dnsZones, err := dnsZonesInterface.List()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -130,30 +311,6 @@ func getDnsZones(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprov
 | 
			
		||||
	return matches, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// getDnsZone returns the DNS zone, as identified by dnsZoneName and dnsZoneID
 | 
			
		||||
// This is similar to getDnsZones, but returns an error if there are zero or multiple matching zones.
 | 
			
		||||
func getDnsZone(dnsZoneName string, dnsZoneID string, dnsZonesInterface dnsprovider.Zones) (dnsprovider.Zone, error) {
 | 
			
		||||
	dnsZones, err := getDnsZones(dnsZoneName, dnsZoneID, dnsZonesInterface)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(dnsZones) == 1 {
 | 
			
		||||
		return dnsZones[0], nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	name := dnsZoneName
 | 
			
		||||
	if dnsZoneID != "" {
 | 
			
		||||
		name += "/" + dnsZoneID
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(dnsZones) == 0 {
 | 
			
		||||
		return nil, fmt.Errorf("DNS zone %s not found", name)
 | 
			
		||||
	} else {
 | 
			
		||||
		return nil, fmt.Errorf("DNS zone %s is ambiguous (please specify zoneID)", name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NOTE: that if the named resource record set does not exist, but no
 | 
			
		||||
// error occurred, the returned list will be empty, and the error will
 | 
			
		||||
// be nil
 | 
			
		||||
@@ -193,13 +350,13 @@ func getResolvedEndpoints(endpoints []string) ([]string, error) {
 | 
			
		||||
	return resolvedEndpoints, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* ensureDnsRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
 | 
			
		||||
/* ensureDNSRrsets ensures (idempotently, and with minimum mutations) that all of the DNS resource record sets for dnsName are consistent with endpoints.
 | 
			
		||||
   if endpoints is nil or empty, a CNAME record to uplevelCname is ensured.
 | 
			
		||||
*/
 | 
			
		||||
func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName string, endpoints []string, uplevelCname string) error {
 | 
			
		||||
func (s *ServiceDNSController) ensureDNSRrsets(dnsZone dnsprovider.Zone, dnsName string, endpoints []string, uplevelCname string) error {
 | 
			
		||||
	rrsets, supported := dnsZone.ResourceRecordSets()
 | 
			
		||||
	if !supported {
 | 
			
		||||
		return fmt.Errorf("Failed to ensure DNS records for %s. DNS provider does not support the ResourceRecordSets interface.", dnsName)
 | 
			
		||||
		return fmt.Errorf("Failed to ensure DNS records for %s. DNS provider does not support the ResourceRecordSets interface", dnsName)
 | 
			
		||||
	}
 | 
			
		||||
	rrsetList, err := getRrset(dnsName, rrsets) // TODO: rrsets.Get(dnsName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -211,7 +368,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
 | 
			
		||||
			glog.V(4).Infof("There are no healthy endpoint addresses at level %q, so CNAME to %q, if provided", dnsName, uplevelCname)
 | 
			
		||||
			if uplevelCname != "" {
 | 
			
		||||
				glog.V(4).Infof("Creating CNAME to %q for %q", uplevelCname, dnsName)
 | 
			
		||||
				newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
 | 
			
		||||
				newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDNSTTL, rrstype.CNAME)
 | 
			
		||||
				glog.V(4).Infof("Adding recordset %v", newRrset)
 | 
			
		||||
				err = rrsets.StartChangeset().Add(newRrset).Apply()
 | 
			
		||||
				if err != nil {
 | 
			
		||||
@@ -230,7 +387,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
 | 
			
		||||
			}
 | 
			
		||||
			newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
 | 
			
		||||
			newRrset := rrsets.New(dnsName, resolvedEndpoints, minDNSTTL, rrstype.A)
 | 
			
		||||
			glog.V(4).Infof("Adding recordset %v", newRrset)
 | 
			
		||||
			err = rrsets.StartChangeset().Add(newRrset).Apply()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
@@ -243,7 +400,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
 | 
			
		||||
		glog.V(4).Infof("Recordset %v already exists. Ensuring that it is correct.", rrsetList)
 | 
			
		||||
		if len(endpoints) < 1 {
 | 
			
		||||
			// Need an appropriate CNAME record.  Check that we have it.
 | 
			
		||||
			newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDnsTtl, rrstype.CNAME)
 | 
			
		||||
			newRrset := rrsets.New(dnsName, []string{uplevelCname}, minDNSTTL, rrstype.CNAME)
 | 
			
		||||
			glog.V(4).Infof("No healthy endpoints for %s. Have recordsets %v. Need recordset %v", dnsName, rrsetList, newRrset)
 | 
			
		||||
			found := findRrset(rrsetList, newRrset)
 | 
			
		||||
			if found != nil {
 | 
			
		||||
@@ -279,7 +436,7 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
 | 
			
		||||
			if err != nil { // Some invalid addresses or otherwise unresolvable DNS names.
 | 
			
		||||
				return err // TODO: We could potentially add the ones we did get back, even if some of them failed to resolve.
 | 
			
		||||
			}
 | 
			
		||||
			newRrset := rrsets.New(dnsName, resolvedEndpoints, minDnsTtl, rrstype.A)
 | 
			
		||||
			newRrset := rrsets.New(dnsName, resolvedEndpoints, minDNSTTL, rrstype.A)
 | 
			
		||||
			glog.V(4).Infof("Have recordset %v. Need recordset %v", rrsetList, newRrset)
 | 
			
		||||
			found := findRrset(rrsetList, newRrset)
 | 
			
		||||
			if found != nil {
 | 
			
		||||
@@ -305,13 +462,13 @@ func (s *ServiceController) ensureDnsRrsets(dnsZone dnsprovider.Zone, dnsName st
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* ensureDnsRecords ensures (idempotently, and with minimum mutations) that all of the DNS records for a service in a given cluster are correct,
 | 
			
		||||
/* ensureDNSRecords ensures (idempotently, and with minimum mutations) that all of the DNS records for a service in a given cluster are correct,
 | 
			
		||||
given the current state of that service in that cluster.  This should be called every time the state of a service might have changed
 | 
			
		||||
(either w.r.t. its loadbalancer address, or if the number of healthy backend endpoints for that service transitioned from zero to non-zero
 | 
			
		||||
(or vice versa).  Only shards of the service which have both a loadbalancer ingress IP address or hostname AND at least one healthy backend endpoint
 | 
			
		||||
are included in DNS records for that service (at all of zone, region and global levels). All other addresses are removed.  Also, if no shards exist
 | 
			
		||||
in the zone or region of the cluster, a CNAME reference to the next higher level is ensured to exist. */
 | 
			
		||||
func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Service) error {
 | 
			
		||||
func (s *ServiceDNSController) ensureDNSRecords(clusterName string, service *v1.Service) error {
 | 
			
		||||
	// Quinton: Pseudocode....
 | 
			
		||||
	// See https://github.com/kubernetes/kubernetes/pull/25107#issuecomment-218026648
 | 
			
		||||
	// For each service we need the following DNS names:
 | 
			
		||||
@@ -329,15 +486,6 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
 | 
			
		||||
	// So generate the DNS records based on the current state and ensure those desired DNS records match the
 | 
			
		||||
	// actual DNS records (add new records, remove deleted records, and update changed records).
 | 
			
		||||
	//
 | 
			
		||||
	if s == nil {
 | 
			
		||||
		return fmt.Errorf("nil ServiceController passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
 | 
			
		||||
	}
 | 
			
		||||
	if s.dns == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if service == nil {
 | 
			
		||||
		return fmt.Errorf("nil service passed to ServiceController.ensureDnsRecords(clusterName: %s, service: %v)", clusterName, service)
 | 
			
		||||
	}
 | 
			
		||||
	serviceName := service.Name
 | 
			
		||||
	namespaceName := service.Namespace
 | 
			
		||||
	zoneNames, regionName, err := s.getClusterZoneNames(clusterName)
 | 
			
		||||
@@ -347,10 +495,6 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
 | 
			
		||||
	if zoneNames == nil {
 | 
			
		||||
		return fmt.Errorf("failed to get cluster zone names")
 | 
			
		||||
	}
 | 
			
		||||
	serviceDnsSuffix, err := s.getServiceDnsSuffix()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	zoneEndpoints, regionEndpoints, globalEndpoints, err := s.getHealthyEndpoints(clusterName, service)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
@@ -358,21 +502,16 @@ func (s *ServiceController) ensureDnsRecords(clusterName string, service *v1.Ser
 | 
			
		||||
	commonPrefix := serviceName + "." + namespaceName + "." + s.federationName + ".svc"
 | 
			
		||||
	// dnsNames is the path up the DNS search tree, starting at the leaf
 | 
			
		||||
	dnsNames := []string{
 | 
			
		||||
		commonPrefix + "." + zoneNames[0] + "." + regionName + "." + serviceDnsSuffix, // zone level - TODO might need other zone names for multi-zone clusters
 | 
			
		||||
		commonPrefix + "." + regionName + "." + serviceDnsSuffix,                      // region level, one up from zone level
 | 
			
		||||
		commonPrefix + "." + serviceDnsSuffix,                                         // global level, one up from region level
 | 
			
		||||
		strings.Join([]string{commonPrefix, zoneNames[0], regionName, s.serviceDNSSuffix}, "."), // zone level - TODO might need other zone names for multi-zone clusters
 | 
			
		||||
		strings.Join([]string{commonPrefix, regionName, s.serviceDNSSuffix}, "."),               // region level, one up from zone level
 | 
			
		||||
		strings.Join([]string{commonPrefix, s.serviceDNSSuffix}, "."),                           // global level, one up from region level
 | 
			
		||||
		"", // nowhere to go up from global level
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	endpoints := [][]string{zoneEndpoints, regionEndpoints, globalEndpoints}
 | 
			
		||||
 | 
			
		||||
	dnsZone, err := getDnsZone(s.zoneName, s.zoneID, s.dnsZones)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, endpoint := range endpoints {
 | 
			
		||||
		if err = s.ensureDnsRrsets(dnsZone, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
 | 
			
		||||
		if err = s.ensureDNSRrsets(s.dnsZone, dnsNames[i], endpoint, dnsNames[i+1]); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -50,13 +50,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithSingleLBIngress",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:      "servicename",
 | 
			
		||||
					Namespace: "servicenamespace",
 | 
			
		||||
					Annotations: map[string]string{
 | 
			
		||||
						FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
							AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
							String()},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{}).
 | 
			
		||||
						String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -73,10 +71,7 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
			{
 | 
			
		||||
				name: "withname",
 | 
			
		||||
				service: v1.Service{
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
						Name: "servicename",
 | 
			
		||||
						Namespace: "servicenamespace",
 | 
			
		||||
					},
 | 
			
		||||
					ObjectMeta: metav1.ObjectMeta{},
 | 
			
		||||
				},
 | 
			
		||||
				expected: []string{
 | 
			
		||||
					"example.com:"+globalDNSName+":A:180:[198.51.100.1]",
 | 
			
		||||
@@ -88,9 +83,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithNoLBIngress",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:      "servicename",
 | 
			
		||||
					Namespace: "servicenamespace",
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{}).
 | 
			
		||||
						String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -103,14 +100,11 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithMultipleLBIngress",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:      "servicename",
 | 
			
		||||
					Namespace: "servicenamespace",
 | 
			
		||||
					Annotations: map[string]string{
 | 
			
		||||
						FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
							AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
							AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
							String()},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
						String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -124,14 +118,12 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithLBIngressAndServiceDeleted",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:              "servicename",
 | 
			
		||||
					Namespace:         "servicenamespace",
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
						String()},
 | 
			
		||||
					DeletionTimestamp: &metav1.Time{Time: time.Now()},
 | 
			
		||||
					Annotations: map[string]string{
 | 
			
		||||
						FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
							AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
							String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -145,15 +137,12 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithMultipleLBIngressAndOneLBIngressGettingRemoved",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:      "servicename",
 | 
			
		||||
					Namespace: "servicenamespace",
 | 
			
		||||
					Annotations: map[string]string{
 | 
			
		||||
						FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
							AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
							AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
							RemoveEndpoint(cluster2Name, "198.51.200.1").
 | 
			
		||||
							String()},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
						RemoveEndpoint(cluster2Name, "198.51.200.1").
 | 
			
		||||
						String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -167,16 +156,13 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "ServiceWithMultipleLBIngressAndAllLBIngressGettingRemoved",
 | 
			
		||||
			service: v1.Service{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name:      "servicename",
 | 
			
		||||
					Namespace: "servicenamespace",
 | 
			
		||||
					Annotations: map[string]string{
 | 
			
		||||
						FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
							AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
							AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
							RemoveEndpoint(cluster1Name, "198.51.100.1").
 | 
			
		||||
							RemoveEndpoint(cluster2Name, "198.51.200.1").
 | 
			
		||||
							String()},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{
 | 
			
		||||
					FederatedServiceIngressAnnotation: NewFederatedServiceIngress().
 | 
			
		||||
						AddEndpoints(cluster1Name, []string{"198.51.100.1"}).
 | 
			
		||||
						AddEndpoints(cluster2Name, []string{"198.51.200.1"}).
 | 
			
		||||
						RemoveEndpoint(cluster1Name, "198.51.100.1").
 | 
			
		||||
						RemoveEndpoint(cluster2Name, "198.51.200.1").
 | 
			
		||||
						String()},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			expected: []string{
 | 
			
		||||
@@ -195,22 +181,30 @@ func TestServiceController_ensureDnsRecords(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
		fakeClient := &fakefedclientset.Clientset{}
 | 
			
		||||
		RegisterFakeClusterGet(&fakeClient.Fake, &v1beta1.ClusterList{Items: []v1beta1.Cluster{*cluster1, *cluster2}})
 | 
			
		||||
		serviceController := ServiceController{
 | 
			
		||||
		d := ServiceDNSController{
 | 
			
		||||
			federationClient: fakeClient,
 | 
			
		||||
			dns:              fakedns,
 | 
			
		||||
			dnsZones:         fakednsZones,
 | 
			
		||||
			serviceDnsSuffix: "federation.example.com",
 | 
			
		||||
			serviceDNSSuffix: "federation.example.com",
 | 
			
		||||
			zoneName:         "example.com",
 | 
			
		||||
			federationName:   "myfederation",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		err := serviceController.ensureDnsRecords(cluster1Name, &test.service)
 | 
			
		||||
		dnsZones, err := getDNSZones(d.zoneName, d.zoneID, d.dnsZones)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
 | 
			
		||||
			t.Errorf("Test failed for %s, Get DNS Zones failed: %v", test.name, err)
 | 
			
		||||
		}
 | 
			
		||||
		err = serviceController.ensureDnsRecords(cluster2Name, &test.service)
 | 
			
		||||
		d.dnsZone = dnsZones[0]
 | 
			
		||||
		test.service.Name = "servicename"
 | 
			
		||||
		test.service.Namespace = "servicenamespace"
 | 
			
		||||
 | 
			
		||||
		ingress, err := ParseFederatedServiceIngress(&test.service)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("Test failed for %s, unexpected error %v", test.name, err)
 | 
			
		||||
			t.Errorf("Error in parsing lb ingress for service %s/%s: %v", test.service.Namespace, test.service.Name, err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		for _, clusterIngress := range ingress.Items {
 | 
			
		||||
			d.ensureDNSRecords(clusterIngress.Cluster, &test.service)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		zones, err := fakednsZones.List()
 | 
			
		||||
 
 | 
			
		||||
@@ -40,10 +40,7 @@ import (
 | 
			
		||||
	"k8s.io/client-go/util/workqueue"
 | 
			
		||||
	fedapi "k8s.io/kubernetes/federation/apis/federation"
 | 
			
		||||
	v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	federationcache "k8s.io/kubernetes/federation/client/cache"
 | 
			
		||||
	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
@@ -70,25 +67,12 @@ var (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ServiceController struct {
 | 
			
		||||
	dns              dnsprovider.Interface
 | 
			
		||||
	federationClient fedclientset.Interface
 | 
			
		||||
	federationName   string
 | 
			
		||||
	// serviceDnsSuffix is the DNS suffix we use when publishing service DNS names
 | 
			
		||||
	serviceDnsSuffix string
 | 
			
		||||
	// zoneName and zoneID are used to identify the zone in which to put records
 | 
			
		||||
	zoneName string
 | 
			
		||||
	zoneID   string
 | 
			
		||||
	// each federation should be configured with a single zone (e.g. "mycompany.com")
 | 
			
		||||
	dnsZones dnsprovider.Zones
 | 
			
		||||
	// A store of services, populated by the serviceController
 | 
			
		||||
	serviceStore corelisters.ServiceLister
 | 
			
		||||
	// Watches changes to all services
 | 
			
		||||
	serviceController cache.Controller
 | 
			
		||||
	federatedInformer fedutil.FederatedInformer
 | 
			
		||||
	// A store of services, populated by the serviceController
 | 
			
		||||
	clusterStore federationcache.StoreToClusterLister
 | 
			
		||||
	// Watches changes to all services
 | 
			
		||||
	clusterController cache.Controller
 | 
			
		||||
	eventBroadcaster  record.EventBroadcaster
 | 
			
		||||
	eventRecorder     record.EventRecorder
 | 
			
		||||
	// services that need to be synced
 | 
			
		||||
@@ -96,7 +80,7 @@ type ServiceController struct {
 | 
			
		||||
 | 
			
		||||
	// For triggering all services reconciliation. This is used when
 | 
			
		||||
	// a new cluster becomes available.
 | 
			
		||||
	clusterDeliverer *util.DelayingDeliverer
 | 
			
		||||
	clusterDeliverer *fedutil.DelayingDeliverer
 | 
			
		||||
 | 
			
		||||
	deletionHelper *deletionhelper.DeletionHelper
 | 
			
		||||
 | 
			
		||||
@@ -106,26 +90,20 @@ type ServiceController struct {
 | 
			
		||||
 | 
			
		||||
	endpointFederatedInformer fedutil.FederatedInformer
 | 
			
		||||
	federatedUpdater          fedutil.FederatedUpdater
 | 
			
		||||
	objectDeliverer           *util.DelayingDeliverer
 | 
			
		||||
	objectDeliverer           *fedutil.DelayingDeliverer
 | 
			
		||||
	flowcontrolBackoff        *flowcontrol.Backoff
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// New returns a new service controller to keep DNS provider service resources
 | 
			
		||||
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
 | 
			
		||||
func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
	federationName, serviceDnsSuffix, zoneName string, zoneID string) *ServiceController {
 | 
			
		||||
func New(federationClient fedclientset.Interface) *ServiceController {
 | 
			
		||||
	broadcaster := record.NewBroadcaster()
 | 
			
		||||
	// federationClient event is not supported yet
 | 
			
		||||
	// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
 | 
			
		||||
	recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
 | 
			
		||||
 | 
			
		||||
	s := &ServiceController{
 | 
			
		||||
		dns:                   dns,
 | 
			
		||||
		federationClient:      federationClient,
 | 
			
		||||
		federationName:        federationName,
 | 
			
		||||
		serviceDnsSuffix:      serviceDnsSuffix,
 | 
			
		||||
		zoneName:              zoneName,
 | 
			
		||||
		zoneID:                zoneID,
 | 
			
		||||
		eventBroadcaster:      broadcaster,
 | 
			
		||||
		eventRecorder:         recorder,
 | 
			
		||||
		queue:                 workqueue.New(),
 | 
			
		||||
@@ -134,8 +112,8 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
		updateTimeout:         updateTimeout,
 | 
			
		||||
		flowcontrolBackoff:    flowcontrol.NewBackOff(5*time.Second, time.Minute),
 | 
			
		||||
	}
 | 
			
		||||
	s.objectDeliverer = util.NewDelayingDeliverer()
 | 
			
		||||
	s.clusterDeliverer = util.NewDelayingDeliverer()
 | 
			
		||||
	s.objectDeliverer = fedutil.NewDelayingDeliverer()
 | 
			
		||||
	s.clusterDeliverer = fedutil.NewDelayingDeliverer()
 | 
			
		||||
	var serviceIndexer cache.Indexer
 | 
			
		||||
	serviceIndexer, s.serviceController = cache.NewIndexerInformer(
 | 
			
		||||
		&cache.ListWatch{
 | 
			
		||||
@@ -148,7 +126,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
		},
 | 
			
		||||
		&v1.Service{},
 | 
			
		||||
		serviceSyncPeriod,
 | 
			
		||||
		util.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
 | 
			
		||||
		fedutil.NewTriggerOnAllChanges(func(obj pkgruntime.Object) {
 | 
			
		||||
			glog.V(5).Infof("Delivering notification from federation: %v", obj)
 | 
			
		||||
			s.deliverObject(obj, 0, false)
 | 
			
		||||
		}),
 | 
			
		||||
@@ -175,7 +153,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
 | 
			
		||||
			controller.NoResyncPeriodFunc(),
 | 
			
		||||
			// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
 | 
			
		||||
			// would be just confirmation that some service operation succeeded.
 | 
			
		||||
			util.NewTriggerOnAllChanges(
 | 
			
		||||
			fedutil.NewTriggerOnAllChanges(
 | 
			
		||||
				func(obj pkgruntime.Object) {
 | 
			
		||||
					glog.V(5).Infof("Delivering service notification from federated cluster %s: %v", cluster.Name, obj)
 | 
			
		||||
					s.deliverObject(obj, s.reviewDelay, false)
 | 
			
		||||
@@ -267,17 +245,16 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
 | 
			
		||||
 | 
			
		||||
// It's an error to call Run() more than once for a given ServiceController
 | 
			
		||||
// object.
 | 
			
		||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
 | 
			
		||||
	if err := s.init(); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) {
 | 
			
		||||
	glog.Infof("Starting federation service controller")
 | 
			
		||||
 | 
			
		||||
	defer runtime.HandleCrash()
 | 
			
		||||
	s.federatedInformer.Start()
 | 
			
		||||
	s.endpointFederatedInformer.Start()
 | 
			
		||||
	s.objectDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
 | 
			
		||||
	s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
 | 
			
		||||
		s.queue.Add(item.Value.(string))
 | 
			
		||||
	})
 | 
			
		||||
	s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
 | 
			
		||||
	s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
 | 
			
		||||
		s.deliverServicesOnClusterChange()
 | 
			
		||||
	})
 | 
			
		||||
	fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
 | 
			
		||||
@@ -295,55 +272,6 @@ func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) error {
 | 
			
		||||
		s.objectDeliverer.Stop()
 | 
			
		||||
		s.clusterDeliverer.Stop()
 | 
			
		||||
	}()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ServiceController) init() error {
 | 
			
		||||
	if s.federationName == "" {
 | 
			
		||||
		return fmt.Errorf("ServiceController should not be run without federationName.")
 | 
			
		||||
	}
 | 
			
		||||
	if s.zoneName == "" && s.zoneID == "" {
 | 
			
		||||
		return fmt.Errorf("ServiceController must be run with either zoneName or zoneID.")
 | 
			
		||||
	}
 | 
			
		||||
	if s.serviceDnsSuffix == "" {
 | 
			
		||||
		// TODO: Is this the right place to do defaulting?
 | 
			
		||||
		if s.zoneName == "" {
 | 
			
		||||
			return fmt.Errorf("ServiceController must be run with zoneName, if serviceDnsSuffix is not set.")
 | 
			
		||||
		}
 | 
			
		||||
		s.serviceDnsSuffix = s.zoneName
 | 
			
		||||
	}
 | 
			
		||||
	if s.dns == nil {
 | 
			
		||||
		return fmt.Errorf("ServiceController should not be run without a dnsprovider.")
 | 
			
		||||
	}
 | 
			
		||||
	zones, ok := s.dns.Zones()
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("the dns provider does not support zone enumeration, which is required for creating dns records")
 | 
			
		||||
	}
 | 
			
		||||
	s.dnsZones = zones
 | 
			
		||||
	matchingZones, err := getDnsZones(s.zoneName, s.zoneID, s.dnsZones)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("error querying for DNS zones: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(matchingZones) == 0 {
 | 
			
		||||
		if s.zoneName == "" {
 | 
			
		||||
			return fmt.Errorf("ServiceController must be run with zoneName to create zone automatically.")
 | 
			
		||||
		}
 | 
			
		||||
		glog.Infof("DNS zone %q not found.  Creating DNS zone %q.", s.zoneName, s.zoneName)
 | 
			
		||||
		managedZone, err := s.dnsZones.New(s.zoneName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		zone, err := s.dnsZones.Add(managedZone)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		glog.Infof("DNS zone %q successfully created.  Note that DNS resolution will not work until you have registered this name with "+
 | 
			
		||||
			"a DNS registrar and they have changed the authoritative name servers for your domain to point to your DNS provider.", zone.Name())
 | 
			
		||||
	}
 | 
			
		||||
	if len(matchingZones) > 1 {
 | 
			
		||||
		return fmt.Errorf("Multiple matching DNS zones found for %q; please specify zoneID", s.zoneName)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type reconciliationStatus string
 | 
			
		||||
@@ -355,38 +283,39 @@ const (
 | 
			
		||||
	statusNotSynced           = reconciliationStatus("NOSYNC")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func (s *ServiceController) workerFunction() bool {
 | 
			
		||||
	key, quit := s.queue.Get()
 | 
			
		||||
	if quit {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	defer s.queue.Done(key)
 | 
			
		||||
 | 
			
		||||
	service := key.(string)
 | 
			
		||||
	status := s.reconcileService(service)
 | 
			
		||||
	switch status {
 | 
			
		||||
	case statusAllOk:
 | 
			
		||||
	// do nothing, reconcile is successful.
 | 
			
		||||
	case statusNotSynced:
 | 
			
		||||
		glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
 | 
			
		||||
		s.deliverService(service, s.clusterAvailableDelay, false)
 | 
			
		||||
	case statusRecoverableError:
 | 
			
		||||
		s.deliverService(service, 0, true)
 | 
			
		||||
	case statusNonRecoverableError:
 | 
			
		||||
		// do nothing, error is already logged.
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
 | 
			
		||||
func (s *ServiceController) fedServiceWorker() {
 | 
			
		||||
	for {
 | 
			
		||||
		func() {
 | 
			
		||||
			key, quit := s.queue.Get()
 | 
			
		||||
			if quit {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			defer s.queue.Done(key)
 | 
			
		||||
			service := key.(string)
 | 
			
		||||
			status := s.reconcileService(service)
 | 
			
		||||
			switch status {
 | 
			
		||||
			case statusAllOk:
 | 
			
		||||
				break
 | 
			
		||||
			case statusNotSynced:
 | 
			
		||||
				glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
 | 
			
		||||
				s.deliverService(service, s.clusterAvailableDelay, false)
 | 
			
		||||
			case statusRecoverableError:
 | 
			
		||||
				s.deliverService(service, 0, true)
 | 
			
		||||
			case statusNonRecoverableError:
 | 
			
		||||
				// error is already logged, do nothing
 | 
			
		||||
			default:
 | 
			
		||||
				// unreachable
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		if quit := s.workerFunction(); quit {
 | 
			
		||||
			glog.Infof("service controller worker queue shutting down")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func wantsDNSRecords(service *v1.Service) bool {
 | 
			
		||||
	return service.Spec.Type == v1.ServiceTypeLoadBalancer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// delete deletes the given service or returns error if the deletion was not complete.
 | 
			
		||||
func (s *ServiceController) delete(service *v1.Service) error {
 | 
			
		||||
	glog.V(3).Infof("Handling deletion of service: %v", *service)
 | 
			
		||||
@@ -395,24 +324,6 @@ func (s *ServiceController) delete(service *v1.Service) error {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ensure DNS records are removed for service
 | 
			
		||||
	if wantsDNSRecords(service) {
 | 
			
		||||
		key := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
 | 
			
		||||
		serviceIngress, err := ParseFederatedServiceIngress(service)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			runtime.HandleError(fmt.Errorf("Failed to parse endpoint annotations for service %s: %v", key, err))
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		for _, ingress := range serviceIngress.Items {
 | 
			
		||||
			err := s.ensureDnsRecords(ingress.Cluster, service)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				glog.V(4).Infof("Error ensuring DNS Records for service %s on cluster %s: %v", key, ingress.Cluster, err)
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = s.federationClient.Core().Services(service.Namespace).Delete(service.Name, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
 | 
			
		||||
@@ -789,17 +700,6 @@ func (s *ServiceController) updateFederatedService(fedService *v1.Service, newLB
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ensure DNS records based on Annotations in federated service for all federated clusters
 | 
			
		||||
	if needUpdate && wantsDNSRecords(fedService) {
 | 
			
		||||
		for _, ingress := range newServiceIngress.Items {
 | 
			
		||||
			err := s.ensureDnsRecords(ingress.Cluster, fedService)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				runtime.HandleError(fmt.Errorf("Error ensuring DNS Records for service %s on cluster %q: %v", key, ingress.Cluster, err))
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			glog.V(4).Infof("Ensured DNS records for Service %s in cluster %q", key, ingress.Cluster)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -32,7 +32,6 @@ import (
 | 
			
		||||
	"k8s.io/client-go/tools/cache"
 | 
			
		||||
	"k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/dnsprovider/providers/google/clouddns" // Only for unit testing purposes.
 | 
			
		||||
	fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
			
		||||
@@ -101,8 +100,7 @@ func TestServiceController(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fakedns, _ := clouddns.NewFakeInterface()
 | 
			
		||||
	sc := New(fedClient, fakedns, "myfederation", "federation.example.com", "example.com", "")
 | 
			
		||||
	sc := New(fedClient)
 | 
			
		||||
	ToFederatedInformerForTestOnly(sc.federatedInformer).SetClientFactory(fedInformerClientFactory)
 | 
			
		||||
	ToFederatedInformerForTestOnly(sc.endpointFederatedInformer).SetClientFactory(fedInformerClientFactory)
 | 
			
		||||
	sc.clusterAvailableDelay = 100 * time.Millisecond
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user