make minion registry not intolerably slow
This commit is contained in:
		| @@ -125,6 +125,7 @@ type Master struct { | ||||
| 	admissionControl      admission.Interface | ||||
| 	masterCount           int | ||||
| 	v1beta3               bool | ||||
| 	nodeIPCache           IPGetter | ||||
|  | ||||
| 	readOnlyServer  string | ||||
| 	readWriteServer string | ||||
| @@ -256,6 +257,7 @@ func New(c *Config) *Master { | ||||
| 		authorizer:            c.Authorizer, | ||||
| 		admissionControl:      c.AdmissionControl, | ||||
| 		v1beta3:               c.EnableV1Beta3, | ||||
| 		nodeIPCache:           NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second), | ||||
|  | ||||
| 		masterCount:     c.MasterCount, | ||||
| 		readOnlyServer:  net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), | ||||
| @@ -319,8 +321,9 @@ func logStackOnRecover(panicReason interface{}, httpWriter http.ResponseWriter) | ||||
|  | ||||
| func makeMinionRegistry(c *Config) minion.Registry { | ||||
| 	var minionRegistry minion.Registry = etcd.NewRegistry(c.EtcdHelper, nil) | ||||
| 	// TODO: plumb in nodeIPCache here | ||||
| 	if c.HealthCheckMinions { | ||||
| 		minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient) | ||||
| 		minionRegistry = minion.NewHealthyRegistry(minionRegistry, c.KubeletClient, util.RealClock{}, 20*time.Second) | ||||
| 	} | ||||
| 	return minionRegistry | ||||
| } | ||||
| @@ -331,9 +334,8 @@ func (m *Master) init(c *Config) { | ||||
| 	var authenticator = c.Authenticator | ||||
|  | ||||
| 	nodeRESTStorage := minion.NewREST(m.minionRegistry) | ||||
| 	ipCache := NewIPCache(c.Cloud, util.RealClock{}, 30*time.Second) | ||||
| 	podCache := NewPodCache( | ||||
| 		ipCache, | ||||
| 		m.nodeIPCache, | ||||
| 		c.KubeletClient, | ||||
| 		RESTStorageToNodes(nodeRESTStorage).Nodes(), | ||||
| 		m.podRegistry, | ||||
|   | ||||
| @@ -17,23 +17,30 @@ limitations under the License. | ||||
| package minion | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/client" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/health" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" | ||||
| ) | ||||
|  | ||||
| type HealthyRegistry struct { | ||||
| 	delegate Registry | ||||
| 	client   client.KubeletHealthChecker | ||||
| 	cache    util.TimeCache | ||||
| } | ||||
|  | ||||
| func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker) Registry { | ||||
| 	return &HealthyRegistry{ | ||||
| func NewHealthyRegistry(delegate Registry, client client.KubeletHealthChecker, clock util.Clock, ttl time.Duration) Registry { | ||||
| 	h := &HealthyRegistry{ | ||||
| 		delegate: delegate, | ||||
| 		client:   client, | ||||
| 	} | ||||
| 	h.cache = util.NewTimeCache(clock, ttl, h.doCheck) | ||||
| 	return h | ||||
| } | ||||
|  | ||||
| func (r *HealthyRegistry) GetMinion(ctx api.Context, minionID string) (*api.Node, error) { | ||||
| @@ -61,9 +68,17 @@ func (r *HealthyRegistry) ListMinions(ctx api.Context) (currentMinions *api.Node | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	// In case the cache is empty, health check in parallel instead of serially. | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(len(list.Items)) | ||||
| 	for i := range list.Items { | ||||
| 		list.Items[i] = *r.checkMinion(&list.Items[i]) | ||||
| 		go func(i int) { | ||||
| 			list.Items[i] = *r.checkMinion(&list.Items[i]) | ||||
| 			wg.Done() | ||||
| 		}(i) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	return list, nil | ||||
| } | ||||
|  | ||||
| @@ -81,13 +96,7 @@ func (r *HealthyRegistry) WatchMinions(ctx api.Context, label, field labels.Sele | ||||
| } | ||||
|  | ||||
| func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node { | ||||
| 	condition := api.ConditionFull | ||||
| 	switch status, err := r.client.HealthCheck(node.Name); { | ||||
| 	case err != nil: | ||||
| 		condition = api.ConditionUnknown | ||||
| 	case status == health.Unhealthy: | ||||
| 		condition = api.ConditionNone | ||||
| 	} | ||||
| 	condition := r.cache.Get(node.Name).(api.NodeConditionStatus) | ||||
| 	// TODO: distinguish other conditions like Reachable/Live, and begin storing this | ||||
| 	// data on nodes directly via sync loops. | ||||
| 	node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ | ||||
| @@ -96,3 +105,15 @@ func (r *HealthyRegistry) checkMinion(node *api.Node) *api.Node { | ||||
| 	}) | ||||
| 	return node | ||||
| } | ||||
|  | ||||
| // This is called to fill the cache. | ||||
| func (r *HealthyRegistry) doCheck(key string) util.T { | ||||
| 	switch status, err := r.client.HealthCheck(key); { | ||||
| 	case err != nil: | ||||
| 		return api.ConditionUnknown | ||||
| 	case status == health.Unhealthy: | ||||
| 		return api.ConditionNone | ||||
| 	default: | ||||
| 		return api.ConditionFull | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -19,10 +19,12 @@ package minion | ||||
| import ( | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/health" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| ) | ||||
|  | ||||
| type alwaysYes struct{} | ||||
| @@ -34,10 +36,12 @@ func (alwaysYes) HealthCheck(host string) (health.Status, error) { | ||||
| func TestBasicDelegation(t *testing.T) { | ||||
| 	ctx := api.NewContext() | ||||
| 	mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) | ||||
| 	healthy := HealthyRegistry{ | ||||
| 		delegate: mockMinionRegistry, | ||||
| 		client:   alwaysYes{}, | ||||
| 	} | ||||
| 	healthy := NewHealthyRegistry( | ||||
| 		mockMinionRegistry, | ||||
| 		alwaysYes{}, | ||||
| 		&util.FakeClock{}, | ||||
| 		60*time.Second, | ||||
| 	) | ||||
| 	list, err := healthy.ListMinions(ctx) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| @@ -82,10 +86,12 @@ func (n *notMinion) HealthCheck(host string) (health.Status, error) { | ||||
| func TestFiltering(t *testing.T) { | ||||
| 	ctx := api.NewContext() | ||||
| 	mockMinionRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2", "m3"}, api.NodeResources{}) | ||||
| 	healthy := HealthyRegistry{ | ||||
| 		delegate: mockMinionRegistry, | ||||
| 		client:   ¬Minion{minion: "m1"}, | ||||
| 	} | ||||
| 	healthy := NewHealthyRegistry( | ||||
| 		mockMinionRegistry, | ||||
| 		¬Minion{minion: "m1"}, | ||||
| 		&util.FakeClock{}, | ||||
| 		60*time.Second, | ||||
| 	) | ||||
| 	expected := []string{"m1", "m2", "m3"} | ||||
| 	list, err := healthy.ListMinions(ctx) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -18,11 +18,13 @@ package minion | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" | ||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/util" | ||||
| ) | ||||
|  | ||||
| func TestMinionRegistryREST(t *testing.T) { | ||||
| @@ -89,12 +91,14 @@ func TestMinionRegistryREST(t *testing.T) { | ||||
|  | ||||
| func TestMinionRegistryHealthCheck(t *testing.T) { | ||||
| 	minionRegistry := registrytest.NewMinionRegistry([]string{}, api.NodeResources{}) | ||||
| 	minionHealthRegistry := HealthyRegistry{ | ||||
| 		delegate: minionRegistry, | ||||
| 		client:   ¬Minion{minion: "m1"}, | ||||
| 	} | ||||
| 	minionHealthRegistry := NewHealthyRegistry( | ||||
| 		minionRegistry, | ||||
| 		¬Minion{minion: "m1"}, | ||||
| 		&util.FakeClock{}, | ||||
| 		60*time.Second, | ||||
| 	) | ||||
|  | ||||
| 	ms := NewREST(&minionHealthRegistry) | ||||
| 	ms := NewREST(minionHealthRegistry) | ||||
| 	ctx := api.NewContext() | ||||
|  | ||||
| 	c, err := ms.Create(ctx, &api.Node{ObjectMeta: api.ObjectMeta{Name: "m1"}}) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Daniel Smith
					Daniel Smith