From 1551b483479c64668723f8caca0057bab6091a20 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 26 Sep 2014 16:28:30 -0700 Subject: [PATCH] Add a resource fit scheduler predicate. Set sensible defaults. --- cmd/apiserver/apiserver.go | 11 +++++ examples/guestbook/frontend-controller.json | 1 + pkg/cloudprovider/aws/aws.go | 5 +++ pkg/cloudprovider/cloud.go | 4 ++ pkg/cloudprovider/fake/fake.go | 18 +++++--- pkg/cloudprovider/gce/gce.go | 45 ++++++++++++++++++++ pkg/cloudprovider/ovirt/ovirt.go | 5 +++ pkg/cloudprovider/vagrant/vagrant.go | 5 +++ pkg/master/master.go | 6 ++- pkg/registry/minion/caching_registry.go | 20 +++++---- pkg/registry/minion/caching_registry_test.go | 22 +++++----- pkg/registry/minion/cloud_registry.go | 43 ++++++++++++++----- pkg/registry/minion/cloud_registry_test.go | 11 ++--- pkg/registry/minion/healthy_registry.go | 11 ++--- pkg/registry/minion/healthy_registry_test.go | 4 +- pkg/registry/minion/minion.go | 27 +++++++----- pkg/registry/minion/minion_test.go | 18 ++++++-- pkg/registry/minion/rest.go | 10 +---- pkg/registry/minion/rest_test.go | 6 +-- pkg/registry/registrytest/minion.go | 42 ++++++++++++------ pkg/registry/service/rest.go | 10 ++++- pkg/registry/service/rest_test.go | 16 +++---- pkg/scheduler/predicates.go | 24 ++++++++++- pkg/scheduler/predicates_test.go | 5 ++- plugin/pkg/scheduler/factory/factory.go | 14 +++++- 25 files changed, 280 insertions(+), 103 deletions(-) diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 3fbd579a31..f190241f59 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -27,11 +27,13 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/golang/glog" @@ -52,6 +54,9 @@ var ( machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") + // TODO: Discover these by pinging the host machines, and rip out these flags. + nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") + nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") ) func init() { @@ -150,6 +155,12 @@ func main() { MinionCacheTTL: *minionCacheTTL, MinionRegexp: *minionRegexp, PodInfoGetter: podInfoGetter, + NodeResources: api.NodeResources{ + Capacity: api.ResourceList{ + resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU), + resources.Memory: util.NewIntOrStringFromInt(*nodeMemory), + }, + }, }) mux := http.NewServeMux() diff --git a/examples/guestbook/frontend-controller.json b/examples/guestbook/frontend-controller.json index 5116f3d721..aa0b0d92a3 100644 --- a/examples/guestbook/frontend-controller.json +++ b/examples/guestbook/frontend-controller.json @@ -13,6 +13,7 @@ "containers": [{ "name": "php-redis", "image": "brendanburns/php-redis", + "memory": 10000000, "ports": [{"containerPort": 80, "hostPort": 8000}] }] } diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 438b263077..b69feaae25 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -26,6 +26,7 @@ import ( "github.com/mitchellh/goamz/aws" "github.com/mitchellh/goamz/ec2" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) @@ -179,3 +180,7 @@ func (aws *AWSCloud) List(filter string) ([]string, error) { // TODO: Should really use tag query. No need to go regexp. return aws.getInstancesByRegex(filter) } + +func (v *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) { + return nil, nil +} diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index a8814fa382..4dfd79d417 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -18,6 +18,8 @@ package cloudprovider import ( "net" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) // Interface is an abstract, pluggable interface for cloud providers. @@ -49,6 +51,8 @@ type Instances interface { IPAddress(name string) (net.IP, error) // List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn) List(filter string) ([]string, error) + // GetNodeResources gets the resources for a particular node + GetNodeResources(name string) (*api.NodeResources, error) } // Zone represents the location of a particular machine. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index 3b4fe7c1f4..196e3f4873 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -20,16 +20,19 @@ import ( "net" "regexp" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) // FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing. type FakeCloud struct { - Exists bool - Err error - Calls []string - IP net.IP - Machines []string + Exists bool + Err error + Calls []string + IP net.IP + Machines []string + NodeResources *api.NodeResources + cloudprovider.Zone } @@ -110,3 +113,8 @@ func (f *FakeCloud) GetZone() (cloudprovider.Zone, error) { f.addCall("get-zone") return f.Zone, f.Err } + +func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) { + f.addCall("get-node-resources") + return f.NodeResources, f.Err +} diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 3fb12720af..4012b17e00 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -29,7 +29,10 @@ import ( "code.google.com/p/goauth2/compute/serviceaccount" compute "code.google.com/p/google-api-go-client/compute/v1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -254,6 +257,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) { return instances, nil } +func makeResources(cpu float32, memory float32) *api.NodeResources { + return &api.NodeResources{ + Capacity: api.ResourceList{ + resources.CPU: util.NewIntOrStringFromInt(int(cpu * 1000)), + resources.Memory: util.NewIntOrStringFromInt(int(memory * 1024 * 1024 * 1024)), + }, + } +} + +func canonicalizeMachineType(machineType string) string { + ix := strings.LastIndex(machineType, "/") + return machineType[ix+1:] +} + +func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) { + instance := canonicalizeInstanceName(name) + instanceCall := gce.service.Instances.Get(gce.projectID, gce.zone, instance) + res, err := instanceCall.Do() + if err != nil { + return nil, err + } + switch canonicalizeMachineType(res.MachineType) { + case "f1-micro": + return makeResources(1, 0.6), nil + case "g1-small": + return makeResources(1, 1.70), nil + case "n1-standard-1": + return makeResources(1, 3.75), nil + case "n1-standard-2": + return makeResources(2, 7.5), nil + case "n1-standard-4": + return makeResources(4, 15), nil + case "n1-standard-8": + return makeResources(8, 30), nil + case "n1-standard-16": + return makeResources(16, 30), nil + default: + glog.Errorf("unknown machine: %s", res.MachineType) + return nil, nil + } +} + func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) { region, err := getGceRegion(gce.zone) if err != nil { diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index a63ab080c3..99b0ec0cf6 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -28,6 +28,7 @@ import ( "strings" "code.google.com/p/gcfg" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) @@ -154,3 +155,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) { return getInstancesFromXml(response.Body) } + +func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { + return nil, nil +} diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index cdf1b5c14f..a6aa1881f8 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -27,6 +27,7 @@ import ( neturl "net/url" "sort" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) @@ -209,3 +210,7 @@ func (v *VagrantCloud) List(filter string) ([]string, error) { return instances, nil } + +func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) { + return nil, nil +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 08bc3f34bb..1286fbc43f 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -20,6 +20,7 @@ import ( "net/http" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" @@ -52,6 +53,7 @@ type Config struct { MinionCacheTTL time.Duration MinionRegexp string PodInfoGetter client.PodInfoGetter + NodeResources api.NodeResources } // Master contains state for a Kubernetes cluster master/api server. @@ -104,13 +106,13 @@ func makeMinionRegistry(c *Config) minion.Registry { var minionRegistry minion.Registry if c.Cloud != nil && len(c.MinionRegexp) > 0 { var err error - minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp) + minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp, &c.NodeResources) if err != nil { glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err) } } if minionRegistry == nil { - minionRegistry = minion.NewRegistry(c.Minions) + minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources) } if c.HealthCheckMinions { minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{}) diff --git a/pkg/registry/minion/caching_registry.go b/pkg/registry/minion/caching_registry.go index ec04939b21..604b67171b 100644 --- a/pkg/registry/minion/caching_registry.go +++ b/pkg/registry/minion/caching_registry.go @@ -20,6 +20,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) type Clock interface { @@ -35,7 +37,7 @@ func (SystemClock) Now() time.Time { type CachingRegistry struct { delegate Registry ttl time.Duration - minions []string + nodes *api.MinionList lastUpdate int64 lock sync.RWMutex clock Clock @@ -49,13 +51,13 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) return &CachingRegistry{ delegate: delegate, ttl: ttl, - minions: list, + nodes: list, lastUpdate: time.Now().Unix(), clock: SystemClock{}, }, nil } -func (r *CachingRegistry) Contains(minion string) (bool, error) { +func (r *CachingRegistry) Contains(nodeID string) (bool, error) { if r.expired() { if err := r.refresh(false); err != nil { return false, err @@ -64,8 +66,8 @@ func (r *CachingRegistry) Contains(minion string) (bool, error) { // block updates in the middle of a contains. r.lock.RLock() defer r.lock.RUnlock() - for _, name := range r.minions { - if name == minion { + for _, node := range r.nodes.Items { + if node.ID == nodeID { return true, nil } } @@ -86,13 +88,13 @@ func (r *CachingRegistry) Insert(minion string) error { return r.refresh(true) } -func (r *CachingRegistry) List() ([]string, error) { +func (r *CachingRegistry) List() (*api.MinionList, error) { if r.expired() { if err := r.refresh(false); err != nil { - return r.minions, err + return r.nodes, err } } - return r.minions, nil + return r.nodes, nil } func (r *CachingRegistry) expired() bool { @@ -108,7 +110,7 @@ func (r *CachingRegistry) refresh(force bool) error { defer r.lock.Unlock() if force || r.expired() { var err error - r.minions, err = r.delegate.List() + r.nodes, err = r.delegate.List() time := r.clock.Now() atomic.SwapInt64(&r.lastUpdate, time.Unix()) return err diff --git a/pkg/registry/minion/caching_registry_test.go b/pkg/registry/minion/caching_registry_test.go index d09ccd7743..ddef3cb6b9 100644 --- a/pkg/registry/minion/caching_registry_test.go +++ b/pkg/registry/minion/caching_registry_test.go @@ -37,13 +37,13 @@ func TestCachingHit(t *testing.T) { now: time.Unix(0, 0), } fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := []string{"m1", "m2", "m3"} + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, lastUpdate: fakeClock.Now().Unix(), - minions: expected, + nodes: expected, } list, err := cache.List() if err != nil { @@ -59,20 +59,20 @@ func TestCachingMiss(t *testing.T) { now: time.Unix(0, 0), } fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := []string{"m1", "m2", "m3"} + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, lastUpdate: fakeClock.Now().Unix(), - minions: expected, + nodes: expected, } fakeClock.now = time.Unix(3, 0) list, err := cache.List() if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, fakeRegistry.Minions) { + if !reflect.DeepEqual(list, &fakeRegistry.Minions) { t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -82,13 +82,13 @@ func TestCachingInsert(t *testing.T) { now: time.Unix(0, 0), } fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := []string{"m1", "m2", "m3"} + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, lastUpdate: fakeClock.Now().Unix(), - minions: expected, + nodes: expected, } err := cache.Insert("foo") if err != nil { @@ -98,7 +98,7 @@ func TestCachingInsert(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, fakeRegistry.Minions) { + if !reflect.DeepEqual(list, &fakeRegistry.Minions) { t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } @@ -108,13 +108,13 @@ func TestCachingDelete(t *testing.T) { now: time.Unix(0, 0), } fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}) - expected := []string{"m1", "m2", "m3"} + expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}) cache := CachingRegistry{ delegate: fakeRegistry, ttl: 1 * time.Second, clock: &fakeClock, lastUpdate: fakeClock.Now().Unix(), - minions: expected, + nodes: expected, } err := cache.Delete("m2") if err != nil { @@ -124,7 +124,7 @@ func TestCachingDelete(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, fakeRegistry.Minions) { + if !reflect.DeepEqual(list, &fakeRegistry.Minions) { t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list) } } diff --git a/pkg/registry/minion/cloud_registry.go b/pkg/registry/minion/cloud_registry.go index 9d7688ce18..61ffede347 100644 --- a/pkg/registry/minion/cloud_registry.go +++ b/pkg/registry/minion/cloud_registry.go @@ -19,28 +19,31 @@ package minion import ( "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" ) type CloudRegistry struct { - cloud cloudprovider.Interface - matchRE string + cloud cloudprovider.Interface + matchRE string + staticResources *api.NodeResources } -func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudRegistry, error) { +func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string, staticResources *api.NodeResources) (*CloudRegistry, error) { return &CloudRegistry{ - cloud: cloud, - matchRE: matchRE, + cloud: cloud, + matchRE: matchRE, + staticResources: staticResources, }, nil } -func (r *CloudRegistry) Contains(minion string) (bool, error) { +func (r *CloudRegistry) Contains(nodeID string) (bool, error) { instances, err := r.List() if err != nil { return false, err } - for _, name := range instances { - if name == minion { + for _, node := range instances.Items { + if node.ID == nodeID { return true, nil } } @@ -55,10 +58,30 @@ func (r CloudRegistry) Insert(minion string) error { return fmt.Errorf("unsupported") } -func (r *CloudRegistry) List() ([]string, error) { +func (r *CloudRegistry) List() (*api.MinionList, error) { instances, ok := r.cloud.Instances() if !ok { return nil, fmt.Errorf("cloud doesn't support instances") } - return instances.List(r.matchRE) + matches, err := instances.List(r.matchRE) + if err != nil { + return nil, err + } + result := &api.MinionList{ + Items: make([]api.Minion, len(matches)), + } + for ix := range matches { + result.Items[ix].ID = matches[ix] + resources, err := instances.GetNodeResources(matches[ix]) + if err != nil { + return nil, err + } + if resources == nil { + resources = r.staticResources + } + if resources != nil { + result.Items[ix].NodeResources = *resources + } + } + return result, err } diff --git a/pkg/registry/minion/cloud_registry_test.go b/pkg/registry/minion/cloud_registry_test.go index 8f5a8894fe..457b8c5f3a 100644 --- a/pkg/registry/minion/cloud_registry_test.go +++ b/pkg/registry/minion/cloud_registry_test.go @@ -21,6 +21,7 @@ import ( "testing" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) func TestCloudList(t *testing.T) { @@ -28,7 +29,7 @@ func TestCloudList(t *testing.T) { fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - registry, err := NewCloudRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*", nil) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -38,7 +39,7 @@ func TestCloudList(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, instances) { + if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances)) { t.Errorf("Unexpected inequality: %#v, %#v", list, instances) } } @@ -48,7 +49,7 @@ func TestCloudContains(t *testing.T) { fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - registry, err := NewCloudRegistry(&fakeCloud, ".*") + registry, err := NewCloudRegistry(&fakeCloud, ".*", nil) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -77,7 +78,7 @@ func TestCloudListRegexp(t *testing.T) { fakeCloud := fake_cloud.FakeCloud{ Machines: instances, } - registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+") + registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+", nil) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -87,7 +88,7 @@ func TestCloudListRegexp(t *testing.T) { t.Errorf("unexpected error: %v", err) } - expectedList := []string{"m1", "m2"} + expectedList := registrytest.MakeMinionList([]string{"m1", "m2"}) if !reflect.DeepEqual(list, expectedList) { t.Errorf("Unexpected inequality: %#v, %#v", list, expectedList) } diff --git a/pkg/registry/minion/healthy_registry.go b/pkg/registry/minion/healthy_registry.go index 004540223a..28dfbaf4e0 100644 --- a/pkg/registry/minion/healthy_registry.go +++ b/pkg/registry/minion/healthy_registry.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/golang/glog" @@ -65,20 +66,20 @@ func (r *HealthyRegistry) Insert(minion string) error { return r.delegate.Insert(minion) } -func (r *HealthyRegistry) List() (currentMinions []string, err error) { - var result []string +func (r *HealthyRegistry) List() (currentMinions *api.MinionList, err error) { + result := &api.MinionList{} list, err := r.delegate.List() if err != nil { return result, err } - for _, minion := range list { - status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client) + for _, minion := range list.Items { + status, err := health.DoHTTPCheck(r.makeMinionURL(minion.ID), r.client) if err != nil { glog.Errorf("%s failed health check with error: %s", minion, err) continue } if status == health.Healthy { - result = append(result, minion) + result.Items = append(result.Items, minion) } else { glog.Errorf("%s failed a health check, ignoring.", minion) } diff --git a/pkg/registry/minion/healthy_registry_test.go b/pkg/registry/minion/healthy_registry_test.go index cebcf31f1d..b8635018e1 100644 --- a/pkg/registry/minion/healthy_registry_test.go +++ b/pkg/registry/minion/healthy_registry_test.go @@ -49,7 +49,7 @@ func TestBasicDelegation(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, mockMinionRegistry.Minions) { + if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) { t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list) } err = healthy.Insert("foo") @@ -96,7 +96,7 @@ func TestFiltering(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(list, expected) { + if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected)) { t.Errorf("Expected %v, Got %v", expected, list) } ok, err := healthy.Contains("m1") diff --git a/pkg/registry/minion/minion.go b/pkg/registry/minion/minion.go index 75cf75b1a4..8bdc0392bf 100644 --- a/pkg/registry/minion/minion.go +++ b/pkg/registry/minion/minion.go @@ -18,9 +18,9 @@ package minion import ( "fmt" - "sort" "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -28,16 +28,17 @@ var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.") // Registry keeps track of a set of minions. Safe for concurrent reading/writing. type Registry interface { - List() (currentMinions []string, err error) + List() (currentMinions *api.MinionList, err error) Insert(minion string) error Delete(minion string) error Contains(minion string) (bool, error) } // NewRegistry initializes a minion registry with a list of minions. -func NewRegistry(minions []string) Registry { +func NewRegistry(minions []string, nodeResources api.NodeResources) Registry { m := &minionList{ - minions: util.StringSet{}, + minions: util.StringSet{}, + nodeResources: nodeResources, } for _, minion := range minions { m.minions.Insert(minion) @@ -46,8 +47,9 @@ func NewRegistry(minions []string) Registry { } type minionList struct { - minions util.StringSet - lock sync.Mutex + minions util.StringSet + lock sync.Mutex + nodeResources api.NodeResources } func (m *minionList) Contains(minion string) (bool, error) { @@ -70,12 +72,17 @@ func (m *minionList) Insert(newMinion string) error { return nil } -func (m *minionList) List() (currentMinions []string, err error) { +func (m *minionList) List() (currentMinions *api.MinionList, err error) { m.lock.Lock() defer m.lock.Unlock() + minions := []api.Minion{} for minion := range m.minions { - currentMinions = append(currentMinions, minion) + minions = append(minions, api.Minion{ + JSONBase: api.JSONBase{ID: minion}, + NodeResources: m.nodeResources, + }) } - sort.StringSlice(currentMinions).Sort() - return + return &api.MinionList{ + Items: minions, + }, nil } diff --git a/pkg/registry/minion/minion_test.go b/pkg/registry/minion/minion_test.go index f4dd2bfa5b..d4df16730f 100644 --- a/pkg/registry/minion/minion_test.go +++ b/pkg/registry/minion/minion_test.go @@ -17,12 +17,13 @@ limitations under the License. package minion import ( - "reflect" "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) func TestRegistry(t *testing.T) { - m := NewRegistry([]string{"foo", "bar"}) + m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{}) if has, err := m.Contains("foo"); !has || err != nil { t.Errorf("missing expected object") } @@ -48,7 +49,16 @@ func TestRegistry(t *testing.T) { if err != nil { t.Errorf("got error calling List") } - if !reflect.DeepEqual(list, []string{"baz", "foo"}) { - t.Errorf("Unexpected list value: %#v", list) + if len(list.Items) != 2 || !contains(list, "foo") || !contains(list, "baz") { + t.Errorf("unexpected %v", list) } } + +func contains(nodes *api.MinionList, nodeID string) bool { + for _, node := range nodes.Items { + if node.ID == nodeID { + return true + } + } + return false +} diff --git a/pkg/registry/minion/rest.go b/pkg/registry/minion/rest.go index c06f8d6246..f47910e3e2 100644 --- a/pkg/registry/minion/rest.go +++ b/pkg/registry/minion/rest.go @@ -87,15 +87,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { } func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) { - nameList, err := rs.registry.List() - if err != nil { - return nil, err - } - var list api.MinionList - for _, name := range nameList { - list.Items = append(list.Items, *rs.toApiMinion(name)) - } - return &list, nil + return rs.registry.List() } func (*REST) New() runtime.Object { diff --git a/pkg/registry/minion/rest_test.go b/pkg/registry/minion/rest_test.go index 0fa0672f6b..c01ecb50fe 100644 --- a/pkg/registry/minion/rest_test.go +++ b/pkg/registry/minion/rest_test.go @@ -17,7 +17,6 @@ limitations under the License. package minion import ( - "reflect" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -25,7 +24,7 @@ import ( ) func TestMinionREST(t *testing.T) { - m := NewRegistry([]string{"foo", "bar"}) + m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{}) ms := NewREST(m) ctx := api.NewContext() if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Minion).ID != "foo" { @@ -78,7 +77,8 @@ func TestMinionREST(t *testing.T) { JSONBase: api.JSONBase{ID: "foo"}, }, } - if !reflect.DeepEqual(list.(*api.MinionList).Items, expect) { + nodeList := list.(*api.MinionList) + if len(expect) != len(nodeList.Items) || !contains(nodeList, "foo") || !contains(nodeList, "baz") { t.Errorf("Unexpected list value: %#v", list) } } diff --git a/pkg/registry/registrytest/minion.go b/pkg/registry/registrytest/minion.go index f6ca412295..0cbf2fa37a 100644 --- a/pkg/registry/registrytest/minion.go +++ b/pkg/registry/registrytest/minion.go @@ -16,40 +16,54 @@ limitations under the License. package registrytest -import "sync" +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" +) type MinionRegistry struct { Err error Minion string - Minions []string + Minions api.MinionList sync.Mutex } +func MakeMinionList(minions []string) *api.MinionList { + list := api.MinionList{ + Items: make([]api.Minion, len(minions)), + } + for i := range minions { + list.Items[i].ID = minions[i] + } + return &list +} + func NewMinionRegistry(minions []string) *MinionRegistry { return &MinionRegistry{ - Minions: minions, + Minions: *MakeMinionList(minions), } } -func (r *MinionRegistry) List() ([]string, error) { +func (r *MinionRegistry) List() (*api.MinionList, error) { r.Lock() defer r.Unlock() - return r.Minions, r.Err + return &r.Minions, r.Err } func (r *MinionRegistry) Insert(minion string) error { r.Lock() defer r.Unlock() r.Minion = minion - r.Minions = append(r.Minions, minion) + r.Minions.Items = append(r.Minions.Items, api.Minion{JSONBase: api.JSONBase{ID: minion}}) return r.Err } -func (r *MinionRegistry) Contains(minion string) (bool, error) { +func (r *MinionRegistry) Contains(nodeID string) (bool, error) { r.Lock() defer r.Unlock() - for _, name := range r.Minions { - if name == minion { + for _, node := range r.Minions.Items { + if node.ID == nodeID { return true, r.Err } } @@ -59,12 +73,12 @@ func (r *MinionRegistry) Contains(minion string) (bool, error) { func (r *MinionRegistry) Delete(minion string) error { r.Lock() defer r.Unlock() - var newList []string - for _, name := range r.Minions { - if name != minion { - newList = append(newList, name) + var newList []api.Minion + for _, node := range r.Minions.Items { + if node.ID != minion { + newList = append(newList, api.Minion{JSONBase: api.JSONBase{ID: minion}}) } } - r.Minions = newList + r.Minions.Items = newList return r.Err } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 702c3c48db..ae98cb9435 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -84,7 +84,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje if err != nil { return nil, err } - err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts) + err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hostsFromMinionList(hosts)) if err != nil { return nil, err } @@ -97,6 +97,14 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje }), nil } +func hostsFromMinionList(list *api.MinionList) []string { + result := make([]string, len(list.Items)) + for ix := range list.Items { + result[ix] = list.Items[ix].ID + } + return result +} + func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) { service, err := rs.registry.GetService(ctx, id) if err != nil { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 33f3706151..2261598578 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -34,7 +34,7 @@ func TestServiceRegistryCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, @@ -156,7 +156,7 @@ func TestServiceRegistryExternalService(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, @@ -183,7 +183,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) svc := &api.Service{ Port: 6502, JSONBase: api.JSONBase{ID: "foo"}, @@ -206,7 +206,7 @@ func TestServiceRegistryDelete(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -227,7 +227,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) svc := &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -314,7 +314,7 @@ func TestServiceRegistryGet(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -334,7 +334,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -363,7 +363,7 @@ func TestServiceRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines)) + storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{})) registry.CreateService(ctx, &api.Service{ JSONBase: api.JSONBase{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 1ab2ac786d..3b4e881f0f 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -17,6 +17,8 @@ limitations under the License. package scheduler import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/resources" @@ -24,7 +26,20 @@ import ( ) type NodeInfo interface { - GetNodeInfo(nodeName string) (api.Minion, error) + GetNodeInfo(nodeID string) (*api.Minion, error) +} + +type StaticNodeInfo struct { + *api.MinionList +} + +func (nodes StaticNodeInfo) GetNodeInfo(nodeID string) (*api.Minion, error) { + for ix := range nodes.Items { + if nodes.Items[ix].ID == nodeID { + return &nodes.Items[ix], nil + } + } + return nil, fmt.Errorf("failed to find node: %s", nodeID) } type ResourceFit struct { @@ -75,6 +90,13 @@ func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node return fitsCPU && fitsMemory, nil } +func NewResourceFitPredicate(info NodeInfo) FitPredicate { + fit := &ResourceFit{ + info: info, + } + return fit.PodFitsResources +} + func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { for _, scheduledPod := range existingPods { for _, container := range pod.DesiredState.Manifest.Containers { diff --git a/pkg/scheduler/predicates_test.go b/pkg/scheduler/predicates_test.go index 388b611ee1..9f4cc0ada6 100644 --- a/pkg/scheduler/predicates_test.go +++ b/pkg/scheduler/predicates_test.go @@ -26,8 +26,9 @@ import ( type FakeNodeInfo api.Minion -func (n FakeNodeInfo) GetNodeInfo(nodeName string) (api.Minion, error) { - return api.Minion(n), nil +func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Minion, error) { + node := api.Minion(n) + return &node, nil } func makeResources(milliCPU int, memory int) api.NodeResources { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index d1331e5ab6..c504a6a5a3 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -62,9 +62,19 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } r := rand.New(rand.NewSource(time.Now().UnixNano())) + + nodes, err := factory.Client.ListMinions() + if err != nil { + glog.Errorf("failed to obtain minion information, aborting") + return nil + } algo := algorithm.NewGenericScheduler( - // Fit is defined based on the absence of port conflicts. - []algorithm.FitPredicate{algorithm.PodFitsPorts}, + []algorithm.FitPredicate{ + // Fit is defined based on the absence of port conflicts. + algorithm.PodFitsPorts, + // Fit is determined by resource availability + algorithm.NewResourceFitPredicate(algorithm.StaticNodeInfo{nodes}), + }, // All nodes where things fit are equally likely (Random) algorithm.EqualPriority, &storeToPodLister{podCache}, r)