From dddad888b5e01c9f397484477cef9a391ea4b681 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sun, 3 Aug 2014 00:01:28 -0700 Subject: [PATCH] Begin scheduler plugin --- hack/build-go.sh | 2 +- pkg/scheduler/listers.go | 1 + pkg/scheduler/randomfit.go | 1 + plugin/cmd/scheduler/scheduler.go | 141 +++++++++++++++++++++++++ plugin/pkg/scheduler/scheduler.go | 80 ++++++++++++++ plugin/pkg/scheduler/scheduler_test.go | 110 +++++++++++++++++++ 6 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 plugin/cmd/scheduler/scheduler.go create mode 100644 plugin/pkg/scheduler/scheduler.go create mode 100644 plugin/pkg/scheduler/scheduler_test.go diff --git a/hack/build-go.sh b/hack/build-go.sh index 6a5def2016..b2044ae288 100755 --- a/hack/build-go.sh +++ b/hack/build-go.sh @@ -33,7 +33,7 @@ cd "${KUBE_REPO_ROOT}" if [[ $# == 0 ]]; then # Update $@ with the default list of targets to build. - set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg + set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg plugin/cmd/scheduler fi binaries=() diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 96b216d8b4..e48e9ad7c6 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -36,6 +36,7 @@ func (f FakeMinionLister) List() ([]string, error) { // PodLister interface represents anything that can list pods for a scheduler type PodLister interface { + // TODO: make this exactly the same as client's ListPods() method... ListPods(labels.Selector) ([]api.Pod, error) } diff --git a/pkg/scheduler/randomfit.go b/pkg/scheduler/randomfit.go index ce32d741c6..92e66dd5cc 100644 --- a/pkg/scheduler/randomfit.go +++ b/pkg/scheduler/randomfit.go @@ -57,6 +57,7 @@ func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (s return "", err } machineToPods := map[string][]api.Pod{} + // TODO: perform more targeted query... pods, err := s.podLister.ListPods(labels.Everything()) if err != nil { return "", err diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go new file mode 100644 index 0000000000..c2725ca539 --- /dev/null +++ b/plugin/cmd/scheduler/scheduler.go @@ -0,0 +1,141 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "math/rand" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" +) + +var ( + master = flag.String("master", "", "The address of the Kubernetes API server") +) + +// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. +type storeToMinionLister struct { + s cache.Store +} + +func (s storeToMinionLister) List() (machines []string, err error) { + for _, m := range s.s.List() { + machines = append(machines, m.(*api.Minion).ID) + } + return machines, nil +} + +// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. +type storeToPodLister struct { + s cache.Store +} + +func (s storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { + for _, m := range s.s.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, *pod) + } + } + return pods, nil +} + +type binder struct { + kubeClient *client.Client +} + +// Bind just does a POST binding RPC. +func (b binder) Bind(binding *api.Binding) error { + return b.kubeClient.Post().Path("bindings").Body(binding).Do().Error() +} + +func main() { + flag.Parse() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + // This function is long because we inject all the dependencies into scheduler here. + + // TODO: security story for plugins! + kubeClient := client.New("http://"+*master, nil) + + // Watch and queue pods that need scheduling. + podQueue := cache.NewFIFO() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods with no assigned host. + return kubeClient. + Get(). + Path("pods"). + Path("watch"). + SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Pod{}, podQueue).Run() + + // Watch and cache all running pods. Scheduler needs to find all pods + // so it knows where it's safe to place a pod. Cache this locally. + podCache := cache.NewStore() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods that do have an assigned host. + return kubeClient. + Get(). + Path("pods"). + Path("watch"). + ParseSelectorParam("fields", "DesiredState.Host!="). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Pod{}, podCache).Run() + + // Watch minions. + // Minions may be listed frequently, so provide a local up-to-date cache. + minionCache := cache.NewStore() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods that do have an assigned host. + return kubeClient. + Get(). + Path("minions"). + Path("watch"). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Minion{}, minionCache).Run() + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + algo := algorithm.NewRandomFitScheduler( + storeToPodLister{podCache}, r) + + s := scheduler.New(&scheduler.Config{ + MinionLister: storeToMinionLister{minionCache}, + Algorithm: algo, + NextPod: func() *api.Pod { return podQueue.Pop().(*api.Pod) }, + Binder: binder{kubeClient}, + }) + + s.Run() + + select {} +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go new file mode 100644 index 0000000000..0b5234ec70 --- /dev/null +++ b/plugin/pkg/scheduler/scheduler.go @@ -0,0 +1,80 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + // TODO: move everything from pkg/scheduler into this package. Remove references from registry. + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Binder knows how to write a binding. +type Binder interface { + Bind(binding *api.Binding) error +} + +// Scheduler watches for new unscheduled pods. It attempts to find +// minions that they fit on and writes bindings back to the api server. +type Scheduler struct { + c *Config +} + +type Config struct { + MinionLister scheduler.MinionLister + Algorithm scheduler.Scheduler + Binder Binder + + // NextPod should be a function that blocks until the next pod + // is available. We don't use a channel for this, because scheduling + // a pod may take some amount of time and we don't want pods to get + // stale while they sit in a channel. + NextPod func() *api.Pod + + // Error is called if there is an error. It is passed the pod in + // question, and the error + Error func(*api.Pod, error) +} + +// New returns a new scheduler. +func New(c *Config) *Scheduler { + s := &Scheduler{ + c: c, + } + return s +} + +// Run begins watching and scheduling. +func (s *Scheduler) Run() { + go util.Forever(s.scheduleOne, 0) +} + +func (s *Scheduler) scheduleOne() { + pod := s.c.NextPod() + dest, err := s.c.Algorithm.Schedule(*pod, s.c.MinionLister) + if err != nil { + s.c.Error(pod, err) + return + } + b := &api.Binding{ + PodID: pod.ID, + Host: dest, + } + if err := s.c.Binder.Bind(b); err != nil { + s.c.Error(pod, err) + } +} diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go new file mode 100644 index 0000000000..f6c597abb8 --- /dev/null +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" +) + +type fakeBinder struct { + b func(binding *api.Binding) error +} + +func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) } + +func podWithID(id string) *api.Pod { + return &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} +} + +type mockScheduler struct { + machine string + err error +} + +func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string, error) { + return es.machine, es.err +} + +func TestScheduler(t *testing.T) { + + errS := errors.New("scheduler") + errB := errors.New("binder") + + table := []struct { + injectBindError error + sendPod *api.Pod + algo scheduler.Scheduler + expectErrorPod *api.Pod + expectError error + expectBind *api.Binding + }{ + { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + }, { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", errS}, + expectError: errS, + expectErrorPod: podWithID("foo"), + }, { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + injectBindError: errB, + expectError: errB, + expectErrorPod: podWithID("foo"), + }, + } + + for i, item := range table { + var gotError error + var gotPod *api.Pod + var gotBinding *api.Binding + c := &Config{ + MinionLister: scheduler.FakeMinionLister{"machine1"}, + Algorithm: item.algo, + Binder: fakeBinder{func(b *api.Binding) error { + gotBinding = b + return item.injectBindError + }}, + Error: func(p *api.Pod, err error) { + gotPod = p + gotError = err + }, + NextPod: func() *api.Pod { + return item.sendPod + }, + } + s := New(c) + s.scheduleOne() + if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error pod: wanted %v, got %v", i, e, a) + } + if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error: wanted %v, got %v", i, e, a) + } + if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error: wanted %v, got %v", i, e, a) + } + } +}