Merge pull request #13081 from mwielgus/hpa_heapster_query
Scale subresource and Heapster querying in HorizontalPodAutoscaler
This commit is contained in:
		| @@ -272,7 +272,7 @@ func (s *CMServer) Run(_ []string) error { | |||||||
| 	// if err != nil { | 	// if err != nil { | ||||||
| 	// 	glog.Fatalf("Invalid API configuration: %v", err) | 	// 	glog.Fatalf("Invalid API configuration: %v", err) | ||||||
| 	// } | 	// } | ||||||
| 	// horizontalPodAutoscalerController := autoscalercontroller.New(expClient) | 	// horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient) | ||||||
| 	// horizontalPodAutoscalerController.Run(s.NodeSyncPeriod) | 	// horizontalPodAutoscalerController.Run(s.NodeSyncPeriod) | ||||||
|  |  | ||||||
| 	select {} | 	select {} | ||||||
|   | |||||||
| @@ -33,6 +33,7 @@ import ( | |||||||
| type ExperimentalInterface interface { | type ExperimentalInterface interface { | ||||||
| 	VersionInterface | 	VersionInterface | ||||||
| 	HorizontalPodAutoscalersNamespacer | 	HorizontalPodAutoscalersNamespacer | ||||||
|  | 	ScaleNamespacer | ||||||
| } | } | ||||||
|  |  | ||||||
| // ExperimentalClient is used to interact with experimental Kubernetes features. | // ExperimentalClient is used to interact with experimental Kubernetes features. | ||||||
| @@ -75,6 +76,10 @@ func (c *ExperimentalClient) HorizontalPodAutoscalers(namespace string) Horizont | |||||||
| 	return newHorizontalPodAutoscalers(c, namespace) | 	return newHorizontalPodAutoscalers(c, namespace) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (c *ExperimentalClient) Scales(namespace string) ScaleInterface { | ||||||
|  | 	return newScales(c, namespace) | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewExperimental creates a new ExperimentalClient for the given config. This client | // NewExperimental creates a new ExperimentalClient for the given config. This client | ||||||
| // provides access to experimental Kubernetes features. | // provides access to experimental Kubernetes features. | ||||||
| // Experimental features are not supported and may be changed or removed in | // Experimental features are not supported and may be changed or removed in | ||||||
|   | |||||||
							
								
								
									
										59
									
								
								pkg/client/unversioned/scale.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										59
									
								
								pkg/client/unversioned/scale.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,59 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2015 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package unversioned | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/pkg/expapi" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type ScaleNamespacer interface { | ||||||
|  | 	Scales(namespace string) ScaleInterface | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ScaleInterface has methods to work with Scale (sub)resources. | ||||||
|  | type ScaleInterface interface { | ||||||
|  | 	Get(string, string) (*expapi.Scale, error) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // horizontalPodAutoscalers implements HorizontalPodAutoscalersNamespacer interface | ||||||
|  | type scales struct { | ||||||
|  | 	client *ExperimentalClient | ||||||
|  | 	ns     string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // newHorizontalPodAutoscalers returns a horizontalPodAutoscalers | ||||||
|  | func newScales(c *ExperimentalClient, namespace string) *scales { | ||||||
|  | 	return &scales{ | ||||||
|  | 		client: c, | ||||||
|  | 		ns:     namespace, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Get takes the reference to scale subresource and returns the subresource or error, if one occurs. | ||||||
|  | func (c *scales) Get(kind string, name string) (result *expapi.Scale, err error) { | ||||||
|  | 	result = &expapi.Scale{} | ||||||
|  | 	if strings.ToLower(kind) == "replicationcontroller" { | ||||||
|  | 		kind = "replicationControllers" | ||||||
|  | 		err = c.client.Get().Namespace(c.ns).Resource(kind).Name(name).SubResource("scale").Do().Into(result) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	err = fmt.Errorf("Kind not supported: %s", kind) | ||||||
|  | 	return | ||||||
|  | } | ||||||
| @@ -18,23 +18,37 @@ package autoscalercontroller | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/golang/glog" | 	"github.com/golang/glog" | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
| 	"k8s.io/kubernetes/pkg/client/unversioned" | 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||||
| 	"k8s.io/kubernetes/pkg/fields" | 	"k8s.io/kubernetes/pkg/fields" | ||||||
| 	"k8s.io/kubernetes/pkg/labels" | 	"k8s.io/kubernetes/pkg/labels" | ||||||
| 	"k8s.io/kubernetes/pkg/util" | 	"k8s.io/kubernetes/pkg/util" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	heapsterNamespace = "kube-system" | ||||||
|  | 	heapsterService   = "monitoring-heapster" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var resourceToMetric = map[api.ResourceName]string{ | ||||||
|  | 	api.ResourceCPU: "cpu-usage", | ||||||
|  | } | ||||||
|  | var heapsterQueryStart, _ = time.ParseDuration("-20m") | ||||||
|  |  | ||||||
| type HorizontalPodAutoscalerController struct { | type HorizontalPodAutoscalerController struct { | ||||||
| 	kubeClient unversioned.ExperimentalInterface | 	client    *client.Client | ||||||
|  | 	expClient client.ExperimentalInterface | ||||||
| } | } | ||||||
|  |  | ||||||
| func New(kubeClient unversioned.ExperimentalInterface) *HorizontalPodAutoscalerController { | func New(client *client.Client, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController { | ||||||
|  | 	//TODO: switch to client.Interface | ||||||
| 	return &HorizontalPodAutoscalerController{ | 	return &HorizontalPodAutoscalerController{ | ||||||
| 		kubeClient: kubeClient, | 		client:    client, | ||||||
|  | 		expClient: expClient, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -48,11 +62,58 @@ func (a *HorizontalPodAutoscalerController) Run(syncPeriod time.Duration) { | |||||||
|  |  | ||||||
| func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { | func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { | ||||||
| 	ns := api.NamespaceAll | 	ns := api.NamespaceAll | ||||||
| 	list, err := a.kubeClient.HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything()) | 	list, err := a.expClient.HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("error listing nodes: %v", err) | 		return fmt.Errorf("error listing nodes: %v", err) | ||||||
| 	} | 	} | ||||||
| 	// TODO: implement! | 	for _, hpa := range list.Items { | ||||||
| 	glog.Info("autoscalers: %v", list) | 		reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name) | ||||||
|  |  | ||||||
|  | 		scale, err := a.expClient.Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name) | ||||||
|  | 		if err != nil { | ||||||
|  | 			glog.Warningf("Failed to query scale subresource for %s: %v", reference, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		podList, err := a.client.Pods(hpa.Spec.ScaleRef.Namespace). | ||||||
|  | 			List(labels.SelectorFromSet(labels.Set(scale.Status.Selector)), fields.Everything()) | ||||||
|  |  | ||||||
|  | 		if err != nil { | ||||||
|  | 			glog.Warningf("Failed to get pod list for %s: %v", reference, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		podNames := []string{} | ||||||
|  | 		for _, pod := range podList.Items { | ||||||
|  | 			podNames = append(podNames, pod.Name) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		metric, metricDefined := resourceToMetric[hpa.Spec.Target.Resource] | ||||||
|  | 		if !metricDefined { | ||||||
|  | 			glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		startTime := time.Now().Add(heapsterQueryStart) | ||||||
|  | 		metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", | ||||||
|  | 			hpa.Spec.ScaleRef.Namespace, | ||||||
|  | 			strings.Join(podNames, ","), | ||||||
|  | 			metric) | ||||||
|  |  | ||||||
|  | 		resultRaw, err := a.client. | ||||||
|  | 			Get(). | ||||||
|  | 			Prefix("proxy"). | ||||||
|  | 			Resource("services"). | ||||||
|  | 			Namespace(heapsterNamespace). | ||||||
|  | 			Name(heapsterService). | ||||||
|  | 			Suffix(metricPath). | ||||||
|  | 			Param("start", startTime.Format(time.RFC3339)). | ||||||
|  | 			Do(). | ||||||
|  | 			Raw() | ||||||
|  |  | ||||||
|  | 		if err != nil { | ||||||
|  | 			glog.Warningf("Failed to get pods metrics for %s: %v", reference, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		glog.Infof("Metrics available for %s: %s", reference, string(resultRaw)) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -0,0 +1,154 @@ | |||||||
|  | /* | ||||||
|  | Copyright 2014 The Kubernetes Authors All rights reserved. | ||||||
|  |  | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  |  | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | package autoscalercontroller | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"fmt" | ||||||
|  | 	"net/http" | ||||||
|  | 	"net/http/httptest" | ||||||
|  | 	"testing" | ||||||
|  |  | ||||||
|  | 	"k8s.io/kubernetes/pkg/api" | ||||||
|  | 	_ "k8s.io/kubernetes/pkg/api/latest" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/resource" | ||||||
|  | 	"k8s.io/kubernetes/pkg/api/testapi" | ||||||
|  | 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||||
|  | 	"k8s.io/kubernetes/pkg/expapi" | ||||||
|  | 	"k8s.io/kubernetes/pkg/runtime" | ||||||
|  | 	"k8s.io/kubernetes/pkg/util" | ||||||
|  |  | ||||||
|  | 	"github.com/golang/glog" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	namespace    = api.NamespaceDefault | ||||||
|  | 	rcName       = "app-rc" | ||||||
|  | 	podNameLabel = "app" | ||||||
|  | 	podName      = "p1" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var target = expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.8")} | ||||||
|  |  | ||||||
|  | type serverResponse struct { | ||||||
|  | 	statusCode int | ||||||
|  | 	obj        interface{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func makeTestServer(t *testing.T, hpaResponse serverResponse, | ||||||
|  | 	scaleResponse serverResponse, podListResponse serverResponse, | ||||||
|  | 	heapsterResponse serverResponse) (*httptest.Server, []*util.FakeHandler) { | ||||||
|  |  | ||||||
|  | 	handlers := []*util.FakeHandler{} | ||||||
|  | 	mux := http.NewServeMux() | ||||||
|  |  | ||||||
|  | 	mkHandler := func(url string, response serverResponse) *util.FakeHandler { | ||||||
|  | 		handler := util.FakeHandler{ | ||||||
|  | 			StatusCode:   response.statusCode, | ||||||
|  | 			ResponseBody: runtime.EncodeOrDie(testapi.Codec(), response.obj.(runtime.Object)), | ||||||
|  | 		} | ||||||
|  | 		mux.Handle(url, &handler) | ||||||
|  | 		glog.Infof("Will handle %s", url) | ||||||
|  | 		return &handler | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	mkRawHandler := func(url string, response serverResponse) *util.FakeHandler { | ||||||
|  | 		handler := util.FakeHandler{ | ||||||
|  | 			StatusCode:   response.statusCode, | ||||||
|  | 			ResponseBody: *response.obj.(*string), | ||||||
|  | 		} | ||||||
|  | 		mux.Handle(url, &handler) | ||||||
|  | 		glog.Infof("Will handle %s", url) | ||||||
|  | 		return &handler | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	handlers = append(handlers, mkHandler("/experimental/v1/horizontalpodautoscalers", hpaResponse)) | ||||||
|  | 	handlers = append(handlers, mkHandler( | ||||||
|  | 		fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), scaleResponse)) | ||||||
|  | 	handlers = append(handlers, mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), podListResponse)) | ||||||
|  | 	handlers = append(handlers, mkRawHandler( | ||||||
|  | 		fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", | ||||||
|  | 			namespace, podName), heapsterResponse)) | ||||||
|  |  | ||||||
|  | 	mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { | ||||||
|  | 		t.Errorf("unexpected request: %v", req.RequestURI) | ||||||
|  | 		res.WriteHeader(http.StatusNotFound) | ||||||
|  | 	}) | ||||||
|  | 	return httptest.NewServer(mux), handlers | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { | ||||||
|  |  | ||||||
|  | 	hpaResponse := serverResponse{http.StatusOK, &expapi.HorizontalPodAutoscalerList{ | ||||||
|  | 		Items: []expapi.HorizontalPodAutoscaler{ | ||||||
|  | 			{ | ||||||
|  | 				ObjectMeta: api.ObjectMeta{ | ||||||
|  | 					Name:      "foo", | ||||||
|  | 					Namespace: namespace, | ||||||
|  | 				}, | ||||||
|  | 				Spec: expapi.HorizontalPodAutoscalerSpec{ | ||||||
|  | 					ScaleRef: &expapi.SubresourceReference{ | ||||||
|  | 						Kind:        "replicationController", | ||||||
|  | 						Name:        rcName, | ||||||
|  | 						Namespace:   namespace, | ||||||
|  | 						Subresource: "scale", | ||||||
|  | 					}, | ||||||
|  | 					MinCount: 1, | ||||||
|  | 					MaxCount: 5, | ||||||
|  | 					Target:   target, | ||||||
|  | 				}, | ||||||
|  | 			}}}} | ||||||
|  |  | ||||||
|  | 	scaleResponse := serverResponse{http.StatusOK, &expapi.Scale{ | ||||||
|  | 		ObjectMeta: api.ObjectMeta{ | ||||||
|  | 			Name:      "rcName", | ||||||
|  | 			Namespace: namespace, | ||||||
|  | 		}, | ||||||
|  | 		Spec: expapi.ScaleSpec{ | ||||||
|  | 			Replicas: 5, | ||||||
|  | 		}, | ||||||
|  | 		Status: expapi.ScaleStatus{ | ||||||
|  | 			Replicas: 2, | ||||||
|  | 			Selector: map[string]string{"name": podNameLabel}, | ||||||
|  | 		}, | ||||||
|  | 	}} | ||||||
|  |  | ||||||
|  | 	podListResponse := serverResponse{http.StatusOK, &api.PodList{ | ||||||
|  | 		Items: []api.Pod{ | ||||||
|  | 			{ | ||||||
|  | 				ObjectMeta: api.ObjectMeta{ | ||||||
|  | 					Name:      podName, | ||||||
|  | 					Namespace: namespace, | ||||||
|  | 				}, | ||||||
|  | 			}}}} | ||||||
|  |  | ||||||
|  | 	heapsterRawResponse := "UPADTE ME" | ||||||
|  | 	heapsterResponse := serverResponse{http.StatusOK, &heapsterRawResponse} | ||||||
|  |  | ||||||
|  | 	testServer, handlers := makeTestServer(t, hpaResponse, scaleResponse, podListResponse, heapsterResponse) | ||||||
|  | 	defer testServer.Close() | ||||||
|  | 	kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) | ||||||
|  | 	expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) | ||||||
|  |  | ||||||
|  | 	hpaController := New(kubeClient, expClient) | ||||||
|  | 	err := hpaController.reconcileAutoscalers() | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal("Failed to reconcile %v", err) | ||||||
|  | 	} | ||||||
|  | 	for _, h := range handlers { | ||||||
|  | 		h.ValidateRequestCount(t, 1) | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 Nikhil Jindal
					Nikhil Jindal