From cadc24e9fd7f2bccc972df4d67985aa33a4cd823 Mon Sep 17 00:00:00 2001 From: Ravi Gadde Date: Thu, 3 Sep 2015 23:50:14 -0700 Subject: [PATCH] Scheduler extension --- docs/design/scheduler_extender.md | 117 ++++++ examples/examples_test.go | 5 +- ...scheduler-policy-config-with-extender.json | 25 ++ .../algorithm/priorities/priorities.go | 27 +- .../algorithm/priorities/priorities_test.go | 53 +-- .../priorities/selector_spreading.go | 15 +- .../priorities/selector_spreading_test.go | 51 +-- .../algorithm/scheduler_interface.go | 17 +- plugin/pkg/scheduler/algorithm/types.go | 26 +- plugin/pkg/scheduler/api/types.go | 75 +++- plugin/pkg/scheduler/api/v1/types.go | 73 +++- .../scheduler/api/validation/validation.go | 5 + plugin/pkg/scheduler/extender.go | 172 +++++++++ plugin/pkg/scheduler/extender_test.go | 302 ++++++++++++++++ plugin/pkg/scheduler/factory/factory.go | 19 +- plugin/pkg/scheduler/factory/factory_test.go | 8 +- plugin/pkg/scheduler/generic_scheduler.go | 63 +++- .../pkg/scheduler/generic_scheduler_test.go | 29 +- plugin/pkg/scheduler/scheduler_test.go | 2 + test/integration/extender_test.go | 332 ++++++++++++++++++ 20 files changed, 1278 insertions(+), 138 deletions(-) create mode 100644 docs/design/scheduler_extender.md create mode 100644 examples/scheduler-policy-config-with-extender.json create mode 100644 plugin/pkg/scheduler/extender.go create mode 100644 plugin/pkg/scheduler/extender_test.go create mode 100644 test/integration/extender_test.go diff --git a/docs/design/scheduler_extender.md b/docs/design/scheduler_extender.md new file mode 100644 index 00000000000..0c10de595c5 --- /dev/null +++ b/docs/design/scheduler_extender.md @@ -0,0 +1,117 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest release of this document can be found +[here](http://releases.k8s.io/release-1.1/docs/design/scheduler_extender.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +# Scheduler extender + +There are three ways to add new scheduling rules (predicates and priority functions) to Kubernetes: (1) by adding these rules to the scheduler and recompiling (described here: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/scheduler.md), (2) implementing your own scheduler process that runs instead of, or alongside of, the standard Kubernetes scheduler, (3) implementing a "scheduler extender" process that the standard Kubernetes scheduler calls out to as a final pass when making scheduling decisions. + +This document describes the third approach. This approach is needed for use cases where scheduling decisions need to be made on resources not directly managed by the standard Kubernetes scheduler. The extender helps make scheduling decisions based on such resources. (Note that the three approaches are not mutually exclusive.) + +When scheduling a pod, the extender allows an external process to filter and prioritize nodes. Two separate http/https calls are issued to the extender, one for "filter" and one for "prioritize" actions. To use the extender, you must create a scheduler policy configuration file. The configuration specifies how to reach the extender, whether to use http or https and the timeout. + +```go +// Holds the parameters used to communicate with the extender. If a verb is unspecified/empty, +// it is assumed that the extender chose not to provide that extension. +type ExtenderConfig struct { + // URLPrefix at which the extender is available + URLPrefix string `json:"urlPrefix"` + // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. + FilterVerb string `json:"filterVerb,omitempty"` + // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. + PrioritizeVerb string `json:"prioritizeVerb,omitempty"` + // The numeric multiplier for the node scores that the prioritize call generates. + // The weight should be a positive integer + Weight int `json:"weight,omitempty"` + // EnableHttps specifies whether https should be used to communicate with the extender + EnableHttps bool `json:"enableHttps,omitempty"` + // TLSConfig specifies the transport layer security config + TLSConfig *client.TLSClientConfig `json:"tlsConfig,omitempty"` + // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize + // timeout is ignored, k8s/other extenders priorities are used to select the node. + HTTPTimeout time.Duration `json:"httpTimeout,omitempty"` +} +``` + +A sample scheduler policy file with extender configuration: + +```json +{ + "predicates": [ + { + "name": "HostName" + }, + { + "name": "MatchNodeSelector" + }, + { + "name": "PodFitsResources" + } + ], + "priorities": [ + { + "name": "LeastRequestedPriority", + "weight": 1 + } + ], + "extenders": [ + { + "urlPrefix": "http://127.0.0.1:12345/api/scheduler", + "filterVerb": "filter", + "enableHttps": false + } + ] +} +``` + +Arguments passed to the FilterVerb endpoint on the extender are the set of nodes filtered through the k8s predicates and the pod. Arguments passed to the PrioritizeVerb endpoint on the extender are the set of nodes filtered through the k8s predicates and extender predicates and the pod. + +```go +// ExtenderArgs represents the arguments needed by the extender to filter/prioritize +// nodes for a pod. +type ExtenderArgs struct { + // Pod being scheduled + Pod api.Pod `json:"pod"` + // List of candidate nodes where the pod can be scheduled + Nodes api.NodeList `json:"nodes"` +} +``` + +The "filter" call returns a list of nodes (api.NodeList). The "prioritize" call returns priorities for each node (schedulerapi.HostPriorityList). + +The "filter" call may prune the set of nodes based on its predicates. Scores returned by the "prioritize" call are added to the k8s scores (computed through its priority functions) and used for final host selection. + +Multiple extenders can be configured in the scheduler policy. + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/design/scheduler_extender.md?pixel)]() + diff --git a/examples/examples_test.go b/examples/examples_test.go index c8ae44e8463..0c17137123f 100644 --- a/examples/examples_test.go +++ b/examples/examples_test.go @@ -235,7 +235,8 @@ func TestExampleObjectSchemas(t *testing.T) { "daemon": &extensions.DaemonSet{}, }, "../examples": { - "scheduler-policy-config": &schedulerapi.Policy{}, + "scheduler-policy-config": &schedulerapi.Policy{}, + "scheduler-policy-config-with-extender": &schedulerapi.Policy{}, }, "../examples/rbd/secret": { "ceph-secret": &api.Secret{}, @@ -409,7 +410,7 @@ func TestExampleObjectSchemas(t *testing.T) { t.Logf("skipping : %s/%s\n", path, name) return } - if name == "scheduler-policy-config" { + if strings.Contains(name, "scheduler-policy-config") { if err := schedulerapilatest.Codec.DecodeInto(data, expectedType); err != nil { t.Errorf("%s did not decode correctly: %v\n%s", path, err, string(data)) return diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json new file mode 100644 index 00000000000..68b772f905e --- /dev/null +++ b/examples/scheduler-policy-config-with-extender.json @@ -0,0 +1,25 @@ +{ +"kind" : "Policy", +"apiVersion" : "v1", +"predicates" : [ + {"name" : "PodFitsPorts"}, + {"name" : "PodFitsResources"}, + {"name" : "NoDiskConflict"}, + {"name" : "MatchNodeSelector"}, + {"name" : "HostName"} + ], +"priorities" : [ + {"name" : "LeastRequestedPriority", "weight" : 1}, + {"name" : "BalancedResourceAllocation", "weight" : 1}, + {"name" : "ServiceSpreadingPriority", "weight" : 1}, + {"name" : "EqualPriority", "weight" : 1} + ], +"extender": { + "url": "http://127.0.0.1:12346/scheduler", + "apiVersion": "v1beta1", + "filterVerb": "filter", + "prioritizeVerb": "prioritize", + "weight": 5, + "enableHttps": false + } +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index ef29a694412..59335a6af40 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) // the unused capacity is calculated on a scale of 0-10 @@ -73,7 +74,7 @@ func getNonzeroRequests(requests *api.ResourceList) (int64, int64) { // Calculate the resource occupancy on a node. 'node' has information about the resources on the node. // 'pods' is a list of pods currently scheduled on the node. -func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) algorithm.HostPriority { +func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { totalMilliCPU := int64(0) totalMemory := int64(0) capacityMilliCPU := node.Status.Capacity.Cpu().MilliValue() @@ -104,7 +105,7 @@ func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) al cpuScore, memoryScore, ) - return algorithm.HostPriority{ + return schedulerapi.HostPriority{ Host: node.Name, Score: int((cpuScore + memoryScore) / 2), } @@ -114,14 +115,14 @@ func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) al // It calculates the percentage of memory and CPU requested by pods scheduled on the node, and prioritizes // based on the minimum of the average of the fraction of requested to capacity. // Details: cpu((capacity - sum(requested)) * 10 / capacity) + memory((capacity - sum(requested)) * 10 / capacity) / 2 -func LeastRequestedPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func LeastRequestedPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { - return algorithm.HostPriorityList{}, err + return schedulerapi.HostPriorityList{}, err } podsToMachines, err := predicates.MapPodsToMachines(podLister) - list := algorithm.HostPriorityList{} + list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { list = append(list, calculateResourceOccupancy(pod, node, podsToMachines[node.Name])) } @@ -144,7 +145,7 @@ func NewNodeLabelPriority(label string, presence bool) algorithm.PriorityFunctio // CalculateNodeLabelPriority checks whether a particular label exists on a node or not, regardless of its value. // If presence is true, prioritizes nodes that have the specified label, regardless of value. // If presence is false, prioritizes nodes that do not have the specified label. -func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var score int nodes, err := nodeLister.List() if err != nil { @@ -157,7 +158,7 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podListe labeledNodes[node.Name] = (exists && n.presence) || (!exists && !n.presence) } - result := []algorithm.HostPriority{} + result := []schedulerapi.HostPriority{} //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for nodeName, success := range labeledNodes { @@ -166,7 +167,7 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podListe } else { score = 0 } - result = append(result, algorithm.HostPriority{Host: nodeName, Score: score}) + result = append(result, schedulerapi.HostPriority{Host: nodeName, Score: score}) } return result, nil } @@ -177,21 +178,21 @@ func (n *NodeLabelPrioritizer) CalculateNodeLabelPriority(pod *api.Pod, podListe // close the two metrics are to each other. // Detail: score = 10 - abs(cpuFraction-memoryFraction)*10. The algorithm is partly inspired by: // "Wei Huang et al. An Energy Efficient Virtual Machine Placement Algorithm with Balanced Resource Utilization" -func BalancedResourceAllocation(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func BalancedResourceAllocation(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { - return algorithm.HostPriorityList{}, err + return schedulerapi.HostPriorityList{}, err } podsToMachines, err := predicates.MapPodsToMachines(podLister) - list := algorithm.HostPriorityList{} + list := schedulerapi.HostPriorityList{} for _, node := range nodes.Items { list = append(list, calculateBalancedResourceAllocation(pod, node, podsToMachines[node.Name])) } return list, nil } -func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) algorithm.HostPriority { +func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority { totalMilliCPU := int64(0) totalMemory := int64(0) score := int(0) @@ -234,7 +235,7 @@ func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*ap score, ) - return algorithm.HostPriority{ + return schedulerapi.HostPriority{ Host: node.Name, Score: score, } diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 312b950496d..823e516dcf0 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) func makeNode(node string, milliCPU, memory int64) api.Node { @@ -133,7 +134,7 @@ func TestZeroRequest(t *testing.T) { // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, - algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})) + algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -217,7 +218,7 @@ func TestLeastRequested(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []api.Node - expectedList algorithm.HostPriorityList + expectedList schedulerapi.HostPriorityList test string }{ { @@ -234,7 +235,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled, nothing requested", }, { @@ -251,7 +252,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 6000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 3}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 3}, {"machine2", 5}}, test: "nothing scheduled, resources requested, differently sized machines", }, { @@ -268,7 +269,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no resources requested, pods scheduled", pods: []*api.Pod{ {Spec: machine1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, @@ -291,7 +292,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 20000)}, - expectedList: []algorithm.HostPriority{{"machine1", 7}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 7}, {"machine2", 5}}, test: "no resources requested, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly, ObjectMeta: api.ObjectMeta{Labels: labels2}}, @@ -314,7 +315,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 20000)}, - expectedList: []algorithm.HostPriority{{"machine1", 5}, {"machine2", 4}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 5}, {"machine2", 4}}, test: "resources requested, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -335,7 +336,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 50000)}, - expectedList: []algorithm.HostPriority{{"machine1", 5}, {"machine2", 6}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 5}, {"machine2", 6}}, test: "resources requested, pods scheduled with resources, differently sized machines", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -356,7 +357,7 @@ func TestLeastRequested(t *testing.T) { */ pod: &api.Pod{Spec: cpuOnly}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 5}, {"machine2", 2}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 5}, {"machine2", 2}}, test: "requested resources exceed node capacity", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -366,7 +367,7 @@ func TestLeastRequested(t *testing.T) { { pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 0, 0), makeNode("machine2", 0, 0)}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "zero node resources, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -394,7 +395,7 @@ func TestNewNodeLabelPriority(t *testing.T) { nodes []api.Node label string presence bool - expectedList algorithm.HostPriorityList + expectedList schedulerapi.HostPriorityList test string }{ { @@ -403,7 +404,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}, {"machine3", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}, {"machine3", 0}}, label: "baz", presence: true, test: "no match found, presence true", @@ -414,7 +415,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}, {"machine3", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}, {"machine3", 10}}, label: "baz", presence: false, test: "no match found, presence false", @@ -425,7 +426,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, label: "foo", presence: true, test: "one match found, presence true", @@ -436,7 +437,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, label: "foo", presence: false, test: "one match found, presence false", @@ -447,7 +448,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 10}, {"machine3", 10}}, label: "bar", presence: true, test: "two matches found, presence true", @@ -458,7 +459,7 @@ func TestNewNodeLabelPriority(t *testing.T) { {ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: label2}}, {ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: label3}}, }, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}, {"machine3", 0}}, label: "bar", presence: false, test: "two matches found, presence false", @@ -549,7 +550,7 @@ func TestBalancedResourceAllocation(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []api.Node - expectedList algorithm.HostPriorityList + expectedList schedulerapi.HostPriorityList test string }{ { @@ -566,7 +567,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled, nothing requested", }, { @@ -583,7 +584,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 6000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 7}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 7}, {"machine2", 10}}, test: "nothing scheduled, resources requested, differently sized machines", }, { @@ -600,7 +601,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no resources requested, pods scheduled", pods: []*api.Pod{ {Spec: machine1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, @@ -623,7 +624,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 20000)}, - expectedList: []algorithm.HostPriority{{"machine1", 4}, {"machine2", 6}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 4}, {"machine2", 6}}, test: "no resources requested, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly, ObjectMeta: api.ObjectMeta{Labels: labels2}}, @@ -646,7 +647,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 20000)}, - expectedList: []algorithm.HostPriority{{"machine1", 6}, {"machine2", 9}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 6}, {"machine2", 9}}, test: "resources requested, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -667,7 +668,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: cpuAndMemory}, nodes: []api.Node{makeNode("machine1", 10000, 20000), makeNode("machine2", 10000, 50000)}, - expectedList: []algorithm.HostPriority{{"machine1", 6}, {"machine2", 6}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 6}, {"machine2", 6}}, test: "resources requested, pods scheduled with resources, differently sized machines", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -688,7 +689,7 @@ func TestBalancedResourceAllocation(t *testing.T) { */ pod: &api.Pod{Spec: cpuOnly}, nodes: []api.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 10000)}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "requested resources exceed node capacity", pods: []*api.Pod{ {Spec: cpuOnly}, @@ -698,7 +699,7 @@ func TestBalancedResourceAllocation(t *testing.T) { { pod: &api.Pod{Spec: noResources}, nodes: []api.Node{makeNode("machine1", 0, 0), makeNode("machine2", 0, 0)}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "zero node resources, pods scheduled with resources", pods: []*api.Pod{ {Spec: cpuOnly}, diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index c5a2a5aa532..fcc66eb0f48 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) type SelectorSpread struct { @@ -39,7 +40,7 @@ func NewSelectorSpreadPriority(serviceLister algorithm.ServiceLister, controller // CalculateSpreadPriority spreads pods by minimizing the number of pods belonging to the same service or replication controller. It counts number of pods that run under // Services or RCs as the pod being scheduled and tries to minimize the number of conflicts. I.e. pushes scheduler towards a Node where there's a smallest number of // pods which match the same selectors of Services and RCs as current pod. -func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var maxCount int var nsPods []*api.Pod @@ -95,7 +96,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorit } } - result := []algorithm.HostPriority{} + result := []schedulerapi.HostPriority{} //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for _, node := range nodes.Items { @@ -104,7 +105,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, podLister algorit if maxCount > 0 { fScore = 10 * (float32(maxCount-counts[node.Name]) / float32(maxCount)) } - result = append(result, algorithm.HostPriority{Host: node.Name, Score: int(fScore)}) + result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)}) glog.V(10).Infof( "%v -> %v: SelectorSpreadPriority, Score: (%d)", pod.Name, node.Name, int(fScore), ) @@ -128,7 +129,7 @@ func NewServiceAntiAffinityPriority(serviceLister algorithm.ServiceLister, label // CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service // on machines with the same value for a particular label. // The label to be considered is provided to the struct (ServiceAntiAffinity). -func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var nsServicePods []*api.Pod services, err := s.serviceLister.GetPodServices(pod) @@ -175,7 +176,7 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, podLis } numServicePods := len(nsServicePods) - result := []algorithm.HostPriority{} + result := []schedulerapi.HostPriority{} //score int - scale of 0-10 // 0 being the lowest priority and 10 being the highest for node := range labeledNodes { @@ -184,11 +185,11 @@ func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, podLis if numServicePods > 0 { fScore = 10 * (float32(numServicePods-podCounts[labeledNodes[node]]) / float32(numServicePods)) } - result = append(result, algorithm.HostPriority{Host: node, Score: int(fScore)}) + result = append(result, schedulerapi.HostPriority{Host: node, Score: int(fScore)}) } // add the open nodes with a score of 0 for _, node := range otherNodes { - result = append(result, algorithm.HostPriority{Host: node, Score: 0}) + result = append(result, schedulerapi.HostPriority{Host: node, Score: 0}) } return result, nil diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 0d45d4a178b..93084308554 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) func TestSelectorSpreadPriority(t *testing.T) { @@ -46,20 +47,20 @@ func TestSelectorSpreadPriority(t *testing.T) { nodes []string rcs []api.ReplicationController services []api.Service - expectedList algorithm.HostPriorityList + expectedList schedulerapi.HostPriorityList test string }{ { pod: new(api.Pod), nodes: []string{"machine1", "machine2"}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "nothing scheduled", }, { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{{Spec: zone1Spec}}, nodes: []string{"machine1", "machine2"}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "no services", }, { @@ -67,7 +68,7 @@ func TestSelectorSpreadPriority(t *testing.T) { pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 10}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 10}}, test: "different services", }, { @@ -78,7 +79,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "two pods, one service pod", }, { @@ -92,7 +93,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "five pods, one service pod in no namespace", }, { @@ -105,7 +106,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "four pods, one service pod in default namespace", }, { @@ -119,7 +120,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}}, - expectedList: []algorithm.HostPriority{{"machine1", 10}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 10}, {"machine2", 0}}, test: "five pods, one service pod in specific namespace", }, { @@ -131,7 +132,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "three pods, two service pods on different machines", }, { @@ -144,7 +145,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine1", 5}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 5}, {"machine2", 0}}, test: "four pods, three service pods", }, { @@ -156,7 +157,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "service with partial pod label matches", }, { @@ -171,7 +172,7 @@ func TestSelectorSpreadPriority(t *testing.T) { rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to // do spreading between all pods. The result should be exactly as above. - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "service with partial pod label matches with service and replication controller", }, { @@ -185,7 +186,7 @@ func TestSelectorSpreadPriority(t *testing.T) { services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above. - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "disjoined service and replication controller should be treated equally", }, { @@ -198,7 +199,7 @@ func TestSelectorSpreadPriority(t *testing.T) { nodes: []string{"machine1", "machine2"}, rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // Both Nodes have one pod from the given RC, hence both get 0 score. - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 0}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "Replication controller with partial pod label matches", }, { @@ -210,7 +211,7 @@ func TestSelectorSpreadPriority(t *testing.T) { }, nodes: []string{"machine1", "machine2"}, rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, - expectedList: []algorithm.HostPriority{{"machine1", 0}, {"machine2", 5}}, + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "Replication controller with partial pod label matches", }, } @@ -264,13 +265,13 @@ func TestZoneSpreadPriority(t *testing.T) { pods []*api.Pod nodes map[string]map[string]string services []api.Service - expectedList algorithm.HostPriorityList + expectedList schedulerapi.HostPriorityList test string }{ { pod: new(api.Pod), nodes: labeledNodes, - expectedList: []algorithm.HostPriority{{"machine11", 10}, {"machine12", 10}, + expectedList: []schedulerapi.HostPriority{{"machine11", 10}, {"machine12", 10}, {"machine21", 10}, {"machine22", 10}, {"machine01", 0}, {"machine02", 0}}, test: "nothing scheduled", @@ -279,7 +280,7 @@ func TestZoneSpreadPriority(t *testing.T) { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{{Spec: zone1Spec}}, nodes: labeledNodes, - expectedList: []algorithm.HostPriority{{"machine11", 10}, {"machine12", 10}, + expectedList: []schedulerapi.HostPriority{{"machine11", 10}, {"machine12", 10}, {"machine21", 10}, {"machine22", 10}, {"machine01", 0}, {"machine02", 0}}, test: "no services", @@ -289,7 +290,7 @@ func TestZoneSpreadPriority(t *testing.T) { pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}}, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}}, - expectedList: []algorithm.HostPriority{{"machine11", 10}, {"machine12", 10}, + expectedList: []schedulerapi.HostPriority{{"machine11", 10}, {"machine12", 10}, {"machine21", 10}, {"machine22", 10}, {"machine01", 0}, {"machine02", 0}}, test: "different services", @@ -303,7 +304,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine11", 10}, {"machine12", 10}, + expectedList: []schedulerapi.HostPriority{{"machine11", 10}, {"machine12", 10}, {"machine21", 0}, {"machine22", 0}, {"machine01", 0}, {"machine02", 0}}, test: "three pods, one service pod", @@ -317,7 +318,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine11", 5}, {"machine12", 5}, + expectedList: []schedulerapi.HostPriority{{"machine11", 5}, {"machine12", 5}, {"machine21", 5}, {"machine22", 5}, {"machine01", 0}, {"machine02", 0}}, test: "three pods, two service pods on different machines", @@ -332,7 +333,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}}, - expectedList: []algorithm.HostPriority{{"machine11", 0}, {"machine12", 0}, + expectedList: []schedulerapi.HostPriority{{"machine11", 0}, {"machine12", 0}, {"machine21", 10}, {"machine22", 10}, {"machine01", 0}, {"machine02", 0}}, test: "three service label match pods in different namespaces", @@ -347,7 +348,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine11", 6}, {"machine12", 6}, + expectedList: []schedulerapi.HostPriority{{"machine11", 6}, {"machine12", 6}, {"machine21", 3}, {"machine22", 3}, {"machine01", 0}, {"machine02", 0}}, test: "four pods, three service pods", @@ -361,7 +362,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, - expectedList: []algorithm.HostPriority{{"machine11", 3}, {"machine12", 3}, + expectedList: []schedulerapi.HostPriority{{"machine11", 3}, {"machine12", 3}, {"machine21", 6}, {"machine22", 6}, {"machine01", 0}, {"machine02", 0}}, test: "service with partial pod label matches", @@ -376,7 +377,7 @@ func TestZoneSpreadPriority(t *testing.T) { }, nodes: labeledNodes, services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}}, - expectedList: []algorithm.HostPriority{{"machine11", 7}, {"machine12", 7}, + expectedList: []schedulerapi.HostPriority{{"machine11", 7}, {"machine12", 7}, {"machine21", 5}, {"machine22", 5}, {"machine01", 0}, {"machine02", 0}}, test: "service pod on non-zoned node", diff --git a/plugin/pkg/scheduler/algorithm/scheduler_interface.go b/plugin/pkg/scheduler/algorithm/scheduler_interface.go index 4b50600c278..b47f50e23f7 100644 --- a/plugin/pkg/scheduler/algorithm/scheduler_interface.go +++ b/plugin/pkg/scheduler/algorithm/scheduler_interface.go @@ -18,9 +18,24 @@ package algorithm import ( "k8s.io/kubernetes/pkg/api" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) -// Scheduler is an interface implemented by things that know how to schedule pods +// SchedulerExtender is an interface for external processes to influence scheduling +// decisions made by Kubernetes. This is typically needed for resources not directly +// managed by Kubernetes. +type SchedulerExtender interface { + // Filter based on extender-implemented predicate functions. The filtered list is + // expected to be a subset of the supplied list. + Filter(pod *api.Pod, nodes *api.NodeList) (filteredNodes *api.NodeList, err error) + + // Prioritize based on extender-implemented priority functions. The returned scores & weight + // are used to compute the weighted score for an extender. The weighted scores are added to + // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection. + Prioritize(pod *api.Pod, nodes *api.NodeList) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) +} + +// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. type ScheduleAlgorithm interface { Schedule(*api.Pod, NodeLister) (selectedMachine string, err error) diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index 1f340e25504..bca143c79d7 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -18,35 +18,13 @@ package algorithm import ( "k8s.io/kubernetes/pkg/api" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) // FitPredicate is a function that indicates if a pod fits into an existing node. type FitPredicate func(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) -// HostPriority represents the priority of scheduling to a particular host, lower priority is better. -type HostPriority struct { - Host string - Score int -} - -type HostPriorityList []HostPriority - -func (h HostPriorityList) Len() int { - return len(h) -} - -func (h HostPriorityList) Less(i, j int) bool { - if h[i].Score == h[j].Score { - return h[i].Host < h[j].Host - } - return h[i].Score < h[j].Score -} - -func (h HostPriorityList) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -type PriorityFunction func(pod *api.Pod, podLister PodLister, nodeLister NodeLister) (HostPriorityList, error) +type PriorityFunction func(pod *api.Pod, podLister PodLister, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) type PriorityConfig struct { Function PriorityFunction diff --git a/plugin/pkg/scheduler/api/types.go b/plugin/pkg/scheduler/api/types.go index 93bb557a7cf..36ca11029ae 100644 --- a/plugin/pkg/scheduler/api/types.go +++ b/plugin/pkg/scheduler/api/types.go @@ -16,7 +16,13 @@ limitations under the License. package api -import "k8s.io/kubernetes/pkg/api/unversioned" +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + client "k8s.io/kubernetes/pkg/client/unversioned" +) type Policy struct { unversioned.TypeMeta `json:",inline"` @@ -24,6 +30,8 @@ type Policy struct { Predicates []PredicatePolicy `json:"predicates"` // Holds the information to configure the priority functions Priorities []PriorityPolicy `json:"priorities"` + // Holds the information to communicate with the extender(s) + ExtenderConfigs []ExtenderConfig `json:"extenders"` } type PredicatePolicy struct { @@ -41,7 +49,7 @@ type PriorityPolicy struct { // For the Kubernetes provided priority functions, the name is the identifier of the pre-defined priority function Name string `json:"name"` // The numeric multiplier for the node scores that the priority function generates - // The weight should be non-zero and can be a positive or a negative integer + // The weight should be a positive integer Weight int `json:"weight"` // Holds the parameters to configure the given priority function Argument *PriorityArgument `json:"argument"` @@ -100,3 +108,66 @@ type LabelPreference struct { // If false, higher priority is given to nodes that do not have the label Presence bool `json:"presence"` } + +// Holds the parameters used to communicate with the extender. If a verb is unspecified/empty, +// it is assumed that the extender chose not to provide that extension. +type ExtenderConfig struct { + // URLPrefix at which the extender is available + URLPrefix string `json:"urlPrefix"` + // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. + FilterVerb string `json:"filterVerb,omitempty"` + // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. + PrioritizeVerb string `json:"prioritizeVerb,omitempty"` + // The numeric multiplier for the node scores that the prioritize call generates. + // The weight should be a positive integer + Weight int `json:"weight,omitempty"` + // EnableHttps specifies whether https should be used to communicate with the extender + EnableHttps bool `json:"enableHttps,omitempty"` + // TLSConfig specifies the transport layer security config + TLSConfig *client.TLSClientConfig `json:"tlsConfig,omitempty"` + // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize + // timeout is ignored, k8s/other extenders priorities are used to select the node. + HTTPTimeout time.Duration `json:"httpTimeout,omitempty"` +} + +// ExtenderArgs represents the arguments needed by the extender to filter/prioritize +// nodes for a pod. +type ExtenderArgs struct { + // Pod being scheduled + Pod api.Pod `json:"pod"` + // List of candidate nodes where the pod can be scheduled + Nodes api.NodeList `json:"nodes"` +} + +// ExtenderFilterResult represents the results of a filter call to an extender +type ExtenderFilterResult struct { + // Filtered set of nodes where the pod can be scheduled + Nodes api.NodeList `json:"nodes,omitempty"` + // Error message indicating failure + Error string `json:"error,omitempty"` +} + +// HostPriority represents the priority of scheduling to a particular host, higher priority is better. +type HostPriority struct { + // Name of the host + Host string `json:"host"` + // Score associated with the host + Score int `json:"score"` +} + +type HostPriorityList []HostPriority + +func (h HostPriorityList) Len() int { + return len(h) +} + +func (h HostPriorityList) Less(i, j int) bool { + if h[i].Score == h[j].Score { + return h[i].Host < h[j].Host + } + return h[i].Score < h[j].Score +} + +func (h HostPriorityList) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} diff --git a/plugin/pkg/scheduler/api/v1/types.go b/plugin/pkg/scheduler/api/v1/types.go index b7c0c395f80..ec67a90c8ee 100644 --- a/plugin/pkg/scheduler/api/v1/types.go +++ b/plugin/pkg/scheduler/api/v1/types.go @@ -16,7 +16,13 @@ limitations under the License. package v1 -import "k8s.io/kubernetes/pkg/api/unversioned" +import ( + "time" + + "k8s.io/kubernetes/pkg/api/unversioned" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + client "k8s.io/kubernetes/pkg/client/unversioned" +) type Policy struct { unversioned.TypeMeta `json:",inline"` @@ -24,6 +30,8 @@ type Policy struct { Predicates []PredicatePolicy `json:"predicates"` // Holds the information to configure the priority functions Priorities []PriorityPolicy `json:"priorities"` + // Holds the information to communicate with the extender(s) + ExtenderConfigs []ExtenderConfig `json:"extenders"` } type PredicatePolicy struct { @@ -100,3 +108,66 @@ type LabelPreference struct { // If false, higher priority is given to nodes that do not have the label Presence bool `json:"presence"` } + +// Holds the parameters used to communicate with the extender. If a verb is unspecified/empty, +// it is assumed that the extender chose not to provide that extension. +type ExtenderConfig struct { + // URLPrefix at which the extender is available + URLPrefix string `json:"urlPrefix"` + // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. + FilterVerb string `json:"filterVerb,omitempty"` + // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. + PrioritizeVerb string `json:"prioritizeVerb,omitempty"` + // The numeric multiplier for the node scores that the prioritize call generates. + // The weight should be a positive integer + Weight int `json:"weight,omitempty"` + // EnableHttps specifies whether https should be used to communicate with the extender + EnableHttps bool `json:"enableHttps,omitempty"` + // TLSConfig specifies the transport layer security config + TLSConfig *client.TLSClientConfig `json:"tlsConfig,omitempty"` + // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize + // timeout is ignored, k8s/other extenders priorities are used to select the node. + HTTPTimeout time.Duration `json:"httpTimeout,omitempty"` +} + +// ExtenderArgs represents the arguments needed by the extender to filter/prioritize +// nodes for a pod. +type ExtenderArgs struct { + // Pod being scheduled + Pod apiv1.Pod `json:"pod"` + // List of candidate nodes where the pod can be scheduled + Nodes apiv1.NodeList `json:"nodes"` +} + +// ExtenderFilterResult represents the results of a filter call to an extender +type ExtenderFilterResult struct { + // Filtered set of nodes where the pod can be scheduled + Nodes apiv1.NodeList `json:"nodes,omitempty"` + // Error message indicating failure + Error string `json:"error,omitempty"` +} + +// HostPriority represents the priority of scheduling to a particular host, higher priority is better. +type HostPriority struct { + // Name of the host + Host string `json:"host"` + // Score associated with the host + Score int `json:"score"` +} + +type HostPriorityList []HostPriority + +func (h HostPriorityList) Len() int { + return len(h) +} + +func (h HostPriorityList) Less(i, j int) bool { + if h[i].Score == h[j].Score { + return h[i].Host < h[j].Host + } + return h[i].Score < h[j].Score +} + +func (h HostPriorityList) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} diff --git a/plugin/pkg/scheduler/api/validation/validation.go b/plugin/pkg/scheduler/api/validation/validation.go index 26c46714f82..0ca60c6128e 100644 --- a/plugin/pkg/scheduler/api/validation/validation.go +++ b/plugin/pkg/scheduler/api/validation/validation.go @@ -34,5 +34,10 @@ func ValidatePolicy(policy schedulerapi.Policy) error { } } + for _, extender := range policy.ExtenderConfigs { + if extender.Weight < 0 { + validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a non negative weight applied to it", extender.URLPrefix)) + } + } return utilerrors.NewAggregate(validationErrors) } diff --git a/plugin/pkg/scheduler/extender.go b/plugin/pkg/scheduler/extender.go new file mode 100644 index 00000000000..fafa5b18299 --- /dev/null +++ b/plugin/pkg/scheduler/extender.go @@ -0,0 +1,172 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "k8s.io/kubernetes/pkg/api" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" +) + +const ( + DefaultExtenderTimeout = 5 * time.Second +) + +// HTTPExtender implements the algorithm.SchedulerExtender interface. +type HTTPExtender struct { + extenderURL string + filterVerb string + prioritizeVerb string + weight int + apiVersion string + client *http.Client +} + +func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) { + var cfg client.Config + if config.TLSConfig != nil { + cfg.TLSClientConfig = *config.TLSConfig + } + if config.EnableHttps { + hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0 + if !hasCA { + cfg.Insecure = true + } + } + tlsConfig, err := client.TLSConfigFor(&cfg) + if err != nil { + return nil, err + } + if tlsConfig != nil { + return &http.Transport{ + TLSClientConfig: tlsConfig, + }, nil + } + return http.DefaultTransport, nil +} + +func NewHTTPExtender(config *schedulerapi.ExtenderConfig, apiVersion string) (algorithm.SchedulerExtender, error) { + if config.HTTPTimeout.Nanoseconds() == 0 { + config.HTTPTimeout = time.Duration(DefaultExtenderTimeout) + } + + transport, err := makeTransport(config) + if err != nil { + return nil, err + } + client := &http.Client{ + Transport: transport, + Timeout: config.HTTPTimeout, + } + return &HTTPExtender{ + extenderURL: config.URLPrefix, + apiVersion: apiVersion, + filterVerb: config.FilterVerb, + prioritizeVerb: config.PrioritizeVerb, + weight: config.Weight, + client: client, + }, nil +} + +// Filter based on extender implemented predicate functions. The filtered list is +// expected to be a subset of the supplied list. +func (h *HTTPExtender) Filter(pod *api.Pod, nodes *api.NodeList) (*api.NodeList, error) { + var result schedulerapi.ExtenderFilterResult + + if h.filterVerb == "" { + return nodes, nil + } + + args := schedulerapi.ExtenderArgs{ + Pod: *pod, + Nodes: *nodes, + } + + if err := h.send(h.filterVerb, &args, &result); err != nil { + return nil, err + } + if result.Error != "" { + return nil, fmt.Errorf(result.Error) + } + return &result.Nodes, nil +} + +// Prioritize based on extender implemented priority functions. Weight*priority is added +// up for each such priority function. The returned score is added to the score computed +// by Kubernetes scheduler. The total score is used to do the host selection. +func (h *HTTPExtender) Prioritize(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, int, error) { + var result schedulerapi.HostPriorityList + + if h.prioritizeVerb == "" { + result := schedulerapi.HostPriorityList{} + for _, node := range nodes.Items { + result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0}) + } + return &result, 0, nil + } + + args := schedulerapi.ExtenderArgs{ + Pod: *pod, + Nodes: *nodes, + } + + if err := h.send(h.prioritizeVerb, &args, &result); err != nil { + return nil, 0, err + } + return &result, h.weight, nil +} + +// Helper function to send messages to the extender +func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error { + out, err := json.Marshal(args) + if err != nil { + return err + } + + url := h.extenderURL + "/" + h.apiVersion + "/" + action + + req, err := http.NewRequest("POST", url, bytes.NewReader(out)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := h.client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if err := json.Unmarshal(body, result); err != nil { + return err + } + return nil +} diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go new file mode 100644 index 00000000000..dd182395a84 --- /dev/null +++ b/plugin/pkg/scheduler/extender_test.go @@ -0,0 +1,302 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "math/rand" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" +) + +type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) +type priorityFunc func(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) + +type priorityConfig struct { + function priorityFunc + weight int +} + +func errorPredicateExtender(pod *api.Pod, node *api.Node) (bool, error) { + return false, fmt.Errorf("Some error") +} + +func falsePredicateExtender(pod *api.Pod, node *api.Node) (bool, error) { + return false, nil +} + +func truePredicateExtender(pod *api.Pod, node *api.Node) (bool, error) { + return true, nil +} + +func machine1PredicateExtender(pod *api.Pod, node *api.Node) (bool, error) { + if node.Name == "machine1" { + return true, nil + } + return false, nil +} + +func machine2PredicateExtender(pod *api.Pod, node *api.Node) (bool, error) { + if node.Name == "machine2" { + return true, nil + } + return false, nil +} + +func errorPrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + return &schedulerapi.HostPriorityList{}, fmt.Errorf("Some error") +} + +func machine1PrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} + for _, node := range nodes.Items { + score := 1 + if node.Name == "machine1" { + score = 10 + } + result = append(result, schedulerapi.HostPriority{node.Name, score}) + } + return &result, nil +} + +func machine2PrioritizerExtender(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} + for _, node := range nodes.Items { + score := 1 + if node.Name == "machine2" { + score = 10 + } + result = append(result, schedulerapi.HostPriority{node.Name, score}) + } + return &result, nil +} + +func machine2Prioritizer(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { + nodes, err := nodeLister.List() + if err != nil { + return []schedulerapi.HostPriority{}, err + } + + result := []schedulerapi.HostPriority{} + for _, node := range nodes.Items { + score := 1 + if node.Name == "machine2" { + score = 10 + } + result = append(result, schedulerapi.HostPriority{node.Name, score}) + } + return result, nil +} + +type FakeExtender struct { + predicates []fitPredicate + prioritizers []priorityConfig + weight int +} + +func (f *FakeExtender) Filter(pod *api.Pod, nodes *api.NodeList) (*api.NodeList, error) { + filtered := []api.Node{} + for _, node := range nodes.Items { + fits := true + for _, predicate := range f.predicates { + fit, err := predicate(pod, &node) + if err != nil { + return &api.NodeList{}, err + } + if !fit { + fits = false + break + } + } + if fits { + filtered = append(filtered, node) + } + } + return &api.NodeList{Items: filtered}, nil +} + +func (f *FakeExtender) Prioritize(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, int, error) { + result := schedulerapi.HostPriorityList{} + combinedScores := map[string]int{} + for _, prioritizer := range f.prioritizers { + weight := prioritizer.weight + if weight == 0 { + continue + } + priorityFunc := prioritizer.function + prioritizedList, err := priorityFunc(pod, nodes) + if err != nil { + return &schedulerapi.HostPriorityList{}, 0, err + } + for _, hostEntry := range *prioritizedList { + combinedScores[hostEntry.Host] += hostEntry.Score * weight + } + } + for host, score := range combinedScores { + result = append(result, schedulerapi.HostPriority{Host: host, Score: score}) + } + return &result, f.weight, nil +} + +func TestGenericSchedulerWithExtenders(t *testing.T) { + tests := []struct { + name string + predicates map[string]algorithm.FitPredicate + prioritizers []algorithm.PriorityConfig + extenders []FakeExtender + extenderPredicates []fitPredicate + extenderPrioritizers []priorityConfig + nodes []string + pod *api.Pod + pods []*api.Pod + expectedHost string + expectsErr bool + }{ + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + }, + { + predicates: []fitPredicate{errorPredicateExtender}, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, + name: "test 1", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + }, + { + predicates: []fitPredicate{falsePredicateExtender}, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, + name: "test 2", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + }, + { + predicates: []fitPredicate{machine1PredicateExtender}, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectedHost: "machine1", + name: "test 3", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{machine2PredicateExtender}, + }, + { + predicates: []fitPredicate{machine1PredicateExtender}, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectsErr: true, + name: "test 4", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}}, + weight: 1, + }, + }, + nodes: []string{"machine1"}, + expectedHost: "machine1", + name: "test 5", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{EqualPriority, 1}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}}, + weight: 1, + }, + { + predicates: []fitPredicate{truePredicateExtender}, + prioritizers: []priorityConfig{{machine2PrioritizerExtender, 10}}, + weight: 5, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectedHost: "machine2", + name: "test 6", + }, + { + predicates: map[string]algorithm.FitPredicate{"true": truePredicate}, + prioritizers: []algorithm.PriorityConfig{{machine2Prioritizer, 20}}, + extenders: []FakeExtender{ + { + predicates: []fitPredicate{truePredicateExtender}, + prioritizers: []priorityConfig{{machine1PrioritizerExtender, 10}}, + weight: 1, + }, + }, + nodes: []string{"machine1", "machine2"}, + expectedHost: "machine2", // machine2 has higher score + name: "test 7", + }, + } + + for _, test := range tests { + random := rand.New(rand.NewSource(0)) + extenders := []algorithm.SchedulerExtender{} + for ii := range test.extenders { + extenders = append(extenders, &test.extenders[ii]) + } + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random) + machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) + if test.expectsErr { + if err == nil { + t.Errorf("Unexpected non-error for %s, machine %s", test.name, machine) + } + } else { + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if test.expectedHost != machine { + t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHost, machine) + } + } + } +} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 7de2e57d8a9..d3452726848 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -129,7 +129,7 @@ func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Conf return nil, err } - return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys) + return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{}) } // Creates a scheduler from the configuration file @@ -153,11 +153,22 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler priorityKeys.Insert(RegisterCustomPriorityFunction(priority)) } - return f.CreateFromKeys(predicateKeys, priorityKeys) + extenders := make([]algorithm.SchedulerExtender, 0) + if len(policy.ExtenderConfigs) != 0 { + for ii := range policy.ExtenderConfigs { + glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) + if extender, err := scheduler.NewHTTPExtender(&policy.ExtenderConfigs[ii], policy.APIVersion); err != nil { + return nil, err + } else { + extenders = append(extenders, extender) + } + } + } + return f.CreateFromKeys(predicateKeys, priorityKeys, extenders) } // Creates a scheduler from a set of registered fit predicate keys and priority keys. -func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) (*scheduler.Config, error) { +func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) pluginArgs := PluginFactoryArgs{ PodLister: f.PodLister, @@ -199,7 +210,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String) r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r) + algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index b996bb2ccbe..fa89390c692 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -124,12 +124,12 @@ func PredicateTwo(pod *api.Pod, existingPods []*api.Pod, node string) (bool, err return true, nil } -func PriorityOne(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { - return []algorithm.HostPriority{}, nil +func PriorityOne(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { + return []schedulerapi.HostPriority{}, nil } -func PriorityTwo(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { - return []algorithm.HostPriority{}, nil +func PriorityTwo(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { + return []schedulerapi.HostPriority{}, nil } func TestDefaultErrorFunc(t *testing.T) { diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 41a615e010e..f348d4e5ff2 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) type FailedPredicateMap map[string]sets.String @@ -55,6 +56,7 @@ func (f *FitError) Error() string { type genericScheduler struct { predicates map[string]algorithm.FitPredicate prioritizers []algorithm.PriorityConfig + extenders []algorithm.SchedulerExtender pods algorithm.PodLister random *rand.Rand randomLock sync.Mutex @@ -69,12 +71,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe return "", ErrNoNodesAvailable } - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.pods, g.predicates, nodes) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.pods, g.predicates, nodes, g.extenders) if err != nil { return "", err } - priorityList, err := PrioritizeNodes(pod, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes)) + priorityList, err := PrioritizeNodes(pod, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders) if err != nil { return "", err } @@ -90,7 +92,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe // This method takes a prioritized list of nodes and sorts them in reverse order based on scores // and then picks one randomly from the nodes that had the highest score -func (g *genericScheduler) selectHost(priorityList algorithm.HostPriorityList) (string, error) { +func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) { if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") } @@ -106,7 +108,7 @@ func (g *genericScheduler) selectHost(priorityList algorithm.HostPriorityList) ( // Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit -func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList) (api.NodeList, FailedPredicateMap, error) { +func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFuncs map[string]algorithm.FitPredicate, nodes api.NodeList, extenders []algorithm.SchedulerExtender) (api.NodeList, FailedPredicateMap, error) { filtered := []api.Node{} machineToPods, err := predicates.MapPodsToMachines(podLister) failedPredicateMap := FailedPredicateMap{} @@ -138,6 +140,18 @@ func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFunc filtered = append(filtered, node) } } + if len(filtered) > 0 && len(extenders) != 0 { + for _, extender := range extenders { + filteredList, err := extender.Filter(pod, &api.NodeList{Items: filtered}) + if err != nil { + return api.NodeList{}, FailedPredicateMap{}, err + } + filtered = filteredList.Items + if len(filtered) == 0 { + break + } + } + } return api.NodeList{Items: filtered}, failedPredicateMap, nil } @@ -147,12 +161,12 @@ func findNodesThatFit(pod *api.Pod, podLister algorithm.PodLister, predicateFunc // Each priority function can also have its own weight // The node scores returned by the priority function are multiplied by the weights to get weighted scores // All scores are finally combined (added) to get the total weighted scores of all nodes -func PrioritizeNodes(pod *api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { - result := algorithm.HostPriorityList{} +func PrioritizeNodes(pod *api.Pod, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format - if len(priorityConfigs) == 0 { + if len(priorityConfigs) == 0 && len(extenders) == 0 { return EqualPriority(pod, podLister, nodeLister) } @@ -166,20 +180,38 @@ func PrioritizeNodes(pod *api.Pod, podLister algorithm.PodLister, priorityConfig priorityFunc := priorityConfig.Function prioritizedList, err := priorityFunc(pod, podLister, nodeLister) if err != nil { - return algorithm.HostPriorityList{}, err + return schedulerapi.HostPriorityList{}, err } for _, hostEntry := range prioritizedList { combinedScores[hostEntry.Host] += hostEntry.Score * weight } } + if len(extenders) != 0 && nodeLister != nil { + nodes, err := nodeLister.List() + if err != nil { + return schedulerapi.HostPriorityList{}, err + } + + for _, extender := range extenders { + prioritizedList, weight, err := extender.Prioritize(pod, &nodes) + if err != nil { + // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities + continue + } + + for _, hostEntry := range *prioritizedList { + combinedScores[hostEntry.Host] += hostEntry.Score * weight + } + } + } for host, score := range combinedScores { glog.V(10).Infof("Host %s Score %d", host, score) - result = append(result, algorithm.HostPriority{Host: host, Score: score}) + result = append(result, schedulerapi.HostPriority{Host: host, Score: score}) } return result, nil } -func getBestHosts(list algorithm.HostPriorityList) []string { +func getBestHosts(list schedulerapi.HostPriorityList) []string { result := []string{} for _, hostEntry := range list { if hostEntry.Score == list[0].Score { @@ -192,16 +224,16 @@ func getBestHosts(list algorithm.HostPriorityList) []string { } // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes -func EqualPriority(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func EqualPriority(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() if err != nil { glog.Errorf("Failed to list nodes: %v", err) - return []algorithm.HostPriority{}, err + return []schedulerapi.HostPriority{}, err } - result := []algorithm.HostPriority{} + result := []schedulerapi.HostPriority{} for _, node := range nodes.Items { - result = append(result, algorithm.HostPriority{ + result = append(result, schedulerapi.HostPriority{ Host: node.Name, Score: 1, }) @@ -209,10 +241,11 @@ func EqualPriority(_ *api.Pod, podLister algorithm.PodLister, nodeLister algorit return result, nil } -func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm { +func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm { return &genericScheduler{ predicates: predicates, prioritizers: prioritizers, + extenders: extenders, pods: pods, random: random, } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index ee70a7a4fb2..68e1683d9fb 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" ) func falsePredicate(pod *api.Pod, existingPods []*api.Pod, node string) (bool, error) { @@ -44,9 +45,9 @@ func hasNoPodsPredicate(pod *api.Pod, existingPods []*api.Pod, node string) (boo return len(existingPods) == 0, nil } -func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { nodes, err := nodeLister.List() - result := []algorithm.HostPriority{} + result := []schedulerapi.HostPriority{} if err != nil { return nil, fmt.Errorf("failed to list nodes: %v", err) @@ -56,7 +57,7 @@ func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister alg if err != nil { return nil, err } - result = append(result, algorithm.HostPriority{ + result = append(result, schedulerapi.HostPriority{ Host: node.Name, Score: score, }) @@ -64,10 +65,10 @@ func numericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister alg return result, nil } -func reverseNumericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (algorithm.HostPriorityList, error) { +func reverseNumericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) { var maxScore float64 minScore := math.MaxFloat64 - reverseResult := []algorithm.HostPriority{} + reverseResult := []schedulerapi.HostPriority{} result, err := numericPriority(pod, podLister, nodeLister) if err != nil { return nil, err @@ -78,7 +79,7 @@ func reverseNumericPriority(pod *api.Pod, podLister algorithm.PodLister, nodeLis minScore = math.Min(minScore, float64(hostPriority.Score)) } for _, hostPriority := range result { - reverseResult = append(reverseResult, algorithm.HostPriority{ + reverseResult = append(reverseResult, schedulerapi.HostPriority{ Host: hostPriority.Host, Score: int(maxScore + minScore - float64(hostPriority.Score)), }) @@ -100,12 +101,12 @@ func makeNodeList(nodeNames []string) api.NodeList { func TestSelectHost(t *testing.T) { scheduler := genericScheduler{random: rand.New(rand.NewSource(0))} tests := []struct { - list algorithm.HostPriorityList + list schedulerapi.HostPriorityList possibleHosts sets.String expectsErr bool }{ { - list: []algorithm.HostPriority{ + list: []schedulerapi.HostPriority{ {Host: "machine1.1", Score: 1}, {Host: "machine2.1", Score: 2}, }, @@ -114,7 +115,7 @@ func TestSelectHost(t *testing.T) { }, // equal scores { - list: []algorithm.HostPriority{ + list: []schedulerapi.HostPriority{ {Host: "machine1.1", Score: 1}, {Host: "machine1.2", Score: 2}, {Host: "machine1.3", Score: 2}, @@ -125,7 +126,7 @@ func TestSelectHost(t *testing.T) { }, // out of order scores { - list: []algorithm.HostPriority{ + list: []schedulerapi.HostPriority{ {Host: "machine1.1", Score: 3}, {Host: "machine1.2", Score: 3}, {Host: "machine2.1", Score: 2}, @@ -137,7 +138,7 @@ func TestSelectHost(t *testing.T) { }, // empty priorityList { - list: []algorithm.HostPriority{}, + list: []schedulerapi.HostPriority{}, possibleHosts: sets.NewString(), expectsErr: true, }, @@ -287,7 +288,7 @@ func TestGenericScheduler(t *testing.T) { for _, test := range tests { random := rand.New(rand.NewSource(0)) - scheduler := NewGenericScheduler(test.predicates, test.prioritizers, algorithm.FakePodLister(test.pods), random) + scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { @@ -307,7 +308,7 @@ func TestGenericScheduler(t *testing.T) { func TestFindFitAllError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate} - _, predicateMap, err := findNodesThatFit(&api.Pod{}, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes)) + _, predicateMap, err := findNodesThatFit(&api.Pod{}, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -332,7 +333,7 @@ func TestFindFitSomeError(t *testing.T) { nodes := []string{"3", "2", "1"} predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "1"}} - _, predicateMap, err := findNodesThatFit(pod, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes)) + _, predicateMap, err := findNodesThatFit(pod, algorithm.FakePodLister([]*api.Pod{}), predicates, makeNodeList(nodes), nil) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 6449e6b4665..1a7bfdff58f 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -190,6 +190,7 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { algo := NewGenericScheduler( map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}, []algorithm.PriorityConfig{}, + []algorithm.SchedulerExtender{}, modeler.PodLister(), rand.New(rand.NewSource(time.Now().UnixNano()))) @@ -322,6 +323,7 @@ func TestSchedulerRateLimitsBinding(t *testing.T) { algo := NewGenericScheduler( map[string]algorithm.FitPredicate{}, []algorithm.PriorityConfig{}, + []algorithm.SchedulerExtender{}, modeler.PodLister(), rand.New(rand.NewSource(time.Now().UnixNano()))) diff --git a/test/integration/extender_test.go b/test/integration/extender_test.go new file mode 100644 index 00000000000..d6874a3ce8f --- /dev/null +++ b/test/integration/extender_test.go @@ -0,0 +1,332 @@ +// +build integration,!no-etcd + +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +// This file tests scheduler extender. + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apiserver" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/plugin/pkg/admission/admit" + "k8s.io/kubernetes/plugin/pkg/scheduler" + _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/factory" + "k8s.io/kubernetes/test/integration/framework" +) + +const ( + filter = "filter" + prioritize = "prioritize" +) + +type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error) +type priorityFunc func(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) + +type priorityConfig struct { + function priorityFunc + weight int +} + +type Extender struct { + name string + predicates []fitPredicate + prioritizers []priorityConfig +} + +func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) { + var args schedulerapi.ExtenderArgs + + decoder := json.NewDecoder(req.Body) + defer req.Body.Close() + + if err := decoder.Decode(&args); err != nil { + http.Error(w, "Decode error", http.StatusBadRequest) + return + } + + encoder := json.NewEncoder(w) + + if strings.Contains(req.URL.Path, filter) { + resp := &schedulerapi.ExtenderFilterResult{} + nodes, err := e.Filter(&args.Pod, &args.Nodes) + if err != nil { + resp.Error = err.Error() + } else { + resp.Nodes = *nodes + } + + if err := encoder.Encode(resp); err != nil { + t.Fatalf("Failed to encode %+v", resp) + } + } else if strings.Contains(req.URL.Path, prioritize) { + // Prioritize errors are ignored. Default k8s priorities or another extender's + // priorities may be applied. + priorities, _ := e.Prioritize(&args.Pod, &args.Nodes) + + if err := encoder.Encode(priorities); err != nil { + t.Fatalf("Failed to encode %+v", priorities) + } + } else { + http.Error(w, "Unknown method", http.StatusNotFound) + } +} + +func (e *Extender) Filter(pod *api.Pod, nodes *api.NodeList) (*api.NodeList, error) { + filtered := []api.Node{} + for _, node := range nodes.Items { + fits := true + for _, predicate := range e.predicates { + fit, err := predicate(pod, &node) + if err != nil { + return &api.NodeList{}, err + } + if !fit { + fits = false + break + } + } + if fits { + filtered = append(filtered, node) + } + } + return &api.NodeList{Items: filtered}, nil +} + +func (e *Extender) Prioritize(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} + combinedScores := map[string]int{} + for _, prioritizer := range e.prioritizers { + weight := prioritizer.weight + if weight == 0 { + continue + } + priorityFunc := prioritizer.function + prioritizedList, err := priorityFunc(pod, nodes) + if err != nil { + return &schedulerapi.HostPriorityList{}, err + } + for _, hostEntry := range *prioritizedList { + combinedScores[hostEntry.Host] += hostEntry.Score * weight + } + } + for host, score := range combinedScores { + result = append(result, schedulerapi.HostPriority{Host: host, Score: score}) + } + return &result, nil +} + +func machine_1_2_3_Predicate(pod *api.Pod, node *api.Node) (bool, error) { + if node.Name == "machine1" || node.Name == "machine2" || node.Name == "machine3" { + return true, nil + } + return false, nil +} + +func machine_2_3_5_Predicate(pod *api.Pod, node *api.Node) (bool, error) { + if node.Name == "machine2" || node.Name == "machine3" || node.Name == "machine5" { + return true, nil + } + return false, nil +} + +func machine_2_Prioritizer(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} + for _, node := range nodes.Items { + score := 1 + if node.Name == "machine2" { + score = 10 + } + result = append(result, schedulerapi.HostPriority{node.Name, score}) + } + return &result, nil +} + +func machine_3_Prioritizer(pod *api.Pod, nodes *api.NodeList) (*schedulerapi.HostPriorityList, error) { + result := schedulerapi.HostPriorityList{} + for _, node := range nodes.Items { + score := 1 + if node.Name == "machine3" { + score = 10 + } + result = append(result, schedulerapi.HostPriority{node.Name, score}) + } + return &result, nil +} + +func TestSchedulerExtender(t *testing.T) { + etcdStorage, err := framework.NewEtcdStorage() + if err != nil { + t.Fatalf("Couldn't create etcd storage: %v", err) + } + expEtcdStorage, err := framework.NewExtensionsEtcdStorage(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + storageDestinations := master.NewStorageDestinations() + storageDestinations.AddAPIGroup("", etcdStorage) + storageDestinations.AddAPIGroup("extensions", expEtcdStorage) + + storageVersions := make(map[string]string) + storageVersions[""] = testapi.Default.Version() + storageVersions["extensions"] = testapi.Extensions.GroupAndVersion() + + framework.DeleteAllEtcdKeys() + + var m *master.Master + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + m.Handler.ServeHTTP(w, req) + })) + defer s.Close() + + m = master.New(&master.Config{ + StorageDestinations: storageDestinations, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + StorageVersions: storageVersions, + }) + + restClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()}) + + extender1 := &Extender{ + name: "extender1", + predicates: []fitPredicate{machine_1_2_3_Predicate}, + prioritizers: []priorityConfig{{machine_2_Prioritizer, 1}}, + } + es1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + extender1.serveHTTP(t, w, req) + })) + defer es1.Close() + + extender2 := &Extender{ + name: "extender2", + predicates: []fitPredicate{machine_2_3_5_Predicate}, + prioritizers: []priorityConfig{{machine_3_Prioritizer, 1}}, + } + es2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + extender2.serveHTTP(t, w, req) + })) + defer es2.Close() + + policy := schedulerapi.Policy{ + ExtenderConfigs: []schedulerapi.ExtenderConfig{ + { + URLPrefix: es1.URL, + FilterVerb: filter, + PrioritizeVerb: prioritize, + Weight: 3, + EnableHttps: false, + }, + { + URLPrefix: es2.URL, + FilterVerb: filter, + PrioritizeVerb: prioritize, + Weight: 4, + EnableHttps: false, + }, + }, + } + policy.APIVersion = testapi.Default.Version() + + schedulerConfigFactory := factory.NewConfigFactory(restClient, nil) + schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy) + if err != nil { + t.Fatalf("Couldn't create scheduler config: %v", err) + } + eventBroadcaster := record.NewBroadcaster() + schedulerConfig.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}) + eventBroadcaster.StartRecordingToSink(restClient.Events("")) + scheduler.New(schedulerConfig).Run() + + defer close(schedulerConfig.StopEverything) + + DoTestPodScheduling(t, restClient) +} + +func DoTestPodScheduling(t *testing.T, restClient *client.Client) { + goodCondition := api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionTrue, + Reason: fmt.Sprintf("schedulable condition"), + LastHeartbeatTime: unversioned.Time{time.Now()}, + } + node := &api.Node{ + Spec: api.NodeSpec{Unschedulable: false}, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI), + }, + Conditions: []api.NodeCondition{goodCondition}, + }, + } + + for ii := 0; ii < 5; ii++ { + node.Name = fmt.Sprintf("machine%d", ii+1) + if _, err := restClient.Nodes().Create(node); err != nil { + t.Fatalf("Failed to create nodes: %v", err) + } + } + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "extender-test-pod"}, + Spec: api.PodSpec{ + Containers: []api.Container{{Name: "container", Image: "kubernetes/pause:go"}}, + }, + } + + myPod, err := restClient.Pods(api.NamespaceDefault).Create(pod) + if err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + + err = wait.Poll(time.Second, util.ForeverTestTimeout, podScheduled(restClient, myPod.Namespace, myPod.Name)) + if err != nil { + t.Fatalf("Failed to schedule pod: %v", err) + } + + if myPod, err := restClient.Pods(api.NamespaceDefault).Get(myPod.Name); err != nil { + t.Fatalf("Failed to get pod: %v", err) + } else if myPod.Spec.NodeName != "machine3" { + t.Fatalf("Failed to schedule using extender, expected machine3, got %v", myPod.Spec.NodeName) + } + t.Logf("Scheduled pod using extenders") +}