Implement multi-scheduler:

1. Name default scheduler with name `kube-scheduler`
2. The default scheduler only schedules the pods meeting the following condition:
	- the pod has no annotation "scheduler.alpha.kubernetes.io/name: <scheduler-name>"
	- the pod has annotation "scheduler.alpha.kubernetes.io/name: kube-scheduler"

update gofmt

update according to @david's review

run hack/test-integration.sh, hack/test-go.sh and local e2e.test
This commit is contained in:
HaiyangDING
2015-11-27 17:07:17 +08:00
parent 4b18fa553f
commit d9f3607292
13 changed files with 302 additions and 19 deletions

View File

@@ -58,6 +58,7 @@ type SchedulerServer struct {
BindPodsBurst int
KubeAPIQPS float32
KubeAPIBurst int
SchedulerName string
}
// NewSchedulerServer creates a new SchedulerServer with default parameters
@@ -70,6 +71,7 @@ func NewSchedulerServer() *SchedulerServer {
BindPodsBurst: 100,
KubeAPIQPS: 50.0,
KubeAPIBurst: 100,
SchedulerName: api.DefaultSchedulerName,
}
return &s
}
@@ -107,6 +109,7 @@ func (s *SchedulerServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.BindPodsBurst, "bind-pods-burst", s.BindPodsBurst, "Number of bindings per second scheduler is allowed to make during bursts")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.SchedulerName, "scheduler-name", s.SchedulerName, "Name of the scheduler, used to select which pods will be processed by this scheduler, based on pod's annotation with key 'scheduler.alpha.kubernetes.io/name'")
}
// Run runs the specified SchedulerServer. This should never exit.
@@ -142,14 +145,14 @@ func (s *SchedulerServer) Run(_ []string) error {
glog.Fatal(server.ListenAndServe())
}()
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst))
configFactory := factory.NewConfigFactory(kubeClient, util.NewTokenBucketRateLimiter(s.BindPodsQPS, s.BindPodsBurst), s.SchedulerName)
config, err := s.createConfig(configFactory)
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"})
config.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: s.SchedulerName})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))

View File

@@ -100,7 +100,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
if !reflect.DeepEqual(policy, tc.ExpectedPolicy) {
t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy)
}
_, err = factory.NewConfigFactory(nil, nil).CreateFromConfig(policy)
_, err = factory.NewConfigFactory(nil, nil, "some-scheduler-name").CreateFromConfig(policy)
if err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue

View File

@@ -43,6 +43,10 @@ import (
"github.com/golang/glog"
)
const (
SchedulerAnnotationKey = "scheduler.alpha.kubernetes.io/name"
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
@@ -66,10 +70,15 @@ type ConfigFactory struct {
scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key:
// 'scheduler.alpha.kubernetes.io/name'
SchedulerName string
}
// Initializes the factory.
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *ConfigFactory {
func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter, schedulerName string) *ConfigFactory {
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
@@ -79,6 +88,7 @@ func NewConfigFactory(client *client.Client, rateLimiter util.RateLimiter) *Conf
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
SchedulerName: schedulerName,
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
@@ -228,9 +238,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
Algorithm: algo,
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
pod := f.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("About to try and schedule pod %v", pod.Name)
return pod
return f.getNextPod()
},
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
BindPodsRateLimiter: f.BindPodsRateLimiter,
@@ -238,6 +246,24 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}, nil
}
func (f *ConfigFactory) getNextPod() *api.Pod {
for {
pod := f.PodQueue.Pop().(*api.Pod)
if f.responsibleForPod(pod) {
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
}
}
}
func (f *ConfigFactory) responsibleForPod(pod *api.Pod) bool {
if f.SchedulerName == api.DefaultSchedulerName {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName || pod.Annotations[SchedulerAnnotationKey] == ""
} else {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName
}
}
func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node api.Node) bool {
for _, cond := range node.Status.Conditions {

View File

@@ -45,7 +45,7 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
factory.Create()
}
@@ -63,7 +63,7 @@ func TestCreateFromConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
// Pre-register some predicate and priority functions
RegisterFitPredicate("PredicateOne", PredicateOne)
@@ -105,7 +105,7 @@ func TestCreateFromEmptyConfig(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
factory := NewConfigFactory(client, nil)
factory := NewConfigFactory(client, nil, api.DefaultSchedulerName)
configData = []byte(`{}`)
err := latestschedulerapi.Codec.DecodeInto(configData, &policy)
@@ -148,7 +148,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle(testapi.Default.ResourcePath("pods", "bar", "foo"), &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil)
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()}), nil, api.DefaultSchedulerName)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@@ -302,3 +302,71 @@ func TestBackoff(t *testing.T) {
t.Errorf("expected: 1, got %s", duration.String())
}
}
// TestResponsibleForPod tests if a pod with an annotation that should cause it to
// be picked up by the default scheduler, is in fact picked by the default scheduler
// Two schedulers are made in the test: one is default scheduler and other scheduler
// is of name "foo-scheduler". A pod must be picked up by at most one of the two
// schedulers.
func TestResponsibleForPod(t *testing.T) {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, GroupVersion: testapi.Default.GroupVersion()})
// factory of "default-scheduler"
factoryDefaultScheduler := NewConfigFactory(client, nil, api.DefaultSchedulerName)
// factory of "foo-scheduler"
factoryFooScheduler := NewConfigFactory(client, nil, "foo-scheduler")
// scheduler annotaions to be tested
schedulerAnnotationFitsDefault := map[string]string{"scheduler.alpha.kubernetes.io/name": "default-scheduler"}
schedulerAnnotationFitsFoo := map[string]string{"scheduler.alpha.kubernetes.io/name": "foo-scheduler"}
schedulerAnnotationFitsNone := map[string]string{"scheduler.alpha.kubernetes.io/name": "bar-scheduler"}
tests := []struct {
pod *api.Pod
pickedByDefault bool
pickedByFoo bool
}{
{
// pod with no annotation "scheduler.alpha.kubernetes.io/name=<scheduler-name>" should be
// picked by the default scheduler, NOT by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"}},
pickedByDefault: true,
pickedByFoo: false,
},
{
// pod with annotation "scheduler.alpha.kubernetes.io/name=default-scheduler" should be picked
// by the scheduler of name "default-scheduler", NOT by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsDefault}},
pickedByDefault: true,
pickedByFoo: false,
},
{
// pod with annotataion "scheduler.alpha.kubernetes.io/name=foo-scheduler" should be NOT
// be picked by the scheduler of name "default-scheduler", but by the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsFoo}},
pickedByDefault: false,
pickedByFoo: true,
},
{
// pod with annotataion "scheduler.alpha.kubernetes.io/name=foo-scheduler" should be NOT
// be picked by niether the scheduler of name "default-scheduler" nor the one of name "foo-scheduler"
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar", Annotations: schedulerAnnotationFitsNone}},
pickedByDefault: false,
pickedByFoo: false,
},
}
for _, test := range tests {
podOfDefault := factoryDefaultScheduler.responsibleForPod(test.pod)
podOfFoo := factoryFooScheduler.responsibleForPod(test.pod)
results := []bool{podOfDefault, podOfFoo}
expected := []bool{test.pickedByDefault, test.pickedByFoo}
if !reflect.DeepEqual(results, expected) {
t.Errorf("expected: {%v, %v}, got {%v, %v}", test.pickedByDefault, test.pickedByFoo, podOfDefault, podOfFoo)
}
}
}