Merge pull request #1536 from brendandburns/resource

Add in resource fit as a predicate.  Manually set the available resources.
pull/6/head
bgrant0607 2014-10-03 15:29:00 -07:00
commit bed5306434
25 changed files with 280 additions and 103 deletions

View File

@ -27,11 +27,13 @@ import (
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/golang/glog"
@ -52,6 +54,9 @@ var (
machineList util.StringList
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
)
func init() {
@ -150,6 +155,12 @@ func main() {
MinionCacheTTL: *minionCacheTTL,
MinionRegexp: *minionRegexp,
PodInfoGetter: podInfoGetter,
NodeResources: api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(*nodeMilliCPU),
resources.Memory: util.NewIntOrStringFromInt(*nodeMemory),
},
},
})
mux := http.NewServeMux()

View File

@ -13,6 +13,7 @@
"containers": [{
"name": "php-redis",
"image": "brendanburns/php-redis",
"memory": 10000000,
"ports": [{"containerPort": 80, "hostPort": 8000}]
}]
}

View File

@ -26,6 +26,7 @@ import (
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/ec2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
@ -179,3 +180,7 @@ func (aws *AWSCloud) List(filter string) ([]string, error) {
// TODO: Should really use tag query. No need to go regexp.
return aws.getInstancesByRegex(filter)
}
func (v *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}

View File

@ -18,6 +18,8 @@ package cloudprovider
import (
"net"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Interface is an abstract, pluggable interface for cloud providers.
@ -49,6 +51,8 @@ type Instances interface {
IPAddress(name string) (net.IP, error)
// List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn)
List(filter string) ([]string, error)
// GetNodeResources gets the resources for a particular node
GetNodeResources(name string) (*api.NodeResources, error)
}
// Zone represents the location of a particular machine.

View File

@ -20,6 +20,7 @@ import (
"net"
"regexp"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
@ -30,6 +31,8 @@ type FakeCloud struct {
Calls []string
IP net.IP
Machines []string
NodeResources *api.NodeResources
cloudprovider.Zone
}
@ -110,3 +113,8 @@ func (f *FakeCloud) GetZone() (cloudprovider.Zone, error) {
f.addCall("get-zone")
return f.Zone, f.Err
}
func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) {
f.addCall("get-node-resources")
return f.NodeResources, f.Err
}

View File

@ -29,7 +29,10 @@ import (
"code.google.com/p/goauth2/compute/serviceaccount"
compute "code.google.com/p/google-api-go-client/compute/v1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
@ -254,6 +257,48 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
return instances, nil
}
func makeResources(cpu float32, memory float32) *api.NodeResources {
return &api.NodeResources{
Capacity: api.ResourceList{
resources.CPU: util.NewIntOrStringFromInt(int(cpu * 1000)),
resources.Memory: util.NewIntOrStringFromInt(int(memory * 1024 * 1024 * 1024)),
},
}
}
func canonicalizeMachineType(machineType string) string {
ix := strings.LastIndex(machineType, "/")
return machineType[ix+1:]
}
func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) {
instance := canonicalizeInstanceName(name)
instanceCall := gce.service.Instances.Get(gce.projectID, gce.zone, instance)
res, err := instanceCall.Do()
if err != nil {
return nil, err
}
switch canonicalizeMachineType(res.MachineType) {
case "f1-micro":
return makeResources(1, 0.6), nil
case "g1-small":
return makeResources(1, 1.70), nil
case "n1-standard-1":
return makeResources(1, 3.75), nil
case "n1-standard-2":
return makeResources(2, 7.5), nil
case "n1-standard-4":
return makeResources(4, 15), nil
case "n1-standard-8":
return makeResources(8, 30), nil
case "n1-standard-16":
return makeResources(16, 30), nil
default:
glog.Errorf("unknown machine: %s", res.MachineType)
return nil, nil
}
}
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
region, err := getGceRegion(gce.zone)
if err != nil {

View File

@ -28,6 +28,7 @@ import (
"strings"
"code.google.com/p/gcfg"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
@ -154,3 +155,7 @@ func (v *OVirtCloud) List(filter string) ([]string, error) {
return getInstancesFromXml(response.Body)
}
func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}

View File

@ -27,6 +27,7 @@ import (
neturl "net/url"
"sort"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
@ -209,3 +210,7 @@ func (v *VagrantCloud) List(filter string) ([]string, error) {
return instances, nil
}
func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}

View File

@ -20,6 +20,7 @@ import (
"net/http"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
@ -52,6 +53,7 @@ type Config struct {
MinionCacheTTL time.Duration
MinionRegexp string
PodInfoGetter client.PodInfoGetter
NodeResources api.NodeResources
}
// Master contains state for a Kubernetes cluster master/api server.
@ -104,13 +106,13 @@ func makeMinionRegistry(c *Config) minion.Registry {
var minionRegistry minion.Registry
if c.Cloud != nil && len(c.MinionRegexp) > 0 {
var err error
minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp)
minionRegistry, err = minion.NewCloudRegistry(c.Cloud, c.MinionRegexp, &c.NodeResources)
if err != nil {
glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err)
}
}
if minionRegistry == nil {
minionRegistry = minion.NewRegistry(c.Minions)
minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources)
}
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})

View File

@ -20,6 +20,8 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type Clock interface {
@ -35,7 +37,7 @@ func (SystemClock) Now() time.Time {
type CachingRegistry struct {
delegate Registry
ttl time.Duration
minions []string
nodes *api.MinionList
lastUpdate int64
lock sync.RWMutex
clock Clock
@ -49,13 +51,13 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error)
return &CachingRegistry{
delegate: delegate,
ttl: ttl,
minions: list,
nodes: list,
lastUpdate: time.Now().Unix(),
clock: SystemClock{},
}, nil
}
func (r *CachingRegistry) Contains(minion string) (bool, error) {
func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return false, err
@ -64,8 +66,8 @@ func (r *CachingRegistry) Contains(minion string) (bool, error) {
// block updates in the middle of a contains.
r.lock.RLock()
defer r.lock.RUnlock()
for _, name := range r.minions {
if name == minion {
for _, node := range r.nodes.Items {
if node.ID == nodeID {
return true, nil
}
}
@ -86,13 +88,13 @@ func (r *CachingRegistry) Insert(minion string) error {
return r.refresh(true)
}
func (r *CachingRegistry) List() ([]string, error) {
func (r *CachingRegistry) List() (*api.MinionList, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
return r.minions, err
return r.nodes, err
}
}
return r.minions, nil
return r.nodes, nil
}
func (r *CachingRegistry) expired() bool {
@ -108,7 +110,7 @@ func (r *CachingRegistry) refresh(force bool) error {
defer r.lock.Unlock()
if force || r.expired() {
var err error
r.minions, err = r.delegate.List()
r.nodes, err = r.delegate.List()
time := r.clock.Now()
atomic.SwapInt64(&r.lastUpdate, time.Unix())
return err

View File

@ -37,13 +37,13 @@ func TestCachingHit(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
list, err := cache.List()
if err != nil {
@ -59,20 +59,20 @@ func TestCachingMiss(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
fakeClock.now = time.Unix(3, 0)
list, err := cache.List()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
@ -82,13 +82,13 @@ func TestCachingInsert(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
err := cache.Insert("foo")
if err != nil {
@ -98,7 +98,7 @@ func TestCachingInsert(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}
@ -108,13 +108,13 @@ func TestCachingDelete(t *testing.T) {
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := []string{"m1", "m2", "m3"}
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
minions: expected,
nodes: expected,
}
err := cache.Delete("m2")
if err != nil {
@ -124,7 +124,7 @@ func TestCachingDelete(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, fakeRegistry.Minions) {
if !reflect.DeepEqual(list, &fakeRegistry.Minions) {
t.Errorf("expected: %v, got %v", fakeRegistry.Minions, list)
}
}

View File

@ -19,28 +19,31 @@ package minion
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
)
type CloudRegistry struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
}
func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string) (*CloudRegistry, error) {
func NewCloudRegistry(cloud cloudprovider.Interface, matchRE string, staticResources *api.NodeResources) (*CloudRegistry, error) {
return &CloudRegistry{
cloud: cloud,
matchRE: matchRE,
staticResources: staticResources,
}, nil
}
func (r *CloudRegistry) Contains(minion string) (bool, error) {
func (r *CloudRegistry) Contains(nodeID string) (bool, error) {
instances, err := r.List()
if err != nil {
return false, err
}
for _, name := range instances {
if name == minion {
for _, node := range instances.Items {
if node.ID == nodeID {
return true, nil
}
}
@ -55,10 +58,30 @@ func (r CloudRegistry) Insert(minion string) error {
return fmt.Errorf("unsupported")
}
func (r *CloudRegistry) List() ([]string, error) {
func (r *CloudRegistry) List() (*api.MinionList, error) {
instances, ok := r.cloud.Instances()
if !ok {
return nil, fmt.Errorf("cloud doesn't support instances")
}
return instances.List(r.matchRE)
matches, err := instances.List(r.matchRE)
if err != nil {
return nil, err
}
result := &api.MinionList{
Items: make([]api.Minion, len(matches)),
}
for ix := range matches {
result.Items[ix].ID = matches[ix]
resources, err := instances.GetNodeResources(matches[ix])
if err != nil {
return nil, err
}
if resources == nil {
resources = r.staticResources
}
if resources != nil {
result.Items[ix].NodeResources = *resources
}
}
return result, err
}

View File

@ -21,6 +21,7 @@ import (
"testing"
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestCloudList(t *testing.T) {
@ -28,7 +29,7 @@ func TestCloudList(t *testing.T) {
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
registry, err := NewCloudRegistry(&fakeCloud, ".*")
registry, err := NewCloudRegistry(&fakeCloud, ".*", nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -38,7 +39,7 @@ func TestCloudList(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, instances) {
if !reflect.DeepEqual(list, registrytest.MakeMinionList(instances)) {
t.Errorf("Unexpected inequality: %#v, %#v", list, instances)
}
}
@ -48,7 +49,7 @@ func TestCloudContains(t *testing.T) {
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
registry, err := NewCloudRegistry(&fakeCloud, ".*")
registry, err := NewCloudRegistry(&fakeCloud, ".*", nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -77,7 +78,7 @@ func TestCloudListRegexp(t *testing.T) {
fakeCloud := fake_cloud.FakeCloud{
Machines: instances,
}
registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+")
registry, err := NewCloudRegistry(&fakeCloud, "m[0-9]+", nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -87,7 +88,7 @@ func TestCloudListRegexp(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
expectedList := []string{"m1", "m2"}
expectedList := registrytest.MakeMinionList([]string{"m1", "m2"})
if !reflect.DeepEqual(list, expectedList) {
t.Errorf("Unexpected inequality: %#v, %#v", list, expectedList)
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/golang/glog"
@ -65,20 +66,20 @@ func (r *HealthyRegistry) Insert(minion string) error {
return r.delegate.Insert(minion)
}
func (r *HealthyRegistry) List() (currentMinions []string, err error) {
var result []string
func (r *HealthyRegistry) List() (currentMinions *api.MinionList, err error) {
result := &api.MinionList{}
list, err := r.delegate.List()
if err != nil {
return result, err
}
for _, minion := range list {
status, err := health.DoHTTPCheck(r.makeMinionURL(minion), r.client)
for _, minion := range list.Items {
status, err := health.DoHTTPCheck(r.makeMinionURL(minion.ID), r.client)
if err != nil {
glog.Errorf("%s failed health check with error: %s", minion, err)
continue
}
if status == health.Healthy {
result = append(result, minion)
result.Items = append(result.Items, minion)
} else {
glog.Errorf("%s failed a health check, ignoring.", minion)
}

View File

@ -49,7 +49,7 @@ func TestBasicDelegation(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, mockMinionRegistry.Minions) {
if !reflect.DeepEqual(list, &mockMinionRegistry.Minions) {
t.Errorf("Expected %v, Got %v", mockMinionRegistry.Minions, list)
}
err = healthy.Insert("foo")
@ -96,7 +96,7 @@ func TestFiltering(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(list, expected) {
if !reflect.DeepEqual(list, registrytest.MakeMinionList(expected)) {
t.Errorf("Expected %v, Got %v", expected, list)
}
ok, err := healthy.Contains("m1")

View File

@ -18,9 +18,9 @@ package minion
import (
"fmt"
"sort"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@ -28,16 +28,17 @@ var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.")
// Registry keeps track of a set of minions. Safe for concurrent reading/writing.
type Registry interface {
List() (currentMinions []string, err error)
List() (currentMinions *api.MinionList, err error)
Insert(minion string) error
Delete(minion string) error
Contains(minion string) (bool, error)
}
// NewRegistry initializes a minion registry with a list of minions.
func NewRegistry(minions []string) Registry {
func NewRegistry(minions []string, nodeResources api.NodeResources) Registry {
m := &minionList{
minions: util.StringSet{},
nodeResources: nodeResources,
}
for _, minion := range minions {
m.minions.Insert(minion)
@ -48,6 +49,7 @@ func NewRegistry(minions []string) Registry {
type minionList struct {
minions util.StringSet
lock sync.Mutex
nodeResources api.NodeResources
}
func (m *minionList) Contains(minion string) (bool, error) {
@ -70,12 +72,17 @@ func (m *minionList) Insert(newMinion string) error {
return nil
}
func (m *minionList) List() (currentMinions []string, err error) {
func (m *minionList) List() (currentMinions *api.MinionList, err error) {
m.lock.Lock()
defer m.lock.Unlock()
minions := []api.Minion{}
for minion := range m.minions {
currentMinions = append(currentMinions, minion)
minions = append(minions, api.Minion{
JSONBase: api.JSONBase{ID: minion},
NodeResources: m.nodeResources,
})
}
sort.StringSlice(currentMinions).Sort()
return
return &api.MinionList{
Items: minions,
}, nil
}

View File

@ -17,12 +17,13 @@ limitations under the License.
package minion
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestRegistry(t *testing.T) {
m := NewRegistry([]string{"foo", "bar"})
m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{})
if has, err := m.Contains("foo"); !has || err != nil {
t.Errorf("missing expected object")
}
@ -48,7 +49,16 @@ func TestRegistry(t *testing.T) {
if err != nil {
t.Errorf("got error calling List")
}
if !reflect.DeepEqual(list, []string{"baz", "foo"}) {
t.Errorf("Unexpected list value: %#v", list)
if len(list.Items) != 2 || !contains(list, "foo") || !contains(list, "baz") {
t.Errorf("unexpected %v", list)
}
}
func contains(nodes *api.MinionList, nodeID string) bool {
for _, node := range nodes.Items {
if node.ID == nodeID {
return true
}
}
return false
}

View File

@ -87,15 +87,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
}
func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Object, error) {
nameList, err := rs.registry.List()
if err != nil {
return nil, err
}
var list api.MinionList
for _, name := range nameList {
list.Items = append(list.Items, *rs.toApiMinion(name))
}
return &list, nil
return rs.registry.List()
}
func (*REST) New() runtime.Object {

View File

@ -17,7 +17,6 @@ limitations under the License.
package minion
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -25,7 +24,7 @@ import (
)
func TestMinionREST(t *testing.T) {
m := NewRegistry([]string{"foo", "bar"})
m := NewRegistry([]string{"foo", "bar"}, api.NodeResources{})
ms := NewREST(m)
ctx := api.NewContext()
if obj, err := ms.Get(ctx, "foo"); err != nil || obj.(*api.Minion).ID != "foo" {
@ -78,7 +77,8 @@ func TestMinionREST(t *testing.T) {
JSONBase: api.JSONBase{ID: "foo"},
},
}
if !reflect.DeepEqual(list.(*api.MinionList).Items, expect) {
nodeList := list.(*api.MinionList)
if len(expect) != len(nodeList.Items) || !contains(nodeList, "foo") || !contains(nodeList, "baz") {
t.Errorf("Unexpected list value: %#v", list)
}
}

View File

@ -16,40 +16,54 @@ limitations under the License.
package registrytest
import "sync"
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
type MinionRegistry struct {
Err error
Minion string
Minions []string
Minions api.MinionList
sync.Mutex
}
func MakeMinionList(minions []string) *api.MinionList {
list := api.MinionList{
Items: make([]api.Minion, len(minions)),
}
for i := range minions {
list.Items[i].ID = minions[i]
}
return &list
}
func NewMinionRegistry(minions []string) *MinionRegistry {
return &MinionRegistry{
Minions: minions,
Minions: *MakeMinionList(minions),
}
}
func (r *MinionRegistry) List() ([]string, error) {
func (r *MinionRegistry) List() (*api.MinionList, error) {
r.Lock()
defer r.Unlock()
return r.Minions, r.Err
return &r.Minions, r.Err
}
func (r *MinionRegistry) Insert(minion string) error {
r.Lock()
defer r.Unlock()
r.Minion = minion
r.Minions = append(r.Minions, minion)
r.Minions.Items = append(r.Minions.Items, api.Minion{JSONBase: api.JSONBase{ID: minion}})
return r.Err
}
func (r *MinionRegistry) Contains(minion string) (bool, error) {
func (r *MinionRegistry) Contains(nodeID string) (bool, error) {
r.Lock()
defer r.Unlock()
for _, name := range r.Minions {
if name == minion {
for _, node := range r.Minions.Items {
if node.ID == nodeID {
return true, r.Err
}
}
@ -59,12 +73,12 @@ func (r *MinionRegistry) Contains(minion string) (bool, error) {
func (r *MinionRegistry) Delete(minion string) error {
r.Lock()
defer r.Unlock()
var newList []string
for _, name := range r.Minions {
if name != minion {
newList = append(newList, name)
var newList []api.Minion
for _, node := range r.Minions.Items {
if node.ID != minion {
newList = append(newList, api.Minion{JSONBase: api.JSONBase{ID: minion}})
}
}
r.Minions = newList
r.Minions.Items = newList
return r.Err
}

View File

@ -84,7 +84,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hosts)
err = balancer.CreateTCPLoadBalancer(srv.ID, zone.Region, srv.Port, hostsFromMinionList(hosts))
if err != nil {
return nil, err
}
@ -97,6 +97,14 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
}), nil
}
func hostsFromMinionList(list *api.MinionList) []string {
result := make([]string, len(list.Items))
for ix := range list.Items {
result[ix] = list.Items[ix].ID
}
return result
}
func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error) {
service, err := rs.registry.GetService(ctx, id)
if err != nil {

View File

@ -34,7 +34,7 @@ func TestServiceRegistryCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
JSONBase: api.JSONBase{ID: "foo"},
@ -156,7 +156,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
JSONBase: api.JSONBase{ID: "foo"},
@ -183,7 +183,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
Err: fmt.Errorf("test error"),
}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
svc := &api.Service{
Port: 6502,
JSONBase: api.JSONBase{ID: "foo"},
@ -206,7 +206,7 @@ func TestServiceRegistryDelete(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
svc := &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -227,7 +227,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
svc := &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -314,7 +314,7 @@ func TestServiceRegistryGet(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -334,7 +334,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -363,7 +363,7 @@ func TestServiceRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines))
storage := NewREST(registry, fakeCloud, minion.NewRegistry(machines, api.NodeResources{}))
registry.CreateService(ctx, &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{"bar": "baz"},

View File

@ -17,6 +17,8 @@ limitations under the License.
package scheduler
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
@ -24,7 +26,20 @@ import (
)
type NodeInfo interface {
GetNodeInfo(nodeName string) (api.Minion, error)
GetNodeInfo(nodeID string) (*api.Minion, error)
}
type StaticNodeInfo struct {
*api.MinionList
}
func (nodes StaticNodeInfo) GetNodeInfo(nodeID string) (*api.Minion, error) {
for ix := range nodes.Items {
if nodes.Items[ix].ID == nodeID {
return &nodes.Items[ix], nil
}
}
return nil, fmt.Errorf("failed to find node: %s", nodeID)
}
type ResourceFit struct {
@ -75,6 +90,13 @@ func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node
return fitsCPU && fitsMemory, nil
}
func NewResourceFitPredicate(info NodeInfo) FitPredicate {
fit := &ResourceFit{
info: info,
}
return fit.PodFitsResources
}
func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
for _, scheduledPod := range existingPods {
for _, container := range pod.DesiredState.Manifest.Containers {

View File

@ -26,8 +26,9 @@ import (
type FakeNodeInfo api.Minion
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (api.Minion, error) {
return api.Minion(n), nil
func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Minion, error) {
node := api.Minion(n)
return &node, nil
}
func makeResources(milliCPU int, memory int) api.NodeResources {

View File

@ -62,9 +62,19 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
nodes, err := factory.Client.ListMinions()
if err != nil {
glog.Errorf("failed to obtain minion information, aborting")
return nil
}
algo := algorithm.NewGenericScheduler(
[]algorithm.FitPredicate{
// Fit is defined based on the absence of port conflicts.
[]algorithm.FitPredicate{algorithm.PodFitsPorts},
algorithm.PodFitsPorts,
// Fit is determined by resource availability
algorithm.NewResourceFitPredicate(algorithm.StaticNodeInfo{nodes}),
},
// All nodes where things fit are equally likely (Random)
algorithm.EqualPriority,
&storeToPodLister{podCache}, r)