Merge pull request #66242 from feiskyer/instance-az
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add initial availability zones support for Azure nodes **What this PR does / why we need it**: The first part of [Azure Availability Zone feature](https://github.com/kubernetes/features/issues/586). This PR adds initial availability zone (AZ) support for Azure nodes. With this PR, Azure nodes with AZ will have label `failure-domain.beta.kubernetes.io/zone=<region>-<zoneID>`, e.g. `southeastasia-1`. It also updates instance metadata api-version to 2017-12-01, which is required for AZ. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes # **Special notes for your reviewer**: VirtualMachineScaleSetVM doesn't have AZ info yet. It will be supported later after new Azure Go SDK releases. **Release note**: ```release-note Azure nodes with availability zone now will have label `failure-domain.beta.kubernetes.io/zone=<region>-<zoneID>`. ``` /kind feature /sig azure /assign @brendandburns @khenidak @andyzhangx
This commit is contained in:
commit
b68c9440da
@ -100,7 +100,7 @@ func (i *InstanceMetadata) queryMetadataBytes(path, format string) ([]byte, erro
|
|||||||
|
|
||||||
q := req.URL.Query()
|
q := req.URL.Query()
|
||||||
q.Add("format", format)
|
q.Add("format", format)
|
||||||
q.Add("api-version", "2017-04-02")
|
q.Add("api-version", "2017-12-01")
|
||||||
req.URL.RawQuery = q.Encode()
|
req.URL.RawQuery = q.Encode()
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
|
@ -408,14 +408,29 @@ func (as *availabilitySet) GetInstanceTypeByNodeName(name string) (string, error
|
|||||||
return string(machine.HardwareProfile.VMSize), nil
|
return string(machine.HardwareProfile.VMSize), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetZoneByNodeName gets zone from instance view.
|
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running
|
||||||
|
// with availability zone, then it returns fault domain.
|
||||||
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
func (as *availabilitySet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||||
vm, err := as.getVirtualMachine(types.NodeName(name))
|
vm, err := as.getVirtualMachine(types.NodeName(name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
failureDomain := strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
|
var failureDomain string
|
||||||
|
if vm.Zones != nil && len(*vm.Zones) > 0 {
|
||||||
|
// Get availability zone for the node.
|
||||||
|
zones := *vm.Zones
|
||||||
|
zoneID, err := strconv.Atoi(zones[0])
|
||||||
|
if err != nil {
|
||||||
|
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone %q: %v", zones, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
failureDomain = as.makeZone(zoneID)
|
||||||
|
} else {
|
||||||
|
// Availability zone is not used for the node, falling back to fault domain.
|
||||||
|
failureDomain = strconv.Itoa(int(*vm.VirtualMachineProperties.InstanceView.PlatformFaultDomain))
|
||||||
|
}
|
||||||
|
|
||||||
zone := cloudprovider.Zone{
|
zone := cloudprovider.Zone{
|
||||||
FailureDomain: failureDomain,
|
FailureDomain: failureDomain,
|
||||||
Region: *(vm.Location),
|
Region: *(vm.Location),
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -1667,35 +1668,82 @@ func validateEmptyConfig(t *testing.T, config string) {
|
|||||||
t.Errorf("got incorrect value for CloudProviderRateLimit")
|
t.Errorf("got incorrect value for CloudProviderRateLimit")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetZone(t *testing.T) {
|
func TestGetZone(t *testing.T) {
|
||||||
data := `{"ID":"_azdev","UD":"0","FD":"99"}`
|
cloud := &Cloud{
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
Config: Config{
|
||||||
fmt.Fprintln(w, data)
|
Location: "eastus",
|
||||||
}))
|
},
|
||||||
defer ts.Close()
|
metadata: &InstanceMetadata{},
|
||||||
|
|
||||||
cloud := &Cloud{}
|
|
||||||
cloud.Location = "eastus"
|
|
||||||
|
|
||||||
zone, err := cloud.getZoneFromURL(ts.URL)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error: %v", err)
|
|
||||||
}
|
}
|
||||||
if zone.FailureDomain != "99" {
|
testcases := []struct {
|
||||||
t.Errorf("Unexpected value: %s, expected '99'", zone.FailureDomain)
|
name string
|
||||||
|
zone string
|
||||||
|
faultDomain string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "GetZone should get real zone if only node's zone is set",
|
||||||
|
zone: "1",
|
||||||
|
expected: "eastus-1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "GetZone should get real zone if both node's zone and FD are set",
|
||||||
|
zone: "1",
|
||||||
|
faultDomain: "99",
|
||||||
|
expected: "eastus-1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "GetZone should get faultDomain if node's zone isn't set",
|
||||||
|
faultDomain: "99",
|
||||||
|
expected: "99",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if zone.Region != cloud.Location {
|
|
||||||
t.Errorf("Expected: %s, saw: %s", cloud.Location, zone.Region)
|
for _, test := range testcases {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("/v1/InstanceInfo/FD", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, test.faultDomain)
|
||||||
|
}))
|
||||||
|
mux.Handle("/instance/compute/zone", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
fmt.Fprintf(w, test.zone)
|
||||||
|
}))
|
||||||
|
go func() {
|
||||||
|
http.Serve(listener, mux)
|
||||||
|
}()
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
cloud.metadata.baseURL = "http://" + listener.Addr().String() + "/"
|
||||||
|
zone, err := cloud.GetZone(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
|
||||||
|
}
|
||||||
|
if zone.FailureDomain != test.expected {
|
||||||
|
t.Errorf("Test [%s] unexpected zone: %s, expected %q", test.name, zone.FailureDomain, test.expected)
|
||||||
|
}
|
||||||
|
if zone.Region != cloud.Location {
|
||||||
|
t.Errorf("Test [%s] unexpected region: %s, expected: %s", test.name, zone.Region, cloud.Location)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFetchFaultDomain(t *testing.T) {
|
func TestFetchFaultDomain(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprintln(w, `{"ID":"_azdev","UD":"0","FD":"99"}`)
|
fmt.Fprint(w, "99")
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
faultDomain, err := fetchFaultDomain(ts.URL)
|
cloud := &Cloud{}
|
||||||
|
cloud.metadata = &InstanceMetadata{
|
||||||
|
baseURL: ts.URL + "/",
|
||||||
|
}
|
||||||
|
|
||||||
|
faultDomain, err := cloud.fetchFaultDomain()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -1707,23 +1755,6 @@ func TestFetchFaultDomain(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeInstanceInfo(t *testing.T) {
|
|
||||||
response := `{"ID":"_azdev","UD":"0","FD":"99"}`
|
|
||||||
|
|
||||||
faultDomain, err := readFaultDomain(strings.NewReader(response))
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected error in ReadFaultDomain: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if faultDomain == nil {
|
|
||||||
t.Error("Fault domain was unexpectedly nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if *faultDomain != "99" {
|
|
||||||
t.Error("got incorrect fault domain")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetNodeNameByProviderID(t *testing.T) {
|
func TestGetNodeNameByProviderID(t *testing.T) {
|
||||||
az := getTestCloud()
|
az := getTestCloud()
|
||||||
providers := []struct {
|
providers := []struct {
|
||||||
|
@ -211,7 +211,11 @@ func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
|
|||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetZoneByNodeName gets cloudprovider.Zone by node name.
|
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running
|
||||||
|
// with availability zone, then it returns fault domain.
|
||||||
|
// TODO(feiskyer): Add availability zone support of VirtualMachineScaleSetVM
|
||||||
|
// after it is released in Azure Go SDK.
|
||||||
|
// Refer https://github.com/Azure/azure-sdk-for-go/pull/2224.
|
||||||
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
|
||||||
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
|
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -18,39 +18,61 @@ package azure
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"fmt"
|
||||||
"io"
|
"strconv"
|
||||||
"io/ioutil"
|
"strings"
|
||||||
"net/http"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
)
|
)
|
||||||
|
|
||||||
const instanceInfoURL = "http://169.254.169.254/metadata/v1/InstanceInfo"
|
const (
|
||||||
|
faultDomainURI = "v1/InstanceInfo/FD"
|
||||||
|
zoneMetadataURI = "instance/compute/zone"
|
||||||
|
)
|
||||||
|
|
||||||
var faultMutex = &sync.Mutex{}
|
var faultMutex = &sync.Mutex{}
|
||||||
var faultDomain *string
|
var faultDomain *string
|
||||||
|
|
||||||
type instanceInfo struct {
|
// makeZone returns the zone value in format of <region>-<zone-id>.
|
||||||
ID string `json:"ID"`
|
func (az *Cloud) makeZone(zoneID int) string {
|
||||||
UpdateDomain string `json:"UD"`
|
return fmt.Sprintf("%s-%d", strings.ToLower(az.Location), zoneID)
|
||||||
FaultDomain string `json:"FD"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetZone returns the Zone containing the current failure zone and locality region that the program is running in
|
// GetZone returns the Zone containing the current availability zone and locality region that the program is running in.
|
||||||
|
// If the node is not running with availability zones, then it will fall back to fault domain.
|
||||||
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
||||||
return az.getZoneFromURL(instanceInfoURL)
|
zone, err := az.metadata.Text(zoneMetadataURI)
|
||||||
|
if err != nil {
|
||||||
|
return cloudprovider.Zone{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if zone == "" {
|
||||||
|
glog.V(3).Infof("Availability zone is not enabled for the node, falling back to fault domain")
|
||||||
|
return az.getZoneFromFaultDomain()
|
||||||
|
}
|
||||||
|
|
||||||
|
zoneID, err := strconv.Atoi(zone)
|
||||||
|
if err != nil {
|
||||||
|
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone ID %q: %v", zone, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cloudprovider.Zone{
|
||||||
|
FailureDomain: az.makeZone(zoneID),
|
||||||
|
Region: az.Location,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is injectable for testing.
|
// getZoneFromFaultDomain gets fault domain for the instance.
|
||||||
func (az *Cloud) getZoneFromURL(url string) (cloudprovider.Zone, error) {
|
// Fault domain is the fallback when availability zone is not enabled for the node.
|
||||||
|
func (az *Cloud) getZoneFromFaultDomain() (cloudprovider.Zone, error) {
|
||||||
faultMutex.Lock()
|
faultMutex.Lock()
|
||||||
defer faultMutex.Unlock()
|
defer faultMutex.Unlock()
|
||||||
if faultDomain == nil {
|
if faultDomain == nil {
|
||||||
var err error
|
var err error
|
||||||
faultDomain, err = fetchFaultDomain(url)
|
faultDomain, err = az.fetchFaultDomain()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
}
|
}
|
||||||
@ -81,24 +103,11 @@ func (az *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName)
|
|||||||
return az.vmSet.GetZoneByNodeName(string(nodeName))
|
return az.vmSet.GetZoneByNodeName(string(nodeName))
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchFaultDomain(url string) (*string, error) {
|
func (az *Cloud) fetchFaultDomain() (*string, error) {
|
||||||
resp, err := http.Get(url)
|
faultDomain, err := az.metadata.Text(faultDomainURI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
return readFaultDomain(resp.Body)
|
|
||||||
}
|
|
||||||
|
|
||||||
func readFaultDomain(reader io.Reader) (*string, error) {
|
return &faultDomain, nil
|
||||||
var instanceInfo instanceInfo
|
|
||||||
body, err := ioutil.ReadAll(reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = json.Unmarshal(body, &instanceInfo)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &instanceInfo.FaultDomain, nil
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user