simplify Run in controllermanager
This commit is contained in:
		@@ -116,47 +116,15 @@ func Run(s *options.CMServer) error {
 | 
				
			|||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		glog.Errorf("unable to register configz: %s", err)
 | 
							glog.Errorf("unable to register configz: %s", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
 | 
					
 | 
				
			||||||
 | 
						kubeClient, leaderElectionClient, kubeconfig, err := createClients(s)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	kubeconfig.ContentConfig.ContentType = s.ContentType
 | 
						go startHTTP(s)
 | 
				
			||||||
	// Override kubeconfig qps/burst settings from flags
 | 
					 | 
				
			||||||
	kubeconfig.QPS = s.KubeAPIQPS
 | 
					 | 
				
			||||||
	kubeconfig.Burst = int(s.KubeAPIBurst)
 | 
					 | 
				
			||||||
	kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager"))
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		glog.Fatalf("Invalid API configuration: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						recorder := createRecorder(kubeClient)
 | 
				
			||||||
		mux := http.NewServeMux()
 | 
					 | 
				
			||||||
		healthz.InstallHandler(mux)
 | 
					 | 
				
			||||||
		if s.EnableProfiling {
 | 
					 | 
				
			||||||
			mux.HandleFunc("/debug/pprof/", pprof.Index)
 | 
					 | 
				
			||||||
			mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
 | 
					 | 
				
			||||||
			mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
 | 
					 | 
				
			||||||
			mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
 | 
					 | 
				
			||||||
			if s.EnableContentionProfiling {
 | 
					 | 
				
			||||||
				goruntime.SetBlockProfileRate(1)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		configz.InstallHandler(mux)
 | 
					 | 
				
			||||||
		mux.Handle("/metrics", prometheus.Handler())
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		server := &http.Server{
 | 
					 | 
				
			||||||
			Addr:    net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
 | 
					 | 
				
			||||||
			Handler: mux,
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		glog.Fatal(server.ListenAndServe())
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	eventBroadcaster := record.NewBroadcaster()
 | 
					 | 
				
			||||||
	eventBroadcaster.StartLogging(glog.Infof)
 | 
					 | 
				
			||||||
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
 | 
					 | 
				
			||||||
	recorder := eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "controller-manager"})
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	run := func(stop <-chan struct{}) {
 | 
						run := func(stop <-chan struct{}) {
 | 
				
			||||||
		rootClientBuilder := controller.SimpleControllerClientBuilder{
 | 
							rootClientBuilder := controller.SimpleControllerClientBuilder{
 | 
				
			||||||
@@ -230,6 +198,53 @@ func Run(s *options.CMServer) error {
 | 
				
			|||||||
	panic("unreachable")
 | 
						panic("unreachable")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func startHTTP(s *options.CMServer) {
 | 
				
			||||||
 | 
						mux := http.NewServeMux()
 | 
				
			||||||
 | 
						healthz.InstallHandler(mux)
 | 
				
			||||||
 | 
						if s.EnableProfiling {
 | 
				
			||||||
 | 
							mux.HandleFunc("/debug/pprof/", pprof.Index)
 | 
				
			||||||
 | 
							mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
 | 
				
			||||||
 | 
							mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
 | 
				
			||||||
 | 
							mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
 | 
				
			||||||
 | 
							if s.EnableContentionProfiling {
 | 
				
			||||||
 | 
								goruntime.SetBlockProfileRate(1)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						configz.InstallHandler(mux)
 | 
				
			||||||
 | 
						mux.Handle("/metrics", prometheus.Handler())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						server := &http.Server{
 | 
				
			||||||
 | 
							Addr:    net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
 | 
				
			||||||
 | 
							Handler: mux,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						glog.Fatal(server.ListenAndServe())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func createRecorder(kubeClient *clientset.Clientset) record.EventRecorder {
 | 
				
			||||||
 | 
						eventBroadcaster := record.NewBroadcaster()
 | 
				
			||||||
 | 
						eventBroadcaster.StartLogging(glog.Infof)
 | 
				
			||||||
 | 
						eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
 | 
				
			||||||
 | 
						return eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "controller-manager"})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func createClients(s *options.CMServer) (*clientset.Clientset, *clientset.Clientset, *restclient.Config, error) {
 | 
				
			||||||
 | 
						kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, nil, nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						kubeconfig.ContentConfig.ContentType = s.ContentType
 | 
				
			||||||
 | 
						// Override kubeconfig qps/burst settings from flags
 | 
				
			||||||
 | 
						kubeconfig.QPS = s.KubeAPIQPS
 | 
				
			||||||
 | 
						kubeconfig.Burst = int(s.KubeAPIBurst)
 | 
				
			||||||
 | 
						kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager"))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							glog.Fatalf("Invalid API configuration: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
 | 
				
			||||||
 | 
						return kubeClient, leaderElectionClient, kubeconfig, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type ControllerContext struct {
 | 
					type ControllerContext struct {
 | 
				
			||||||
	// ClientBuilder will provide a client for this controller to use
 | 
						// ClientBuilder will provide a client for this controller to use
 | 
				
			||||||
	ClientBuilder controller.ControllerClientBuilder
 | 
						ClientBuilder controller.ControllerClientBuilder
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user