Merge pull request #26900 from Clarifai/aws-instance-cache
Automatic merge from submit-queue AWS: cache instances during service reload to avoid rate limiting on restart Fixes #25610 by reducing redundant calls to DescribeInstances() ```release-note * The AWS cloudprovider will cache results from DescribeInstances() if the set of nodes hasn't changed ``` Also move int/stringSlicesEqual from servicecontroller.go to pkg/util/slice
This commit is contained in:
		@@ -39,16 +39,15 @@ import (
 | 
				
			|||||||
	"github.com/aws/aws-sdk-go/service/autoscaling"
 | 
						"github.com/aws/aws-sdk-go/service/autoscaling"
 | 
				
			||||||
	"github.com/aws/aws-sdk-go/service/ec2"
 | 
						"github.com/aws/aws-sdk-go/service/ec2"
 | 
				
			||||||
	"github.com/aws/aws-sdk-go/service/elb"
 | 
						"github.com/aws/aws-sdk-go/service/elb"
 | 
				
			||||||
 | 
						"github.com/golang/glog"
 | 
				
			||||||
	"gopkg.in/gcfg.v1"
 | 
						"gopkg.in/gcfg.v1"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/service"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider"
 | 
				
			||||||
	aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
 | 
						aws_credentials "k8s.io/kubernetes/pkg/credentialprovider/aws"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/types"
 | 
						"k8s.io/kubernetes/pkg/types"
 | 
				
			||||||
 | 
					 | 
				
			||||||
	"github.com/golang/glog"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/service"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/sets"
 | 
						"k8s.io/kubernetes/pkg/util/sets"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -263,6 +262,8 @@ type AWSCloud struct {
 | 
				
			|||||||
	selfAWSInstance *awsInstance
 | 
						selfAWSInstance *awsInstance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mutex                    sync.Mutex
 | 
						mutex                    sync.Mutex
 | 
				
			||||||
 | 
						lastNodeNames            sets.String
 | 
				
			||||||
 | 
						lastInstancesByNodeNames []*ec2.Instance
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var _ Volumes = &AWSCloud{}
 | 
					var _ Volumes = &AWSCloud{}
 | 
				
			||||||
@@ -2237,7 +2238,8 @@ func (s *AWSCloud) EnsureLoadBalancer(apiService *api.Service, hosts []string) (
 | 
				
			|||||||
		return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
 | 
							return nil, fmt.Errorf("LoadBalancerIP cannot be specified for AWS ELB")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	instances, err := s.getInstancesByNodeNames(hosts)
 | 
						hostSet := sets.NewString(hosts...)
 | 
				
			||||||
 | 
						instances, err := s.getInstancesByNodeNamesCached(hostSet)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -2675,7 +2677,8 @@ func (s *AWSCloud) EnsureLoadBalancerDeleted(service *api.Service) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
 | 
					// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
 | 
				
			||||||
func (s *AWSCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
 | 
					func (s *AWSCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error {
 | 
				
			||||||
	instances, err := s.getInstancesByNodeNames(hosts)
 | 
						hostSet := sets.NewString(hosts...)
 | 
				
			||||||
 | 
						instances, err := s.getInstancesByNodeNamesCached(hostSet)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -2747,10 +2750,21 @@ func (a *AWSCloud) getInstancesByIDs(instanceIDs []*string) (map[string]*ec2.Ins
 | 
				
			|||||||
	return instancesByID, nil
 | 
						return instancesByID, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Fetches instances by node names; returns an error if any cannot be found.
 | 
					// Fetches and caches instances by node names; returns an error if any cannot be found.
 | 
				
			||||||
// This is implemented with a multi value filter on the node names, fetching the desired instances with a single query.
 | 
					// This is implemented with a multi value filter on the node names, fetching the desired instances with a single query.
 | 
				
			||||||
func (a *AWSCloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance, error) {
 | 
					// TODO(therc): make all the caching more rational during the 1.4 timeframe
 | 
				
			||||||
	names := aws.StringSlice(nodeNames)
 | 
					func (a *AWSCloud) getInstancesByNodeNamesCached(nodeNames sets.String) ([]*ec2.Instance, error) {
 | 
				
			||||||
 | 
						a.mutex.Lock()
 | 
				
			||||||
 | 
						defer a.mutex.Unlock()
 | 
				
			||||||
 | 
						if nodeNames.Equal(a.lastNodeNames) {
 | 
				
			||||||
 | 
							if len(a.lastInstancesByNodeNames) > 0 {
 | 
				
			||||||
 | 
								// We assume that if the list of nodes is the same, the underlying
 | 
				
			||||||
 | 
								// instances have not changed. Later we might guard this with TTLs.
 | 
				
			||||||
 | 
								glog.V(2).Infof("Returning cached instances for %v", nodeNames)
 | 
				
			||||||
 | 
								return a.lastInstancesByNodeNames, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						names := aws.StringSlice(nodeNames.List())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	nodeNameFilter := &ec2.Filter{
 | 
						nodeNameFilter := &ec2.Filter{
 | 
				
			||||||
		Name:   aws.String("private-dns-name"),
 | 
							Name:   aws.String("private-dns-name"),
 | 
				
			||||||
@@ -2778,6 +2792,9 @@ func (a *AWSCloud) getInstancesByNodeNames(nodeNames []string) ([]*ec2.Instance,
 | 
				
			|||||||
		return nil, nil
 | 
							return nil, nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						glog.V(2).Infof("Caching instances for %v", nodeNames)
 | 
				
			||||||
 | 
						a.lastNodeNames = nodeNames
 | 
				
			||||||
 | 
						a.lastInstancesByNodeNames = instances
 | 
				
			||||||
	return instances, nil
 | 
						return instances, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1093,7 +1093,7 @@ func TestFindInstanceByNodeNameExcludesTerminatedInstances(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestFindInstancesByNodeName(t *testing.T) {
 | 
					func TestFindInstancesByNodeNameCached(t *testing.T) {
 | 
				
			||||||
	awsServices := NewFakeAWSServices()
 | 
						awsServices := NewFakeAWSServices()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	nodeNameOne := "my-dns.internal"
 | 
						nodeNameOne := "my-dns.internal"
 | 
				
			||||||
@@ -1132,8 +1132,8 @@ func TestFindInstancesByNodeName(t *testing.T) {
 | 
				
			|||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	nodeNames := []string{nodeNameOne}
 | 
						nodeNames := sets.NewString(nodeNameOne)
 | 
				
			||||||
	returnedInstances, errr := c.getInstancesByNodeNames(nodeNames)
 | 
						returnedInstances, errr := c.getInstancesByNodeNamesCached(nodeNames)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if errr != nil {
 | 
						if errr != nil {
 | 
				
			||||||
		t.Errorf("Failed to find instance: %v", err)
 | 
							t.Errorf("Failed to find instance: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user