Reserve internal address for ILBs
This commit is contained in:
parent
ad6c85ca2e
commit
644aa69da7
@ -11,6 +11,7 @@ go_library(
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"gce.go",
|
||||
"gce_address_manager.go",
|
||||
"gce_addresses.go",
|
||||
"gce_addresses_fakes.go",
|
||||
"gce_alpha.go",
|
||||
@ -83,6 +84,7 @@ go_library(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"gce_address_manager_test.go",
|
||||
"gce_annotations_test.go",
|
||||
"gce_disks_test.go",
|
||||
"gce_healthchecks_test.go",
|
||||
@ -95,6 +97,7 @@ go_test(
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/kubelet/apis:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/require:go_default_library",
|
||||
"//vendor/golang.org/x/oauth2/google:go_default_library",
|
||||
"//vendor/google.golang.org/api/compute/v0.alpha:go_default_library",
|
||||
"//vendor/google.golang.org/api/compute/v0.beta:go_default_library",
|
||||
|
198
pkg/cloudprovider/providers/gce/gce_address_manager.go
Normal file
198
pkg/cloudprovider/providers/gce/gce_address_manager.go
Normal file
@ -0,0 +1,198 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package gce
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type addressManager struct {
|
||||
logPrefix string
|
||||
svc CloudAddressService
|
||||
name string
|
||||
serviceName string
|
||||
targetIP string
|
||||
addressType lbScheme
|
||||
region string
|
||||
subnetURL string
|
||||
tryRelease bool
|
||||
}
|
||||
|
||||
func newAddressManager(svc CloudAddressService, serviceName, region, subnetURL, name, targetIP string, addressType lbScheme) *addressManager {
|
||||
return &addressManager{
|
||||
svc: svc,
|
||||
logPrefix: fmt.Sprintf("AddressManager(%q)", name),
|
||||
region: region,
|
||||
serviceName: serviceName,
|
||||
name: name,
|
||||
targetIP: targetIP,
|
||||
addressType: addressType,
|
||||
tryRelease: true,
|
||||
subnetURL: subnetURL,
|
||||
}
|
||||
}
|
||||
|
||||
// HoldAddress will ensure that the IP is reserved with an address - either owned by the controller
|
||||
// or by a user. If the address is not the addressManager.name, then it's assumed to be a user's address.
|
||||
// The string returned is the reserved IP address.
|
||||
func (am *addressManager) HoldAddress() (string, error) {
|
||||
// HoldAddress starts with retrieving the address that we use for this load balancer (by name).
|
||||
// Retrieving an address by IP will indicate if the IP is reserved and if reserved by the user
|
||||
// or the controller, but won't tell us the current state of the controller's IP. The address
|
||||
// could be reserving another address; therefore, it would need to be deleted. In the normal
|
||||
// case of using a controller address, retrieving the address by name results in the fewest API
|
||||
// calls since it indicates whether a Delete is necessary before Reserve.
|
||||
glog.V(4).Infof("%v: attempting hold of IP %q Type %q", am.logPrefix, am.targetIP, am.addressType)
|
||||
// Get the address in case it was orphaned earlier
|
||||
addr, err := am.svc.GetBetaRegionAddress(am.name, am.region)
|
||||
if err != nil && !isNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if addr != nil {
|
||||
// If address exists, check if the address had the expected attributes.
|
||||
validationError := am.validateAddress(addr)
|
||||
if validationError == nil {
|
||||
glog.V(4).Infof("%v: address %q already reserves IP %q Type %q. No further action required.", am.logPrefix, addr.Name, addr.Address, addr.AddressType)
|
||||
return addr.Address, nil
|
||||
}
|
||||
|
||||
glog.V(2).Infof("%v: deleting existing address because %v", am.logPrefix, validationError)
|
||||
err := am.svc.DeleteRegionAddress(addr.Name, am.region)
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
glog.V(4).Infof("%v: address %q was not found. Ignoring.", am.logPrefix, addr.Name)
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
} else {
|
||||
glog.V(4).Infof("%v: successfully deleted previous address %q", am.logPrefix, addr.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return am.ensureAddressReservation()
|
||||
}
|
||||
|
||||
// ReleaseAddress will release the address if it's owned by the controller.
|
||||
func (am *addressManager) ReleaseAddress() error {
|
||||
if !am.tryRelease {
|
||||
glog.V(4).Infof("%v: not attempting release of address %q.", am.logPrefix, am.targetIP)
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(4).Infof("%v: releasing address %q named %q", am.logPrefix, am.targetIP, am.name)
|
||||
// Controller only ever tries to unreserve the address named with the load balancer's name.
|
||||
err := am.svc.DeleteRegionAddress(am.name, am.region)
|
||||
if err != nil {
|
||||
if isNotFound(err) {
|
||||
glog.Warningf("%v: address %q was not found. Ignoring.", am.logPrefix, am.name)
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(4).Infof("%v: successfully released IP %q named %q", am.logPrefix, am.targetIP, am.name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *addressManager) ensureAddressReservation() (string, error) {
|
||||
// Try reserving the IP with controller-owned address name
|
||||
// If am.targetIP is an empty string, a new IP will be created.
|
||||
newAddr := &computebeta.Address{
|
||||
Name: am.name,
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, am.serviceName),
|
||||
Address: am.targetIP,
|
||||
AddressType: string(am.addressType),
|
||||
Subnetwork: am.subnetURL,
|
||||
}
|
||||
|
||||
err := am.svc.ReserveBetaRegionAddress(newAddr, am.region)
|
||||
if err == nil {
|
||||
if newAddr.Address != "" {
|
||||
glog.V(4).Infof("%v: successfully reserved IP %q with name %q", am.logPrefix, newAddr.Address, newAddr.Name)
|
||||
return newAddr.Address, nil
|
||||
}
|
||||
|
||||
addr, err := am.svc.GetRegionAddress(newAddr.Name, am.region)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
glog.V(4).Infof("%v: successfully created address %q which reserved IP %q", am.logPrefix, addr.Name, addr.Address)
|
||||
return addr.Address, nil
|
||||
} else if !isHTTPErrorCode(err, http.StatusConflict) && !isHTTPErrorCode(err, http.StatusBadRequest) {
|
||||
// If the IP is already reserved:
|
||||
// by an internal address: a StatusConflict is returned
|
||||
// by an external address: a BadRequest is returned
|
||||
return "", err
|
||||
}
|
||||
|
||||
// If the target IP was empty, we cannot try to find which IP caused a conflict.
|
||||
// If the name was already used, then the next sync will attempt deletion of that address.
|
||||
if am.targetIP == "" {
|
||||
return "", fmt.Errorf("failed to reserve address %q, err: %v", am.name, err)
|
||||
}
|
||||
|
||||
// Reserving the address failed due to a conflict or bad request. The address manager just checked that no address
|
||||
// exists with the name, so it may belong to the user.
|
||||
addr, err := am.svc.GetBetaRegionAddressByIP(am.region, am.targetIP)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("could not find address with IP %q after getting conflict error while creating address: %q", am.targetIP, err)
|
||||
}
|
||||
|
||||
// Check that the address attributes are as required.
|
||||
if err := am.validateAddress(addr); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if am.isManagedAddress(addr) {
|
||||
// The address with this name is checked at the beginning of 'HoldAddress()', but for some reason
|
||||
// it was re-created by this point. May be possible that two controllers are running.
|
||||
glog.Warning("%v: address %q unexpectedly existed with IP %q.", am.logPrefix, addr.Name, am.targetIP)
|
||||
} else {
|
||||
// If the retrieved address is not named with the loadbalancer name, then the controller does not own it.
|
||||
glog.V(4).Infof("%v: address %q was already reserved with name: %q, description: %q", am.logPrefix, am.targetIP, addr.Name, addr.Description)
|
||||
am.tryRelease = false
|
||||
}
|
||||
|
||||
return addr.Address, nil
|
||||
}
|
||||
|
||||
func (am *addressManager) validateAddress(addr *computebeta.Address) error {
|
||||
if am.targetIP != "" && am.targetIP != addr.Address {
|
||||
return fmt.Errorf("address %q does not have the expected IP %q, actual: %q", addr.Name, am.targetIP, addr.Address)
|
||||
}
|
||||
if addr.AddressType != string(am.addressType) {
|
||||
return fmt.Errorf("address %q does not have the expected address type %q, actual: %q", addr.Name, am.addressType, addr.AddressType)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *addressManager) isManagedAddress(addr *computebeta.Address) bool {
|
||||
return addr.Name == am.name
|
||||
}
|
||||
|
||||
func ensureAddressDeleted(svc CloudAddressService, name, region string) error {
|
||||
return ignoreNotFound(svc.DeleteRegionAddress(name, region))
|
||||
}
|
137
pkg/cloudprovider/providers/gce/gce_address_manager_test.go
Normal file
137
pkg/cloudprovider/providers/gce/gce_address_manager_test.go
Normal file
@ -0,0 +1,137 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package gce
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
)
|
||||
|
||||
const testSvcName = "my-service"
|
||||
const testRegion = "us-central1"
|
||||
const testSubnet = "/projects/x/testRegions/us-central1/testSubnetworks/customsub"
|
||||
const testLBName = "a111111111111111"
|
||||
|
||||
// TestAddressManagerNoRequestedIP tests the typical case of passing in no requested IP
|
||||
func TestAddressManagerNoRequestedIP(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
targetIP := ""
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
testHoldAddress(t, mgr, svc, testLBName, testRegion, targetIP, string(schemeInternal))
|
||||
testReleaseAddress(t, mgr, svc, testLBName, testRegion)
|
||||
}
|
||||
|
||||
// TestAddressManagerBasic tests the typical case of reserving and unreserving an address.
|
||||
func TestAddressManagerBasic(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
targetIP := "1.1.1.1"
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
testHoldAddress(t, mgr, svc, testLBName, testRegion, targetIP, string(schemeInternal))
|
||||
testReleaseAddress(t, mgr, svc, testLBName, testRegion)
|
||||
}
|
||||
|
||||
// TestAddressManagerOrphaned tests the case where the address exists with the IP being equal
|
||||
// to the requested address (forwarding rule or loadbalancer IP).
|
||||
func TestAddressManagerOrphaned(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
targetIP := "1.1.1.1"
|
||||
|
||||
addr := &computebeta.Address{Name: testLBName, Address: targetIP, AddressType: string(schemeInternal)}
|
||||
err := svc.ReserveBetaRegionAddress(addr, testRegion)
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
testHoldAddress(t, mgr, svc, testLBName, testRegion, targetIP, string(schemeInternal))
|
||||
testReleaseAddress(t, mgr, svc, testLBName, testRegion)
|
||||
}
|
||||
|
||||
// TestAddressManagerOutdatedOrphan tests the case where an address exists but points to
|
||||
// an IP other than the forwarding rule or loadbalancer IP.
|
||||
func TestAddressManagerOutdatedOrphan(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
previousAddress := "1.1.0.0"
|
||||
targetIP := "1.1.1.1"
|
||||
|
||||
addr := &computebeta.Address{Name: testLBName, Address: previousAddress, AddressType: string(schemeExternal)}
|
||||
err := svc.ReserveBetaRegionAddress(addr, testRegion)
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
testHoldAddress(t, mgr, svc, testLBName, testRegion, targetIP, string(schemeInternal))
|
||||
testReleaseAddress(t, mgr, svc, testLBName, testRegion)
|
||||
}
|
||||
|
||||
// TestAddressManagerExternallyOwned tests the case where the address exists but isn't
|
||||
// owned by the controller.
|
||||
func TestAddressManagerExternallyOwned(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
targetIP := "1.1.1.1"
|
||||
|
||||
addr := &computebeta.Address{Name: "my-important-address", Address: targetIP, AddressType: string(schemeInternal)}
|
||||
err := svc.ReserveBetaRegionAddress(addr, testRegion)
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
ipToUse, err := mgr.HoldAddress()
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, ipToUse)
|
||||
|
||||
_, err = svc.GetRegionAddress(testLBName, testRegion)
|
||||
assert.True(t, isNotFound(err))
|
||||
|
||||
testReleaseAddress(t, mgr, svc, testLBName, testRegion)
|
||||
}
|
||||
|
||||
// TestAddressManagerExternallyOwned tests the case where the address exists but isn't
|
||||
// owned by the controller. However, this address has the wrong type.
|
||||
func TestAddressManagerBadExternallyOwned(t *testing.T) {
|
||||
svc := NewFakeCloudAddressService()
|
||||
targetIP := "1.1.1.1"
|
||||
|
||||
addr := &computebeta.Address{Name: "my-important-address", Address: targetIP, AddressType: string(schemeExternal)}
|
||||
err := svc.ReserveBetaRegionAddress(addr, testRegion)
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr := newAddressManager(svc, testSvcName, testRegion, testSubnet, testLBName, targetIP, schemeInternal)
|
||||
_, err = mgr.HoldAddress()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func testHoldAddress(t *testing.T, mgr *addressManager, svc CloudAddressService, name, region, targetIP, scheme string) {
|
||||
ipToUse, err := mgr.HoldAddress()
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, ipToUse)
|
||||
|
||||
addr, err := svc.GetBetaRegionAddress(name, region)
|
||||
require.NoError(t, err)
|
||||
if targetIP != "" {
|
||||
assert.EqualValues(t, targetIP, addr.Address)
|
||||
}
|
||||
assert.EqualValues(t, scheme, addr.AddressType)
|
||||
}
|
||||
|
||||
func testReleaseAddress(t *testing.T, mgr *addressManager, svc CloudAddressService, name, region string) {
|
||||
err := mgr.ReleaseAddress()
|
||||
require.NoError(t, err)
|
||||
_, err = svc.GetBetaRegionAddress(name, region)
|
||||
assert.True(t, isNotFound(err))
|
||||
}
|
@ -20,7 +20,9 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
@ -82,6 +84,16 @@ func (gce *GCECloud) ReserveAlphaRegionAddress(addr *computealpha.Address, regio
|
||||
return gce.waitForRegionOp(op, region, mc)
|
||||
}
|
||||
|
||||
// ReserveBetaRegionAddress creates a beta region address
|
||||
func (gce *GCECloud) ReserveBetaRegionAddress(addr *computebeta.Address, region string) error {
|
||||
mc := newAddressMetricContextWithVersion("reserve", region, computeBetaVersion)
|
||||
op, err := gce.serviceBeta.Addresses.Insert(gce.projectID, region, addr).Do()
|
||||
if err != nil {
|
||||
return mc.Observe(err)
|
||||
}
|
||||
return gce.waitForRegionOp(op, region, mc)
|
||||
}
|
||||
|
||||
// DeleteRegionAddress deletes a region address by name.
|
||||
func (gce *GCECloud) DeleteRegionAddress(name, region string) error {
|
||||
mc := newAddressMetricContext("delete", region)
|
||||
@ -106,8 +118,14 @@ func (gce *GCECloud) GetAlphaRegionAddress(name, region string) (*computealpha.A
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetRegionAddressByIP returns the regional address matching the given IP
|
||||
// address.
|
||||
// GetBetaRegionAddress returns the beta region address by name
|
||||
func (gce *GCECloud) GetBetaRegionAddress(name, region string) (*computebeta.Address, error) {
|
||||
mc := newAddressMetricContextWithVersion("get", region, computeBetaVersion)
|
||||
v, err := gce.serviceBeta.Addresses.Get(gce.projectID, region, name).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetRegionAddressByIP returns the regional address matching the given IP address.
|
||||
func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) {
|
||||
mc := newAddressMetricContext("list", region)
|
||||
addrs, err := gce.service.Addresses.List(gce.projectID, region).Filter("address eq " + ipAddress).Do()
|
||||
@ -132,3 +150,29 @@ func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Ad
|
||||
}
|
||||
return nil, makeGoogleAPINotFoundError(fmt.Sprintf("Address with IP %q was not found in region %q", ipAddress, region))
|
||||
}
|
||||
|
||||
// GetBetaRegionAddressByIP returns the beta regional address matching the given IP address.
|
||||
func (gce *GCECloud) GetBetaRegionAddressByIP(region, ipAddress string) (*computebeta.Address, error) {
|
||||
mc := newAddressMetricContext("list", region)
|
||||
addrs, err := gce.serviceBeta.Addresses.List(gce.projectID, region).Filter("address eq " + ipAddress).Do()
|
||||
// Record the metrics for the call.
|
||||
mc.Observe(err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(addrs.Items) > 1 {
|
||||
// We don't expect more than one match.
|
||||
addrsToPrint := []computebeta.Address{}
|
||||
for _, addr := range addrs.Items {
|
||||
addrsToPrint = append(addrsToPrint, *addr)
|
||||
}
|
||||
glog.Errorf("More than one addresses matching the IP %q: %+v", ipAddress, addrsToPrint)
|
||||
}
|
||||
for _, addr := range addrs.Items {
|
||||
if addr.Address == ipAddress {
|
||||
return addr, nil
|
||||
}
|
||||
}
|
||||
return nil, makeGoogleAPINotFoundError(fmt.Sprintf("Address with IP %q was not found in region %q", ipAddress, region))
|
||||
}
|
||||
|
@ -17,13 +17,14 @@ limitations under the License.
|
||||
package gce
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
"google.golang.org/api/googleapi"
|
||||
)
|
||||
|
||||
type FakeCloudAddressService struct {
|
||||
@ -64,8 +65,12 @@ func (cas *FakeCloudAddressService) ReserveAlphaRegionAddress(addr *computealpha
|
||||
cas.count++
|
||||
}
|
||||
|
||||
if addr.AddressType == "" {
|
||||
addr.AddressType = string(schemeExternal)
|
||||
}
|
||||
|
||||
if cas.reservedAddrs[addr.Address] {
|
||||
return &googleapi.Error{Code: http.StatusConflict}
|
||||
return makeGoogleAPIError(http.StatusConflict, "IP in use")
|
||||
}
|
||||
|
||||
if _, exists := cas.addrsByRegionAndName[region]; !exists {
|
||||
@ -73,7 +78,7 @@ func (cas *FakeCloudAddressService) ReserveAlphaRegionAddress(addr *computealpha
|
||||
}
|
||||
|
||||
if _, exists := cas.addrsByRegionAndName[region][addr.Name]; exists {
|
||||
return &googleapi.Error{Code: http.StatusConflict}
|
||||
return makeGoogleAPIError(http.StatusConflict, "name in use")
|
||||
}
|
||||
|
||||
cas.addrsByRegionAndName[region][addr.Name] = addr
|
||||
@ -81,6 +86,11 @@ func (cas *FakeCloudAddressService) ReserveAlphaRegionAddress(addr *computealpha
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) ReserveBetaRegionAddress(addr *computebeta.Address, region string) error {
|
||||
alphaAddr := convertToAlphaAddress(addr)
|
||||
return cas.ReserveAlphaRegionAddress(alphaAddr, region)
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) ReserveRegionAddress(addr *compute.Address, region string) error {
|
||||
alphaAddr := convertToAlphaAddress(addr)
|
||||
return cas.ReserveAlphaRegionAddress(alphaAddr, region)
|
||||
@ -98,6 +108,14 @@ func (cas *FakeCloudAddressService) GetAlphaRegionAddress(name, region string) (
|
||||
}
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) GetBetaRegionAddress(name, region string) (*computebeta.Address, error) {
|
||||
addr, err := cas.GetAlphaRegionAddress(name, region)
|
||||
if addr != nil {
|
||||
return convertToBetaAddress(addr), err
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) GetRegionAddress(name, region string) (*compute.Address, error) {
|
||||
addr, err := cas.GetAlphaRegionAddress(name, region)
|
||||
if addr != nil {
|
||||
@ -111,26 +129,44 @@ func (cas *FakeCloudAddressService) DeleteRegionAddress(name, region string) err
|
||||
return makeGoogleAPINotFoundError("")
|
||||
}
|
||||
|
||||
if _, exists := cas.addrsByRegionAndName[region][name]; !exists {
|
||||
addr, exists := cas.addrsByRegionAndName[region][name]
|
||||
if !exists {
|
||||
return makeGoogleAPINotFoundError("")
|
||||
}
|
||||
delete(cas.reservedAddrs, addr.Address)
|
||||
delete(cas.addrsByRegionAndName[region], name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) {
|
||||
func (cas *FakeCloudAddressService) GetAlphaRegionAddressByIP(region, ipAddress string) (*computealpha.Address, error) {
|
||||
if _, exists := cas.addrsByRegionAndName[region]; !exists {
|
||||
return nil, makeGoogleAPINotFoundError("")
|
||||
}
|
||||
|
||||
for _, addr := range cas.addrsByRegionAndName[region] {
|
||||
if addr.Address == ipAddress {
|
||||
return convertToV1Address(addr), nil
|
||||
return addr, nil
|
||||
}
|
||||
}
|
||||
return nil, makeGoogleAPINotFoundError("")
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) GetBetaRegionAddressByIP(name, region string) (*computebeta.Address, error) {
|
||||
addr, err := cas.GetAlphaRegionAddressByIP(name, region)
|
||||
if addr != nil {
|
||||
return convertToBetaAddress(addr), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) GetRegionAddressByIP(name, region string) (*compute.Address, error) {
|
||||
addr, err := cas.GetAlphaRegionAddressByIP(name, region)
|
||||
if addr != nil {
|
||||
return convertToV1Address(addr), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func convertToV1Address(object gceObject) *compute.Address {
|
||||
enc, err := object.MarshalJSON()
|
||||
if err != nil {
|
||||
@ -154,3 +190,26 @@ func convertToAlphaAddress(object gceObject) *computealpha.Address {
|
||||
}
|
||||
return &addr
|
||||
}
|
||||
|
||||
func convertToBetaAddress(object gceObject) *computebeta.Address {
|
||||
enc, err := object.MarshalJSON()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to encode to json: %v", err))
|
||||
}
|
||||
var addr computebeta.Address
|
||||
if err := json.Unmarshal(enc, &addr); err != nil {
|
||||
panic(fmt.Sprintf("Failed to convert GCE apiObject %v to beta address: %v", object, err))
|
||||
}
|
||||
return &addr
|
||||
}
|
||||
|
||||
func (cas *FakeCloudAddressService) String() string {
|
||||
var b bytes.Buffer
|
||||
for region, regAddresses := range cas.addrsByRegionAndName {
|
||||
b.WriteString(fmt.Sprintf("%v:\n", region))
|
||||
for name, addr := range regAddresses {
|
||||
b.WriteString(fmt.Sprintf(" %v: %v\n", name, addr.Address))
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
@ -18,13 +18,14 @@ package gce
|
||||
|
||||
import (
|
||||
computealpha "google.golang.org/api/compute/v0.alpha"
|
||||
computebeta "google.golang.org/api/compute/v0.beta"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
// CloudAddressService is an interface for managing addresses
|
||||
type CloudAddressService interface {
|
||||
ReserveRegionAddress(*compute.Address, string) error
|
||||
GetRegionAddress(string, string) (*compute.Address, error)
|
||||
ReserveRegionAddress(address *compute.Address, region string) error
|
||||
GetRegionAddress(name string, region string) (*compute.Address, error)
|
||||
GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error)
|
||||
DeleteRegionAddress(name, region string) error
|
||||
// TODO: Mock Global endpoints
|
||||
@ -32,6 +33,11 @@ type CloudAddressService interface {
|
||||
// Alpha API.
|
||||
GetAlphaRegionAddress(name, region string) (*computealpha.Address, error)
|
||||
ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error
|
||||
|
||||
// Beta API
|
||||
ReserveBetaRegionAddress(address *computebeta.Address, region string) error
|
||||
GetBetaRegionAddress(name string, region string) (*computebeta.Address, error)
|
||||
GetBetaRegionAddressByIP(region, ipAddress string) (*computebeta.Address, error)
|
||||
}
|
||||
|
||||
// CloudForwardingRuleService is an interface for managing forwarding rules.
|
||||
|
@ -137,6 +137,9 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, svc *v1.Service, nod
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Assume the ensureDeleted function successfully deleted the forwarding rule.
|
||||
existingFwdRule = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,11 +79,21 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Determine IP which will be used for this LB. If no forwarding rule has been established
|
||||
// or specified in the Service spec, then requestedIP = "".
|
||||
requestedIP := determineRequestedIP(svc, existingFwdRule)
|
||||
addrMgr := newAddressManager(gce, nm.String(), gce.Region(), gce.getInternalSubnetURL(), loadBalancerName, requestedIP, schemeInternal)
|
||||
ipToUse, err := addrMgr.HoldAddress()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(2).Infof("ensureInternalLoadBalancer(%v): reserved IP %q for the forwarding rule", loadBalancerName, ipToUse)
|
||||
|
||||
// Ensure firewall rules if necessary
|
||||
if gce.OnXPN() {
|
||||
glog.V(2).Infof("ensureInternalLoadBalancer: cluster is on a cross-project network (XPN) network project %v, compute project %v - skipping firewall creation", gce.networkProjectID, gce.projectID)
|
||||
} else {
|
||||
if err = gce.ensureInternalFirewalls(loadBalancerName, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
|
||||
if err = gce.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -91,7 +101,7 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s
|
||||
expectedFwdRule := &compute.ForwardingRule{
|
||||
Name: loadBalancerName,
|
||||
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, nm.String()),
|
||||
IPAddress: svc.Spec.LoadBalancerIP,
|
||||
IPAddress: ipToUse,
|
||||
BackendService: backendServiceLink,
|
||||
Ports: ports,
|
||||
IPProtocol: string(protocol),
|
||||
@ -126,6 +136,7 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s
|
||||
if err = gce.CreateRegionForwardingRule(expectedFwdRule, gce.region); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(2).Infof("ensureInternalLoadBalancer(%v): created forwarding rule", loadBalancerName)
|
||||
}
|
||||
|
||||
// Delete the previous internal load balancer resources if necessary
|
||||
@ -133,14 +144,13 @@ func (gce *GCECloud) ensureInternalLoadBalancer(clusterName, clusterID string, s
|
||||
gce.clearPreviousInternalResources(loadBalancerName, existingBackendService, backendServiceName, hcName)
|
||||
}
|
||||
|
||||
// Get the most recent forwarding rule for the new address.
|
||||
existingFwdRule, err = gce.GetRegionForwardingRule(loadBalancerName, gce.region)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// Now that the controller knows the forwarding rule exists, we can release the address.
|
||||
if err := addrMgr.ReleaseAddress(); err != nil {
|
||||
glog.Errorf("ensureInternalLoadBalancer: failed to release address reservation, possibly causing an orphan: %v", err)
|
||||
}
|
||||
|
||||
status := &v1.LoadBalancerStatus{}
|
||||
status.Ingress = []v1.LoadBalancerIngress{{IP: existingFwdRule.IPAddress}}
|
||||
status.Ingress = []v1.LoadBalancerIngress{{IP: ipToUse}}
|
||||
return status, nil
|
||||
}
|
||||
|
||||
@ -198,6 +208,9 @@ func (gce *GCECloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID st
|
||||
gce.sharedResourceLock.Lock()
|
||||
defer gce.sharedResourceLock.Unlock()
|
||||
|
||||
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName)
|
||||
ensureAddressDeleted(gce, loadBalancerName, gce.region)
|
||||
|
||||
glog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule", loadBalancerName)
|
||||
if err := gce.DeleteRegionForwardingRule(loadBalancerName, gce.region); err != nil && !isNotFound(err) {
|
||||
return err
|
||||
@ -306,9 +319,9 @@ func (gce *GCECloud) ensureInternalFirewall(fwName, fwDesc string, sourceRanges
|
||||
return gce.UpdateFirewall(expectedFirewall)
|
||||
}
|
||||
|
||||
func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error {
|
||||
func (gce *GCECloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID string, nm types.NamespacedName, svc *v1.Service, healthCheckPort string, sharedHealthCheck bool, nodes []*v1.Node) error {
|
||||
// First firewall is for ingress traffic
|
||||
fwDesc := makeFirewallDescription(nm.String(), svc.Spec.LoadBalancerIP)
|
||||
fwDesc := makeFirewallDescription(nm.String(), ipAddress)
|
||||
ports, protocol := getPortsAndProtocol(svc.Spec.Ports)
|
||||
sourceRanges, err := v1_service.GetLoadBalancerSourceRanges(svc)
|
||||
if err != nil {
|
||||
@ -632,6 +645,20 @@ func (gce *GCECloud) getBackendServiceLink(name string) string {
|
||||
return gce.service.BasePath + strings.Join([]string{gce.projectID, "regions", gce.region, "backendServices", name}, "/")
|
||||
}
|
||||
|
||||
// getInternalSubnetURL first attempts to return the configured SubnetURL.
|
||||
// If subnetwork-name was not specified, then a best-effort generation is made.
|
||||
// Note subnet names might not be the network name for some auto networks.
|
||||
func (gce *GCECloud) getInternalSubnetURL() string {
|
||||
if gce.SubnetworkURL() != "" {
|
||||
return gce.SubnetworkURL()
|
||||
}
|
||||
|
||||
networkName := getNameFromLink(gce.NetworkURL())
|
||||
v := gceSubnetworkURL("", gce.NetworkProjectID(), gce.Region(), networkName)
|
||||
glog.Warningf("Generating subnetwork URL based off network since subnet name/URL was not configured: %q", v)
|
||||
return v
|
||||
}
|
||||
|
||||
func getNameFromLink(link string) string {
|
||||
if link == "" {
|
||||
return ""
|
||||
@ -640,3 +667,15 @@ func getNameFromLink(link string) string {
|
||||
fields := strings.Split(link, "/")
|
||||
return fields[len(fields)-1]
|
||||
}
|
||||
|
||||
func determineRequestedIP(svc *v1.Service, fwdRule *compute.ForwardingRule) string {
|
||||
if svc.Spec.LoadBalancerIP != "" {
|
||||
return svc.Spec.LoadBalancerIP
|
||||
}
|
||||
|
||||
if fwdRule != nil {
|
||||
return fwdRule.IPAddress
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
@ -153,3 +153,7 @@ func isNotFoundOrInUse(err error) bool {
|
||||
func makeGoogleAPINotFoundError(message string) error {
|
||||
return &googleapi.Error{Code: http.StatusNotFound, Message: message}
|
||||
}
|
||||
|
||||
func makeGoogleAPIError(code int, message string) error {
|
||||
return &googleapi.Error{Code: code, Message: message}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user