Added Bind method to Scheduler Extender
- only one extender can support the bind method - if an extender supports bind, scheduler delegates the pod binding to the extender
This commit is contained in:
		| @@ -19,6 +19,7 @@ | ||||
|         "urlPrefix": "http://127.0.0.1:12346/scheduler", | ||||
|         "apiVersion": "v1beta1", | ||||
|         "filterVerb": "filter", | ||||
|         "bindVerb": "bind", | ||||
|         "prioritizeVerb": "prioritize", | ||||
|         "weight": 5, | ||||
|         "enableHttps": false, | ||||
|   | ||||
| @@ -35,6 +35,12 @@ type SchedulerExtender interface { | ||||
| 	// are used to compute the weighted score for an extender. The weighted scores are added to | ||||
| 	// the scores computed  by Kubernetes scheduler. The total scores are used to do the host selection. | ||||
| 	Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) | ||||
|  | ||||
| 	// Bind delegates the action of binding a pod to a node to the extender. | ||||
| 	Bind(binding *v1.Binding) error | ||||
|  | ||||
| 	// IsBinder returns whether this extender is configured for the Bind method. | ||||
| 	IsBinder() bool | ||||
| } | ||||
|  | ||||
| // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods | ||||
|   | ||||
| @@ -19,6 +19,7 @@ go_library( | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/rest:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| ) | ||||
| @@ -133,6 +134,10 @@ type ExtenderConfig struct { | ||||
| 	// The numeric multiplier for the node scores that the prioritize call generates. | ||||
| 	// The weight should be a positive integer | ||||
| 	Weight int | ||||
| 	// Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender. | ||||
| 	// If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender | ||||
| 	// can implement this function. | ||||
| 	BindVerb string | ||||
| 	// EnableHttps specifies whether https should be used to communicate with the extender | ||||
| 	EnableHttps bool | ||||
| 	// TLSConfig specifies the transport layer security config | ||||
| @@ -176,6 +181,24 @@ type ExtenderFilterResult struct { | ||||
| 	Error string | ||||
| } | ||||
|  | ||||
| // ExtenderBindingArgs represents the arguments to an extender for binding a pod to a node. | ||||
| type ExtenderBindingArgs struct { | ||||
| 	// PodName is the name of the pod being bound | ||||
| 	PodName string | ||||
| 	// PodNamespace is the namespace of the pod being bound | ||||
| 	PodNamespace string | ||||
| 	// PodUID is the UID of the pod being bound | ||||
| 	PodUID types.UID | ||||
| 	// Node selected by the scheduler | ||||
| 	Node string | ||||
| } | ||||
|  | ||||
| // ExtenderBindingResult represents the result of binding of a pod to a node from an extender. | ||||
| type ExtenderBindingResult struct { | ||||
| 	// Error message indicating failure | ||||
| 	Error string | ||||
| } | ||||
|  | ||||
| // HostPriority represents the priority of scheduling to a particular host, higher priority is better. | ||||
| type HostPriority struct { | ||||
| 	// Name of the host | ||||
|   | ||||
| @@ -20,6 +20,7 @@ go_library( | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/rest:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	apiv1 "k8s.io/kubernetes/pkg/api/v1" | ||||
| ) | ||||
| @@ -125,6 +126,10 @@ type ExtenderConfig struct { | ||||
| 	// The numeric multiplier for the node scores that the prioritize call generates. | ||||
| 	// The weight should be a positive integer | ||||
| 	Weight int `json:"weight,omitempty"` | ||||
| 	// Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender. | ||||
| 	// If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender | ||||
| 	// can implement this function. | ||||
| 	BindVerb string | ||||
| 	// EnableHttps specifies whether https should be used to communicate with the extender | ||||
| 	EnableHttps bool `json:"enableHttps,omitempty"` | ||||
| 	// TLSConfig specifies the transport layer security config | ||||
| @@ -168,6 +173,24 @@ type ExtenderFilterResult struct { | ||||
| 	Error string `json:"error,omitempty"` | ||||
| } | ||||
|  | ||||
| // ExtenderBindingArgs represents the arguments to an extender for binding a pod to a node. | ||||
| type ExtenderBindingArgs struct { | ||||
| 	// PodName is the name of the pod being bound | ||||
| 	PodName string | ||||
| 	// PodNamespace is the namespace of the pod being bound | ||||
| 	PodNamespace string | ||||
| 	// PodUID is the UID of the pod being bound | ||||
| 	PodUID types.UID | ||||
| 	// Node selected by the scheduler | ||||
| 	Node string | ||||
| } | ||||
|  | ||||
| // ExtenderBindingResult represents the result of binding of a pod to a node from an extender. | ||||
| type ExtenderBindingResult struct { | ||||
| 	// Error message indicating failure | ||||
| 	Error string | ||||
| } | ||||
|  | ||||
| // HostPriority represents the priority of scheduling to a particular host, higher priority is better. | ||||
| type HostPriority struct { | ||||
| 	// Name of the host | ||||
|   | ||||
| @@ -34,10 +34,17 @@ func ValidatePolicy(policy schedulerapi.Policy) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	binders := 0 | ||||
| 	for _, extender := range policy.ExtenderConfigs { | ||||
| 		if extender.Weight <= 0 { | ||||
| 			validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix)) | ||||
| 		} | ||||
| 		if extender.BindVerb != "" { | ||||
| 			binders++ | ||||
| 		} | ||||
| 	} | ||||
| 	if binders > 1 { | ||||
| 		validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders)) | ||||
| 	} | ||||
| 	return utilerrors.NewAggregate(validationErrors) | ||||
| } | ||||
|   | ||||
| @@ -72,3 +72,15 @@ func TestValidateExtenderWithNegativeWeight(t *testing.T) { | ||||
| 		t.Errorf("Expected error about priority weight for extender not being positive") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestValidateMultipleExtendersWithBind(t *testing.T) { | ||||
| 	extenderPolicy := api.Policy{ | ||||
| 		ExtenderConfigs: []api.ExtenderConfig{ | ||||
| 			{URLPrefix: "http://127.0.0.1:8081/extender", BindVerb: "bind"}, | ||||
| 			{URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind"}, | ||||
| 		}, | ||||
| 	} | ||||
| 	if ValidatePolicy(extenderPolicy) == nil { | ||||
| 		t.Errorf("Expected failure when multiple extenders with bind") | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -41,6 +41,7 @@ type HTTPExtender struct { | ||||
| 	extenderURL      string | ||||
| 	filterVerb       string | ||||
| 	prioritizeVerb   string | ||||
| 	bindVerb         string | ||||
| 	weight           int | ||||
| 	client           *http.Client | ||||
| 	nodeCacheCapable bool | ||||
| @@ -86,6 +87,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx | ||||
| 		extenderURL:      config.URLPrefix, | ||||
| 		filterVerb:       config.FilterVerb, | ||||
| 		prioritizeVerb:   config.PrioritizeVerb, | ||||
| 		bindVerb:         config.BindVerb, | ||||
| 		weight:           config.Weight, | ||||
| 		client:           client, | ||||
| 		nodeCacheCapable: config.NodeCacheCapable, | ||||
| @@ -193,6 +195,33 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. | ||||
| 	return &result, h.weight, nil | ||||
| } | ||||
|  | ||||
| // Bind delegates the action of binding a pod to a node to the extender. | ||||
| func (h *HTTPExtender) Bind(binding *v1.Binding) error { | ||||
| 	var result schedulerapi.ExtenderBindingResult | ||||
| 	if !h.IsBinder() { | ||||
| 		// This shouldn't happen as this extender wouldn't have become a Binder. | ||||
| 		return fmt.Errorf("Unexpected empty bindVerb in extender") | ||||
| 	} | ||||
| 	req := &schedulerapi.ExtenderBindingArgs{ | ||||
| 		PodName:      binding.Name, | ||||
| 		PodNamespace: binding.Namespace, | ||||
| 		PodUID:       binding.UID, | ||||
| 		Node:         binding.Target.Name, | ||||
| 	} | ||||
| 	if err := h.send(h.bindVerb, &req, &result); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if result.Error != "" { | ||||
| 		return fmt.Errorf(result.Error) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // IsBinder returns whether this extender is configured for the Bind method. | ||||
| func (h *HTTPExtender) IsBinder() bool { | ||||
| 	return h.bindVerb != "" | ||||
| } | ||||
|  | ||||
| // Helper function to send messages to the extender | ||||
| func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error { | ||||
| 	out, err := json.Marshal(args) | ||||
| @@ -214,6 +243,10 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{}) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if resp.StatusCode != http.StatusOK { | ||||
| 		return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, h.extenderURL, resp.StatusCode) | ||||
| 	} | ||||
|  | ||||
| 	defer resp.Body.Close() | ||||
| 	body, err := ioutil.ReadAll(resp.Body) | ||||
| 	if err != nil { | ||||
|   | ||||
| @@ -109,6 +109,7 @@ type FakeExtender struct { | ||||
| 	prioritizers     []priorityConfig | ||||
| 	weight           int | ||||
| 	nodeCacheCapable bool | ||||
| 	filteredNodes    []*v1.Node | ||||
| } | ||||
|  | ||||
| func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { | ||||
| @@ -133,6 +134,7 @@ func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[ | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	f.filteredNodes = filtered | ||||
| 	if f.nodeCacheCapable { | ||||
| 		return filtered, failedNodesMap, nil | ||||
| 	} | ||||
| @@ -162,6 +164,25 @@ func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. | ||||
| 	return &result, f.weight, nil | ||||
| } | ||||
|  | ||||
| func (f *FakeExtender) Bind(binding *v1.Binding) error { | ||||
| 	if len(f.filteredNodes) != 0 { | ||||
| 		for _, node := range f.filteredNodes { | ||||
| 			if node.Name == binding.Target.Name { | ||||
| 				f.filteredNodes = nil | ||||
| 				return nil | ||||
| 			} | ||||
| 		} | ||||
| 		err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.filteredNodes) | ||||
| 		f.filteredNodes = nil | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (f *FakeExtender) IsBinder() bool { | ||||
| 	return true | ||||
| } | ||||
|  | ||||
| func TestGenericSchedulerWithExtenders(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name                 string | ||||
|   | ||||
| @@ -386,6 +386,16 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler | ||||
| 	return f.CreateFromKeys(predicateKeys, priorityKeys, extenders) | ||||
| } | ||||
|  | ||||
| // getBinder returns an extender that supports bind or a default binder. | ||||
| func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { | ||||
| 	for i := range extenders { | ||||
| 		if extenders[i].IsBinder() { | ||||
| 			return extenders[i] | ||||
| 		} | ||||
| 	} | ||||
| 	return &binder{f.client} | ||||
| } | ||||
|  | ||||
| // Creates a scheduler from a set of registered fit predicate keys and priority keys. | ||||
| func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { | ||||
| 	glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) | ||||
| @@ -422,7 +432,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, | ||||
| 		// The scheduler only needs to consider schedulable nodes. | ||||
| 		NodeLister:          &nodePredicateLister{f.nodeLister}, | ||||
| 		Algorithm:           algo, | ||||
| 		Binder:              &binder{f.client}, | ||||
| 		Binder:              f.getBinder(extenders), | ||||
| 		PodConditionUpdater: &podConditionUpdater{f.client}, | ||||
| 		WaitForCacheSync: func() bool { | ||||
| 			return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced) | ||||
|   | ||||
| @@ -271,7 +271,7 @@ func (sched *Scheduler) scheduleOne() { | ||||
| 	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). | ||||
| 	go func() { | ||||
| 		err := sched.bind(pod, &v1.Binding{ | ||||
| 			ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, | ||||
| 			ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID}, | ||||
| 			Target: v1.ObjectReference{ | ||||
| 				Kind: "Node", | ||||
| 				Name: suggestedHost, | ||||
|   | ||||
| @@ -49,6 +49,7 @@ import ( | ||||
| const ( | ||||
| 	filter     = "filter" | ||||
| 	prioritize = "prioritize" | ||||
| 	bind       = "bind" | ||||
| ) | ||||
|  | ||||
| type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error) | ||||
| @@ -64,21 +65,23 @@ type Extender struct { | ||||
| 	predicates       []fitPredicate | ||||
| 	prioritizers     []priorityConfig | ||||
| 	nodeCacheCapable bool | ||||
| 	Client           clientset.Interface | ||||
| } | ||||
|  | ||||
| func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) { | ||||
| 	var args schedulerapi.ExtenderArgs | ||||
|  | ||||
| 	decoder := json.NewDecoder(req.Body) | ||||
| 	defer req.Body.Close() | ||||
|  | ||||
| 	encoder := json.NewEncoder(w) | ||||
|  | ||||
| 	if strings.Contains(req.URL.Path, filter) || strings.Contains(req.URL.Path, prioritize) { | ||||
| 		var args schedulerapi.ExtenderArgs | ||||
|  | ||||
| 		if err := decoder.Decode(&args); err != nil { | ||||
| 			http.Error(w, "Decode error", http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 	encoder := json.NewEncoder(w) | ||||
|  | ||||
| 		if strings.Contains(req.URL.Path, filter) { | ||||
| 			resp := &schedulerapi.ExtenderFilterResult{} | ||||
| 			resp, err := e.Filter(&args) | ||||
| @@ -87,7 +90,7 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ | ||||
| 			} | ||||
|  | ||||
| 			if err := encoder.Encode(resp); err != nil { | ||||
| 			t.Fatalf("Failed to encode %+v", resp) | ||||
| 				t.Fatalf("Failed to encode %v", resp) | ||||
| 			} | ||||
| 		} else if strings.Contains(req.URL.Path, prioritize) { | ||||
| 			// Prioritize errors are ignored. Default k8s priorities or another extender's | ||||
| @@ -97,6 +100,24 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ | ||||
| 			if err := encoder.Encode(priorities); err != nil { | ||||
| 				t.Fatalf("Failed to encode %+v", priorities) | ||||
| 			} | ||||
| 		} | ||||
| 	} else if strings.Contains(req.URL.Path, bind) { | ||||
| 		var args schedulerapi.ExtenderBindingArgs | ||||
|  | ||||
| 		if err := decoder.Decode(&args); err != nil { | ||||
| 			http.Error(w, "Decode error", http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		resp := &schedulerapi.ExtenderBindingResult{} | ||||
|  | ||||
| 		if err := e.Bind(&args); err != nil { | ||||
| 			resp.Error = err.Error() | ||||
| 		} | ||||
|  | ||||
| 		if err := encoder.Encode(resp); err != nil { | ||||
| 			t.Fatalf("Failed to encode %+v", resp) | ||||
| 		} | ||||
| 	} else { | ||||
| 		http.Error(w, "Unknown method", http.StatusNotFound) | ||||
| 	} | ||||
| @@ -209,6 +230,18 @@ func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.Ho | ||||
| 	return &result, nil | ||||
| } | ||||
|  | ||||
| func (e *Extender) Bind(binding *schedulerapi.ExtenderBindingArgs) error { | ||||
| 	b := &v1.Binding{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Namespace: binding.PodNamespace, Name: binding.PodName, UID: binding.PodUID}, | ||||
| 		Target: v1.ObjectReference{ | ||||
| 			Kind: "Node", | ||||
| 			Name: binding.Node, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	return e.Client.CoreV1().Pods(b.Namespace).Bind(b) | ||||
| } | ||||
|  | ||||
| func machine_1_2_3_Predicate(pod *v1.Pod, node *v1.Node) (bool, error) { | ||||
| 	if node.Name == "machine1" || node.Name == "machine2" || node.Name == "machine3" { | ||||
| 		return true, nil | ||||
| @@ -276,6 +309,7 @@ func TestSchedulerExtender(t *testing.T) { | ||||
| 		name:         "extender2", | ||||
| 		predicates:   []fitPredicate{machine_2_3_5_Predicate}, | ||||
| 		prioritizers: []priorityConfig{{machine_3_Prioritizer, 1}}, | ||||
| 		Client:       clientSet, | ||||
| 	} | ||||
| 	es2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { | ||||
| 		extender2.serveHTTP(t, w, req) | ||||
| @@ -306,6 +340,7 @@ func TestSchedulerExtender(t *testing.T) { | ||||
| 				URLPrefix:      es2.URL, | ||||
| 				FilterVerb:     filter, | ||||
| 				PrioritizeVerb: prioritize, | ||||
| 				BindVerb:       bind, | ||||
| 				Weight:         4, | ||||
| 				EnableHttps:    false, | ||||
| 			}, | ||||
| @@ -402,5 +437,13 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) | ||||
| 	} else if myPod.Spec.NodeName != "machine2" { | ||||
| 		t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName) | ||||
| 	} | ||||
| 	var gracePeriod int64 | ||||
| 	if err := cs.Core().Pods(ns.Name).Delete(myPod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil { | ||||
| 		t.Fatalf("Failed to delete pod: %v", err) | ||||
| 	} | ||||
| 	_, err = cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}) | ||||
| 	if err == nil { | ||||
| 		t.Fatalf("Failed to delete pod: %v", err) | ||||
| 	} | ||||
| 	t.Logf("Scheduled pod using extenders") | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Ravi Gadde
					Ravi Gadde