switch controller manager to generated clientset
This commit is contained in:
@@ -38,11 +38,11 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/typed/dynamic"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
@@ -133,8 +133,7 @@ func Run(s *options.CMServer) error {
|
||||
// Override kubeconfig qps/burst settings from flags
|
||||
kubeconfig.QPS = s.KubeAPIQPS
|
||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||
|
||||
kubeClient, err := client.New(kubeconfig)
|
||||
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid API configuration: %v", err)
|
||||
}
|
||||
@@ -159,11 +158,11 @@ func Run(s *options.CMServer) error {
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
|
||||
|
||||
run := func(stop <-chan struct{}) {
|
||||
err := StartControllers(s, kubeClient, kubeconfig, stop, recorder)
|
||||
err := StartControllers(s, kubeconfig, stop, recorder)
|
||||
glog.Fatalf("error running controllers: %v", err)
|
||||
panic("unreachable")
|
||||
}
|
||||
@@ -183,12 +182,12 @@ func Run(s *options.CMServer) error {
|
||||
Namespace: "kube-system",
|
||||
Name: "kube-controller-manager",
|
||||
},
|
||||
Client: kubeClient,
|
||||
Identity: id,
|
||||
EventRecorder: recorder,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
EndpointsClient: kubeClient,
|
||||
Identity: id,
|
||||
EventRecorder: recorder,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStoppedLeading: func() {
|
||||
@@ -199,16 +198,20 @@ func Run(s *options.CMServer) error {
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error {
|
||||
sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)())
|
||||
func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error {
|
||||
client := func(userAgent string) clientset.Interface {
|
||||
return clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, userAgent))
|
||||
}
|
||||
discoveryClient := client("controller-discovery").Discovery()
|
||||
sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), ResyncPeriod(s)())
|
||||
|
||||
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
|
||||
go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), client("endpoint-controller")).
|
||||
Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
go replicationcontroller.NewReplicationManager(
|
||||
sharedInformers.Pods().Informer(),
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
|
||||
client("replication-controller"),
|
||||
ResyncPeriod(s),
|
||||
replicationcontroller.BurstReplicas,
|
||||
int(s.LookupCacheSizeForRC),
|
||||
@@ -217,7 +220,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
if s.TerminatedPodGCThreshold > 0 {
|
||||
go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)).
|
||||
go podgc.New(client("pod-garbage-collector"), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)).
|
||||
Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
@@ -235,7 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
|
||||
}
|
||||
nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
|
||||
nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, client("node-controller"),
|
||||
s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration,
|
||||
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
|
||||
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
|
||||
@@ -245,7 +248,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
nodeController.Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
} else {
|
||||
@@ -259,7 +262,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
} else if routes, ok := cloud.Routes(); !ok {
|
||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
} else {
|
||||
routeController := routecontroller.New(routes, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "route-controller")), s.ClusterName, clusterCIDR)
|
||||
routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR)
|
||||
routeController.Run(s.RouteReconciliationPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
@@ -267,7 +270,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
|
||||
}
|
||||
|
||||
resourceQuotaControllerClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "resourcequota-controller"))
|
||||
resourceQuotaControllerClient := client("resourcequota-controller")
|
||||
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient)
|
||||
groupKindsToReplenish := []unversioned.GroupKind{
|
||||
api.Kind("Pod"),
|
||||
@@ -303,13 +306,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
}
|
||||
versions := &unversioned.APIVersions{Versions: versionStrings}
|
||||
|
||||
resourceMap, err := kubeClient.Discovery().ServerResources()
|
||||
resourceMap, err := discoveryClient.ServerResources()
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
}
|
||||
|
||||
// Find the list of namespaced resources via discovery that the namespace controller must manage
|
||||
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
|
||||
namespaceKubeClient := client("namespace-controller")
|
||||
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
|
||||
groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
|
||||
if err != nil {
|
||||
@@ -326,7 +329,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Infof("Starting %s apis", groupVersion)
|
||||
if containsResource(resources, "horizontalpodautoscalers") {
|
||||
glog.Infof("Starting horizontal pod controller.")
|
||||
hpaClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "horizontal-pod-autoscaler"))
|
||||
hpaClient := client("horizontal-pod-autoscaler")
|
||||
metricsClient := metrics.NewHeapsterMetricsClient(
|
||||
hpaClient,
|
||||
metrics.DefaultHeapsterNamespace,
|
||||
@@ -334,35 +337,35 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
metrics.DefaultHeapsterService,
|
||||
metrics.DefaultHeapsterPort,
|
||||
)
|
||||
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient, metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
|
||||
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient.Autoscaling(), metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
|
||||
Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "daemonsets") {
|
||||
glog.Infof("Starting daemon set controller")
|
||||
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)).
|
||||
go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), client("daemon-set-controller"), int(s.LookupCacheSizeForDaemonSet)).
|
||||
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "jobs") {
|
||||
glog.Infof("Starting job controller")
|
||||
go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
|
||||
go job.NewJobController(sharedInformers.Pods().Informer(), client("job-controller")).
|
||||
Run(int(s.ConcurrentJobSyncs), wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "deployments") {
|
||||
glog.Infof("Starting deployment controller")
|
||||
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
|
||||
go deployment.NewDeploymentController(client("deployment-controller"), ResyncPeriod(s)).
|
||||
Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "replicasets") {
|
||||
glog.Infof("Starting ReplicaSet controller")
|
||||
go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
|
||||
go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), client("replicaset-controller"), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
|
||||
Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
@@ -375,7 +378,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Infof("Starting %s apis", groupVersion)
|
||||
if containsResource(resources, "poddisruptionbudgets") {
|
||||
glog.Infof("Starting disruption controller")
|
||||
go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), kubeClient).Run(wait.NeverStop)
|
||||
go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), client("disruption-controller")).Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
}
|
||||
@@ -390,8 +393,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
resyncPeriod := ResyncPeriod(s)()
|
||||
go petset.NewPetSetController(
|
||||
sharedInformers.Pods().Informer(),
|
||||
// TODO: Switch to using clientset
|
||||
kubeClient,
|
||||
client("petset-controller"),
|
||||
resyncPeriod,
|
||||
).Run(1, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
@@ -406,7 +408,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Infof("Starting scheduledjob controller")
|
||||
// // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
|
||||
kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
|
||||
go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
|
||||
go scheduledjob.NewScheduledJobController(client("scheduledjob-controller")).
|
||||
Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
@@ -420,7 +422,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
|
||||
}
|
||||
volumeController := persistentvolumecontroller.NewPersistentVolumeController(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
|
||||
client("persistent-volume-binder"),
|
||||
s.PVClaimBinderSyncPeriod.Duration,
|
||||
alphaProvisioner,
|
||||
ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
|
||||
@@ -437,7 +439,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
|
||||
attachDetachController, attachDetachControllerErr :=
|
||||
attachdetach.NewAttachDetachController(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")),
|
||||
client("attachdetach-controller"),
|
||||
sharedInformers.Pods().Informer(),
|
||||
sharedInformers.Nodes().Informer(),
|
||||
sharedInformers.PersistentVolumeClaims().Informer(),
|
||||
@@ -460,7 +462,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Infof("Starting certificate request controller")
|
||||
resyncPeriod := ResyncPeriod(s)()
|
||||
certController, err := certcontroller.NewCertificateController(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "certificate-controller")),
|
||||
client("certificate-controller"),
|
||||
resyncPeriod,
|
||||
s.ClusterSigningCertFile,
|
||||
s.ClusterSigningKeyFile,
|
||||
@@ -495,7 +497,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Errorf("Error reading key for service account token controller: %v", err)
|
||||
} else {
|
||||
go serviceaccountcontroller.NewTokensController(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
|
||||
client("tokens-controller"),
|
||||
serviceaccountcontroller.TokensControllerOptions{
|
||||
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
|
||||
RootCA: rootCA,
|
||||
@@ -506,13 +508,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
}
|
||||
|
||||
serviceaccountcontroller.NewServiceAccountsController(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-account-controller")),
|
||||
client("service-account-controller"),
|
||||
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
|
||||
).Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
if s.EnableGarbageCollector {
|
||||
gcClientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"))
|
||||
gcClientset := client("generic-garbage-collector")
|
||||
groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources()
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to get supported resources from server: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user