GCE: Refactor firewalls/backendservices api; other small changes
This commit is contained in:
		@@ -29,6 +29,7 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	"gopkg.in/gcfg.v1"
 | 
						"gopkg.in/gcfg.v1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/client-go/util/flowcontrol"
 | 
						"k8s.io/client-go/util/flowcontrol"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider"
 | 
				
			||||||
@@ -77,19 +78,25 @@ const (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
 | 
					// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
 | 
				
			||||||
type GCECloud struct {
 | 
					type GCECloud struct {
 | 
				
			||||||
 | 
						ClusterID ClusterID
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	service                  *compute.Service
 | 
						service                  *compute.Service
 | 
				
			||||||
	serviceBeta              *computebeta.Service
 | 
						serviceBeta              *computebeta.Service
 | 
				
			||||||
	containerService         *container.Service
 | 
						containerService         *container.Service
 | 
				
			||||||
	clientBuilder            controller.ControllerClientBuilder
 | 
						clientBuilder            controller.ControllerClientBuilder
 | 
				
			||||||
	ClusterID                ClusterID
 | 
					 | 
				
			||||||
	projectID                string
 | 
						projectID                string
 | 
				
			||||||
	region                   string
 | 
						region                   string
 | 
				
			||||||
	localZone                string   // The zone in which we are running
 | 
						localZone                string   // The zone in which we are running
 | 
				
			||||||
	managedZones             []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
 | 
						managedZones             []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
 | 
				
			||||||
	networkURL               string
 | 
						networkURL               string
 | 
				
			||||||
	subnetworkURL            string
 | 
						subnetworkURL            string
 | 
				
			||||||
	nodeTags                 []string // List of tags to use on firewall rules for load balancers
 | 
						networkProjectID         string
 | 
				
			||||||
	nodeInstancePrefix       string   // If non-"", an advisory prefix for all nodes in the cluster
 | 
						onXPN                    bool
 | 
				
			||||||
 | 
						nodeTags                 []string    // List of tags to use on firewall rules for load balancers
 | 
				
			||||||
 | 
						lastComputedNodeTags     []string    // List of node tags calculated in GetHostTags()
 | 
				
			||||||
 | 
						lastKnownNodeNames       sets.String // List of hostnames used to calculate lastComputedHostTags in GetHostTags(names)
 | 
				
			||||||
 | 
						computeNodeTagLock       sync.Mutex  // Lock for computing and setting node tags
 | 
				
			||||||
 | 
						nodeInstancePrefix       string      // If non-"", an advisory prefix for all nodes in the cluster
 | 
				
			||||||
	useMetadataServer        bool
 | 
						useMetadataServer        bool
 | 
				
			||||||
	operationPollRateLimiter flowcontrol.RateLimiter
 | 
						operationPollRateLimiter flowcontrol.RateLimiter
 | 
				
			||||||
	manager                  ServiceManager
 | 
						manager                  ServiceManager
 | 
				
			||||||
@@ -243,6 +250,12 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
 | 
				
			|||||||
		networkURL = gceNetworkURL(projectID, networkName)
 | 
							networkURL = gceNetworkURL(projectID, networkName)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						networkProjectID, err := getProjectIDInURL(networkURL)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						onXPN := networkProjectID != projectID
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(managedZones) == 0 {
 | 
						if len(managedZones) == 0 {
 | 
				
			||||||
		managedZones, err = getZonesForRegion(service, projectID, region)
 | 
							managedZones, err = getZonesForRegion(service, projectID, region)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
@@ -260,6 +273,8 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
 | 
				
			|||||||
		serviceBeta:              serviceBeta,
 | 
							serviceBeta:              serviceBeta,
 | 
				
			||||||
		containerService:         containerService,
 | 
							containerService:         containerService,
 | 
				
			||||||
		projectID:                projectID,
 | 
							projectID:                projectID,
 | 
				
			||||||
 | 
							networkProjectID:         networkProjectID,
 | 
				
			||||||
 | 
							onXPN:                    onXPN,
 | 
				
			||||||
		region:                   region,
 | 
							region:                   region,
 | 
				
			||||||
		localZone:                zone,
 | 
							localZone:                zone,
 | 
				
			||||||
		managedZones:             managedZones,
 | 
							managedZones:             managedZones,
 | 
				
			||||||
@@ -311,6 +326,26 @@ func (gce *GCECloud) ProviderName() string {
 | 
				
			|||||||
	return ProviderName
 | 
						return ProviderName
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Region returns the region
 | 
				
			||||||
 | 
					func (gce *GCECloud) Region() string {
 | 
				
			||||||
 | 
						return gce.region
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// OnXPN returns true if the cluster is running on a cross project network (XPN)
 | 
				
			||||||
 | 
					func (gce *GCECloud) OnXPN() bool {
 | 
				
			||||||
 | 
						return gce.onXPN
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// NetworkURL returns the network url
 | 
				
			||||||
 | 
					func (gce *GCECloud) NetworkURL() string {
 | 
				
			||||||
 | 
						return gce.networkURL
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// SubnetworkURL returns the subnetwork url
 | 
				
			||||||
 | 
					func (gce *GCECloud) SubnetworkURL() string {
 | 
				
			||||||
 | 
						return gce.subnetworkURL
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Known-useless DNS search path.
 | 
					// Known-useless DNS search path.
 | 
				
			||||||
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)
 | 
					var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -336,6 +371,20 @@ func gceSubnetworkURL(project, region, subnetwork string) string {
 | 
				
			|||||||
	return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", project, region, subnetwork)
 | 
						return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", project, region, subnetwork)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getProjectIDInURL parses typical full resource URLS and shorter URLS
 | 
				
			||||||
 | 
					// https://www.googleapis.com/compute/v1/projects/myproject/global/networks/mycustom
 | 
				
			||||||
 | 
					// projects/myproject/global/networks/mycustom
 | 
				
			||||||
 | 
					// All return "myproject"
 | 
				
			||||||
 | 
					func getProjectIDInURL(urlStr string) (string, error) {
 | 
				
			||||||
 | 
						fields := strings.Split(urlStr, "/")
 | 
				
			||||||
 | 
						for i, v := range fields {
 | 
				
			||||||
 | 
							if v == "projects" && i < len(fields)-1 {
 | 
				
			||||||
 | 
								return fields[i+1], nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return "", fmt.Errorf("could not find project field in url: %v", urlStr)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getNetworkNameViaMetadata() (string, error) {
 | 
					func getNetworkNameViaMetadata() (string, error) {
 | 
				
			||||||
	result, err := metadata.Get("instance/network-interfaces/0/network")
 | 
						result, err := metadata.Get("instance/network-interfaces/0/network")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,15 +30,15 @@ func newBackendServiceMetricContext(request string) *metricContext {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetBackendService retrieves a backend by name.
 | 
					// GetGlobalBackendService retrieves a backend by name.
 | 
				
			||||||
func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, error) {
 | 
					func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) {
 | 
				
			||||||
	mc := newBackendServiceMetricContext("get")
 | 
						mc := newBackendServiceMetricContext("get")
 | 
				
			||||||
	v, err := gce.service.BackendServices.Get(gce.projectID, name).Do()
 | 
						v, err := gce.service.BackendServices.Get(gce.projectID, name).Do()
 | 
				
			||||||
	return v, mc.Observe(err)
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdateBackendService applies the given BackendService as an update to an existing service.
 | 
					// UpdateGlobalBackendService applies the given BackendService as an update to an existing service.
 | 
				
			||||||
func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
 | 
					func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error {
 | 
				
			||||||
	mc := newBackendServiceMetricContext("update")
 | 
						mc := newBackendServiceMetricContext("update")
 | 
				
			||||||
	op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
 | 
						op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -48,8 +48,8 @@ func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
 | 
				
			|||||||
	return gce.waitForGlobalOp(op, mc)
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DeleteBackendService deletes the given BackendService by name.
 | 
					// DeleteGlobalBackendService deletes the given BackendService by name.
 | 
				
			||||||
func (gce *GCECloud) DeleteBackendService(name string) error {
 | 
					func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
 | 
				
			||||||
	mc := newBackendServiceMetricContext("delete")
 | 
						mc := newBackendServiceMetricContext("delete")
 | 
				
			||||||
	op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
 | 
						op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -62,8 +62,8 @@ func (gce *GCECloud) DeleteBackendService(name string) error {
 | 
				
			|||||||
	return gce.waitForGlobalOp(op, mc)
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CreateBackendService creates the given BackendService.
 | 
					// CreateGlobalBackendService creates the given BackendService.
 | 
				
			||||||
func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
 | 
					func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error {
 | 
				
			||||||
	mc := newBackendServiceMetricContext("create")
 | 
						mc := newBackendServiceMetricContext("create")
 | 
				
			||||||
	op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
 | 
						op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -73,16 +73,81 @@ func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
 | 
				
			|||||||
	return gce.waitForGlobalOp(op, mc)
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ListBackendServices lists all backend services in the project.
 | 
					// ListGlobalBackendServices lists all backend services in the project.
 | 
				
			||||||
func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) {
 | 
					func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("list")
 | 
				
			||||||
	// TODO: use PageToken to list all not just the first 500
 | 
						// TODO: use PageToken to list all not just the first 500
 | 
				
			||||||
	return gce.service.BackendServices.List(gce.projectID).Do()
 | 
						v, err := gce.service.BackendServices.List(gce.projectID).Do()
 | 
				
			||||||
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetHealth returns the health of the BackendService identified by the given
 | 
					// GetGlobalBackendServiceHealth returns the health of the BackendService identified by the given
 | 
				
			||||||
// name, in the given instanceGroup. The instanceGroupLink is the fully
 | 
					// name, in the given instanceGroup. The instanceGroupLink is the fully
 | 
				
			||||||
// qualified self link of an instance group.
 | 
					// qualified self link of an instance group.
 | 
				
			||||||
func (gce *GCECloud) GetHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
 | 
					func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("get_health")
 | 
				
			||||||
	groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
 | 
						groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
 | 
				
			||||||
	return gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do()
 | 
						v, err := gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do()
 | 
				
			||||||
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetRegionBackendService retrieves a backend by name.
 | 
				
			||||||
 | 
					func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("get")
 | 
				
			||||||
 | 
						v, err := gce.service.RegionBackendServices.Get(gce.projectID, region, name).Do()
 | 
				
			||||||
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// UpdateRegionBackendService applies the given BackendService as an update to an existing service.
 | 
				
			||||||
 | 
					func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService) error {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("update")
 | 
				
			||||||
 | 
						op, err := gce.service.RegionBackendServices.Update(gce.projectID, bg.Region, bg.Name, bg).Do()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return mc.Observe(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return gce.waitForRegionOp(op, bg.Region, mc)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// DeleteRegionBackendService deletes the given BackendService by name.
 | 
				
			||||||
 | 
					func (gce *GCECloud) DeleteRegionBackendService(name, region string) error {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("delete")
 | 
				
			||||||
 | 
						op, err := gce.service.RegionBackendServices.Delete(gce.projectID, region, name).Do()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							if isHTTPErrorCode(err, http.StatusNotFound) {
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return mc.Observe(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return gce.waitForRegionOp(op, region, mc)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CreateRegionBackendService creates the given BackendService.
 | 
				
			||||||
 | 
					func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService) error {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("create")
 | 
				
			||||||
 | 
						op, err := gce.service.RegionBackendServices.Insert(gce.projectID, bg.Region, bg).Do()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return mc.Observe(err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return gce.waitForRegionOp(op, bg.Region, mc)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ListRegionBackendServices lists all backend services in the project.
 | 
				
			||||||
 | 
					func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendServiceList, error) {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("list")
 | 
				
			||||||
 | 
						// TODO: use PageToken to list all not just the first 500
 | 
				
			||||||
 | 
						v, err := gce.service.RegionBackendServices.List(gce.projectID, region).Do()
 | 
				
			||||||
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetRegionalBackendServiceHealth returns the health of the BackendService identified by the given
 | 
				
			||||||
 | 
					// name, in the given instanceGroup. The instanceGroupLink is the fully
 | 
				
			||||||
 | 
					// qualified self link of an instance group.
 | 
				
			||||||
 | 
					func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
 | 
				
			||||||
 | 
						mc := newBackendServiceMetricContext("get_health")
 | 
				
			||||||
 | 
						groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
 | 
				
			||||||
 | 
						v, err := gce.service.RegionBackendServices.GetHealth(gce.projectID, region, name, groupRef).Do()
 | 
				
			||||||
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -19,96 +19,51 @@ package gce
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/v1"
 | 
					 | 
				
			||||||
	netsets "k8s.io/kubernetes/pkg/util/net/sets"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	compute "google.golang.org/api/compute/v1"
 | 
						compute "google.golang.org/api/compute/v1"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newFirewallMetricContext(request string, region string) *metricContext {
 | 
					func newFirewallMetricContext(request string) *metricContext {
 | 
				
			||||||
	return &metricContext{
 | 
						return &metricContext{
 | 
				
			||||||
		start:      time.Now(),
 | 
							start:      time.Now(),
 | 
				
			||||||
		attributes: []string{"firewall_" + request, region, unusedMetricLabel},
 | 
							attributes: []string{"firewall_" + request, unusedMetricLabel, unusedMetricLabel},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetFirewall returns the Firewall by name.
 | 
					// GetFirewall returns the Firewall by name.
 | 
				
			||||||
func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
 | 
					func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
 | 
				
			||||||
	mc := newFirewallMetricContext("get", "")
 | 
						mc := newFirewallMetricContext("get")
 | 
				
			||||||
	v, err := gce.service.Firewalls.Get(gce.projectID, name).Do()
 | 
						v, err := gce.service.Firewalls.Get(gce.projectID, name).Do()
 | 
				
			||||||
	return v, mc.Observe(err)
 | 
						return v, mc.Observe(err)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// CreateFirewall creates the given firewall rule.
 | 
					// CreateFirewall creates the passed firewall
 | 
				
			||||||
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
 | 
					func (gce *GCECloud) CreateFirewall(f *compute.Firewall) error {
 | 
				
			||||||
	region, err := GetGCERegion(gce.localZone)
 | 
						mc := newFirewallMetricContext("create")
 | 
				
			||||||
 | 
						op, err := gce.service.Firewalls.Insert(gce.projectID, f).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return mc.Observe(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mc := newFirewallMetricContext("create", region)
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
	// TODO: This completely breaks modularity in the cloudprovider but
 | 
					 | 
				
			||||||
	// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
 | 
					 | 
				
			||||||
	svcPorts := []v1.ServicePort{}
 | 
					 | 
				
			||||||
	// TODO: Currently the only consumer of this method is the GCE L7
 | 
					 | 
				
			||||||
	// loadbalancer controller, which never needs a protocol other than
 | 
					 | 
				
			||||||
	// TCP.  We should pipe through a mapping of port:protocol and
 | 
					 | 
				
			||||||
	// default to TCP if UDP ports are required. This means the method
 | 
					 | 
				
			||||||
	// signature will change forcing downstream clients to refactor
 | 
					 | 
				
			||||||
	// interfaces.
 | 
					 | 
				
			||||||
	for _, p := range ports {
 | 
					 | 
				
			||||||
		svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	hosts, err := gce.getInstancesByNames(hostNames)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		mc.Observe(err)
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return mc.Observe(gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DeleteFirewall deletes the given firewall rule.
 | 
					// DeleteFirewall deletes the given firewall rule.
 | 
				
			||||||
func (gce *GCECloud) DeleteFirewall(name string) error {
 | 
					func (gce *GCECloud) DeleteFirewall(name string) error {
 | 
				
			||||||
	region, err := GetGCERegion(gce.localZone)
 | 
						mc := newFirewallMetricContext("delete")
 | 
				
			||||||
 | 
						op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return mc.Observe(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
	mc := newFirewallMetricContext("delete", region)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return mc.Observe(gce.deleteFirewall(name, region))
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// UpdateFirewall applies the given firewall rule as an update to an
 | 
					// UpdateFirewall applies the given firewall as an update to an existing service.
 | 
				
			||||||
// existing firewall rule with the same name.
 | 
					func (gce *GCECloud) UpdateFirewall(f *compute.Firewall) error {
 | 
				
			||||||
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
 | 
						mc := newFirewallMetricContext("update")
 | 
				
			||||||
 | 
						op, err := gce.service.Firewalls.Update(gce.projectID, f.Name, f).Do()
 | 
				
			||||||
	region, err := GetGCERegion(gce.localZone)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return mc.Observe(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mc := newFirewallMetricContext("update", region)
 | 
						return gce.waitForGlobalOp(op, mc)
 | 
				
			||||||
	// TODO: This completely breaks modularity in the cloudprovider but
 | 
					 | 
				
			||||||
	// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
 | 
					 | 
				
			||||||
	svcPorts := []v1.ServicePort{}
 | 
					 | 
				
			||||||
	// TODO: Currently the only consumer of this method is the GCE L7
 | 
					 | 
				
			||||||
	// loadbalancer controller, which never needs a protocol other than
 | 
					 | 
				
			||||||
	// TCP.  We should pipe through a mapping of port:protocol and
 | 
					 | 
				
			||||||
	// default to TCP if UDP ports are required. This means the method
 | 
					 | 
				
			||||||
	// signature will change, forcing downstream clients to refactor
 | 
					 | 
				
			||||||
	// interfaces.
 | 
					 | 
				
			||||||
	for _, p := range ports {
 | 
					 | 
				
			||||||
		svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	hosts, err := gce.getInstancesByNames(hostNames)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		mc.Observe(err)
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return mc.Observe(gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -29,11 +29,22 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	minNodesHealthCheckVersion = "1.7.0"
 | 
						nodesHealthCheckPath   = "/healthz"
 | 
				
			||||||
	nodesHealthCheckPath       = "/healthz"
 | 
						lbNodesHealthCheckPort = ports.ProxyHealthzPort
 | 
				
			||||||
	lbNodesHealthCheckPort     = ports.ProxyHealthzPort
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						minNodesHealthCheckVersion *utilversion.Version
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func init() {
 | 
				
			||||||
 | 
						if v, err := utilversion.ParseGeneric("1.7.0"); err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							minNodesHealthCheckVersion = v
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func newHealthcheckMetricContext(request string) *metricContext {
 | 
					func newHealthcheckMetricContext(request string) *metricContext {
 | 
				
			||||||
	return &metricContext{
 | 
						return &metricContext{
 | 
				
			||||||
		start:      time.Now(),
 | 
							start:      time.Now(),
 | 
				
			||||||
@@ -212,28 +223,22 @@ func makeNodesHealthCheckName(clusterID string) string {
 | 
				
			|||||||
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
 | 
					// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
 | 
				
			||||||
// balancers (l4) for performing health checks.
 | 
					// balancers (l4) for performing health checks.
 | 
				
			||||||
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
 | 
					func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
 | 
				
			||||||
	// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
 | 
					 | 
				
			||||||
	fwName := "k8s-" + hcName + "-http-hc"
 | 
					 | 
				
			||||||
	if isNodesHealthCheck {
 | 
						if isNodesHealthCheck {
 | 
				
			||||||
		fwName = makeNodesHealthCheckName(clusterID) + "-http-hc"
 | 
							// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
 | 
				
			||||||
 | 
							return makeNodesHealthCheckName(clusterID) + "-http-hc"
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return fwName
 | 
						return "k8s-" + hcName + "-http-hc"
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
 | 
					// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
 | 
				
			||||||
// `minNodesHealthCheckVersion`.
 | 
					// `minNodesHealthCheckVersion`.
 | 
				
			||||||
func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {
 | 
					func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {
 | 
				
			||||||
	minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err)
 | 
					 | 
				
			||||||
		return false
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	version, err := utilversion.ParseGeneric(vstring)
 | 
						version, err := utilversion.ParseGeneric(vstring)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
 | 
							glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return version.AtLeast(minVersion)
 | 
						return version.AtLeast(minNodesHealthCheckVersion)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// supportsNodesHealthCheck returns false if anyone of the nodes has version
 | 
					// supportsNodesHealthCheck returns false if anyone of the nodes has version
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,12 +17,8 @@ limitations under the License.
 | 
				
			|||||||
package gce
 | 
					package gce
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"net/http"
 | 
					 | 
				
			||||||
	"strings"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/golang/glog"
 | 
					 | 
				
			||||||
	compute "google.golang.org/api/compute/v1"
 | 
						compute "google.golang.org/api/compute/v1"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -37,7 +33,6 @@ func newInstanceGroupMetricContext(request string, zone string) *metricContext {
 | 
				
			|||||||
// instances. It is the callers responsibility to add named ports.
 | 
					// instances. It is the callers responsibility to add named ports.
 | 
				
			||||||
func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
 | 
					func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
 | 
				
			||||||
	mc := newInstanceGroupMetricContext("create", zone)
 | 
						mc := newInstanceGroupMetricContext("create", zone)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	op, err := gce.service.InstanceGroups.Insert(
 | 
						op, err := gce.service.InstanceGroups.Insert(
 | 
				
			||||||
		gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
 | 
							gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -55,12 +50,10 @@ func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.Ins
 | 
				
			|||||||
// DeleteInstanceGroup deletes an instance group.
 | 
					// DeleteInstanceGroup deletes an instance group.
 | 
				
			||||||
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
 | 
					func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
 | 
				
			||||||
	mc := newInstanceGroupMetricContext("delete", zone)
 | 
						mc := newInstanceGroupMetricContext("delete", zone)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	op, err := gce.service.InstanceGroups.Delete(
 | 
						op, err := gce.service.InstanceGroups.Delete(
 | 
				
			||||||
		gce.projectID, zone, name).Do()
 | 
							gce.projectID, zone, name).Do()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		mc.Observe(err)
 | 
							return mc.Observe(err)
 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return gce.waitForZoneOp(op, zone, mc)
 | 
						return gce.waitForZoneOp(op, zone, mc)
 | 
				
			||||||
@@ -103,10 +96,8 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
 | 
				
			|||||||
		&compute.InstanceGroupsAddInstancesRequest{
 | 
							&compute.InstanceGroupsAddInstancesRequest{
 | 
				
			||||||
			Instances: instances,
 | 
								Instances: instances,
 | 
				
			||||||
		}).Do()
 | 
							}).Do()
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		mc.Observe(err)
 | 
							return mc.Observe(err)
 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return gce.waitForZoneOp(op, zone, mc)
 | 
						return gce.waitForZoneOp(op, zone, mc)
 | 
				
			||||||
@@ -130,55 +121,24 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string,
 | 
				
			|||||||
		&compute.InstanceGroupsRemoveInstancesRequest{
 | 
							&compute.InstanceGroupsRemoveInstancesRequest{
 | 
				
			||||||
			Instances: instances,
 | 
								Instances: instances,
 | 
				
			||||||
		}).Do()
 | 
							}).Do()
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		if isHTTPErrorCode(err, http.StatusNotFound) {
 | 
							return mc.Observe(err)
 | 
				
			||||||
			mc.Observe(nil)
 | 
					 | 
				
			||||||
			return nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		mc.Observe(err)
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return gce.waitForZoneOp(op, zone, mc)
 | 
						return gce.waitForZoneOp(op, zone, mc)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AddPortToInstanceGroup adds a port to the given instance group.
 | 
					// SetNamedPortsOfInstanceGroup sets the list of named ports on a given instance group
 | 
				
			||||||
func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int64) (*compute.NamedPort, error) {
 | 
					func (gce *GCECloud) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error {
 | 
				
			||||||
	mc := newInstanceGroupMetricContext("add_port", ig.Zone)
 | 
						mc := newInstanceGroupMetricContext("set_namedports", zone)
 | 
				
			||||||
	for _, np := range ig.NamedPorts {
 | 
					 | 
				
			||||||
		if np.Port == port {
 | 
					 | 
				
			||||||
			glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np)
 | 
					 | 
				
			||||||
			return np, nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	glog.Infof("Adding port %v to instance group %v with %d ports", port, ig.Name, len(ig.NamedPorts))
 | 
					 | 
				
			||||||
	namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
 | 
					 | 
				
			||||||
	ig.NamedPorts = append(ig.NamedPorts, &namedPort)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// setNamedPorts is a zonal endpoint, meaning we invoke it by re-creating a URL like:
 | 
					 | 
				
			||||||
	// {project}/zones/{zone}/instanceGroups/{instanceGroup}/setNamedPorts, so the "zone"
 | 
					 | 
				
			||||||
	// parameter given to SetNamedPorts must not be the entire zone URL.
 | 
					 | 
				
			||||||
	zoneURLParts := strings.Split(ig.Zone, "/")
 | 
					 | 
				
			||||||
	zone := zoneURLParts[len(zoneURLParts)-1]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	op, err := gce.service.InstanceGroups.SetNamedPorts(
 | 
						op, err := gce.service.InstanceGroups.SetNamedPorts(
 | 
				
			||||||
		gce.projectID, zone, ig.Name,
 | 
							gce.projectID, zone, igName,
 | 
				
			||||||
		&compute.InstanceGroupsSetNamedPortsRequest{
 | 
							&compute.InstanceGroupsSetNamedPortsRequest{NamedPorts: namedPorts}).Do()
 | 
				
			||||||
			NamedPorts: ig.NamedPorts}).Do()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		mc.Observe(err)
 | 
							return mc.Observe(err)
 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err = gce.waitForZoneOp(op, zone, mc); err != nil {
 | 
						return gce.waitForZoneOp(op, zone, mc)
 | 
				
			||||||
		return nil, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return &namedPort, nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetInstanceGroup returns an instance group by name.
 | 
					// GetInstanceGroup returns an instance group by name.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -443,3 +443,117 @@ func (gce *GCECloud) isCurrentInstance(instanceID string) bool {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return currentInstanceID == canonicalizeInstanceName(instanceID)
 | 
						return currentInstanceID == canonicalizeInstanceName(instanceID)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ComputeHostTags grabs all tags from all instances being added to the pool.
 | 
				
			||||||
 | 
					// * The longest tag that is a prefix of the instance name is used
 | 
				
			||||||
 | 
					// * If any instance has no matching prefix tag, return error
 | 
				
			||||||
 | 
					// Invoking this method to get host tags is risky since it depends on the format
 | 
				
			||||||
 | 
					// of the host names in the cluster. Only use it as a fallback if gce.nodeTags
 | 
				
			||||||
 | 
					// is unspecified
 | 
				
			||||||
 | 
					func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
 | 
				
			||||||
 | 
						// TODO: We could store the tags in gceInstance, so we could have already fetched it
 | 
				
			||||||
 | 
						hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup)
 | 
				
			||||||
 | 
						nodeInstancePrefix := gce.nodeInstancePrefix
 | 
				
			||||||
 | 
						for _, host := range hosts {
 | 
				
			||||||
 | 
							if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) {
 | 
				
			||||||
 | 
								glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix)
 | 
				
			||||||
 | 
								nodeInstancePrefix = ""
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							z, ok := hostNamesByZone[host.Zone]
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								z = make(map[string]bool)
 | 
				
			||||||
 | 
								hostNamesByZone[host.Zone] = z
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							z[host.Name] = true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						tags := sets.NewString()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for zone, hostNames := range hostNamesByZone {
 | 
				
			||||||
 | 
							pageToken := ""
 | 
				
			||||||
 | 
							page := 0
 | 
				
			||||||
 | 
							for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
 | 
				
			||||||
 | 
								listCall := gce.service.Instances.List(gce.projectID, zone)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if nodeInstancePrefix != "" {
 | 
				
			||||||
 | 
									// Add the filter for hosts
 | 
				
			||||||
 | 
									listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Add the fields we want
 | 
				
			||||||
 | 
								// TODO(zmerlynn): Internal bug 29524655
 | 
				
			||||||
 | 
								// listCall = listCall.Fields("items(name,tags)")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								if pageToken != "" {
 | 
				
			||||||
 | 
									listCall = listCall.PageToken(pageToken)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								res, err := listCall.Do()
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return nil, err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								pageToken = res.NextPageToken
 | 
				
			||||||
 | 
								for _, instance := range res.Items {
 | 
				
			||||||
 | 
									if !hostNames[instance.Name] {
 | 
				
			||||||
 | 
										continue
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									longest_tag := ""
 | 
				
			||||||
 | 
									for _, tag := range instance.Tags.Items {
 | 
				
			||||||
 | 
										if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
 | 
				
			||||||
 | 
											longest_tag = tag
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									if len(longest_tag) > 0 {
 | 
				
			||||||
 | 
										tags.Insert(longest_tag)
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if page >= maxPages {
 | 
				
			||||||
 | 
								glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(tags) == 0 {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("No instances found")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return tags.List(), nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetNodeTags will first try returning the list of tags specified in GCE cloud Configuration.
 | 
				
			||||||
 | 
					// If they weren't provided, it'll compute the host tags with the given hostnames. If the list
 | 
				
			||||||
 | 
					// of hostnames has not changed, a cached set of nodetags are returned.
 | 
				
			||||||
 | 
					func (gce *GCECloud) GetNodeTags(nodeNames []string) ([]string, error) {
 | 
				
			||||||
 | 
						// If nodeTags were specified through configuration, use them
 | 
				
			||||||
 | 
						if len(gce.nodeTags) > 0 {
 | 
				
			||||||
 | 
							return gce.nodeTags, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						gce.computeNodeTagLock.Lock()
 | 
				
			||||||
 | 
						defer gce.computeNodeTagLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Early return if hosts have not changed
 | 
				
			||||||
 | 
						hosts := sets.NewString(nodeNames...)
 | 
				
			||||||
 | 
						if hosts.Equal(gce.lastKnownNodeNames) {
 | 
				
			||||||
 | 
							return gce.lastComputedNodeTags, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Get GCE instance data by hostname
 | 
				
			||||||
 | 
						instances, err := gce.getInstancesByNames(nodeNames)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Determine list of host tags
 | 
				
			||||||
 | 
						tags, err := gce.computeHostTags(instances)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Save the list of tags
 | 
				
			||||||
 | 
						gce.lastKnownNodeNames = hosts
 | 
				
			||||||
 | 
						gce.lastComputedNodeTags = tags
 | 
				
			||||||
 | 
						return tags, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1010,7 +1010,7 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
 | 
					func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
 | 
				
			||||||
	mc := newFirewallMetricContext("create", region)
 | 
						mc := newFirewallMetricContext("create")
 | 
				
			||||||
	firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
 | 
						firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return mc.Observe(err)
 | 
							return mc.Observe(err)
 | 
				
			||||||
@@ -1029,7 +1029,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
 | 
					func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
 | 
				
			||||||
	mc := newFirewallMetricContext("update", region)
 | 
						mc := newFirewallMetricContext("update")
 | 
				
			||||||
	firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
 | 
						firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return mc.Observe(err)
 | 
							return mc.Observe(err)
 | 
				
			||||||
@@ -1083,84 +1083,6 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets
 | 
				
			|||||||
	return firewall, nil
 | 
						return firewall, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ComputeHostTags grabs all tags from all instances being added to the pool.
 | 
					 | 
				
			||||||
// * The longest tag that is a prefix of the instance name is used
 | 
					 | 
				
			||||||
// * If any instance has no matching prefix tag, return error
 | 
					 | 
				
			||||||
// Invoking this method to get host tags is risky since it depends on the format
 | 
					 | 
				
			||||||
// of the host names in the cluster. Only use it as a fallback if gce.nodeTags
 | 
					 | 
				
			||||||
// is unspecified
 | 
					 | 
				
			||||||
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
 | 
					 | 
				
			||||||
	// TODO: We could store the tags in gceInstance, so we could have already fetched it
 | 
					 | 
				
			||||||
	hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup)
 | 
					 | 
				
			||||||
	nodeInstancePrefix := gce.nodeInstancePrefix
 | 
					 | 
				
			||||||
	for _, host := range hosts {
 | 
					 | 
				
			||||||
		if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) {
 | 
					 | 
				
			||||||
			glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix)
 | 
					 | 
				
			||||||
			nodeInstancePrefix = ""
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		z, ok := hostNamesByZone[host.Zone]
 | 
					 | 
				
			||||||
		if !ok {
 | 
					 | 
				
			||||||
			z = make(map[string]bool)
 | 
					 | 
				
			||||||
			hostNamesByZone[host.Zone] = z
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		z[host.Name] = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	tags := sets.NewString()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	for zone, hostNames := range hostNamesByZone {
 | 
					 | 
				
			||||||
		pageToken := ""
 | 
					 | 
				
			||||||
		page := 0
 | 
					 | 
				
			||||||
		for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
 | 
					 | 
				
			||||||
			listCall := gce.service.Instances.List(gce.projectID, zone)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if nodeInstancePrefix != "" {
 | 
					 | 
				
			||||||
				// Add the filter for hosts
 | 
					 | 
				
			||||||
				listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*")
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// Add the fields we want
 | 
					 | 
				
			||||||
			// TODO(zmerlynn): Internal bug 29524655
 | 
					 | 
				
			||||||
			// listCall = listCall.Fields("items(name,tags)")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if pageToken != "" {
 | 
					 | 
				
			||||||
				listCall = listCall.PageToken(pageToken)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			res, err := listCall.Do()
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				return nil, err
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			pageToken = res.NextPageToken
 | 
					 | 
				
			||||||
			for _, instance := range res.Items {
 | 
					 | 
				
			||||||
				if !hostNames[instance.Name] {
 | 
					 | 
				
			||||||
					continue
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				longest_tag := ""
 | 
					 | 
				
			||||||
				for _, tag := range instance.Tags.Items {
 | 
					 | 
				
			||||||
					if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
 | 
					 | 
				
			||||||
						longest_tag = tag
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				if len(longest_tag) > 0 {
 | 
					 | 
				
			||||||
					tags.Insert(longest_tag)
 | 
					 | 
				
			||||||
				} else {
 | 
					 | 
				
			||||||
					return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if page >= maxPages {
 | 
					 | 
				
			||||||
			glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(tags) == 0 {
 | 
					 | 
				
			||||||
		return nil, fmt.Errorf("No instances found")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return tags.List(), nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
 | 
					func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
 | 
				
			||||||
	pageToken := ""
 | 
						pageToken := ""
 | 
				
			||||||
	page := 0
 | 
						page := 0
 | 
				
			||||||
@@ -1232,7 +1154,7 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (gce *GCECloud) deleteFirewall(name, region string) error {
 | 
					func (gce *GCECloud) deleteFirewall(name, region string) error {
 | 
				
			||||||
	mc := newFirewallMetricContext("delete", region)
 | 
						mc := newFirewallMetricContext("delete")
 | 
				
			||||||
	op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
 | 
						op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
 | 
						if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -149,16 +149,6 @@ func TestScrubDNS(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCreateFirewallFails(t *testing.T) {
 | 
					 | 
				
			||||||
	name := "loadbalancer"
 | 
					 | 
				
			||||||
	region := "us-central1"
 | 
					 | 
				
			||||||
	desc := "description"
 | 
					 | 
				
			||||||
	gce := &GCECloud{}
 | 
					 | 
				
			||||||
	if err := gce.createFirewall(name, region, desc, nil, nil, nil); err == nil {
 | 
					 | 
				
			||||||
		t.Errorf("error expected when creating firewall without any tags found")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func TestSplitProviderID(t *testing.T) {
 | 
					func TestSplitProviderID(t *testing.T) {
 | 
				
			||||||
	providers := []struct {
 | 
						providers := []struct {
 | 
				
			||||||
		providerID string
 | 
							providerID string
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -478,7 +478,7 @@ func (cont *GCEIngressController) deleteURLMap(del bool) (msg string) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) {
 | 
					func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) {
 | 
				
			||||||
	gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
 | 
						gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
 | 
				
			||||||
	beList, err := gceCloud.ListBackendServices()
 | 
						beList, err := gceCloud.ListGlobalBackendServices()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		if cont.isHTTPErrorCode(err, http.StatusNotFound) {
 | 
							if cont.isHTTPErrorCode(err, http.StatusNotFound) {
 | 
				
			||||||
			return msg
 | 
								return msg
 | 
				
			||||||
@@ -495,7 +495,7 @@ func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		if del {
 | 
							if del {
 | 
				
			||||||
			Logf("Deleting backed-service: %s", be.Name)
 | 
								Logf("Deleting backed-service: %s", be.Name)
 | 
				
			||||||
			if err := gceCloud.DeleteBackendService(be.Name); err != nil &&
 | 
								if err := gceCloud.DeleteGlobalBackendService(be.Name); err != nil &&
 | 
				
			||||||
				!cont.isHTTPErrorCode(err, http.StatusNotFound) {
 | 
									!cont.isHTTPErrorCode(err, http.StatusNotFound) {
 | 
				
			||||||
				msg += fmt.Sprintf("Failed to delete backend service %v: %v\n", be.Name, err)
 | 
									msg += fmt.Sprintf("Failed to delete backend service %v: %v\n", be.Name, err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user