AWS: Maintain a cache of all instances for ELB
We maintain a cache of all instances, and we invalidate the cache whenever we see a new instance. For ELBs that should be sufficient, because our usage is limited to instance ids and security groups, which should not change. Fix #45050
This commit is contained in:
		| @@ -377,9 +377,7 @@ type Cloud struct { | ||||
| 	// Note that we cache some state in awsInstance (mountpoints), so we must preserve the instance | ||||
| 	selfAWSInstance *awsInstance | ||||
|  | ||||
| 	mutex                    sync.Mutex | ||||
| 	lastNodeNames            sets.String | ||||
| 	lastInstancesByNodeNames []*ec2.Instance | ||||
| 	instanceCache instanceCache | ||||
|  | ||||
| 	// We keep an active list of devices we have assigned but not yet | ||||
| 	// attached, to avoid a race condition where we assign a device mapping | ||||
| @@ -862,6 +860,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { | ||||
| 		attaching:        make(map[types.NodeName]map[mountDevice]awsVolumeID), | ||||
| 		deviceAllocators: make(map[types.NodeName]DeviceAllocator), | ||||
| 	} | ||||
| 	awsCloud.instanceCache.cloud = awsCloud | ||||
|  | ||||
| 	if cfg.Global.VPC != "" && cfg.Global.SubnetID != "" && (cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != "") { | ||||
| 		// When the master is running on a different AWS account, cloud provider or on-premise | ||||
| @@ -2556,14 +2555,6 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts | ||||
| 	return listener, nil | ||||
| } | ||||
|  | ||||
| func nodeNames(nodes []*v1.Node) sets.String { | ||||
| 	ret := sets.String{} | ||||
| 	for _, node := range nodes { | ||||
| 		ret.Insert(node.Name) | ||||
| 	} | ||||
| 	return ret | ||||
| } | ||||
|  | ||||
| // EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer | ||||
| func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) { | ||||
| 	annotations := apiService.Annotations | ||||
| @@ -2601,7 +2592,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n | ||||
| 		return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB") | ||||
| 	} | ||||
|  | ||||
| 	instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running") | ||||
| 	instances, err := c.findInstancesForELB(nodes) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -2955,7 +2946,7 @@ func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) | ||||
|  | ||||
| // Open security group ingress rules on the instances so that the load balancer can talk to them | ||||
| // Will also remove any security groups ingress rules for the load balancer that are _not_ needed for allInstances | ||||
| func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, allInstances []*ec2.Instance) error { | ||||
| func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancerDescription, instances map[awsInstanceID]*ec2.Instance) error { | ||||
| 	if c.cfg.Global.DisableSecurityGroupIngress { | ||||
| 		return nil | ||||
| 	} | ||||
| @@ -3010,7 +3001,7 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer | ||||
| 	instanceSecurityGroupIds := map[string]bool{} | ||||
|  | ||||
| 	// Scan instances for groups we want open | ||||
| 	for _, instance := range allInstances { | ||||
| 	for _, instance := range instances { | ||||
| 		securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| @@ -3188,7 +3179,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic | ||||
|  | ||||
| // UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer | ||||
| func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error { | ||||
| 	instances, err := c.getInstancesByNodeNamesCached(nodeNames(nodes), "running") | ||||
| 	instances, err := c.findInstancesForELB(nodes) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -3203,7 +3194,7 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node | ||||
| 		return fmt.Errorf("Load balancer not found") | ||||
| 	} | ||||
|  | ||||
| 	err = c.ensureLoadBalancerInstances(orEmpty(lb.LoadBalancerName), lb.Instances, instances) | ||||
| 	err = c.ensureLoadBalancerInstances(aws.StringValue(lb.LoadBalancerName), lb.Instances, instances) | ||||
| 	if err != nil { | ||||
| 		return nil | ||||
| 	} | ||||
| @@ -3260,37 +3251,6 @@ func (c *Cloud) getInstancesByIDs(instanceIDs []*string) (map[string]*ec2.Instan | ||||
| 	return instancesByID, nil | ||||
| } | ||||
|  | ||||
| // Fetches and caches instances in the given state, by node names; returns an error if any cannot be found. If no states | ||||
| // are given, no state filter is used and instances of all states are fetched. | ||||
| // This is implemented with a multi value filter on the node names, fetching the desired instances with a single query. | ||||
| // TODO(therc): make all the caching more rational during the 1.4 timeframe | ||||
| func (c *Cloud) getInstancesByNodeNamesCached(nodeNames sets.String, states ...string) ([]*ec2.Instance, error) { | ||||
| 	c.mutex.Lock() | ||||
| 	defer c.mutex.Unlock() | ||||
| 	if nodeNames.Equal(c.lastNodeNames) { | ||||
| 		if len(c.lastInstancesByNodeNames) > 0 { | ||||
| 			// We assume that if the list of nodes is the same, the underlying | ||||
| 			// instances have not changed. Later we might guard this with TTLs. | ||||
| 			glog.V(2).Infof("Returning cached instances for %v", nodeNames) | ||||
| 			return c.lastInstancesByNodeNames, nil | ||||
| 		} | ||||
| 	} | ||||
| 	instances, err := c.getInstancesByNodeNames(nodeNames.List(), states...) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(instances) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
|  | ||||
| 	glog.V(2).Infof("Caching instances for %v", nodeNames) | ||||
| 	c.lastNodeNames = nodeNames | ||||
| 	c.lastInstancesByNodeNames = instances | ||||
| 	return instances, nil | ||||
| } | ||||
|  | ||||
| func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([]*ec2.Instance, error) { | ||||
| 	names := aws.StringSlice(nodeNames) | ||||
| 	ec2Instances := []*ec2.Instance{} | ||||
| @@ -3328,6 +3288,7 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([ | ||||
| 	return ec2Instances, nil | ||||
| } | ||||
|  | ||||
| // TODO: Move to instanceCache | ||||
| func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { | ||||
| 	filters = c.tagging.addFilters(filters) | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| ) | ||||
|  | ||||
| const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled" | ||||
| @@ -417,10 +418,10 @@ func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDesc | ||||
| } | ||||
|  | ||||
| // Makes sure that exactly the specified hosts are registered as instances with the load balancer | ||||
| func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instances []*ec2.Instance) error { | ||||
| func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[awsInstanceID]*ec2.Instance) error { | ||||
| 	expected := sets.NewString() | ||||
| 	for _, instance := range instances { | ||||
| 		expected.Insert(orEmpty(instance.InstanceId)) | ||||
| 	for id := range instanceIDs { | ||||
| 		expected.Insert(string(id)) | ||||
| 	} | ||||
|  | ||||
| 	actual := sets.NewString() | ||||
| @@ -519,3 +520,25 @@ func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool { | ||||
|  | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| // findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB | ||||
| // We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider, | ||||
| // and we ignore instances which are not found | ||||
| func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[awsInstanceID]*ec2.Instance, error) { | ||||
| 	// Map to instance ids ignoring Nodes where we cannot find the id (but logging) | ||||
| 	instanceIDs := mapToAWSInstanceIDsTolerant(nodes) | ||||
|  | ||||
| 	cacheCriteria := cacheCriteria{ | ||||
| 		// MaxAge not required, because we only care about security groups, which should not change | ||||
| 		HasInstances: instanceIDs, // Refresh if any of the instance ids are missing | ||||
| 	} | ||||
| 	snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	instances := snapshot.FindInstances(instanceIDs) | ||||
| 	// We ignore instances that cannot be found | ||||
|  | ||||
| 	return instances, nil | ||||
| } | ||||
|   | ||||
| @@ -1012,62 +1012,6 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestFindInstancesByNodeNameCached(t *testing.T) { | ||||
| 	awsServices := NewFakeAWSServices() | ||||
|  | ||||
| 	nodeNameOne := "my-dns.internal" | ||||
| 	nodeNameTwo := "my-dns-two.internal" | ||||
|  | ||||
| 	var tag ec2.Tag | ||||
| 	tag.Key = aws.String(TagNameKubernetesClusterPrefix + TestClusterId) | ||||
| 	tag.Value = aws.String("") | ||||
| 	tags := []*ec2.Tag{&tag} | ||||
|  | ||||
| 	var runningInstance ec2.Instance | ||||
| 	runningInstance.InstanceId = aws.String("i-running") | ||||
| 	runningInstance.PrivateDnsName = aws.String(nodeNameOne) | ||||
| 	runningInstance.State = &ec2.InstanceState{Code: aws.Int64(16), Name: aws.String("running")} | ||||
| 	runningInstance.Tags = tags | ||||
|  | ||||
| 	var secondInstance ec2.Instance | ||||
|  | ||||
| 	secondInstance.InstanceId = aws.String("i-running") | ||||
| 	secondInstance.PrivateDnsName = aws.String(nodeNameTwo) | ||||
| 	secondInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("running")} | ||||
| 	secondInstance.Tags = tags | ||||
|  | ||||
| 	var terminatedInstance ec2.Instance | ||||
| 	terminatedInstance.InstanceId = aws.String("i-terminated") | ||||
| 	terminatedInstance.PrivateDnsName = aws.String(nodeNameOne) | ||||
| 	terminatedInstance.State = &ec2.InstanceState{Code: aws.Int64(48), Name: aws.String("terminated")} | ||||
| 	terminatedInstance.Tags = tags | ||||
|  | ||||
| 	instances := []*ec2.Instance{&secondInstance, &runningInstance, &terminatedInstance} | ||||
| 	awsServices.instances = append(awsServices.instances, instances...) | ||||
|  | ||||
| 	c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Error building aws cloud: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	nodeNames := sets.NewString(nodeNameOne) | ||||
| 	returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames, "running") | ||||
|  | ||||
| 	if errr != nil { | ||||
| 		t.Errorf("Failed to find instance: %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if len(returnedInstances) != 1 { | ||||
| 		t.Errorf("Expected a single isntance but found: %v", returnedInstances) | ||||
| 	} | ||||
|  | ||||
| 	if *returnedInstances[0].PrivateDnsName != nodeNameOne { | ||||
| 		t.Errorf("Expected node name %v but got %v", nodeNameOne, returnedInstances[0].PrivateDnsName) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestGetInstanceByNodeNameBatching(t *testing.T) { | ||||
| 	awsServices := NewFakeAWSServices() | ||||
| 	c, err := newAWSCloud(strings.NewReader("[global]"), awsServices) | ||||
|   | ||||
| @@ -23,6 +23,10 @@ import ( | ||||
|  | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/service/ec2" | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| // awsInstanceID represents the ID of the instance in the AWS API, e.g. i-12345678 | ||||
| @@ -80,6 +84,42 @@ func (name kubernetesInstanceID) mapToAWSInstanceID() (awsInstanceID, error) { | ||||
| 	return awsInstanceID(awsID), nil | ||||
| } | ||||
|  | ||||
| // mapToAWSInstanceID extracts the awsInstanceIDs from the Nodes, returning an error if a Node cannot be mapped | ||||
| func mapToAWSInstanceIDs(nodes []*v1.Node) ([]awsInstanceID, error) { | ||||
| 	var instanceIDs []awsInstanceID | ||||
| 	for _, node := range nodes { | ||||
| 		if node.Spec.ProviderID == "" { | ||||
| 			return nil, fmt.Errorf("node %q did not have ProviderID set", node.Name) | ||||
| 		} | ||||
| 		instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID() | ||||
| 		if err != nil { | ||||
| 			return nil, fmt.Errorf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) | ||||
| 		} | ||||
| 		instanceIDs = append(instanceIDs, instanceID) | ||||
| 	} | ||||
|  | ||||
| 	return instanceIDs, nil | ||||
| } | ||||
|  | ||||
| // mapToAWSInstanceIDsTolerant extracts the awsInstanceIDs from the Nodes, skipping Nodes that cannot be mapped | ||||
| func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []awsInstanceID { | ||||
| 	var instanceIDs []awsInstanceID | ||||
| 	for _, node := range nodes { | ||||
| 		if node.Spec.ProviderID == "" { | ||||
| 			glog.Warningf("node %q did not have ProviderID set", node.Name) | ||||
| 			continue | ||||
| 		} | ||||
| 		instanceID, err := kubernetesInstanceID(node.Spec.ProviderID).mapToAWSInstanceID() | ||||
| 		if err != nil { | ||||
| 			glog.Warningf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) | ||||
| 			continue | ||||
| 		} | ||||
| 		instanceIDs = append(instanceIDs, instanceID) | ||||
| 	} | ||||
|  | ||||
| 	return instanceIDs | ||||
| } | ||||
|  | ||||
| // Gets the full information about this instance from the EC2 API | ||||
| func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, error) { | ||||
| 	request := &ec2.DescribeInstancesInput{ | ||||
| @@ -98,3 +138,132 @@ func describeInstance(ec2Client EC2, instanceID awsInstanceID) (*ec2.Instance, e | ||||
| 	} | ||||
| 	return instances[0], nil | ||||
| } | ||||
|  | ||||
| // instanceCache manages the cache of DescribeInstances | ||||
| type instanceCache struct { | ||||
| 	// TODO: Get rid of this field, send all calls through the instanceCache | ||||
| 	cloud *Cloud | ||||
|  | ||||
| 	mutex    sync.Mutex | ||||
| 	snapshot *allInstancesSnapshot | ||||
| } | ||||
|  | ||||
| // Gets the full information about these instance from the EC2 API | ||||
| func (c *instanceCache) describeAllInstancesUncached() (*allInstancesSnapshot, error) { | ||||
| 	now := time.Now() | ||||
|  | ||||
| 	glog.V(4).Infof("EC2 DescribeInstances - fetching all instances") | ||||
|  | ||||
| 	filters := []*ec2.Filter{} | ||||
| 	instances, err := c.cloud.describeInstances(filters) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	m := make(map[awsInstanceID]*ec2.Instance) | ||||
| 	for _, i := range instances { | ||||
| 		id := awsInstanceID(aws.StringValue(i.InstanceId)) | ||||
| 		m[id] = i | ||||
| 	} | ||||
|  | ||||
| 	snapshot := &allInstancesSnapshot{now, m} | ||||
|  | ||||
| 	c.mutex.Lock() | ||||
| 	defer c.mutex.Unlock() | ||||
|  | ||||
| 	if c.snapshot != nil && snapshot.olderThan(c.snapshot) { | ||||
| 		// If this happens a lot, we could run this function in a mutex and only return one result | ||||
| 		glog.Infof("Not caching concurrent AWS DescribeInstances results") | ||||
| 	} else { | ||||
| 		c.snapshot = snapshot | ||||
| 	} | ||||
|  | ||||
| 	return snapshot, nil | ||||
| } | ||||
|  | ||||
| // cacheCriteria holds criteria that must hold to use a cached snapshot | ||||
| type cacheCriteria struct { | ||||
| 	// MaxAge indicates the maximum age of a cached snapshot we can accept. | ||||
| 	// If set to 0 (i.e. unset), cached values will not time out because of age. | ||||
| 	MaxAge time.Duration | ||||
|  | ||||
| 	// HasInstances is a list of awsInstanceIDs that must be in a cached snapshot for it to be considered valid. | ||||
| 	// If an instance is not found in the cached snapshot, the snapshot be ignored and we will re-fetch. | ||||
| 	HasInstances []awsInstanceID | ||||
| } | ||||
|  | ||||
| // describeAllInstancesCached returns all instances, using cached results if applicable | ||||
| func (c *instanceCache) describeAllInstancesCached(criteria cacheCriteria) (*allInstancesSnapshot, error) { | ||||
| 	var err error | ||||
| 	snapshot := c.getSnapshot() | ||||
| 	if snapshot != nil && !snapshot.MeetsCriteria(criteria) { | ||||
| 		snapshot = nil | ||||
| 	} | ||||
|  | ||||
| 	if snapshot == nil { | ||||
| 		snapshot, err = c.describeAllInstancesUncached() | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} else { | ||||
| 		glog.V(6).Infof("EC2 DescribeInstances - using cached results") | ||||
| 	} | ||||
|  | ||||
| 	return snapshot, nil | ||||
| } | ||||
|  | ||||
| // getSnapshot returns a snapshot if one exists | ||||
| func (c *instanceCache) getSnapshot() *allInstancesSnapshot { | ||||
| 	c.mutex.Lock() | ||||
| 	defer c.mutex.Unlock() | ||||
|  | ||||
| 	return c.snapshot | ||||
| } | ||||
|  | ||||
| // olderThan is a simple helper to encapsulate timestamp comparison | ||||
| func (s *allInstancesSnapshot) olderThan(other *allInstancesSnapshot) bool { | ||||
| 	// After() is technically broken by time changes until we have monotonic time | ||||
| 	return other.timestamp.After(s.timestamp) | ||||
| } | ||||
|  | ||||
| // MeetsCriteria returns true if the snapshot meets the criteria in cacheCriteria | ||||
| func (s *allInstancesSnapshot) MeetsCriteria(criteria cacheCriteria) bool { | ||||
| 	if criteria.MaxAge > 0 { | ||||
| 		// Sub() is technically broken by time changes until we have monotonic time | ||||
| 		now := time.Now() | ||||
| 		if now.Sub(s.timestamp) > criteria.MaxAge { | ||||
| 			glog.V(6).Infof("instanceCache snapshot cannot be used as is older than MaxAge=%s", criteria.MaxAge) | ||||
| 			return false | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if len(criteria.HasInstances) != 0 { | ||||
| 		for _, id := range criteria.HasInstances { | ||||
| 			if nil == s.instances[id] { | ||||
| 				glog.V(6).Infof("instanceCache snapshot cannot be used as does not contain instance %s", id) | ||||
| 				return false | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| // allInstancesSnapshot holds the results from querying for all instances, | ||||
| // along with the timestamp for cache-invalidation purposes | ||||
| type allInstancesSnapshot struct { | ||||
| 	timestamp time.Time | ||||
| 	instances map[awsInstanceID]*ec2.Instance | ||||
| } | ||||
|  | ||||
| // FindInstances returns the instances corresponding to the specified ids.  If an id is not found, it is ignored. | ||||
| func (s *allInstancesSnapshot) FindInstances(ids []awsInstanceID) map[awsInstanceID]*ec2.Instance { | ||||
| 	m := make(map[awsInstanceID]*ec2.Instance) | ||||
| 	for _, id := range ids { | ||||
| 		instance := s.instances[id] | ||||
| 		if instance != nil { | ||||
| 			m[id] = instance | ||||
| 		} | ||||
| 	} | ||||
| 	return m | ||||
| } | ||||
|   | ||||
| @@ -17,7 +17,12 @@ limitations under the License. | ||||
| package aws | ||||
|  | ||||
| import ( | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/service/ec2" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func TestParseInstance(t *testing.T) { | ||||
| @@ -86,4 +91,109 @@ func TestParseInstance(t *testing.T) { | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for _, test := range tests { | ||||
| 		node := &v1.Node{} | ||||
| 		node.Spec.ProviderID = string(test.Kubernetes) | ||||
|  | ||||
| 		awsInstanceIds, err := mapToAWSInstanceIDs([]*v1.Node{node}) | ||||
| 		if err != nil { | ||||
| 			if !test.ExpectError { | ||||
| 				t.Errorf("unexpected error parsing %s: %v", test.Kubernetes, err) | ||||
| 			} | ||||
| 		} else { | ||||
| 			if test.ExpectError { | ||||
| 				t.Errorf("expected error parsing %s", test.Kubernetes) | ||||
| 			} else if len(awsInstanceIds) != 1 { | ||||
| 				t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) | ||||
| 			} else if awsInstanceIds[0] != test.Aws { | ||||
| 				t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		awsInstanceIds = mapToAWSInstanceIDsTolerant([]*v1.Node{node}) | ||||
| 		if test.ExpectError { | ||||
| 			if len(awsInstanceIds) != 0 { | ||||
| 				t.Errorf("unexpected results parsing %s: %s", test.Kubernetes, awsInstanceIds) | ||||
| 			} | ||||
| 		} else { | ||||
| 			if len(awsInstanceIds) != 1 { | ||||
| 				t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) | ||||
| 			} else if awsInstanceIds[0] != test.Aws { | ||||
| 				t.Errorf("unexpected value parsing %s, got %s", test.Kubernetes, awsInstanceIds) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestSnapshotMeetsCriteria(t *testing.T) { | ||||
| 	snapshot := &allInstancesSnapshot{timestamp: time.Now().Add(-3601 * time.Second)} | ||||
|  | ||||
| 	if !snapshot.MeetsCriteria(cacheCriteria{}) { | ||||
| 		t.Errorf("Snapshot should always meet empty criteria") | ||||
| 	} | ||||
|  | ||||
| 	if snapshot.MeetsCriteria(cacheCriteria{MaxAge: time.Hour}) { | ||||
| 		t.Errorf("Snapshot did not honor MaxAge") | ||||
| 	} | ||||
|  | ||||
| 	if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) { | ||||
| 		t.Errorf("Snapshot did not honor HasInstances with missing instances") | ||||
| 	} | ||||
|  | ||||
| 	snapshot.instances = make(map[awsInstanceID]*ec2.Instance) | ||||
| 	snapshot.instances[awsInstanceID("i-12345678")] = &ec2.Instance{} | ||||
|  | ||||
| 	if !snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678")}}) { | ||||
| 		t.Errorf("Snapshot did not honor HasInstances with matching instances") | ||||
| 	} | ||||
|  | ||||
| 	if snapshot.MeetsCriteria(cacheCriteria{HasInstances: []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-00000000")}}) { | ||||
| 		t.Errorf("Snapshot did not honor HasInstances with partially matching instances") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestOlderThan(t *testing.T) { | ||||
| 	t1 := time.Now() | ||||
| 	t2 := t1.Add(time.Second) | ||||
|  | ||||
| 	s1 := &allInstancesSnapshot{timestamp: t1} | ||||
| 	s2 := &allInstancesSnapshot{timestamp: t2} | ||||
|  | ||||
| 	assert.True(t, s1.olderThan(s2), "s1 should be olderThan s2") | ||||
| 	assert.False(t, s2.olderThan(s1), "s2 not should be olderThan s1") | ||||
| 	assert.False(t, s1.olderThan(s1), "s1 not should be olderThan itself") | ||||
| } | ||||
|  | ||||
| func TestSnapshotFindInstances(t *testing.T) { | ||||
| 	snapshot := &allInstancesSnapshot{} | ||||
|  | ||||
| 	snapshot.instances = make(map[awsInstanceID]*ec2.Instance) | ||||
| 	{ | ||||
| 		id := awsInstanceID("i-12345678") | ||||
| 		snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()} | ||||
| 	} | ||||
| 	{ | ||||
| 		id := awsInstanceID("i-23456789") | ||||
| 		snapshot.instances[id] = &ec2.Instance{InstanceId: id.awsString()} | ||||
| 	} | ||||
|  | ||||
| 	instances := snapshot.FindInstances([]awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789"), awsInstanceID("i-00000000")}) | ||||
| 	if len(instances) != 2 { | ||||
| 		t.Errorf("findInstances returned %d results, expected 2", len(instances)) | ||||
| 	} | ||||
|  | ||||
| 	for _, id := range []awsInstanceID{awsInstanceID("i-12345678"), awsInstanceID("i-23456789")} { | ||||
| 		i := instances[id] | ||||
| 		if i == nil { | ||||
| 			t.Errorf("findInstances did not return %s", id) | ||||
| 			continue | ||||
| 		} | ||||
| 		if aws.StringValue(i.InstanceId) != string(id) { | ||||
| 			t.Errorf("findInstances did not return expected instanceId for %s", id) | ||||
| 		} | ||||
| 		if i != snapshot.instances[id] { | ||||
| 			t.Errorf("findInstances did not return expected instance (reference equality) for %s", id) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Justin Santa Barbara
					Justin Santa Barbara