Merge pull request #20783 from mesosphere/jdef_fix_scheduler_clientset_impl
Auto commit by PR queue bot
This commit is contained in:
		| @@ -19,7 +19,7 @@ package executor | ||||
| import ( | ||||
| 	"k8s.io/kubernetes/contrib/mesos/pkg/node" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||
| 	core_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" | ||||
| ) | ||||
|  | ||||
| type kubeAPI interface { | ||||
| @@ -33,11 +33,11 @@ type nodeAPI interface { | ||||
| // clientAPIWrapper implements kubeAPI and node API, which serve to isolate external dependencies | ||||
| // such that they're easier to mock in unit test. | ||||
| type clientAPIWrapper struct { | ||||
| 	client *clientset.Clientset | ||||
| 	client core_unversioned.CoreInterface | ||||
| } | ||||
|  | ||||
| func (cw *clientAPIWrapper) killPod(ns, name string) error { | ||||
| 	return cw.client.Core().Pods(ns).Delete(name, api.NewDeleteOptions(0)) | ||||
| 	return cw.client.Pods(ns).Delete(name, api.NewDeleteOptions(0)) | ||||
| } | ||||
|  | ||||
| func (cw *clientAPIWrapper) createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error) { | ||||
|   | ||||
| @@ -23,7 +23,7 @@ import ( | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||
| 	core_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" | ||||
|  | ||||
| 	log "github.com/golang/glog" | ||||
| 	mesos "github.com/mesos/mesos-go/mesosproto" | ||||
| @@ -42,7 +42,7 @@ const ( | ||||
| // Create creates a new node api object with the given hostname, | ||||
| // slave attribute labels and annotations | ||||
| func Create( | ||||
| 	client *clientset.Clientset, | ||||
| 	client core_unversioned.NodesGetter, | ||||
| 	hostName string, | ||||
| 	slaveAttrLabels, | ||||
| 	annotations map[string]string, | ||||
| @@ -88,7 +88,7 @@ func Create( | ||||
| // The updated node merges the given slave attribute labels | ||||
| // and annotations with the found api object. | ||||
| func Update( | ||||
| 	client *clientset.Clientset, | ||||
| 	client core_unversioned.NodesGetter, | ||||
| 	hostname string, | ||||
| 	slaveAttrLabels, | ||||
| 	annotations map[string]string, | ||||
| @@ -123,7 +123,7 @@ func Update( | ||||
|  | ||||
| // CreateOrUpdate creates a node api object or updates an existing one | ||||
| func CreateOrUpdate( | ||||
| 	client *clientset.Clientset, | ||||
| 	client core_unversioned.NodesGetter, | ||||
| 	hostname string, | ||||
| 	slaveAttrLabels, | ||||
| 	annotations map[string]string, | ||||
|   | ||||
| @@ -33,8 +33,6 @@ import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||
|  | ||||
| 	etcd "github.com/coreos/etcd/client" | ||||
| 	"github.com/gogo/protobuf/proto" | ||||
| 	log "github.com/golang/glog" | ||||
| @@ -72,9 +70,11 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	"k8s.io/kubernetes/pkg/api/resource" | ||||
| 	"k8s.io/kubernetes/pkg/client/cache" | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" | ||||
| 	client "k8s.io/kubernetes/pkg/client/unversioned" | ||||
| 	clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" | ||||
| 	"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" | ||||
| 	cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" | ||||
| 	controllerfw "k8s.io/kubernetes/pkg/controller/framework" | ||||
| 	"k8s.io/kubernetes/pkg/fields" | ||||
| @@ -104,7 +104,9 @@ type SchedulerServer struct { | ||||
| 	port                  int | ||||
| 	address               net.IP | ||||
| 	enableProfiling       bool | ||||
| 	authPath              string | ||||
| 	kubeconfig            string | ||||
| 	kubeAPIQPS            float32 | ||||
| 	kubeAPIBurst          int | ||||
| 	apiServerList         []string | ||||
| 	etcdServerList        []string | ||||
| 	allowPrivileged       bool | ||||
| @@ -194,6 +196,8 @@ func NewSchedulerServer() *SchedulerServer { | ||||
| 		address:           net.ParseIP("127.0.0.1"), | ||||
| 		failoverTimeout:   time.Duration((1 << 62) - 1).Seconds(), | ||||
| 		frameworkStoreURI: "etcd://", | ||||
| 		kubeAPIQPS:        50.0, | ||||
| 		kubeAPIBurst:      100, | ||||
|  | ||||
| 		runProxy:                 true, | ||||
| 		executorSuicideTimeout:   execcfg.DefaultSuicideTimeout, | ||||
| @@ -250,7 +254,9 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { | ||||
| 	fs.IPVar(&s.address, "address", s.address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)") | ||||
| 	fs.BoolVar(&s.enableProfiling, "profiling", s.enableProfiling, "Enable profiling via web interface host:port/debug/pprof/") | ||||
| 	fs.StringSliceVar(&s.apiServerList, "api-servers", s.apiServerList, "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") | ||||
| 	fs.StringVar(&s.authPath, "auth-path", s.authPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.") | ||||
| 	fs.StringVar(&s.kubeconfig, "kubeconfig", s.kubeconfig, "Path to kubeconfig file with authorization and master location information.") | ||||
| 	fs.Float32Var(&s.kubeAPIQPS, "kube-api-qps", s.kubeAPIQPS, "QPS to use while talking with kubernetes apiserver") | ||||
| 	fs.IntVar(&s.kubeAPIBurst, "kube-api-burst", s.kubeAPIBurst, "Burst to use while talking with kubernetes apiserver") | ||||
| 	fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated.") | ||||
| 	fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "Enable privileged containers in the kubelet (compare the same flag in the apiserver).") | ||||
| 	fs.StringVar(&s.clusterDomain, "cluster-domain", s.clusterDomain, "Domain for this cluster.  If set, kubelet will configure all containers to search this domain in addition to the host's search domains") | ||||
| @@ -436,11 +442,11 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E | ||||
| 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--conntrack-max=%d", s.conntrackMax)) | ||||
| 	ci.Arguments = append(ci.Arguments, fmt.Sprintf("--conntrack-tcp-timeout-established=%d", s.conntrackTCPTimeoutEstablished)) | ||||
|  | ||||
| 	if s.authPath != "" { | ||||
| 	if s.kubeconfig != "" { | ||||
| 		//TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file | ||||
| 		uri, basename := s.serveFrameworkArtifact(s.authPath) | ||||
| 		uri, basename := s.serveFrameworkArtifact(s.kubeconfig) | ||||
| 		ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri)}) | ||||
| 		ci.Arguments = append(ci.Arguments, fmt.Sprintf("--auth-path=%s", basename)) | ||||
| 		ci.Arguments = append(ci.Arguments, fmt.Sprintf("--kubeconfig=%s", basename)) | ||||
| 	} | ||||
| 	appendOptional := func(name string, value string) { | ||||
| 		if value != "" { | ||||
| @@ -520,34 +526,17 @@ func (s *SchedulerServer) prepareStaticPods() (data []byte, staticPodCPUs, stati | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // TODO(jdef): hacked from kubelet/server/server.go | ||||
| // TODO(k8s): replace this with clientcmd | ||||
| func (s *SchedulerServer) createAPIServerClient() (*clientset.Clientset, error) { | ||||
| 	authInfo, err := clientauth.LoadFromFile(s.authPath) | ||||
| 	if err != nil { | ||||
| 		log.Warningf("Could not load kubernetes auth path: %v. Continuing with defaults.", err) | ||||
| 	} | ||||
| 	if authInfo == nil { | ||||
| 		// authInfo didn't load correctly - continue with defaults. | ||||
| 		authInfo = &clientauth.Info{} | ||||
| 	} | ||||
| 	clientConfig, err := authInfo.MergeWithConfig(client.Config{}) | ||||
| // TODO(jdef): hacked from plugin/cmd/kube-scheduler/app/server.go | ||||
| func (s *SchedulerServer) createAPIServerClientConfig() (*client.Config, error) { | ||||
| 	kubeconfig, err := clientcmd.BuildConfigFromFlags(s.apiServerList[0], s.kubeconfig) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if len(s.apiServerList) < 1 { | ||||
| 		return nil, fmt.Errorf("no api servers specified") | ||||
| 	} | ||||
| 	// TODO: adapt Kube client to support LB over several servers | ||||
| 	if len(s.apiServerList) > 1 { | ||||
| 		log.Infof("Multiple api servers specified.  Picking first one") | ||||
| 	} | ||||
| 	clientConfig.Host = s.apiServerList[0] | ||||
| 	c, err := clientset.NewForConfig(&clientConfig) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return c, nil | ||||
|  | ||||
| 	// Override kubeconfig qps/burst settings from flags | ||||
| 	kubeconfig.QPS = s.kubeAPIQPS | ||||
| 	kubeconfig.Burst = s.kubeAPIBurst | ||||
| 	return kubeconfig, nil | ||||
| } | ||||
|  | ||||
| func (s *SchedulerServer) setDriver(driver bindings.SchedulerDriver) { | ||||
| @@ -691,11 +680,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config | ||||
| 		log.Fatal("No api servers specified.") | ||||
| 	} | ||||
|  | ||||
| 	client, err := s.createAPIServerClient() | ||||
| 	clientConfig, err := s.createAPIServerClientConfig() | ||||
| 	if err != nil { | ||||
| 		log.Fatalf("Unable to make apiserver client: %v", err) | ||||
| 		log.Fatalf("Unable to make apiserver client config: %v", err) | ||||
| 	} | ||||
| 	s.client, err = clientset.NewForConfig(clientConfig) | ||||
| 	if err != nil { | ||||
| 		log.Fatalf("Unable to make apiserver clientset: %v", err) | ||||
| 	} | ||||
| 	s.client = client | ||||
|  | ||||
| 	if s.reconcileCooldown < defaultReconcileCooldown { | ||||
| 		s.reconcileCooldown = defaultReconcileCooldown | ||||
| @@ -719,7 +711,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config | ||||
|  | ||||
| 	// mirror all nodes into the nodeStore | ||||
| 	var eiRegistry executorinfo.Registry | ||||
| 	nodesClient, err := s.createAPIServerClient() | ||||
| 	nodesClientConfig := *clientConfig | ||||
| 	nodesClient, err := clientset.NewForConfig(&nodesClientConfig) | ||||
| 	if err != nil { | ||||
| 		log.Fatalf("Cannot create client to watch nodes: %v", err) | ||||
| 	} | ||||
| @@ -760,7 +753,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config | ||||
| 	} | ||||
| 	framework := framework.New(framework.Config{ | ||||
| 		SchedulerConfig:   *sc, | ||||
| 		Client:            client, | ||||
| 		Client:            s.client, | ||||
| 		FailoverTimeout:   s.failoverTimeout, | ||||
| 		ReconcileInterval: s.reconcileInterval, | ||||
| 		ReconcileCooldown: s.reconcileCooldown, | ||||
| @@ -791,18 +784,23 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config | ||||
| 	} | ||||
|  | ||||
| 	// create event recorder sending events to the "" namespace of the apiserver | ||||
| 	eventsClientConfig := *clientConfig | ||||
| 	eventsClient, err := clientset.NewForConfig(&eventsClientConfig) | ||||
| 	if err != nil { | ||||
| 		log.Fatalf("Invalid API configuration: %v", err) | ||||
| 	} | ||||
| 	broadcaster := record.NewBroadcaster() | ||||
| 	recorder := broadcaster.NewRecorder(api.EventSource{Component: api.DefaultSchedulerName}) | ||||
| 	broadcaster.StartLogging(log.Infof) | ||||
| 	broadcaster.StartRecordingToSink(client.Events("")) | ||||
| 	broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{eventsClient.Events("")}) | ||||
|  | ||||
| 	// create scheduler core with all components arranged around it | ||||
| 	lw := cache.NewListWatchFromClient(client.CoreClient, "pods", api.NamespaceAll, fields.Everything()) | ||||
| 	lw := cache.NewListWatchFromClient(s.client.CoreClient, "pods", api.NamespaceAll, fields.Everything()) | ||||
| 	sched := components.New( | ||||
| 		sc, | ||||
| 		framework, | ||||
| 		fcfs, | ||||
| 		client, | ||||
| 		s.client, | ||||
| 		recorder, | ||||
| 		schedulerProcess.Terminal(), | ||||
| 		s.mux, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 k8s-merge-robot
					k8s-merge-robot