Merge pull request #70353 from feiskyer/metadata-cache

Improve Azure instance metadata handling
This commit is contained in:
k8s-ci-robot 2018-10-29 10:53:00 -07:00 committed by GitHub
commit a8934ff6fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 232 deletions

View File

@ -159,7 +159,7 @@ type Cloud struct {
DisksClient DisksClient DisksClient DisksClient
FileClient FileClient FileClient FileClient
resourceRequestBackoff wait.Backoff resourceRequestBackoff wait.Backoff
metadata *InstanceMetadata metadata *InstanceMetadataService
vmSet VMSet vmSet VMSet
// Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes. // Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes.
@ -328,7 +328,10 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
az.CloudProviderBackoffJitter) az.CloudProviderBackoffJitter)
} }
az.metadata = NewInstanceMetadata() az.metadata, err = NewInstanceMetadataService(metadataURL)
if err != nil {
return nil, err
}
if az.MaximumLoadBalancerRuleCount == 0 { if az.MaximumLoadBalancerRuleCount == 0 {
az.MaximumLoadBalancerRuleCount = maximumLoadBalancerRuleCount az.MaximumLoadBalancerRuleCount = maximumLoadBalancerRuleCount

View File

@ -18,11 +18,17 @@ package azure
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time"
) )
const metadataURL = "http://169.254.169.254/metadata/" const (
metadataCacheTTL = time.Minute
metadataCacheKey = "InstanceMetadata"
metadataURL = "http://169.254.169.254/metadata/instance"
)
// NetworkMetadata contains metadata about an instance's network // NetworkMetadata contains metadata about an instance's network
type NetworkMetadata struct { type NetworkMetadata struct {
@ -54,67 +60,100 @@ type Subnet struct {
Prefix string `json:"prefix"` Prefix string `json:"prefix"`
} }
// InstanceMetadata knows how to query the Azure instance metadata server.
type InstanceMetadata struct {
baseURL string
}
// ComputeMetadata represents compute information // ComputeMetadata represents compute information
type ComputeMetadata struct { type ComputeMetadata struct {
Name string `json:"name,omitempty"` SKU string `json:"sku,omitempty"`
Zone string `json:"zone,omitempty"` Name string `json:"name,omitempty"`
VMSize string `json:"vmSize,omitempty"` Zone string `json:"zone,omitempty"`
VMSize string `json:"vmSize,omitempty"`
OSType string `json:"osType,omitempty"`
Location string `json:"location,omitempty"`
FaultDomain string `json:"platformFaultDomain,omitempty"`
UpdateDomain string `json:"platformUpdateDomain,omitempty"`
ResourceGroup string `json:"resourceGroupName,omitempty"`
VMScaleSetName string `json:"vmScaleSetName,omitempty"`
} }
// NewInstanceMetadata creates an instance of the InstanceMetadata accessor object. // InstanceMetadata represents instance information.
func NewInstanceMetadata() *InstanceMetadata { type InstanceMetadata struct {
return &InstanceMetadata{ Compute *ComputeMetadata `json:"compute,omitempty"`
baseURL: metadataURL, Network *NetworkMetadata `json:"network,omitempty"`
}
// InstanceMetadataService knows how to query the Azure instance metadata server.
type InstanceMetadataService struct {
metadataURL string
imsCache *timedCache
}
// NewInstanceMetadataService creates an instance of the InstanceMetadataService accessor object.
func NewInstanceMetadataService(metadataURL string) (*InstanceMetadataService, error) {
ims := &InstanceMetadataService{
metadataURL: metadataURL,
} }
}
// makeMetadataURL makes a complete metadata URL from the given path. imsCache, err := newTimedcache(metadataCacheTTL, ims.getInstanceMetadata)
func (i *InstanceMetadata) makeMetadataURL(path string) string {
return i.baseURL + path
}
// Object queries the metadata server and populates the passed in object
func (i *InstanceMetadata) Object(path string, obj interface{}) error {
data, err := i.queryMetadataBytes(path, "json")
if err != nil { if err != nil {
return err return nil, err
} }
return json.Unmarshal(data, obj)
ims.imsCache = imsCache
return ims, nil
} }
// Text queries the metadata server and returns the corresponding text func (ims *InstanceMetadataService) getInstanceMetadata(key string) (interface{}, error) {
func (i *InstanceMetadata) Text(path string) (string, error) { req, err := http.NewRequest("GET", ims.metadataURL, nil)
data, err := i.queryMetadataBytes(path, "text")
if err != nil {
return "", err
}
return string(data), err
}
func (i *InstanceMetadata) queryMetadataBytes(path, format string) ([]byte, error) {
client := &http.Client{}
req, err := http.NewRequest("GET", i.makeMetadataURL(path), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Header.Add("Metadata", "True") req.Header.Add("Metadata", "True")
req.Header.Add("User-Agent", "golang/kubernetes-cloud-provider")
q := req.URL.Query() q := req.URL.Query()
q.Add("format", format) q.Add("format", "json")
q.Add("api-version", "2017-12-01") q.Add("api-version", "2017-12-01")
req.URL.RawQuery = q.Encode() req.URL.RawQuery = q.Encode()
client := &http.Client{}
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
return ioutil.ReadAll(resp.Body) if resp.StatusCode != 200 {
return nil, fmt.Errorf("failure of getting instance metadata with response %q", resp.Status)
}
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
obj := InstanceMetadata{}
err = json.Unmarshal(data, &obj)
if err != nil {
return nil, err
}
return &obj, nil
}
// GetMetadata gets instance metadata from cache.
func (ims *InstanceMetadataService) GetMetadata() (*InstanceMetadata, error) {
cache, err := ims.imsCache.Get(metadataCacheKey)
if err != nil {
return nil, err
}
// Cache shouldn't be nil, but added a check incase something wrong.
if cache == nil {
return nil, fmt.Errorf("failure of getting instance metadata")
}
if metadata, ok := cache.(*InstanceMetadata); ok {
return metadata, nil
}
return nil, fmt.Errorf("failure of getting instance metadata")
} }

View File

@ -18,6 +18,7 @@ package azure
import ( import (
"context" "context"
"fmt"
"os" "os"
"strings" "strings"
@ -67,12 +68,16 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
computeMetadata, err := az.getComputeMetadata() metadata, err := az.metadata.GetMetadata()
if err != nil { if err != nil {
return nil, err return nil, err
} }
isLocalInstance, err := az.isCurrentInstance(name, computeMetadata.Name) if metadata.Compute == nil || metadata.Network == nil {
return nil, fmt.Errorf("failure of getting instance metadata")
}
isLocalInstance, err := az.isCurrentInstance(name, metadata.Compute.Name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -82,30 +87,38 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N
return addressGetter(name) return addressGetter(name)
} }
ipAddress := IPAddress{} if len(metadata.Network.Interface) == 0 {
err = az.metadata.Object("instance/network/interface/0/ipv4/ipAddress/0", &ipAddress) return nil, fmt.Errorf("no interface is found for the instance")
if err != nil {
return nil, err
}
// Fall back to ARM API if the address is empty string.
// TODO: this is a workaround because IMDS is not stable enough.
// It should be removed after IMDS fixing the issue.
if strings.TrimSpace(ipAddress.PrivateIP) == "" {
return addressGetter(name)
} }
// Use ip address got from instance metadata. // Use ip address got from instance metadata.
ipAddress := metadata.Network.Interface[0]
addresses := []v1.NodeAddress{ addresses := []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: ipAddress.PrivateIP},
{Type: v1.NodeHostName, Address: string(name)}, {Type: v1.NodeHostName, Address: string(name)},
} }
if len(ipAddress.PublicIP) > 0 { for _, address := range ipAddress.IPV4.IPAddress {
addr := v1.NodeAddress{ addresses = append(addresses, v1.NodeAddress{
Type: v1.NodeExternalIP, Type: v1.NodeInternalIP,
Address: ipAddress.PublicIP, Address: address.PrivateIP,
})
if len(address.PublicIP) > 0 {
addresses = append(addresses, v1.NodeAddress{
Type: v1.NodeExternalIP,
Address: address.PublicIP,
})
}
}
for _, address := range ipAddress.IPV6.IPAddress {
addresses = append(addresses, v1.NodeAddress{
Type: v1.NodeInternalIP,
Address: address.PrivateIP,
})
if len(address.PublicIP) > 0 {
addresses = append(addresses, v1.NodeAddress{
Type: v1.NodeExternalIP,
Address: address.PublicIP,
})
} }
addresses = append(addresses, addr)
} }
return addresses, nil return addresses, nil
} }
@ -172,17 +185,6 @@ func (az *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID st
return strings.ToLower(powerStatus) == vmPowerStateStopped || strings.ToLower(powerStatus) == vmPowerStateDeallocated, nil return strings.ToLower(powerStatus) == vmPowerStateStopped || strings.ToLower(powerStatus) == vmPowerStateDeallocated, nil
} }
// getComputeMetadata gets compute information from instance metadata.
func (az *Cloud) getComputeMetadata() (*ComputeMetadata, error) {
computeInfo := ComputeMetadata{}
err := az.metadata.Object(computeMetadataURI, &computeInfo)
if err != nil {
return nil, err
}
return &computeInfo, nil
}
func (az *Cloud) isCurrentInstance(name types.NodeName, metadataVMName string) (bool, error) { func (az *Cloud) isCurrentInstance(name types.NodeName, metadataVMName string) (bool, error) {
var err error var err error
nodeName := mapNodeNameToVMName(name) nodeName := mapNodeNameToVMName(name)
@ -213,12 +215,16 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
computeMetadata, err := az.getComputeMetadata() metadata, err := az.metadata.GetMetadata()
if err != nil { if err != nil {
return "", err return "", err
} }
isLocalInstance, err := az.isCurrentInstance(name, computeMetadata.Name) if metadata.Compute == nil {
return "", fmt.Errorf("failure of getting instance metadata")
}
isLocalInstance, err := az.isCurrentInstance(name, metadata.Compute.Name)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -229,10 +235,7 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
} }
// Get resource group name. // Get resource group name.
resourceGroup, err := az.metadata.Text("instance/compute/resourceGroupName") resourceGroup := metadata.Compute.ResourceGroup
if err != nil {
return "", err
}
// Compose instanceID based on nodeName for standard instance. // Compose instanceID based on nodeName for standard instance.
if az.VMType == vmTypeStandard { if az.VMType == vmTypeStandard {
@ -240,7 +243,7 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e
} }
// Get scale set name and instanceID from vmName for vmss. // Get scale set name and instanceID from vmName for vmss.
ssName, instanceID, err := extractVmssVMName(computeMetadata.Name) ssName, instanceID, err := extractVmssVMName(metadata.Compute.Name)
if err != nil { if err != nil {
if err == ErrorNotVmssInstance { if err == ErrorNotVmssInstance {
// Compose machineID for standard Node. // Compose machineID for standard Node.
@ -289,18 +292,22 @@ func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string,
} }
if az.UseInstanceMetadata { if az.UseInstanceMetadata {
computeMetadata, err := az.getComputeMetadata() metadata, err := az.metadata.GetMetadata()
if err != nil { if err != nil {
return "", err return "", err
} }
isLocalInstance, err := az.isCurrentInstance(name, computeMetadata.Name) if metadata.Compute == nil {
return "", fmt.Errorf("failure of getting instance metadata")
}
isLocalInstance, err := az.isCurrentInstance(name, metadata.Compute.Name)
if err != nil { if err != nil {
return "", err return "", err
} }
if isLocalInstance { if isLocalInstance {
if computeMetadata.VMSize != "" { if metadata.Compute.VMSize != "" {
return computeMetadata.VMSize, nil return metadata.Compute.VMSize, nil
} }
} }
} }

View File

@ -66,7 +66,6 @@ func setTestVirtualMachines(c *Cloud, vmList map[string]string) {
func TestInstanceID(t *testing.T) { func TestInstanceID(t *testing.T) {
cloud := getTestCloud() cloud := getTestCloud()
cloud.metadata = &InstanceMetadata{}
testcases := []struct { testcases := []struct {
name string name string
@ -105,15 +104,18 @@ func TestInstanceID(t *testing.T) {
} }
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/instance/compute", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, fmt.Sprintf("{\"name\":\"%s\"}", test.metadataName)) fmt.Fprintf(w, fmt.Sprintf(`{"compute":{"name":"%s"}}`, test.metadataName))
})) }))
go func() { go func() {
http.Serve(listener, mux) http.Serve(listener, mux)
}() }()
defer listener.Close() defer listener.Close()
cloud.metadata.baseURL = "http://" + listener.Addr().String() + "/" cloud.metadata, err = NewInstanceMetadataService("http://" + listener.Addr().String() + "/")
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
vmListWithPowerState := make(map[string]string) vmListWithPowerState := make(map[string]string)
for _, vm := range test.vmList { for _, vm := range test.vmList {
vmListWithPowerState[vm] = "" vmListWithPowerState[vm] = ""

View File

@ -19,13 +19,10 @@ package azure
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"math" "math"
"net" "net"
"net/http" "net/http"
"net/http/httptest"
"reflect"
"strings" "strings"
"testing" "testing"
@ -1682,7 +1679,6 @@ func TestGetZone(t *testing.T) {
Config: Config{ Config: Config{
Location: "eastus", Location: "eastus",
}, },
metadata: &InstanceMetadata{},
} }
testcases := []struct { testcases := []struct {
name string name string
@ -1715,18 +1711,19 @@ func TestGetZone(t *testing.T) {
} }
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/v1/InstanceInfo/FD", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, test.faultDomain) fmt.Fprintf(w, fmt.Sprintf(`{"compute":{"zone":"%s", "platformFaultDomain":"%s"}}`, test.zone, test.faultDomain))
}))
mux.Handle("/instance/compute", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, fmt.Sprintf("{\"zone\":\"%s\"}", test.zone))
})) }))
go func() { go func() {
http.Serve(listener, mux) http.Serve(listener, mux)
}() }()
defer listener.Close() defer listener.Close()
cloud.metadata.baseURL = "http://" + listener.Addr().String() + "/" cloud.metadata, err = NewInstanceMetadataService("http://" + listener.Addr().String() + "/")
if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err)
}
zone, err := cloud.GetZone(context.Background()) zone, err := cloud.GetZone(context.Background())
if err != nil { if err != nil {
t.Errorf("Test [%s] unexpected error: %v", test.name, err) t.Errorf("Test [%s] unexpected error: %v", test.name, err)
@ -1740,29 +1737,6 @@ func TestGetZone(t *testing.T) {
} }
} }
func TestFetchFaultDomain(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "99")
}))
defer ts.Close()
cloud := &Cloud{}
cloud.metadata = &InstanceMetadata{
baseURL: ts.URL + "/",
}
faultDomain, err := cloud.fetchFaultDomain()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if faultDomain == nil {
t.Errorf("Unexpected nil fault domain")
}
if *faultDomain != "99" {
t.Errorf("Expected '99', saw '%s'", *faultDomain)
}
}
func TestGetNodeNameByProviderID(t *testing.T) { func TestGetNodeNameByProviderID(t *testing.T) {
az := getTestCloud() az := getTestCloud()
providers := []struct { providers := []struct {
@ -1815,73 +1789,6 @@ func TestGetNodeNameByProviderID(t *testing.T) {
} }
} }
func TestMetadataURLGeneration(t *testing.T) {
metadata := NewInstanceMetadata()
fullPath := metadata.makeMetadataURL("some/path")
if fullPath != "http://169.254.169.254/metadata/some/path" {
t.Errorf("Expected http://169.254.169.254/metadata/some/path saw %s", fullPath)
}
}
func TestMetadataParsing(t *testing.T) {
data := `
{
"interface": [
{
"ipv4": {
"ipAddress": [
{
"privateIpAddress": "10.0.1.4",
"publicIpAddress": "X.X.X.X"
}
],
"subnet": [
{
"address": "10.0.1.0",
"prefix": "24"
}
]
},
"ipv6": {
"ipAddress": [
]
},
"macAddress": "002248020E1E"
}
]
}
`
network := NetworkMetadata{}
if err := json.Unmarshal([]byte(data), &network); err != nil {
t.Errorf("Unexpected error: %v", err)
}
ip := network.Interface[0].IPV4.IPAddress[0].PrivateIP
if ip != "10.0.1.4" {
t.Errorf("Unexpected value: %s, expected 10.0.1.4", ip)
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, data)
}))
defer server.Close()
metadata := &InstanceMetadata{
baseURL: server.URL,
}
networkJSON := NetworkMetadata{}
if err := metadata.Object("/some/path", &networkJSON); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(network, networkJSON) {
t.Errorf("Unexpected inequality:\n%#v\nvs\n%#v", network, networkJSON)
}
}
func addTestSubnet(t *testing.T, az *Cloud, svc *v1.Service) { func addTestSubnet(t *testing.T, az *Cloud, svc *v1.Service) {
if svc.Annotations[ServiceAnnotationLoadBalancerInternal] != "true" { if svc.Annotations[ServiceAnnotationLoadBalancerInternal] != "true" {
t.Error("Subnet added to non-internal service") t.Error("Subnet added to non-internal service")

View File

@ -21,21 +21,12 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
) )
const (
faultDomainURI = "v1/InstanceInfo/FD"
computeMetadataURI = "instance/compute"
)
var faultMutex = &sync.Mutex{}
var faultDomain *string
// makeZone returns the zone value in format of <region>-<zone-id>. // makeZone returns the zone value in format of <region>-<zone-id>.
func (az *Cloud) makeZone(zoneID int) string { func (az *Cloud) makeZone(zoneID int) string {
return fmt.Sprintf("%s-%d", strings.ToLower(az.Location), zoneID) return fmt.Sprintf("%s-%d", strings.ToLower(az.Location), zoneID)
@ -58,47 +49,33 @@ func (az *Cloud) GetZoneID(zoneLabel string) string {
// GetZone returns the Zone containing the current availability zone and locality region that the program is running in. // GetZone returns the Zone containing the current availability zone and locality region that the program is running in.
// If the node is not running with availability zones, then it will fall back to fault domain. // If the node is not running with availability zones, then it will fall back to fault domain.
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) { func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
computeInfo := ComputeMetadata{} metadata, err := az.metadata.GetMetadata()
err := az.metadata.Object(computeMetadataURI, &computeInfo)
if err != nil { if err != nil {
return cloudprovider.Zone{}, err return cloudprovider.Zone{}, err
} }
if computeInfo.Zone == "" { if metadata.Compute == nil {
glog.V(3).Infof("Availability zone is not enabled for the node, falling back to fault domain") return cloudprovider.Zone{}, fmt.Errorf("failure of getting compute information from instance metadata")
return az.getZoneFromFaultDomain()
} }
zoneID, err := strconv.Atoi(computeInfo.Zone) zone := ""
if err != nil { if metadata.Compute.Zone != "" {
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone ID %q: %v", computeInfo.Zone, err) zoneID, err := strconv.Atoi(metadata.Compute.Zone)
if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone ID %q: %v", metadata.Compute.Zone, err)
}
zone = az.makeZone(zoneID)
} else {
glog.V(3).Infof("Availability zone is not enabled for the node, falling back to fault domain")
zone = metadata.Compute.FaultDomain
} }
return cloudprovider.Zone{ return cloudprovider.Zone{
FailureDomain: az.makeZone(zoneID), FailureDomain: zone,
Region: az.Location, Region: az.Location,
}, nil }, nil
} }
// getZoneFromFaultDomain gets fault domain for the instance.
// Fault domain is the fallback when availability zone is not enabled for the node.
func (az *Cloud) getZoneFromFaultDomain() (cloudprovider.Zone, error) {
faultMutex.Lock()
defer faultMutex.Unlock()
if faultDomain == nil {
var err error
faultDomain, err = az.fetchFaultDomain()
if err != nil {
return cloudprovider.Zone{}, err
}
}
zone := cloudprovider.Zone{
FailureDomain: *faultDomain,
Region: az.Location,
}
return zone, nil
}
// GetZoneByProviderID implements Zones.GetZoneByProviderID // GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet // This is particularly useful in external cloud providers where the kubelet
// does not initialize node data. // does not initialize node data.
@ -133,12 +110,3 @@ func (az *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName)
return az.vmSet.GetZoneByNodeName(string(nodeName)) return az.vmSet.GetZoneByNodeName(string(nodeName))
} }
func (az *Cloud) fetchFaultDomain() (*string, error) {
faultDomain, err := az.metadata.Text(faultDomainURI)
if err != nil {
return nil, err
}
return &faultDomain, nil
}