For testability & reuse, move scheduler setup into its own package.

This commit is contained in:
Daniel Smith 2014-08-20 14:34:55 -07:00
parent 79f60da6c4
commit 8a9eaf911f
4 changed files with 416 additions and 137 deletions

View File

@ -19,6 +19,8 @@ package util
import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
)
// TestInterface is a simple interface providing Errorf, to make injection for
@ -57,8 +59,15 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
// ValidateRequest verifies that FakeHandler received a request with expected path, method, and body.
func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) {
if f.RequestReceived.URL.Path != expectedPath {
t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectedPath)
expectURL, err := url.Parse(expectedPath)
if err != nil {
t.Errorf("Couldn't parse %v as a URL.", expectedPath)
}
if f.RequestReceived.URL.Path != expectURL.Path {
t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectURL.Path)
}
if e, a := expectURL.Query(), f.RequestReceived.URL.Query(); !reflect.DeepEqual(e, a) {
t.Errorf("Unexpected query for request %#v, received: %q, expected: %q", f.RequestReceived, a, e)
}
if f.RequestReceived.Method != expectedMethod {
t.Errorf("Unexpected method: %q, expected: %q", f.RequestReceived.Method, expectedMethod)

View File

@ -18,80 +18,18 @@ package main
import (
"flag"
"math/rand"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/golang/glog"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
var (
master = flag.String("master", "", "The address of the Kubernetes API server")
)
// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions.
type storeToMinionLister struct {
cache.Store
}
func (s *storeToMinionLister) List() (machines []string, err error) {
for _, m := range s.Store.List() {
machines = append(machines, m.(*api.Minion).ID)
}
return machines, nil
}
// storeToPodLister turns a store into a pod lister. The store must contain (only) pods.
type storeToPodLister struct {
cache.Store
}
func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) {
for _, m := range s.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, *pod)
}
}
return pods, nil
}
// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList
type minionEnumerator struct {
*api.MinionList
}
// Returns the number of items in the pod list.
func (me *minionEnumerator) Len() int {
if me.MinionList == nil {
return 0
}
return len(me.Items)
}
// Returns the item (and ID) with the particular index.
func (me *minionEnumerator) Get(index int) (string, interface{}) {
return me.Items[index].ID, &me.Items[index]
}
type binder struct {
*client.Client
}
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error {
return b.Post().Path("bindings").Body(binding).Do().Error()
}
func main() {
flag.Parse()
util.InitLogs()
@ -99,81 +37,12 @@ func main() {
verflag.PrintAndExitIfRequested()
// This function is long because we inject all the dependencies into scheduler here.
// TODO: security story for plugins!
kubeClient := client.New("http://"+*master, nil)
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) {
// This query will only find pods with no assigned host.
return kubeClient.
Get().
Path("watch").
Path("pods").
SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()).
UintParam("resourceVersion", resourceVersion).
Watch()
}, &api.Pod{}, podQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) {
// This query will only find pods that do have an assigned host.
return kubeClient.
Get().
Path("watch").
Path("pods").
ParseSelectorParam("fields", "DesiredState.Host!=").
UintParam("resourceVersion", resourceVersion).
Watch()
}, &api.Pod{}, podCache).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) {
// This query will only find pods that do have an assigned host.
return kubeClient.
Get().
Path("watch").
Path("minions").
UintParam("resourceVersion", resourceVersion).
Watch()
}, &api.Minion{}, minionCache).Run()
} else {
cache.NewPoller(func() (cache.Enumerator, error) {
// This query will only find pods that do have an assigned host.
list := &api.MinionList{}
err := kubeClient.Get().Path("minions").Do().Into(list)
if err != nil {
return nil, err
}
return &minionEnumerator{list}, nil
}, 10*time.Second, minionCache).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewRandomFitScheduler(
&storeToPodLister{podCache}, r)
s := scheduler.New(&scheduler.Config{
MinionLister: &storeToMinionLister{minionCache},
Algorithm: algo,
Binder: &binder{kubeClient},
NextPod: func() *api.Pod {
return podQueue.Pop().(*api.Pod)
},
Error: func(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err)
podQueue.Add(pod.ID, pod)
},
})
configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()
s := scheduler.New(config)
s.Run()
select {}

View File

@ -0,0 +1,177 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package factory can set up a scheduler. This code is here instead of
// plugin/cmd/scheduler for both testability and reuse.
package factory
import (
"math/rand"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/golang/glog"
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
}
// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create() *scheduler.Config {
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewRandomFitScheduler(
&storeToPodLister{podCache}, r)
return &scheduler.Config{
MinionLister: &storeToMinionLister{minionCache},
Algorithm: algo,
Binder: &binder{factory.Client},
NextPod: func() *api.Pod {
return podQueue.Pop().(*api.Pod)
},
Error: factory.defaultErrorFunc,
}
}
// createUnassignedPodWatch starts a watch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()).
UintParam("resourceVersion", resourceVersion).
Watch()
}
// createUnassignedPodWatch starts a watch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
ParseSelectorParam("fields", "DesiredState.Host!=").
UintParam("resourceVersion", resourceVersion).
Watch()
}
// createMinionWatch starts a watch that gets all changes to minions.
func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("minions").
UintParam("resourceVersion", resourceVersion).
Watch()
}
// pollMinions lists all minions and returns an enumerator for cache.Poller.
func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
list := &api.MinionList{}
err := factory.Client.Get().Path("minions").Do().Into(list)
if err != nil {
return nil, err
}
return &minionEnumerator{list}, nil
}
func (factory *ConfigFactory) defaultErrorFunc(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err)
}
// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions.
type storeToMinionLister struct {
cache.Store
}
func (s *storeToMinionLister) List() (machines []string, err error) {
for _, m := range s.Store.List() {
machines = append(machines, m.(*api.Minion).ID)
}
return machines, nil
}
// storeToPodLister turns a store into a pod lister. The store must contain (only) pods.
type storeToPodLister struct {
cache.Store
}
func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) {
for _, m := range s.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, *pod)
}
}
return pods, nil
}
// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList
type minionEnumerator struct {
*api.MinionList
}
// Returns the number of items in the pod list.
func (me *minionEnumerator) Len() int {
if me.MinionList == nil {
return 0
}
return len(me.Items)
}
// Returns the item (and ID) with the particular index.
func (me *minionEnumerator) Get(index int) (string, interface{}) {
return me.Items[index].ID, &me.Items[index]
}
type binder struct {
*client.Client
}
// Bind just does a POST binding RPC.
func (b *binder) Bind(binding *api.Binding) error {
return b.Post().Path("bindings").Body(binding).Do().Error()
}

View File

@ -0,0 +1,224 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"net/http/httptest"
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestCreate(t *testing.T) {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
factory := ConfigFactory{client.New(server.URL, nil)}
factory.Create()
}
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
table := []struct {
rv uint64
location string
watchFactory func(rv uint64) (watch.Interface, error)
}{
// Minion watch
{
rv: 0,
location: "/api/v1beta1/watch/minions?resourceVersion=0",
watchFactory: factory.createMinionWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/minions?resourceVersion=42",
watchFactory: factory.createMinionWatch,
},
// Assigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
watchFactory: factory.createAssignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
watchFactory: factory.createAssignedPodWatch,
},
// Unassigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
watchFactory: factory.createUnassignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
watchFactory: factory.createUnassignedPodWatch,
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
factory.Client = client.New(server.URL, nil)
// This test merely tests that the correct request is made.
item.watchFactory(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestPollMinions(t *testing.T) {
table := []struct {
minions []api.Minion
}{
{
minions: []api.Minion{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
},
},
}
for _, item := range table {
ml := &api.MinionList{Items: item.minions}
handler := util.FakeHandler{
StatusCode: 200,
ResponseBody: api.EncodeOrDie(ml),
T: t,
}
server := httptest.NewServer(&handler)
cf := ConfigFactory{client.New(server.URL, nil)}
ce, err := cf.pollMinions()
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
handler.ValidateRequest(t, "/api/v1beta1/minions", "GET", nil)
if e, a := len(item.minions), ce.Len(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
func TestStoreToMinionLister(t *testing.T) {
store := cache.NewStore()
ids := util.NewStringSet("foo", "bar", "baz")
for id := range ids {
store.Add(id, &api.Minion{JSONBase: api.JSONBase{ID: id}})
}
sml := storeToMinionLister{store}
got, err := sml.List()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !ids.HasAll(got...) || len(got) != len(ids) {
t.Errorf("Expected %v, got %v", ids, got)
}
}
func TestStoreToPodLister(t *testing.T) {
store := cache.NewStore()
ids := []string{"foo", "bar", "baz"}
for _, id := range ids {
store.Add(id, &api.Pod{
JSONBase: api.JSONBase{ID: id},
Labels: map[string]string{"name": id},
})
}
spl := storeToPodLister{store}
for _, id := range ids {
got, err := spl.ListPods(labels.Set{"name": id}.AsSelector())
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
if e, a := 1, len(got); e != a {
t.Errorf("Expected %v, got %v", e, a)
continue
}
if e, a := id, got[0].ID; e != a {
t.Errorf("Expected %v, got %v", e, a)
continue
}
}
}
func TestMinionEnumerator(t *testing.T) {
testList := &api.MinionList{
Items: []api.Minion{
{JSONBase: api.JSONBase{ID: "foo"}},
{JSONBase: api.JSONBase{ID: "bar"}},
{JSONBase: api.JSONBase{ID: "baz"}},
},
}
me := minionEnumerator{testList}
if e, a := 3, me.Len(); e != a {
t.Fatalf("expected %v, got %v", e, a)
}
for i := range testList.Items {
gotID, gotObj := me.Get(i)
if e, a := testList.Items[i].ID, gotID; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := &testList.Items[i], gotObj; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %v#", e, a)
}
}
}
func TestBind(t *testing.T) {
table := []struct {
binding *api.Binding
}{
{binding: &api.Binding{PodID: "foo", Host: "foohost.kubernetes.mydomain.com"}},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 200,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
b := binder{client.New(server.URL, nil)}
err := b.Bind(item.binding)
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
expectedBody := api.EncodeOrDie(item.binding)
handler.ValidateRequest(t, "/api/v1beta1/bindings", "POST", &expectedBody)
}
}