code cleanup in integration framework
This commit is contained in:
		@@ -21,9 +21,7 @@ import (
 | 
				
			|||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"net/http/httptest"
 | 
						"net/http/httptest"
 | 
				
			||||||
	"path"
 | 
						"path"
 | 
				
			||||||
	goruntime "runtime"
 | 
					 | 
				
			||||||
	"strconv"
 | 
						"strconv"
 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/go-openapi/spec"
 | 
						"github.com/go-openapi/spec"
 | 
				
			||||||
@@ -54,13 +52,10 @@ import (
 | 
				
			|||||||
	"k8s.io/client-go/informers"
 | 
						"k8s.io/client-go/informers"
 | 
				
			||||||
	clientset "k8s.io/client-go/kubernetes"
 | 
						clientset "k8s.io/client-go/kubernetes"
 | 
				
			||||||
	restclient "k8s.io/client-go/rest"
 | 
						restclient "k8s.io/client-go/rest"
 | 
				
			||||||
	"k8s.io/client-go/tools/record"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
						"k8s.io/kubernetes/pkg/api/legacyscheme"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api/testapi"
 | 
						"k8s.io/kubernetes/pkg/api/testapi"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/apis/batch"
 | 
						"k8s.io/kubernetes/pkg/apis/batch"
 | 
				
			||||||
	policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
 | 
						policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/controller"
 | 
					 | 
				
			||||||
	replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
 | 
					 | 
				
			||||||
	"k8s.io/kubernetes/pkg/generated/openapi"
 | 
						"k8s.io/kubernetes/pkg/generated/openapi"
 | 
				
			||||||
	kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
 | 
						kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/master"
 | 
						"k8s.io/kubernetes/pkg/master"
 | 
				
			||||||
@@ -68,37 +63,6 @@ import (
 | 
				
			|||||||
	"k8s.io/kubernetes/plugin/pkg/admission/admit"
 | 
						"k8s.io/kubernetes/plugin/pkg/admission/admit"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					 | 
				
			||||||
	// Timeout used in benchmarks, to eg: scale an rc
 | 
					 | 
				
			||||||
	DefaultTimeout = 30 * time.Minute
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Rc manifest used to create pods for benchmarks.
 | 
					 | 
				
			||||||
	// TODO: Convert this to a full path?
 | 
					 | 
				
			||||||
	TestRCManifest = "benchmark-controller.json"
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// MasterComponents is a control struct for all master components started via NewMasterComponents.
 | 
					 | 
				
			||||||
// TODO: Include all master components (scheduler, nodecontroller).
 | 
					 | 
				
			||||||
// TODO: Reconcile with integration.go, currently the master used there doesn't understand
 | 
					 | 
				
			||||||
// how to restart cleanly, which is required for each iteration of a benchmark. The integration
 | 
					 | 
				
			||||||
// tests also don't make it easy to isolate and turn off components at will.
 | 
					 | 
				
			||||||
type MasterComponents struct {
 | 
					 | 
				
			||||||
	// Raw http server in front of the master
 | 
					 | 
				
			||||||
	ApiServer *httptest.Server
 | 
					 | 
				
			||||||
	// Kubernetes master, contains an embedded etcd storage
 | 
					 | 
				
			||||||
	KubeMaster *master.Master
 | 
					 | 
				
			||||||
	// Restclient used to talk to the kubernetes master
 | 
					 | 
				
			||||||
	ClientSet clientset.Interface
 | 
					 | 
				
			||||||
	// Replication controller manager
 | 
					 | 
				
			||||||
	ControllerManager *replicationcontroller.ReplicationManager
 | 
					 | 
				
			||||||
	// CloseFn shuts down the server
 | 
					 | 
				
			||||||
	CloseFn CloseFunc
 | 
					 | 
				
			||||||
	// Channel for stop signals to rc manager
 | 
					 | 
				
			||||||
	rcStopCh chan struct{}
 | 
					 | 
				
			||||||
	// Used to stop master components individually, and via MasterComponents.Stop
 | 
					 | 
				
			||||||
	once sync.Once
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Config is a struct of configuration directives for NewMasterComponents.
 | 
					// Config is a struct of configuration directives for NewMasterComponents.
 | 
				
			||||||
type Config struct {
 | 
					type Config struct {
 | 
				
			||||||
	// If nil, a default is used, partially filled configs will not get populated.
 | 
						// If nil, a default is used, partially filled configs will not get populated.
 | 
				
			||||||
@@ -111,33 +75,6 @@ type Config struct {
 | 
				
			|||||||
	// TODO: Add configs for endpoints controller, scheduler etc
 | 
						// TODO: Add configs for endpoints controller, scheduler etc
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewMasterComponents creates, initializes and starts master components based on the given config.
 | 
					 | 
				
			||||||
func NewMasterComponents(c *Config) *MasterComponents {
 | 
					 | 
				
			||||||
	m, s, closeFn := startMasterOrDie(c.MasterConfig, nil, nil)
 | 
					 | 
				
			||||||
	// TODO: Allow callers to pipe through a different master url and create a client/start components using it.
 | 
					 | 
				
			||||||
	glog.Infof("Master %+v", s.URL)
 | 
					 | 
				
			||||||
	// TODO: caesarxuchao: remove this client when the refactoring of client libraray is done.
 | 
					 | 
				
			||||||
	clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}, QPS: c.QPS, Burst: c.Burst})
 | 
					 | 
				
			||||||
	rcStopCh := make(chan struct{})
 | 
					 | 
				
			||||||
	informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
 | 
					 | 
				
			||||||
	controllerManager := replicationcontroller.NewReplicationManager(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().ReplicationControllers(), clientset, c.Burst)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// TODO: Support events once we can cleanly shutdown an event recorder.
 | 
					 | 
				
			||||||
	controllerManager.SetEventRecorder(&record.FakeRecorder{})
 | 
					 | 
				
			||||||
	if c.StartReplicationManager {
 | 
					 | 
				
			||||||
		informerFactory.Start(rcStopCh)
 | 
					 | 
				
			||||||
		go controllerManager.Run(goruntime.NumCPU(), rcStopCh)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return &MasterComponents{
 | 
					 | 
				
			||||||
		ApiServer:         s,
 | 
					 | 
				
			||||||
		KubeMaster:        m,
 | 
					 | 
				
			||||||
		ClientSet:         clientset,
 | 
					 | 
				
			||||||
		ControllerManager: controllerManager,
 | 
					 | 
				
			||||||
		CloseFn:           closeFn,
 | 
					 | 
				
			||||||
		rcStopCh:          rcStopCh,
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// alwaysAllow always allows an action
 | 
					// alwaysAllow always allows an action
 | 
				
			||||||
type alwaysAllow struct{}
 | 
					type alwaysAllow struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -375,22 +312,6 @@ func NewIntegrationTestMasterConfig() *master.Config {
 | 
				
			|||||||
	return masterConfig
 | 
						return masterConfig
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *MasterComponents) stopRCManager() {
 | 
					 | 
				
			||||||
	close(m.rcStopCh)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (m *MasterComponents) Stop(apiServer, rcManager bool) {
 | 
					 | 
				
			||||||
	glog.Infof("Stopping master components")
 | 
					 | 
				
			||||||
	if rcManager {
 | 
					 | 
				
			||||||
		// Ordering matters because the apiServer will only shutdown when pending
 | 
					 | 
				
			||||||
		// requests are done
 | 
					 | 
				
			||||||
		m.once.Do(m.stopRCManager)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if apiServer {
 | 
					 | 
				
			||||||
		m.CloseFn()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// CloseFunc can be called to cleanup the master
 | 
					// CloseFunc can be called to cleanup the master
 | 
				
			||||||
type CloseFunc func()
 | 
					type CloseFunc func()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -407,37 +328,6 @@ func RunAMasterUsingServer(masterConfig *master.Config, s *httptest.Server, mast
 | 
				
			|||||||
	return startMasterOrDie(masterConfig, s, masterReceiver)
 | 
						return startMasterOrDie(masterConfig, s, masterReceiver)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Task is a function passed to worker goroutines by RunParallel.
 | 
					 | 
				
			||||||
// The function needs to implement its own thread safety.
 | 
					 | 
				
			||||||
type Task func(id int) error
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// RunParallel spawns a goroutine per task in the given queue
 | 
					 | 
				
			||||||
func RunParallel(task Task, numTasks, numWorkers int) {
 | 
					 | 
				
			||||||
	start := time.Now()
 | 
					 | 
				
			||||||
	if numWorkers <= 0 {
 | 
					 | 
				
			||||||
		numWorkers = numTasks
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	defer func() {
 | 
					 | 
				
			||||||
		glog.Infof("RunParallel took %v for %d tasks and %d workers", time.Since(start), numTasks, numWorkers)
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
	var wg sync.WaitGroup
 | 
					 | 
				
			||||||
	semCh := make(chan struct{}, numWorkers)
 | 
					 | 
				
			||||||
	wg.Add(numTasks)
 | 
					 | 
				
			||||||
	for id := 0; id < numTasks; id++ {
 | 
					 | 
				
			||||||
		go func(id int) {
 | 
					 | 
				
			||||||
			semCh <- struct{}{}
 | 
					 | 
				
			||||||
			err := task(id)
 | 
					 | 
				
			||||||
			if err != nil {
 | 
					 | 
				
			||||||
				glog.Fatalf("Worker failed with %v", err)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			<-semCh
 | 
					 | 
				
			||||||
			wg.Done()
 | 
					 | 
				
			||||||
		}(id)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	wg.Wait()
 | 
					 | 
				
			||||||
	close(semCh)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// FindFreeLocalPort returns the number of an available port number on
 | 
					// FindFreeLocalPort returns the number of an available port number on
 | 
				
			||||||
// the loopback interface.  Useful for determining the port to launch
 | 
					// the loopback interface.  Useful for determining the port to launch
 | 
				
			||||||
// a server on.  Error handling required - there is a non-zero chance
 | 
					// a server on.  Error handling required - there is a non-zero chance
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user