Added optional delays to starting controller managers. #22669
This commit is contained in:
@@ -75,6 +75,11 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
const (
|
||||
// Jitter used when starting controller managers
|
||||
ControllerStartJitter = 1.0
|
||||
)
|
||||
|
||||
// NewControllerManagerCommand creates a *cobra.Command object with default parameters
|
||||
func NewControllerManagerCommand() *cobra.Command {
|
||||
s := options.NewCMServer()
|
||||
@@ -186,6 +191,7 @@ func Run(s *options.CMServer) error {
|
||||
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
|
||||
go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)).
|
||||
Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
go replicationcontroller.NewReplicationManager(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
|
||||
@@ -193,10 +199,12 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
replicationcontroller.BurstReplicas,
|
||||
s.LookupCacheSizeForRC,
|
||||
).Run(s.ConcurrentRCSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
if s.TerminatedPodGCThreshold > 0 {
|
||||
go gc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "garbage-collector")), ResyncPeriod(s), s.TerminatedPodGCThreshold).
|
||||
Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
@@ -211,11 +219,13 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, s.AllocateNodeCIDRs)
|
||||
nodeController.Run(s.NodeSyncPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
serviceController := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
|
||||
if err := serviceController.Run(s.ServiceSyncPeriod.Duration, s.NodeSyncPeriod.Duration); err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
}
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
if s.AllocateNodeCIDRs {
|
||||
if cloud == nil {
|
||||
@@ -225,6 +235,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
} else {
|
||||
routeController := routecontroller.New(routes, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "route-controller")), s.ClusterName, clusterCIDR)
|
||||
routeController.Run(s.NodeSyncPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
} else {
|
||||
glog.Infof("allocate-node-cidrs set to %v, node controller not creating routes", s.AllocateNodeCIDRs)
|
||||
@@ -249,6 +260,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
GroupKindsToReplenish: groupKindsToReplenish,
|
||||
}
|
||||
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
// If apiserver is not running we should wait for some time and fail only then. This is particularly
|
||||
// important when we start apiserver and controller manager at the same time.
|
||||
@@ -279,6 +291,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
}
|
||||
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
|
||||
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
groupVersion := "extensions/v1beta1"
|
||||
resources, found := resourceMap[groupVersion]
|
||||
@@ -297,30 +310,35 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
)
|
||||
go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient, metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
|
||||
Run(wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "daemonsets") {
|
||||
glog.Infof("Starting daemon set controller")
|
||||
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet).
|
||||
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "jobs") {
|
||||
glog.Infof("Starting job controller")
|
||||
go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)).
|
||||
Run(s.ConcurrentJobSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "deployments") {
|
||||
glog.Infof("Starting deployment controller")
|
||||
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
|
||||
Run(s.ConcurrentDeploymentSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
if containsResource(resources, "replicasets") {
|
||||
glog.Infof("Starting ReplicaSet controller")
|
||||
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, s.LookupCacheSizeForRS).
|
||||
Run(s.ConcurrentRSSyncs, wait.NeverStop)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,6 +350,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
|
||||
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), s.PVClaimBinderSyncPeriod.Duration)
|
||||
pvclaimBinder.Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
|
||||
@@ -344,6 +363,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
||||
}
|
||||
pvRecycler.Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
if provisioner != nil {
|
||||
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-provisioner"))), s.PVClaimBinderSyncPeriod.Duration, s.ClusterName, volumePlugins, provisioner, cloud)
|
||||
@@ -351,6 +371,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
|
||||
}
|
||||
pvController.Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
|
||||
var rootCA []byte
|
||||
@@ -379,6 +400,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
RootCA: rootCA,
|
||||
},
|
||||
).Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,6 +408,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
||||
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-account-controller")),
|
||||
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
|
||||
).Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
select {}
|
||||
}
|
||||
|
@@ -81,9 +81,10 @@ func NewCMServer() *CMServer {
|
||||
IncrementTimeoutHostPath: 30,
|
||||
},
|
||||
},
|
||||
KubeAPIQPS: 20.0,
|
||||
KubeAPIBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
KubeAPIQPS: 20.0,
|
||||
KubeAPIBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
ControllerStartInterval: unversioned.Duration{0 * time.Second},
|
||||
},
|
||||
}
|
||||
return &s
|
||||
@@ -146,5 +147,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.")
|
||||
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
||||
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.DurationVar(&s.ControllerStartInterval.Duration, "controller-start-interval", s.ControllerStartInterval.Duration, "Interval between starting controller managers.")
|
||||
leaderelection.BindFlags(&s.LeaderElection, fs)
|
||||
}
|
||||
|
Reference in New Issue
Block a user