Merge pull request #25726 from jszczepkowski/hpa-testfix
Automatic merge from submit-queue Rework of resource consumer.
This commit is contained in:
		| @@ -70,6 +70,9 @@ configure-cbr0 | ||||
| configure-cloud-routes | ||||
| conntrack-max | ||||
| conntrack-tcp-timeout-established | ||||
| consumer-port | ||||
| consumer-service-name | ||||
| consumer-service-namespace | ||||
| contain-pod-resources | ||||
| container-port | ||||
| container-runtime | ||||
| @@ -463,3 +466,4 @@ watch-only | ||||
| whitelist-override-label | ||||
| windows-line-endings | ||||
| www-prefix | ||||
|  | ||||
|   | ||||
| @@ -40,7 +40,8 @@ const ( | ||||
| 	timeoutRC                       = 120 * time.Second | ||||
| 	startServiceTimeout             = time.Minute | ||||
| 	startServiceInterval            = 5 * time.Second | ||||
| 	resourceConsumerImage           = "gcr.io/google_containers/resource_consumer:beta2" | ||||
| 	resourceConsumerImage           = "gcr.io/google_containers/resource_consumer:beta4" | ||||
| 	resourceConsumerControllerImage = "gcr.io/google_containers/resource_consumer/controller:beta4" | ||||
| 	rcIsNil                         = "ERROR: replicationController = nil" | ||||
| 	deploymentIsNil                 = "ERROR: deployment = nil" | ||||
| 	rsIsNil                         = "ERROR: replicaset = nil" | ||||
| @@ -58,6 +59,7 @@ rc.ConsumeCPU(300) | ||||
| */ | ||||
| type ResourceConsumer struct { | ||||
| 	name                     string | ||||
| 	controllerName           string | ||||
| 	kind                     string | ||||
| 	framework                *framework.Framework | ||||
| 	cpu                      chan int | ||||
| @@ -97,6 +99,7 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo | ||||
| 	runServiceAndWorkloadForResourceConsumer(f.Client, f.Namespace.Name, name, kind, replicas, cpuLimit, memLimit) | ||||
| 	rc := &ResourceConsumer{ | ||||
| 		name:                     name, | ||||
| 		controllerName:           name + "-controller", | ||||
| 		kind:                     kind, | ||||
| 		framework:                f, | ||||
| 		cpu:                      make(chan int), | ||||
| @@ -111,8 +114,10 @@ func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTo | ||||
| 		requestSizeInMegabytes:   requestSizeInMegabytes, | ||||
| 		requestSizeCustomMetric:  requestSizeCustomMetric, | ||||
| 	} | ||||
|  | ||||
| 	go rc.makeConsumeCPURequests() | ||||
| 	rc.ConsumeCPU(initCPUTotal) | ||||
|  | ||||
| 	go rc.makeConsumeMemRequests() | ||||
| 	rc.ConsumeMem(initMemoryTotal) | ||||
| 	go rc.makeConsumeCustomMetric() | ||||
| @@ -140,25 +145,15 @@ func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) { | ||||
|  | ||||
| func (rc *ResourceConsumer) makeConsumeCPURequests() { | ||||
| 	defer GinkgoRecover() | ||||
| 	var count int | ||||
| 	var rest int | ||||
| 	sleepTime := time.Duration(0) | ||||
| 	millicores := 0 | ||||
| 	for { | ||||
| 		select { | ||||
| 		case millicores := <-rc.cpu: | ||||
| 			framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores) | ||||
| 			if rc.requestSizeInMillicores != 0 { | ||||
| 				count = millicores / rc.requestSizeInMillicores | ||||
| 			} | ||||
| 			rest = millicores - count*rc.requestSizeInMillicores | ||||
| 		case millicores = <-rc.cpu: | ||||
| 			framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores) | ||||
| 		case <-time.After(sleepTime): | ||||
| 			framework.Logf("RC %s: sending %v requests to consume %v millicores each and 1 request to consume %v millicores", rc.name, count, rc.requestSizeInMillicores, rest) | ||||
| 			if count > 0 { | ||||
| 				rc.sendConsumeCPURequests(count, rc.requestSizeInMillicores, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			if rest > 0 { | ||||
| 				go rc.sendOneConsumeCPURequest(rest, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores) | ||||
| 			rc.sendConsumeCPURequest(millicores) | ||||
| 			sleepTime = rc.sleepTime | ||||
| 		case <-rc.stopCPU: | ||||
| 			return | ||||
| @@ -168,25 +163,15 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() { | ||||
|  | ||||
| func (rc *ResourceConsumer) makeConsumeMemRequests() { | ||||
| 	defer GinkgoRecover() | ||||
| 	var count int | ||||
| 	var rest int | ||||
| 	sleepTime := time.Duration(0) | ||||
| 	megabytes := 0 | ||||
| 	for { | ||||
| 		select { | ||||
| 		case megabytes := <-rc.mem: | ||||
| 			framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes) | ||||
| 			if rc.requestSizeInMegabytes != 0 { | ||||
| 				count = megabytes / rc.requestSizeInMegabytes | ||||
| 			} | ||||
| 			rest = megabytes - count*rc.requestSizeInMegabytes | ||||
| 		case megabytes = <-rc.mem: | ||||
| 			framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes) | ||||
| 		case <-time.After(sleepTime): | ||||
| 			framework.Logf("RC %s: sending %v requests to consume %v MB each and 1 request to consume %v MB", rc.name, count, rc.requestSizeInMegabytes, rest) | ||||
| 			if count > 0 { | ||||
| 				rc.sendConsumeMemRequests(count, rc.requestSizeInMegabytes, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			if rest > 0 { | ||||
| 				go rc.sendOneConsumeMemRequest(rest, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes) | ||||
| 			rc.sendConsumeMemRequest(megabytes) | ||||
| 			sleepTime = rc.sleepTime | ||||
| 		case <-rc.stopMem: | ||||
| 			return | ||||
| @@ -196,26 +181,15 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() { | ||||
|  | ||||
| func (rc *ResourceConsumer) makeConsumeCustomMetric() { | ||||
| 	defer GinkgoRecover() | ||||
| 	var count int | ||||
| 	var rest int | ||||
| 	sleepTime := time.Duration(0) | ||||
| 	delta := 0 | ||||
| 	for { | ||||
| 		select { | ||||
| 		case total := <-rc.customMetric: | ||||
| 			framework.Logf("RC %s: consume custom metric %v in total", rc.name, total) | ||||
| 			if rc.requestSizeInMegabytes != 0 { | ||||
| 				count = total / rc.requestSizeCustomMetric | ||||
| 			} | ||||
| 			rest = total - count*rc.requestSizeCustomMetric | ||||
| 		case delta := <-rc.customMetric: | ||||
| 			framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta) | ||||
| 		case <-time.After(sleepTime): | ||||
| 			framework.Logf("RC %s: sending %v requests to consume %v custom metric each and 1 request to consume %v", | ||||
| 				rc.name, count, rc.requestSizeCustomMetric, rest) | ||||
| 			if count > 0 { | ||||
| 				rc.sendConsumeCustomMetric(count, rc.requestSizeCustomMetric, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			if rest > 0 { | ||||
| 				go rc.sendOneConsumeCustomMetric(rest, rc.consumptionTimeInSeconds) | ||||
| 			} | ||||
| 			framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName) | ||||
| 			rc.sendConsumeCustomMetric(delta) | ||||
| 			sleepTime = rc.sleepTime | ||||
| 		case <-rc.stopCustomMetric: | ||||
| 			return | ||||
| @@ -223,64 +197,48 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (rc *ResourceConsumer) sendConsumeCPURequests(requests, millicores, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go rc.sendOneConsumeCPURequest(millicores, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (rc *ResourceConsumer) sendConsumeMemRequests(requests, megabytes, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go rc.sendOneConsumeMemRequest(megabytes, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (rc *ResourceConsumer) sendConsumeCustomMetric(requests, delta, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go rc.sendOneConsumeCustomMetric(delta, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // sendOneConsumeCPURequest sends POST request for cpu consumption | ||||
| func (rc *ResourceConsumer) sendOneConsumeCPURequest(millicores int, durationSec int) { | ||||
| 	defer GinkgoRecover() | ||||
| func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) { | ||||
| 	proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) | ||||
| 	framework.ExpectNoError(err) | ||||
| 	_, err = proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.name). | ||||
| 	req := proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.controllerName). | ||||
| 		Suffix("ConsumeCPU"). | ||||
| 		Param("millicores", strconv.Itoa(millicores)). | ||||
| 		Param("durationSec", strconv.Itoa(durationSec)). | ||||
| 		DoRaw() | ||||
| 		Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). | ||||
| 		Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores)) | ||||
| 	framework.Logf("URL: %v", *req.URL()) | ||||
| 	_, err = req.DoRaw() | ||||
| 	framework.ExpectNoError(err) | ||||
| } | ||||
|  | ||||
| // sendOneConsumeMemRequest sends POST request for memory consumption | ||||
| func (rc *ResourceConsumer) sendOneConsumeMemRequest(megabytes int, durationSec int) { | ||||
| 	defer GinkgoRecover() | ||||
| // sendConsumeMemRequest sends POST request for memory consumption | ||||
| func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) { | ||||
| 	proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) | ||||
| 	framework.ExpectNoError(err) | ||||
| 	_, err = proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.name). | ||||
| 	req := proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.controllerName). | ||||
| 		Suffix("ConsumeMem"). | ||||
| 		Param("megabytes", strconv.Itoa(megabytes)). | ||||
| 		Param("durationSec", strconv.Itoa(durationSec)). | ||||
| 		DoRaw() | ||||
| 		Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). | ||||
| 		Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes)) | ||||
| 	framework.Logf("URL: %v", *req.URL()) | ||||
| 	_, err = req.DoRaw() | ||||
| 	framework.ExpectNoError(err) | ||||
| } | ||||
|  | ||||
| // sendOneConsumeCustomMetric sends POST request for custom metric consumption | ||||
| func (rc *ResourceConsumer) sendOneConsumeCustomMetric(delta int, durationSec int) { | ||||
| 	defer GinkgoRecover() | ||||
| // sendConsumeCustomMetric sends POST request for custom metric consumption | ||||
| func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) { | ||||
| 	proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post()) | ||||
| 	framework.ExpectNoError(err) | ||||
| 	_, err = proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.name). | ||||
| 	req := proxyRequest.Namespace(rc.framework.Namespace.Name). | ||||
| 		Name(rc.controllerName). | ||||
| 		Suffix("BumpMetric"). | ||||
| 		Param("metric", customMetricName). | ||||
| 		Param("delta", strconv.Itoa(delta)). | ||||
| 		Param("durationSec", strconv.Itoa(durationSec)). | ||||
| 		DoRaw() | ||||
| 		Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). | ||||
| 		Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric)) | ||||
| 	framework.Logf("URL: %v", *req.URL()) | ||||
| 	_, err = req.DoRaw() | ||||
| 	framework.ExpectNoError(err) | ||||
| } | ||||
|  | ||||
| @@ -346,6 +304,8 @@ func (rc *ResourceConsumer) CleanUp() { | ||||
| 	time.Sleep(10 * time.Second) | ||||
| 	framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.name)) | ||||
| 	framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name)) | ||||
| 	framework.ExpectNoError(framework.DeleteRC(rc.framework.Client, rc.framework.Namespace.Name, rc.controllerName)) | ||||
| 	framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.controllerName)) | ||||
| } | ||||
|  | ||||
| func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) { | ||||
| @@ -400,6 +360,38 @@ func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind s | ||||
| 		framework.Failf(invalidKind) | ||||
| 	} | ||||
|  | ||||
| 	By(fmt.Sprintf("Running controller")) | ||||
| 	controllerName := name + "-controller" | ||||
| 	_, err = c.Services(ns).Create(&api.Service{ | ||||
| 		ObjectMeta: api.ObjectMeta{ | ||||
| 			Name: controllerName, | ||||
| 		}, | ||||
| 		Spec: api.ServiceSpec{ | ||||
| 			Ports: []api.ServicePort{{ | ||||
| 				Port:       port, | ||||
| 				TargetPort: intstr.FromInt(targetPort), | ||||
| 			}}, | ||||
|  | ||||
| 			Selector: map[string]string{ | ||||
| 				"name": controllerName, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}) | ||||
| 	framework.ExpectNoError(err) | ||||
|  | ||||
| 	dnsClusterFirst := api.DNSClusterFirst | ||||
| 	controllerRcConfig := framework.RCConfig{ | ||||
| 		Client:    c, | ||||
| 		Image:     resourceConsumerControllerImage, | ||||
| 		Name:      controllerName, | ||||
| 		Namespace: ns, | ||||
| 		Timeout:   timeoutRC, | ||||
| 		Replicas:  1, | ||||
| 		Command:   []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"}, | ||||
| 		DNSPolicy: &dnsClusterFirst, | ||||
| 	} | ||||
| 	framework.ExpectNoError(framework.RunRC(controllerRcConfig)) | ||||
|  | ||||
| 	// Make sure endpoints are propagated. | ||||
| 	// TODO(piosz): replace sleep with endpoints watch. | ||||
| 	time.Sleep(10 * time.Second) | ||||
|   | ||||
| @@ -271,6 +271,7 @@ type RCConfig struct { | ||||
| 	MemRequest     int64 // bytes | ||||
| 	MemLimit       int64 // bytes | ||||
| 	ReadinessProbe *api.Probe | ||||
| 	DNSPolicy      *api.DNSPolicy | ||||
|  | ||||
| 	// Env vars, set the same for every pod. | ||||
| 	Env map[string]string | ||||
| @@ -2184,6 +2185,10 @@ func RunRC(config RCConfig) error { | ||||
|  | ||||
| func (config *RCConfig) create() error { | ||||
| 	By(fmt.Sprintf("creating replication controller %s in namespace %s", config.Name, config.Namespace)) | ||||
| 	dnsDefault := api.DNSDefault | ||||
| 	if config.DNSPolicy == nil { | ||||
| 		config.DNSPolicy = &dnsDefault | ||||
| 	} | ||||
| 	rc := &api.ReplicationController{ | ||||
| 		ObjectMeta: api.ObjectMeta{ | ||||
| 			Name: config.Name, | ||||
| @@ -2207,7 +2212,7 @@ func (config *RCConfig) create() error { | ||||
| 							ReadinessProbe: config.ReadinessProbe, | ||||
| 						}, | ||||
| 					}, | ||||
| 					DNSPolicy: api.DNSDefault, | ||||
| 					DNSPolicy: *config.DNSPolicy, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|   | ||||
| @@ -12,7 +12,7 @@ | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
|  | ||||
| TAG = beta2 | ||||
| TAG = beta4 | ||||
| PREFIX = gcr.io/google_containers | ||||
|  | ||||
| all: clean consumer | ||||
| @@ -20,18 +20,22 @@ all: clean consumer | ||||
| consumer: | ||||
| 	CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consume-cpu/consume-cpu ./consume-cpu/consume_cpu.go | ||||
| 	CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o consumer . | ||||
| 	CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' -o controller/controller ./controller/controller.go | ||||
|  | ||||
| container: image | ||||
|  | ||||
| image: | ||||
| 	sudo docker build -t $(PREFIX)/resource_consumer:$(TAG) . | ||||
| 	sudo docker build -t $(PREFIX)/resource_consumer/controller:$(TAG) controller | ||||
|  | ||||
| run_container: | ||||
| 	docker run --publish=8080:8080 $(PREFIX)/resource_consumer:$(TAG) | ||||
|  | ||||
| push: | ||||
| 	@echo "This image is not meant to be pushed." | ||||
| 	gcloud docker push ${PREFIX}/resource_consumer:${TAG} | ||||
| 	gcloud docker push ${PREFIX}/resource_consumer/controller:${TAG} | ||||
|  | ||||
| clean: | ||||
| 	rm -f consumer | ||||
| 	rm -f consume-cpu/consume-cpu | ||||
| 	rm -f controller/controller | ||||
|   | ||||
							
								
								
									
										41
									
								
								test/images/resource-consumer/common/common.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								test/images/resource-consumer/common/common.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,41 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package common | ||||
|  | ||||
| const ( | ||||
| 	ConsumeCPUAddress       = "/ConsumeCPU" | ||||
| 	ConsumeMemAddress       = "/ConsumeMem" | ||||
| 	BumpMetricAddress       = "/BumpMetric" | ||||
| 	GetCurrentStatusAddress = "/GetCurrentStatus" | ||||
| 	MetricsAddress          = "/Metrics" | ||||
|  | ||||
| 	MillicoresQuery              = "millicores" | ||||
| 	MegabytesQuery               = "megabytes" | ||||
| 	MetricNameQuery              = "metric" | ||||
| 	DeltaQuery                   = "delta" | ||||
| 	DurationSecQuery             = "durationSec" | ||||
| 	RequestSizeInMillicoresQuery = "requestSizeMillicores" | ||||
| 	RequestSizeInMegabytesQuery  = "requestSizeMegabytes" | ||||
| 	RequestSizeCustomMetricQuery = "requestSizeMetrics" | ||||
|  | ||||
| 	BadRequest                = "Bad request. Not a POST request" | ||||
| 	UnknownFunction           = "unknown function" | ||||
| 	IncorrectFunctionArgument = "incorrect function argument" | ||||
| 	NotGivenFunctionArgument  = "not given function argument" | ||||
|  | ||||
| 	FrameworkName = "horizontal-pod-autoscaling" | ||||
| ) | ||||
							
								
								
									
										19
									
								
								test/images/resource-consumer/controller/Dockerfile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								test/images/resource-consumer/controller/Dockerfile
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | ||||
| # Copyright 2016 The Kubernetes Authors All rights reserved. | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
|  | ||||
| FROM busybox | ||||
| MAINTAINER Jerzy Szczepkowski <jsz@google.com> | ||||
| ADD controller /controller | ||||
| EXPOSE 8080 | ||||
| ENTRYPOINT ["/controller"] | ||||
							
								
								
									
										243
									
								
								test/images/resource-consumer/controller/controller.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										243
									
								
								test/images/resource-consumer/controller/controller.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,243 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors All rights reserved. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"flag" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
|  | ||||
| 	. "k8s.io/kubernetes/test/images/resource-consumer/common" | ||||
| ) | ||||
|  | ||||
| var port = flag.Int("port", 8080, "Port number.") | ||||
| var consumerPort = flag.Int("consumer-port", 8080, "Port number of consumers.") | ||||
| var consumerServiceName = flag.String("consumer-service-name", "resource-consumer", "Name of service containing resource consumers.") | ||||
| var consumerServiceNamespace = flag.String("consumer-service-namespace", "default", "Namespace of service containing resource consumers.") | ||||
|  | ||||
| func main() { | ||||
| 	flag.Parse() | ||||
| 	mgr := NewController() | ||||
| 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *port), mgr)) | ||||
| } | ||||
|  | ||||
| type Controller struct { | ||||
| 	responseWriterLock sync.Mutex | ||||
| 	waitGroup          sync.WaitGroup | ||||
| } | ||||
|  | ||||
| func NewController() *Controller { | ||||
| 	c := &Controller{} | ||||
| 	return c | ||||
| } | ||||
|  | ||||
| func (handler *Controller) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||||
| 	if req.Method != "POST" { | ||||
| 		http.Error(w, BadRequest, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
| 	// parsing POST request data and URL data | ||||
| 	if err := req.ParseForm(); err != nil { | ||||
| 		http.Error(w, err.Error(), http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle consumeCPU | ||||
| 	if req.URL.Path == ConsumeCPUAddress { | ||||
| 		handler.handleConsumeCPU(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle consumeMem | ||||
| 	if req.URL.Path == ConsumeMemAddress { | ||||
| 		handler.handleConsumeMem(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle bumpMetric | ||||
| 	if req.URL.Path == BumpMetricAddress { | ||||
| 		handler.handleBumpMetric(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	http.Error(w, UnknownFunction, http.StatusNotFound) | ||||
| } | ||||
|  | ||||
| func (handler *Controller) handleConsumeCPU(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for consumeCPU | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	millicoresString := query.Get(MillicoresQuery) | ||||
| 	requestSizeInMillicoresString := query.Get(RequestSizeInMillicoresQuery) | ||||
| 	if durationSecString == "" || millicoresString == "" || requestSizeInMillicoresString == "" { | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// convert data (strings to ints) for consumeCPU | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	millicores, millicoresError := strconv.Atoi(millicoresString) | ||||
| 	requestSizeInMillicores, requestSizeInMillicoresError := strconv.Atoi(requestSizeInMillicoresString) | ||||
| 	if durationSecError != nil || millicoresError != nil || requestSizeInMillicoresError != nil || requestSizeInMillicores <= 0 { | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	count := millicores / requestSizeInMillicores | ||||
| 	rest := millicores - count*requestSizeInMillicores | ||||
| 	fmt.Fprintf(w, "RC manager: sending %v requests to consume %v millicores each and 1 request to consume %v millicores\n", count, requestSizeInMillicores, rest) | ||||
| 	if count > 0 { | ||||
| 		handler.waitGroup.Add(count) | ||||
| 		handler.sendConsumeCPURequests(w, count, requestSizeInMillicores, durationSec) | ||||
| 	} | ||||
| 	if rest > 0 { | ||||
| 		handler.waitGroup.Add(1) | ||||
| 		go handler.sendOneConsumeCPURequest(w, rest, durationSec) | ||||
| 	} | ||||
| 	handler.waitGroup.Wait() | ||||
| } | ||||
|  | ||||
| func (handler *Controller) handleConsumeMem(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for consumeMem | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	megabytesString := query.Get(MegabytesQuery) | ||||
| 	requestSizeInMegabytesString := query.Get(RequestSizeInMegabytesQuery) | ||||
| 	if durationSecString == "" || megabytesString == "" || requestSizeInMegabytesString == "" { | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// convert data (strings to ints) for consumeMem | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	megabytes, megabytesError := strconv.Atoi(megabytesString) | ||||
| 	requestSizeInMegabytes, requestSizeInMegabytesError := strconv.Atoi(requestSizeInMegabytesString) | ||||
| 	if durationSecError != nil || megabytesError != nil || requestSizeInMegabytesError != nil || requestSizeInMegabytes <= 0 { | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	count := megabytes / requestSizeInMegabytes | ||||
| 	rest := megabytes - count*requestSizeInMegabytes | ||||
| 	fmt.Fprintf(w, "RC manager: sending %v requests to consume %v MB each and 1 request to consume %v MB\n", count, requestSizeInMegabytes, rest) | ||||
| 	if count > 0 { | ||||
| 		handler.waitGroup.Add(count) | ||||
| 		handler.sendConsumeMemRequests(w, count, requestSizeInMegabytes, durationSec) | ||||
| 	} | ||||
| 	if rest > 0 { | ||||
| 		handler.waitGroup.Add(1) | ||||
| 		go handler.sendOneConsumeMemRequest(w, rest, durationSec) | ||||
| 	} | ||||
| 	handler.waitGroup.Wait() | ||||
| } | ||||
|  | ||||
| func (handler *Controller) handleBumpMetric(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for handleBumpMetric | ||||
| 	metric := query.Get(MetricNameQuery) | ||||
| 	deltaString := query.Get(DeltaQuery) | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	requestSizeCustomMetricString := query.Get(RequestSizeCustomMetricQuery) | ||||
| 	if durationSecString == "" || metric == "" || deltaString == "" || requestSizeCustomMetricString == "" { | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// convert data (strings to ints/floats) for handleBumpMetric | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	delta, deltaError := strconv.Atoi(deltaString) | ||||
| 	requestSizeCustomMetric, requestSizeCustomMetricError := strconv.Atoi(requestSizeCustomMetricString) | ||||
| 	if durationSecError != nil || deltaError != nil || requestSizeCustomMetricError != nil || requestSizeCustomMetric <= 0 { | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	count := delta / requestSizeCustomMetric | ||||
| 	rest := delta - count*requestSizeCustomMetric | ||||
| 	fmt.Fprintf(w, "RC manager: sending %v requests to bump custom metric by %v each and 1 request to bump by %v\n", count, requestSizeCustomMetric, rest) | ||||
| 	if count > 0 { | ||||
| 		handler.waitGroup.Add(count) | ||||
| 		handler.sendConsumeCustomMetric(w, metric, count, requestSizeCustomMetric, durationSec) | ||||
| 	} | ||||
| 	if rest > 0 { | ||||
| 		handler.waitGroup.Add(1) | ||||
| 		go handler.sendOneConsumeCustomMetric(w, metric, rest, durationSec) | ||||
| 	} | ||||
| 	handler.waitGroup.Wait() | ||||
| } | ||||
|  | ||||
| func (manager *Controller) sendConsumeCPURequests(w http.ResponseWriter, requests, millicores, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go manager.sendOneConsumeCPURequest(w, millicores, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (manager *Controller) sendConsumeMemRequests(w http.ResponseWriter, requests, megabytes, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go manager.sendOneConsumeMemRequest(w, megabytes, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (manager *Controller) sendConsumeCustomMetric(w http.ResponseWriter, metric string, requests, delta, durationSec int) { | ||||
| 	for i := 0; i < requests; i++ { | ||||
| 		go manager.sendOneConsumeCustomMetric(w, metric, delta, durationSec) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func createConsumerURL(suffix string) string { | ||||
| 	return fmt.Sprintf("http://%s.%s.svc.cluster.local:%d%s", *consumerServiceName, *consumerServiceNamespace, *consumerPort, suffix) | ||||
| } | ||||
|  | ||||
| // sendOneConsumeCPURequest sends POST request for cpu consumption | ||||
| func (c *Controller) sendOneConsumeCPURequest(w http.ResponseWriter, millicores int, durationSec int) { | ||||
| 	defer c.waitGroup.Done() | ||||
| 	query := createConsumerURL(ConsumeCPUAddress) | ||||
| 	_, err := http.PostForm(query, url.Values{MillicoresQuery: {strconv.Itoa(millicores)}, DurationSecQuery: {strconv.Itoa(durationSec)}}) | ||||
| 	c.responseWriterLock.Lock() | ||||
| 	defer c.responseWriterLock.Unlock() | ||||
| 	if err != nil { | ||||
| 		fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) | ||||
| 		return | ||||
| 	} | ||||
| 	fmt.Fprintf(w, "Consumed %d millicores\n", millicores) | ||||
| } | ||||
|  | ||||
| // sendOneConsumeMemRequest sends POST request for memory consumption | ||||
| func (c *Controller) sendOneConsumeMemRequest(w http.ResponseWriter, megabytes int, durationSec int) { | ||||
| 	defer c.waitGroup.Done() | ||||
| 	query := createConsumerURL(ConsumeMemAddress) | ||||
| 	_, err := http.PostForm(query, url.Values{MegabytesQuery: {strconv.Itoa(megabytes)}, DurationSecQuery: {strconv.Itoa(durationSec)}}) | ||||
| 	c.responseWriterLock.Lock() | ||||
| 	defer c.responseWriterLock.Unlock() | ||||
| 	if err != nil { | ||||
| 		fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) | ||||
| 		return | ||||
| 	} | ||||
| 	fmt.Fprintf(w, "Consumed %d megabytes\n", megabytes) | ||||
| } | ||||
|  | ||||
| // sendOneConsumeCustomMetric sends POST request for custom metric consumption | ||||
| func (c *Controller) sendOneConsumeCustomMetric(w http.ResponseWriter, customMetricName string, delta int, durationSec int) { | ||||
| 	defer c.waitGroup.Done() | ||||
| 	query := createConsumerURL(BumpMetricAddress) | ||||
| 	_, err := http.PostForm(query, | ||||
| 		url.Values{MetricNameQuery: {customMetricName}, DurationSecQuery: {strconv.Itoa(durationSec)}, DeltaQuery: {strconv.Itoa(delta)}}) | ||||
| 	c.responseWriterLock.Lock() | ||||
| 	defer c.responseWriterLock.Unlock() | ||||
| 	if err != nil { | ||||
| 		fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err) | ||||
| 		return | ||||
| 	} | ||||
| 	fmt.Fprintf(w, "Bumped metric %s by %d\n", customMetricName, delta) | ||||
| } | ||||
| @@ -23,23 +23,8 @@ import ( | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	badRequest                = "Bad request. Not a POST request" | ||||
| 	unknownFunction           = "unknown function" | ||||
| 	incorrectFunctionArgument = "incorrect function argument" | ||||
| 	notGivenFunctionArgument  = "not given function argument" | ||||
| 	consumeCPUAddress         = "/ConsumeCPU" | ||||
| 	consumeMemAddress         = "/ConsumeMem" | ||||
| 	bumpMetricAddress         = "/BumpMetric" | ||||
| 	getCurrentStatusAddress   = "/GetCurrentStatus" | ||||
| 	metricsAddress            = "/metrics" | ||||
| 	millicoresQuery           = "millicores" | ||||
| 	megabytesQuery            = "megabytes" | ||||
| 	metricNameQuery           = "metric" | ||||
| 	deltaQuery                = "delta" | ||||
| 	durationSecQuery          = "durationSec" | ||||
| 	. "k8s.io/kubernetes/test/images/resource-consumer/common" | ||||
| ) | ||||
|  | ||||
| type ResourceConsumerHandler struct { | ||||
| @@ -53,12 +38,12 @@ func NewResourceConsumerHandler() *ResourceConsumerHandler { | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||||
| 	// handle exposing metrics in Prometheus format (both GET & POST) | ||||
| 	if req.URL.Path == metricsAddress { | ||||
| 	if req.URL.Path == MetricsAddress { | ||||
| 		handler.handleMetrics(w) | ||||
| 		return | ||||
| 	} | ||||
| 	if req.Method != "POST" { | ||||
| 		http.Error(w, badRequest, http.StatusBadRequest) | ||||
| 		http.Error(w, BadRequest, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
| 	// parsing POST request data and URL data | ||||
| @@ -67,34 +52,34 @@ func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *ht | ||||
| 		return | ||||
| 	} | ||||
| 	// handle consumeCPU | ||||
| 	if req.URL.Path == consumeCPUAddress { | ||||
| 	if req.URL.Path == ConsumeCPUAddress { | ||||
| 		handler.handleConsumeCPU(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle consumeMem | ||||
| 	if req.URL.Path == consumeMemAddress { | ||||
| 	if req.URL.Path == ConsumeMemAddress { | ||||
| 		handler.handleConsumeMem(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle getCurrentStatus | ||||
| 	if req.URL.Path == getCurrentStatusAddress { | ||||
| 	if req.URL.Path == GetCurrentStatusAddress { | ||||
| 		handler.handleGetCurrentStatus(w) | ||||
| 		return | ||||
| 	} | ||||
| 	// handle bumpMetric | ||||
| 	if req.URL.Path == bumpMetricAddress { | ||||
| 	if req.URL.Path == BumpMetricAddress { | ||||
| 		handler.handleBumpMetric(w, req.Form) | ||||
| 		return | ||||
| 	} | ||||
| 	http.Error(w, unknownFunction, http.StatusNotFound) | ||||
| 	http.Error(w, fmt.Sprintf("%s: %s", UnknownFunction, req.URL.Path), http.StatusNotFound) | ||||
| } | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for consumeCPU | ||||
| 	durationSecString := query.Get(durationSecQuery) | ||||
| 	millicoresString := query.Get(millicoresQuery) | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	millicoresString := query.Get(MillicoresQuery) | ||||
| 	if durationSecString == "" || millicoresString == "" { | ||||
| 		http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -102,22 +87,22 @@ func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	millicores, millicoresError := strconv.Atoi(millicoresString) | ||||
| 	if durationSecError != nil || millicoresError != nil { | ||||
| 		http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go ConsumeCPU(millicores, durationSec) | ||||
| 	fmt.Fprintln(w, consumeCPUAddress[1:]) | ||||
| 	fmt.Fprintln(w, millicores, millicoresQuery) | ||||
| 	fmt.Fprintln(w, durationSec, durationSecQuery) | ||||
| 	fmt.Fprintln(w, ConsumeCPUAddress[1:]) | ||||
| 	fmt.Fprintln(w, millicores, MillicoresQuery) | ||||
| 	fmt.Fprintln(w, durationSec, DurationSecQuery) | ||||
| } | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for consumeMem | ||||
| 	durationSecString := query.Get(durationSecQuery) | ||||
| 	megabytesString := query.Get(megabytesQuery) | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	megabytesString := query.Get(MegabytesQuery) | ||||
| 	if durationSecString == "" || megabytesString == "" { | ||||
| 		http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -125,20 +110,20 @@ func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	megabytes, megabytesError := strconv.Atoi(megabytesString) | ||||
| 	if durationSecError != nil || megabytesError != nil { | ||||
| 		http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go ConsumeMem(megabytes, durationSec) | ||||
| 	fmt.Fprintln(w, consumeMemAddress[1:]) | ||||
| 	fmt.Fprintln(w, megabytes, megabytesQuery) | ||||
| 	fmt.Fprintln(w, durationSec, durationSecQuery) | ||||
| 	fmt.Fprintln(w, ConsumeMemAddress[1:]) | ||||
| 	fmt.Fprintln(w, megabytes, MegabytesQuery) | ||||
| 	fmt.Fprintln(w, durationSec, DurationSecQuery) | ||||
| } | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) { | ||||
| 	GetCurrentStatus() | ||||
| 	fmt.Fprintln(w, "Warning: not implemented!") | ||||
| 	fmt.Fprint(w, getCurrentStatusAddress[1:]) | ||||
| 	fmt.Fprint(w, GetCurrentStatusAddress[1:]) | ||||
| } | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) { | ||||
| @@ -169,11 +154,11 @@ func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64, | ||||
|  | ||||
| func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) { | ||||
| 	// geting string data for handleBumpMetric | ||||
| 	metric := query.Get(metricNameQuery) | ||||
| 	deltaString := query.Get(deltaQuery) | ||||
| 	durationSecString := query.Get(durationSecQuery) | ||||
| 	metric := query.Get(MetricNameQuery) | ||||
| 	deltaString := query.Get(DeltaQuery) | ||||
| 	durationSecString := query.Get(DurationSecQuery) | ||||
| 	if durationSecString == "" || metric == "" || deltaString == "" { | ||||
| 		http.Error(w, notGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, NotGivenFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -181,13 +166,13 @@ func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, | ||||
| 	durationSec, durationSecError := strconv.Atoi(durationSecString) | ||||
| 	delta, deltaError := strconv.ParseFloat(deltaString, 64) | ||||
| 	if durationSecError != nil || deltaError != nil { | ||||
| 		http.Error(w, incorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		http.Error(w, IncorrectFunctionArgument, http.StatusBadRequest) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second) | ||||
| 	fmt.Fprintln(w, bumpMetricAddress[1:]) | ||||
| 	fmt.Fprintln(w, metric, metricNameQuery) | ||||
| 	fmt.Fprintln(w, delta, deltaQuery) | ||||
| 	fmt.Fprintln(w, durationSec, durationSecQuery) | ||||
| 	fmt.Fprintln(w, BumpMetricAddress[1:]) | ||||
| 	fmt.Fprintln(w, metric, MetricNameQuery) | ||||
| 	fmt.Fprintln(w, delta, DeltaQuery) | ||||
| 	fmt.Fprintln(w, durationSec, DurationSecQuery) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 k8s-merge-robot
					k8s-merge-robot