diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index 071b08d2bf..996e6efc82 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -19,6 +19,7 @@ "urlPrefix": "http://127.0.0.1:12346/scheduler", "apiVersion": "v1beta1", "filterVerb": "filter", + "bindVerb": "bind", "prioritizeVerb": "prioritize", "weight": 5, "enableHttps": false, diff --git a/plugin/pkg/scheduler/algorithm/scheduler_interface.go b/plugin/pkg/scheduler/algorithm/scheduler_interface.go index ecdf935f85..e55e4b5365 100644 --- a/plugin/pkg/scheduler/algorithm/scheduler_interface.go +++ b/plugin/pkg/scheduler/algorithm/scheduler_interface.go @@ -35,6 +35,12 @@ type SchedulerExtender interface { // are used to compute the weighted score for an extender. The weighted scores are added to // the scores computed by Kubernetes scheduler. The total scores are used to do the host selection. Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) + + // Bind delegates the action of binding a pod to a node to the extender. + Bind(binding *v1.Binding) error + + // IsBinder returns whether this extender is configured for the Bind method. + IsBinder() bool } // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods diff --git a/plugin/pkg/scheduler/api/BUILD b/plugin/pkg/scheduler/api/BUILD index 234215e8cf..962a334edc 100644 --- a/plugin/pkg/scheduler/api/BUILD +++ b/plugin/pkg/scheduler/api/BUILD @@ -19,6 +19,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/api/types.go b/plugin/pkg/scheduler/api/types.go index 6498088dd8..7296a51c2e 100644 --- a/plugin/pkg/scheduler/api/types.go +++ b/plugin/pkg/scheduler/api/types.go @@ -20,6 +20,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api/v1" ) @@ -133,6 +134,10 @@ type ExtenderConfig struct { // The numeric multiplier for the node scores that the prioritize call generates. // The weight should be a positive integer Weight int + // Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender. + // If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender + // can implement this function. + BindVerb string // EnableHttps specifies whether https should be used to communicate with the extender EnableHttps bool // TLSConfig specifies the transport layer security config @@ -176,6 +181,24 @@ type ExtenderFilterResult struct { Error string } +// ExtenderBindingArgs represents the arguments to an extender for binding a pod to a node. +type ExtenderBindingArgs struct { + // PodName is the name of the pod being bound + PodName string + // PodNamespace is the namespace of the pod being bound + PodNamespace string + // PodUID is the UID of the pod being bound + PodUID types.UID + // Node selected by the scheduler + Node string +} + +// ExtenderBindingResult represents the result of binding of a pod to a node from an extender. +type ExtenderBindingResult struct { + // Error message indicating failure + Error string +} + // HostPriority represents the priority of scheduling to a particular host, higher priority is better. type HostPriority struct { // Name of the host diff --git a/plugin/pkg/scheduler/api/v1/BUILD b/plugin/pkg/scheduler/api/v1/BUILD index dce7c7b3c0..b438b8c4d7 100644 --- a/plugin/pkg/scheduler/api/v1/BUILD +++ b/plugin/pkg/scheduler/api/v1/BUILD @@ -20,6 +20,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/api/v1/types.go b/plugin/pkg/scheduler/api/v1/types.go index d8c6f50748..20f109b898 100644 --- a/plugin/pkg/scheduler/api/v1/types.go +++ b/plugin/pkg/scheduler/api/v1/types.go @@ -20,6 +20,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" restclient "k8s.io/client-go/rest" apiv1 "k8s.io/kubernetes/pkg/api/v1" ) @@ -125,6 +126,10 @@ type ExtenderConfig struct { // The numeric multiplier for the node scores that the prioritize call generates. // The weight should be a positive integer Weight int `json:"weight,omitempty"` + // Verb for the bind call, empty if not supported. This verb is appended to the URLPrefix when issuing the bind call to extender. + // If this method is implemented by the extender, it is the extender's responsibility to bind the pod to apiserver. Only one extender + // can implement this function. + BindVerb string // EnableHttps specifies whether https should be used to communicate with the extender EnableHttps bool `json:"enableHttps,omitempty"` // TLSConfig specifies the transport layer security config @@ -168,6 +173,24 @@ type ExtenderFilterResult struct { Error string `json:"error,omitempty"` } +// ExtenderBindingArgs represents the arguments to an extender for binding a pod to a node. +type ExtenderBindingArgs struct { + // PodName is the name of the pod being bound + PodName string + // PodNamespace is the namespace of the pod being bound + PodNamespace string + // PodUID is the UID of the pod being bound + PodUID types.UID + // Node selected by the scheduler + Node string +} + +// ExtenderBindingResult represents the result of binding of a pod to a node from an extender. +type ExtenderBindingResult struct { + // Error message indicating failure + Error string +} + // HostPriority represents the priority of scheduling to a particular host, higher priority is better. type HostPriority struct { // Name of the host diff --git a/plugin/pkg/scheduler/api/validation/validation.go b/plugin/pkg/scheduler/api/validation/validation.go index 9eb616745f..620a0343ed 100644 --- a/plugin/pkg/scheduler/api/validation/validation.go +++ b/plugin/pkg/scheduler/api/validation/validation.go @@ -34,10 +34,17 @@ func ValidatePolicy(policy schedulerapi.Policy) error { } } + binders := 0 for _, extender := range policy.ExtenderConfigs { if extender.Weight <= 0 { validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix)) } + if extender.BindVerb != "" { + binders++ + } + } + if binders > 1 { + validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders)) } return utilerrors.NewAggregate(validationErrors) } diff --git a/plugin/pkg/scheduler/api/validation/validation_test.go b/plugin/pkg/scheduler/api/validation/validation_test.go index afc684dca6..1e5f44ec91 100644 --- a/plugin/pkg/scheduler/api/validation/validation_test.go +++ b/plugin/pkg/scheduler/api/validation/validation_test.go @@ -72,3 +72,15 @@ func TestValidateExtenderWithNegativeWeight(t *testing.T) { t.Errorf("Expected error about priority weight for extender not being positive") } } + +func TestValidateMultipleExtendersWithBind(t *testing.T) { + extenderPolicy := api.Policy{ + ExtenderConfigs: []api.ExtenderConfig{ + {URLPrefix: "http://127.0.0.1:8081/extender", BindVerb: "bind"}, + {URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind"}, + }, + } + if ValidatePolicy(extenderPolicy) == nil { + t.Errorf("Expected failure when multiple extenders with bind") + } +} diff --git a/plugin/pkg/scheduler/core/extender.go b/plugin/pkg/scheduler/core/extender.go index bcc42d774c..1894e1b3ac 100644 --- a/plugin/pkg/scheduler/core/extender.go +++ b/plugin/pkg/scheduler/core/extender.go @@ -41,6 +41,7 @@ type HTTPExtender struct { extenderURL string filterVerb string prioritizeVerb string + bindVerb string weight int client *http.Client nodeCacheCapable bool @@ -86,6 +87,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx extenderURL: config.URLPrefix, filterVerb: config.FilterVerb, prioritizeVerb: config.PrioritizeVerb, + bindVerb: config.BindVerb, weight: config.Weight, client: client, nodeCacheCapable: config.NodeCacheCapable, @@ -193,6 +195,33 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. return &result, h.weight, nil } +// Bind delegates the action of binding a pod to a node to the extender. +func (h *HTTPExtender) Bind(binding *v1.Binding) error { + var result schedulerapi.ExtenderBindingResult + if !h.IsBinder() { + // This shouldn't happen as this extender wouldn't have become a Binder. + return fmt.Errorf("Unexpected empty bindVerb in extender") + } + req := &schedulerapi.ExtenderBindingArgs{ + PodName: binding.Name, + PodNamespace: binding.Namespace, + PodUID: binding.UID, + Node: binding.Target.Name, + } + if err := h.send(h.bindVerb, &req, &result); err != nil { + return err + } + if result.Error != "" { + return fmt.Errorf(result.Error) + } + return nil +} + +// IsBinder returns whether this extender is configured for the Bind method. +func (h *HTTPExtender) IsBinder() bool { + return h.bindVerb != "" +} + // Helper function to send messages to the extender func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error { out, err := json.Marshal(args) @@ -214,6 +243,10 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{}) return err } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, h.extenderURL, resp.StatusCode) + } + defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index d9d5b83fe8..dd01ae2484 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -109,6 +109,7 @@ type FakeExtender struct { prioritizers []priorityConfig weight int nodeCacheCapable bool + filteredNodes []*v1.Node } func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { @@ -133,6 +134,7 @@ func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[ } } + f.filteredNodes = filtered if f.nodeCacheCapable { return filtered, failedNodesMap, nil } @@ -162,6 +164,25 @@ func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. return &result, f.weight, nil } +func (f *FakeExtender) Bind(binding *v1.Binding) error { + if len(f.filteredNodes) != 0 { + for _, node := range f.filteredNodes { + if node.Name == binding.Target.Name { + f.filteredNodes = nil + return nil + } + } + err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.filteredNodes) + f.filteredNodes = nil + return err + } + return nil +} + +func (f *FakeExtender) IsBinder() bool { + return true +} + func TestGenericSchedulerWithExtenders(t *testing.T) { tests := []struct { name string diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 5a540622d8..1862c2e547 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -386,6 +386,16 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return f.CreateFromKeys(predicateKeys, priorityKeys, extenders) } +// getBinder returns an extender that supports bind or a default binder. +func (f *ConfigFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder { + for i := range extenders { + if extenders[i].IsBinder() { + return extenders[i] + } + } + return &binder{f.client} +} + // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) @@ -422,7 +432,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // The scheduler only needs to consider schedulable nodes. NodeLister: &nodePredicateLister{f.nodeLister}, Algorithm: algo, - Binder: &binder{f.client}, + Binder: f.getBinder(extenders), PodConditionUpdater: &podConditionUpdater{f.client}, WaitForCacheSync: func() bool { return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index ed5f2a61d1..f13f2f5c9e 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -271,7 +271,7 @@ func (sched *Scheduler) scheduleOne() { // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { err := sched.bind(pod, &v1.Binding{ - ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name}, + ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: suggestedHost, diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 1c758a9bc5..cc362a7f48 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -49,6 +49,7 @@ import ( const ( filter = "filter" prioritize = "prioritize" + bind = "bind" ) type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error) @@ -64,39 +65,59 @@ type Extender struct { predicates []fitPredicate prioritizers []priorityConfig nodeCacheCapable bool + Client clientset.Interface } func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) { - var args schedulerapi.ExtenderArgs - decoder := json.NewDecoder(req.Body) defer req.Body.Close() - if err := decoder.Decode(&args); err != nil { - http.Error(w, "Decode error", http.StatusBadRequest) - return - } - encoder := json.NewEncoder(w) - if strings.Contains(req.URL.Path, filter) { - resp := &schedulerapi.ExtenderFilterResult{} - resp, err := e.Filter(&args) - if err != nil { + if strings.Contains(req.URL.Path, filter) || strings.Contains(req.URL.Path, prioritize) { + var args schedulerapi.ExtenderArgs + + if err := decoder.Decode(&args); err != nil { + http.Error(w, "Decode error", http.StatusBadRequest) + return + } + + if strings.Contains(req.URL.Path, filter) { + resp := &schedulerapi.ExtenderFilterResult{} + resp, err := e.Filter(&args) + if err != nil { + resp.Error = err.Error() + } + + if err := encoder.Encode(resp); err != nil { + t.Fatalf("Failed to encode %v", resp) + } + } else if strings.Contains(req.URL.Path, prioritize) { + // Prioritize errors are ignored. Default k8s priorities or another extender's + // priorities may be applied. + priorities, _ := e.Prioritize(&args) + + if err := encoder.Encode(priorities); err != nil { + t.Fatalf("Failed to encode %+v", priorities) + } + } + } else if strings.Contains(req.URL.Path, bind) { + var args schedulerapi.ExtenderBindingArgs + + if err := decoder.Decode(&args); err != nil { + http.Error(w, "Decode error", http.StatusBadRequest) + return + } + + resp := &schedulerapi.ExtenderBindingResult{} + + if err := e.Bind(&args); err != nil { resp.Error = err.Error() } if err := encoder.Encode(resp); err != nil { t.Fatalf("Failed to encode %+v", resp) } - } else if strings.Contains(req.URL.Path, prioritize) { - // Prioritize errors are ignored. Default k8s priorities or another extender's - // priorities may be applied. - priorities, _ := e.Prioritize(&args) - - if err := encoder.Encode(priorities); err != nil { - t.Fatalf("Failed to encode %+v", priorities) - } } else { http.Error(w, "Unknown method", http.StatusNotFound) } @@ -209,6 +230,18 @@ func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.Ho return &result, nil } +func (e *Extender) Bind(binding *schedulerapi.ExtenderBindingArgs) error { + b := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: binding.PodNamespace, Name: binding.PodName, UID: binding.PodUID}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: binding.Node, + }, + } + + return e.Client.CoreV1().Pods(b.Namespace).Bind(b) +} + func machine_1_2_3_Predicate(pod *v1.Pod, node *v1.Node) (bool, error) { if node.Name == "machine1" || node.Name == "machine2" || node.Name == "machine3" { return true, nil @@ -276,6 +309,7 @@ func TestSchedulerExtender(t *testing.T) { name: "extender2", predicates: []fitPredicate{machine_2_3_5_Predicate}, prioritizers: []priorityConfig{{machine_3_Prioritizer, 1}}, + Client: clientSet, } es2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { extender2.serveHTTP(t, w, req) @@ -306,6 +340,7 @@ func TestSchedulerExtender(t *testing.T) { URLPrefix: es2.URL, FilterVerb: filter, PrioritizeVerb: prioritize, + BindVerb: bind, Weight: 4, EnableHttps: false, }, @@ -402,5 +437,13 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) } else if myPod.Spec.NodeName != "machine2" { t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName) } + var gracePeriod int64 + if err := cs.Core().Pods(ns.Name).Delete(myPod.Name, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil { + t.Fatalf("Failed to delete pod: %v", err) + } + _, err = cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}) + if err == nil { + t.Fatalf("Failed to delete pod: %v", err) + } t.Logf("Scheduled pod using extenders") }