retrofit the scheduler with the leader election client.
Signed-off-by: Mike Danese <mikedanese@google.com>
This commit is contained in:
parent
a47c170377
commit
f71657d9a6
@ -62,6 +62,10 @@ kube-scheduler
|
|||||||
--kube-api-burst=100: Burst to use while talking with kubernetes apiserver
|
--kube-api-burst=100: Burst to use while talking with kubernetes apiserver
|
||||||
--kube-api-qps=50: QPS to use while talking with kubernetes apiserver
|
--kube-api-qps=50: QPS to use while talking with kubernetes apiserver
|
||||||
--kubeconfig="": Path to kubeconfig file with authorization and master location information.
|
--kubeconfig="": Path to kubeconfig file with authorization and master location information.
|
||||||
|
--leader-elect[=false]: Start a leader election client and gain leadership before executing scheduler loop. Enable this when running replicated schedulers.
|
||||||
|
--leader-elect-lease-duration=15s: The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.
|
||||||
|
--leader-elect-renew-deadline=10s: The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.
|
||||||
|
--leader-elect-retry-period=2s: The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.
|
||||||
--log-flush-frequency=5s: Maximum number of seconds between log flushes
|
--log-flush-frequency=5s: Maximum number of seconds between log flushes
|
||||||
--master="": The address of the Kubernetes API server (overrides any value in kubeconfig)
|
--master="": The address of the Kubernetes API server (overrides any value in kubeconfig)
|
||||||
--policy-config-file="": File with scheduler policy configuration
|
--policy-config-file="": File with scheduler policy configuration
|
||||||
@ -70,7 +74,7 @@ kube-scheduler
|
|||||||
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
|
--scheduler-name="default-scheduler": Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'
|
||||||
```
|
```
|
||||||
|
|
||||||
###### Auto generated by spf13/cobra on 14-Dec-2015
|
###### Auto generated by spf13/cobra on 12-Jan-2016
|
||||||
|
|
||||||
|
|
||||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||||
|
@ -356,3 +356,7 @@ www-prefix
|
|||||||
clientset-name
|
clientset-name
|
||||||
clientset-only
|
clientset-only
|
||||||
clientset-path
|
clientset-path
|
||||||
|
leader-elect
|
||||||
|
leader-elect-lease-duration
|
||||||
|
leader-elect-renew-deadline
|
||||||
|
leader-elect-retry-period
|
||||||
|
@ -54,8 +54,6 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
@ -63,12 +61,19 @@ import (
|
|||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"github.com/spf13/pflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
JitterFactor = 1.2
|
JitterFactor = 1.2
|
||||||
|
|
||||||
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
|
LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader"
|
||||||
|
|
||||||
|
DefaultLeaseDuration = 15 * time.Second
|
||||||
|
DefaultRenewDeadline = 10 * time.Second
|
||||||
|
DefaultRetryPeriod = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewLeadereElector creates a LeaderElector from a LeaderElecitionConfig
|
// NewLeadereElector creates a LeaderElector from a LeaderElecitionConfig
|
||||||
@ -173,6 +178,16 @@ func (le *LeaderElector) Run() {
|
|||||||
close(stop)
|
close(stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunOrDie starts a client with the provided config or panics if the config
|
||||||
|
// fails to validate.
|
||||||
|
func RunOrDie(lec LeaderElectionConfig) {
|
||||||
|
le, err := NewLeaderElector(lec)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
le.Run()
|
||||||
|
}
|
||||||
|
|
||||||
// GetLeader returns the identity of the last observed leader or returns the empty string if
|
// GetLeader returns the identity of the last observed leader or returns the empty string if
|
||||||
// no leader has yet been observed.
|
// no leader has yet been observed.
|
||||||
func (le *LeaderElector) GetLeader() string {
|
func (le *LeaderElector) GetLeader() string {
|
||||||
@ -315,3 +330,42 @@ func (l *LeaderElector) maybeReportTransition() {
|
|||||||
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
|
go l.config.Callbacks.OnNewLeader(l.reportedLeader)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DefaultLeaderElectionCLIConfig() LeaderElectionCLIConfig {
|
||||||
|
return LeaderElectionCLIConfig{
|
||||||
|
LeaderElect: false,
|
||||||
|
LeaseDuration: DefaultLeaseDuration,
|
||||||
|
RenewDeadline: DefaultRenewDeadline,
|
||||||
|
RetryPeriod: DefaultRetryPeriod,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LeaderElectionCLIConfig is useful for embedding into component configuration objects
|
||||||
|
// to maintain consistent command line flags.
|
||||||
|
type LeaderElectionCLIConfig struct {
|
||||||
|
LeaderElect bool
|
||||||
|
LeaseDuration time.Duration
|
||||||
|
RenewDeadline time.Duration
|
||||||
|
RetryPeriod time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindFlags binds the common LeaderElectionCLIConfig flags to a flagset
|
||||||
|
func (l *LeaderElectionCLIConfig) BindFlags(fs *pflag.FlagSet) {
|
||||||
|
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
|
||||||
|
"Start a leader election client and gain leadership before "+
|
||||||
|
"executing scheduler loop. Enable this when running replicated "+
|
||||||
|
"schedulers.")
|
||||||
|
fs.DurationVar(&l.LeaseDuration, "leader-elect-lease-duration", l.LeaseDuration, ""+
|
||||||
|
"The duration that non-leader candidates will wait after observing a leadership"+
|
||||||
|
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
|
||||||
|
"slot. This is effectively the maximum duration that a leader can be stopped "+
|
||||||
|
"before it is replaced by another candidate. This is only applicable if leader "+
|
||||||
|
"election is enabled.")
|
||||||
|
fs.DurationVar(&l.RenewDeadline, "leader-elect-renew-deadline", l.RenewDeadline, ""+
|
||||||
|
"The interval between attempts by the acting master to renew a leadership slot "+
|
||||||
|
"before it stops leading. This must be less than or equal to the lease duration. "+
|
||||||
|
"This is only applicable if leader election is enabled.")
|
||||||
|
fs.DurationVar(&l.RetryPeriod, "leader-elect-retry-period", l.RetryPeriod, ""+
|
||||||
|
"The duration the clients should wait between attempting acquisition and renewal "+
|
||||||
|
"of a leadership. This is only applicable if leader election is enabled.")
|
||||||
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||||
"k8s.io/kubernetes/pkg/master/ports"
|
"k8s.io/kubernetes/pkg/master/ports"
|
||||||
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
|
||||||
|
|
||||||
@ -41,6 +42,7 @@ type SchedulerServer struct {
|
|||||||
KubeAPIQPS float32
|
KubeAPIQPS float32
|
||||||
KubeAPIBurst int
|
KubeAPIBurst int
|
||||||
SchedulerName string
|
SchedulerName string
|
||||||
|
LeaderElection leaderelection.LeaderElectionCLIConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
||||||
@ -54,6 +56,7 @@ func NewSchedulerServer() *SchedulerServer {
|
|||||||
KubeAPIQPS: 50.0,
|
KubeAPIQPS: 50.0,
|
||||||
KubeAPIBurst: 100,
|
KubeAPIBurst: 100,
|
||||||
SchedulerName: api.DefaultSchedulerName,
|
SchedulerName: api.DefaultSchedulerName,
|
||||||
|
LeaderElection: leaderelection.DefaultLeaderElectionCLIConfig(),
|
||||||
}
|
}
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
@ -72,4 +75,5 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
||||||
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
||||||
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
|
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
|
||||||
|
s.LeaderElection.BindFlags(fs)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||||
@ -110,9 +111,44 @@ func Run(s *options.SchedulerServer) error {
|
|||||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||||
|
|
||||||
sched := scheduler.New(config)
|
sched := scheduler.New(config)
|
||||||
sched.Run()
|
|
||||||
|
|
||||||
select {}
|
run := func(_ <-chan struct{}) {
|
||||||
|
sched.Run()
|
||||||
|
select {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !s.LeaderElection.LeaderElect {
|
||||||
|
run(nil)
|
||||||
|
glog.Fatal("this statement is unreachable")
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||||
|
EndpointsMeta: api.ObjectMeta{
|
||||||
|
Namespace: "kube-system",
|
||||||
|
Name: "kube-scheduler",
|
||||||
|
},
|
||||||
|
Client: kubeClient,
|
||||||
|
Identity: id,
|
||||||
|
EventRecorder: config.Recorder,
|
||||||
|
LeaseDuration: s.LeaderElection.LeaseDuration,
|
||||||
|
RenewDeadline: s.LeaderElection.RenewDeadline,
|
||||||
|
RetryPeriod: s.LeaderElection.RetryPeriod,
|
||||||
|
Callbacks: leaderelection.LeaderCallbacks{
|
||||||
|
OnStartedLeading: run,
|
||||||
|
OnStoppedLeading: func() {
|
||||||
|
glog.Fatalf("lost master")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
glog.Fatal("this statement is unreachable")
|
||||||
|
panic("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
|
func createConfig(s *options.SchedulerServer, configFactory *factory.ConfigFactory) (*scheduler.Config, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user