Add a simple master benchmark and a wrapper to run it.
This commit is contained in:
77
test/integration/framework/etcd_utils.go
Normal file
77
test/integration/framework/etcd_utils.go
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// +build integration
|
||||
|
||||
package framework
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// If you need to start an etcd instance by hand, you also need to insert a key
|
||||
// for this check to pass (*any* key will do, eg:
|
||||
//curl -L http://127.0.0.1:4001/v2/keys/message -XPUT -d value="Hello world").
|
||||
func init() {
|
||||
RequireEtcd()
|
||||
}
|
||||
|
||||
func NewEtcdClient() *etcd.Client {
|
||||
return etcd.NewClient([]string{})
|
||||
}
|
||||
|
||||
func NewHelper() (tools.EtcdHelper, error) {
|
||||
return master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
|
||||
}
|
||||
|
||||
func RequireEtcd() {
|
||||
if _, err := NewEtcdClient().Get("/", false, false); err != nil {
|
||||
glog.Fatalf("unable to connect to etcd for testing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func WithEtcdKey(f func(string)) {
|
||||
prefix := fmt.Sprintf("/test-%d", rand.Int63())
|
||||
defer NewEtcdClient().Delete(prefix, true)
|
||||
f(prefix)
|
||||
}
|
||||
|
||||
// DeleteAllEtcdKeys deletes all keys from etcd.
|
||||
// TODO: Instead of sprinkling calls to this throughout the code, adjust the
|
||||
// prefix in etcdtest package; then just delete everything once at the end
|
||||
// of the test run.
|
||||
func DeleteAllEtcdKeys() {
|
||||
glog.Infof("Deleting all etcd keys")
|
||||
client := NewEtcdClient()
|
||||
keys, err := client.Get("/", false, false)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unable to list root etcd keys: %v", err)
|
||||
}
|
||||
for _, node := range keys.Node.Nodes {
|
||||
if _, err := client.Delete(node.Key, true); err != nil {
|
||||
glog.Fatalf("Unable delete key: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
313
test/integration/framework/master_utils.go
Normal file
313
test/integration/framework/master_utils.go
Normal file
@@ -0,0 +1,313 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package framework
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
// Timeout used in benchmarks, to eg: resize an rc
|
||||
DefaultTimeout = 30 * time.Minute
|
||||
|
||||
// Rc manifest used to create pods for benchmarks.
|
||||
// TODO: Convert this to a full path?
|
||||
TestRCManifest = "benchmark-controller.json"
|
||||
|
||||
// Test Namspace, for pods and rcs.
|
||||
TestNS = "test"
|
||||
)
|
||||
|
||||
// MasterComponents is a control struct for all master components started via NewMasterComponents.
|
||||
// TODO: Include all master components (scheduler, nodecontroller).
|
||||
// TODO: Reconcile with integration.go, currently the master used there doesn't understand
|
||||
// how to restart cleanly, which is required for each iteration of a benchmark. The integration
|
||||
// tests also don't make it easy to isolate and turn off components at will.
|
||||
type MasterComponents struct {
|
||||
// Raw http server in front of the master
|
||||
ApiServer *httptest.Server
|
||||
// Kubernetes master, contains an embedded etcd helper
|
||||
KubeMaster *master.Master
|
||||
// Restclient used to talk to the kubernetes master
|
||||
RestClient *client.Client
|
||||
// Replication controller manager
|
||||
ControllerManager *controller.ReplicationManager
|
||||
// Channel for stop signals to rc manager
|
||||
rcStopCh chan struct{}
|
||||
// Used to stop master components individually, and via MasterComponents.Stop
|
||||
once sync.Once
|
||||
// Kubernetes etcd helper, has embedded etcd client
|
||||
EtcdHelper *tools.EtcdHelper
|
||||
}
|
||||
|
||||
// Config is a struct of configuration directives for NewMasterComponents.
|
||||
type Config struct {
|
||||
// If nil, a default is used, partially filled configs will not get populated.
|
||||
MasterConfig *master.Config
|
||||
StartReplicationManager bool
|
||||
// If true, all existing etcd keys are purged before starting master components
|
||||
DeleteEtcdKeys bool
|
||||
// Client throttling qps
|
||||
QPS float32
|
||||
// Client burst qps, also burst replicas allowed in rc manager
|
||||
Burst int
|
||||
// TODO: Add configs for endpoints controller, scheduler etc
|
||||
}
|
||||
|
||||
// NewMasterComponents creates, initializes and starts master components based on the given config.
|
||||
func NewMasterComponents(c *Config) *MasterComponents {
|
||||
m, s, h := startMasterOrDie(c.MasterConfig)
|
||||
// TODO: Allow callers to pipe through a different master url and create a client/start components using it.
|
||||
glog.Infof("Master %+v", s.URL)
|
||||
if c.DeleteEtcdKeys {
|
||||
DeleteAllEtcdKeys()
|
||||
}
|
||||
restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: "v1beta3", QPS: c.QPS, Burst: c.Burst})
|
||||
rcStopCh := make(chan struct{})
|
||||
controllerManager := controller.NewReplicationManager(restClient, c.Burst)
|
||||
|
||||
// TODO: Support events once we can cleanly shutdown an event recorder.
|
||||
controllerManager.SetEventRecorder(&record.FakeRecorder{})
|
||||
if c.StartReplicationManager {
|
||||
go controllerManager.Run(runtime.NumCPU(), rcStopCh)
|
||||
}
|
||||
var once sync.Once
|
||||
return &MasterComponents{
|
||||
ApiServer: s,
|
||||
KubeMaster: m,
|
||||
RestClient: restClient,
|
||||
ControllerManager: controllerManager,
|
||||
rcStopCh: rcStopCh,
|
||||
EtcdHelper: h,
|
||||
once: once,
|
||||
}
|
||||
}
|
||||
|
||||
// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
|
||||
func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, *tools.EtcdHelper) {
|
||||
var m *master.Master
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
m.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
|
||||
var helper tools.EtcdHelper
|
||||
var err error
|
||||
if masterConfig == nil {
|
||||
helper, err = master.NewEtcdHelper(NewEtcdClient(), "", etcdtest.PathPrefix())
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to create etcd helper for master %v", err)
|
||||
}
|
||||
masterConfig = &master.Config{
|
||||
EtcdHelper: helper,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
EnableLogsSupport: false,
|
||||
EnableProfiling: true,
|
||||
EnableUISupport: false,
|
||||
APIPrefix: "/api",
|
||||
Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
|
||||
AdmissionControl: admit.NewAlwaysAdmit(),
|
||||
}
|
||||
} else {
|
||||
helper = masterConfig.EtcdHelper
|
||||
}
|
||||
m = master.New(masterConfig)
|
||||
return m, s, &helper
|
||||
}
|
||||
|
||||
func (m *MasterComponents) stopRCManager() {
|
||||
close(m.rcStopCh)
|
||||
}
|
||||
|
||||
func (m *MasterComponents) Stop(apiServer, rcManager bool) {
|
||||
glog.Infof("Stopping master components")
|
||||
if rcManager {
|
||||
// Ordering matters because the apiServer will only shutdown when pending
|
||||
// requests are done
|
||||
m.once.Do(m.stopRCManager)
|
||||
}
|
||||
if apiServer {
|
||||
m.ApiServer.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// RCFromManifest reads a .json file and returns the rc in it.
|
||||
func RCFromManifest(fileName string) *api.ReplicationController {
|
||||
data, err := ioutil.ReadFile(fileName)
|
||||
if err != nil {
|
||||
glog.Fatalf("Unexpected error reading rc manifest %v", err)
|
||||
}
|
||||
var controller api.ReplicationController
|
||||
if err := api.Scheme.DecodeInto(data, &controller); err != nil {
|
||||
glog.Fatalf("Unexpected error reading rc manifest %v", err)
|
||||
}
|
||||
return &controller
|
||||
}
|
||||
|
||||
// StopRC stops the rc via kubectl's stop library
|
||||
func StopRC(rc *api.ReplicationController, restClient *client.Client) error {
|
||||
reaper, err := kubectl.ReaperFor("ReplicationController", restClient)
|
||||
if err != nil || reaper == nil {
|
||||
return err
|
||||
}
|
||||
_, err = reaper.Stop(rc.Namespace, rc.Name, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResizeRC resizes the given rc to the given replicas.
|
||||
func ResizeRC(name, ns string, replicas int, restClient *client.Client) (*api.ReplicationController, error) {
|
||||
resizer, err := kubectl.ResizerFor("ReplicationController", kubectl.NewResizerClient(restClient))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retry := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout}
|
||||
waitForReplicas := &kubectl.RetryParams{50 * time.Millisecond, DefaultTimeout}
|
||||
err = resizer.Resize(ns, name, uint(replicas), nil, retry, waitForReplicas)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resized, err := restClient.ReplicationControllers(ns).Get(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resized, nil
|
||||
}
|
||||
|
||||
// StartRC creates given rc if it doesn't already exist, then updates it via kubectl's resizer.
|
||||
func StartRC(controller *api.ReplicationController, restClient *client.Client) (*api.ReplicationController, error) {
|
||||
created, err := restClient.ReplicationControllers(controller.Namespace).Get(controller.Name)
|
||||
if err != nil {
|
||||
glog.Infof("Rc %v doesn't exist, creating", controller.Name)
|
||||
created, err = restClient.ReplicationControllers(controller.Namespace).Create(controller)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// If we just created an rc, wait till it creates its replicas.
|
||||
return ResizeRC(created.Name, created.Namespace, controller.Spec.Replicas, restClient)
|
||||
}
|
||||
|
||||
// StartPods check for numPods in TestNS. If they exist, it no-ops, otherwise it starts up
|
||||
// a temp rc, resizes it to match numPods, then deletes the rc leaving behind the pods.
|
||||
func StartPods(numPods int, host string, restClient *client.Client) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
glog.Infof("StartPods took %v with numPods %d", time.Since(start), numPods)
|
||||
}()
|
||||
hostField := fields.OneTermEqualSelector(client.PodHost, host)
|
||||
pods, err := restClient.Pods(TestNS).List(labels.Everything(), hostField)
|
||||
if err != nil || len(pods.Items) == numPods {
|
||||
return err
|
||||
}
|
||||
glog.Infof("Found %d pods that match host %v, require %d", len(pods.Items), hostField, numPods)
|
||||
// For the sake of simplicity, assume all pods in TestNS have selectors matching TestRCManifest.
|
||||
controller := RCFromManifest(TestRCManifest)
|
||||
|
||||
// Make the rc unique to the given host.
|
||||
controller.Spec.Replicas = numPods
|
||||
controller.Spec.Template.Spec.Host = host
|
||||
controller.Name = controller.Name + host
|
||||
controller.Spec.Selector["host"] = host
|
||||
controller.Spec.Template.Labels["host"] = host
|
||||
|
||||
if rc, err := StartRC(controller, restClient); err != nil {
|
||||
return err
|
||||
} else {
|
||||
// Delete the rc, otherwise when we restart master components for the next benchmark
|
||||
// the rc controller will race with the pods controller in the rc manager.
|
||||
return restClient.ReplicationControllers(TestNS).Delete(rc.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Merge this into startMasterOrDie.
|
||||
func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) {
|
||||
helper, err := master.NewEtcdHelper(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
m := master.New(&master.Config{
|
||||
EtcdHelper: helper,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
EnableLogsSupport: false,
|
||||
EnableProfiling: true,
|
||||
EnableUISupport: false,
|
||||
APIPrefix: "/api",
|
||||
Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
|
||||
AdmissionControl: admit.NewAlwaysAdmit(),
|
||||
})
|
||||
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
m.Handler.ServeHTTP(w, req)
|
||||
}))
|
||||
|
||||
return m, s
|
||||
}
|
||||
|
||||
// Task is a function passed to worker goroutines by RunParallel.
|
||||
// The function needs to implement its own thread safety.
|
||||
type Task func(id int) error
|
||||
|
||||
// RunParallel spawns a goroutine per task in the given queue
|
||||
func RunParallel(task Task, numTasks, numWorkers int) {
|
||||
start := time.Now()
|
||||
if numWorkers <= 0 {
|
||||
numWorkers = numTasks
|
||||
}
|
||||
defer func() {
|
||||
glog.Infof("RunParallel took %v for %d tasks and %d workers", time.Since(start), numTasks, numWorkers)
|
||||
}()
|
||||
var wg sync.WaitGroup
|
||||
semCh := make(chan struct{}, numWorkers)
|
||||
wg.Add(numTasks)
|
||||
for id := 0; id < numTasks; id++ {
|
||||
go func(id int) {
|
||||
semCh <- struct{}{}
|
||||
err := task(id)
|
||||
if err != nil {
|
||||
glog.Fatalf("Worker failed with %v", err)
|
||||
}
|
||||
<-semCh
|
||||
wg.Done()
|
||||
}(id)
|
||||
}
|
||||
wg.Wait()
|
||||
close(semCh)
|
||||
}
|
Reference in New Issue
Block a user