Merge pull request #180 from lavalamp/master

Make minions first class citizens
pull/6/head
brendandburns 2014-06-23 19:52:00 -07:00
commit 974d3f3999
12 changed files with 352 additions and 42 deletions

View File

@ -39,7 +39,7 @@ func main() {
servers := []string{"http://localhost:4001"}
log.Printf("Creating etcd client pointing to %v", servers)
etcdClient := etcd.NewClient(servers)
machineList := []string{"machine"}
machineList := registry.MakeMinionRegistry([]string{"machine"})
reg := registry.MakeEtcdRegistry(etcdClient, machineList)

View File

@ -34,6 +34,8 @@ func init() {
ReplicationController{},
ServiceList{},
Service{},
MinionList{},
Minion{},
Status{},
)
}

View File

@ -175,6 +175,20 @@ type Endpoints struct {
Endpoints []string
}
// Information about a single Minion; the name of the minion according to etcd
// is in JSONBase.ID.
type Minion struct {
JSONBase `json:",inline" yaml:",inline"`
// Queried from cloud provider, if available.
HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"`
}
// A list of minions.
type MinionList struct {
JSONBase `json:",inline" yaml:",inline"`
Minions []Minion `json:"minions,omitempty" yaml:"minions,omitempty"`
}
// Status is a return value for calls that don't return other objects.
// Arguably, this could go in apiserver, but I'm including it here so clients needn't
// import both.

View File

@ -34,8 +34,8 @@ type Master struct {
podRegistry registry.PodRegistry
controllerRegistry registry.ControllerRegistry
serviceRegistry registry.ServiceRegistry
minionRegistry registry.MinionRegistry
minions []string
random *rand.Rand
storage map[string]apiserver.RESTStorage
}
@ -46,37 +46,40 @@ func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master {
podRegistry: registry.MakeMemoryRegistry(),
controllerRegistry: registry.MakeMemoryRegistry(),
serviceRegistry: registry.MakeMemoryRegistry(),
minionRegistry: registry.MakeMinionRegistry(minions),
}
m.init(minions, cloud)
m.init(cloud)
return m
}
// Returns a new apiserver.
func New(etcdServers, minions []string, cloud cloudprovider.Interface) *Master {
etcdClient := etcd.NewClient(etcdServers)
minionRegistry := registry.MakeMinionRegistry(minions)
m := &Master{
podRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minions),
podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry,
}
m.init(minions, cloud)
m.init(cloud)
return m
}
func (m *Master) init(minions []string, cloud cloudprovider.Interface) {
func (m *Master) init(cloud cloudprovider.Interface) {
containerInfo := &client.HTTPContainerInfo{
Client: http.DefaultClient,
Port: 10250,
}
m.minions = minions
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30)
go podCache.Loop()
m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud, podCache),
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry),
}
}

View File

@ -32,7 +32,7 @@ import (
// EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd.
type EtcdRegistry struct {
etcdClient util.EtcdClient
machines []string
machines MinionRegistry
manifestFactory ManifestFactory
}
@ -40,7 +40,7 @@ type EtcdRegistry struct {
// 'client' is the connection to etcd
// 'machines' is the list of machines
// 'scheduler' is the scheduling algorithm to use.
func MakeEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
func MakeEtcdRegistry(client util.EtcdClient, machines MinionRegistry) *EtcdRegistry {
registry := &EtcdRegistry{
etcdClient: client,
machines: machines,
@ -61,7 +61,11 @@ func (registry *EtcdRegistry) helper() *util.EtcdHelper {
func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) {
pods := []api.Pod{}
for _, machine := range registry.machines {
machines, err := registry.machines.List()
if err != nil {
return nil, err
}
for _, machine := range machines {
var machinePods []api.Pod
err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil {
@ -175,7 +179,11 @@ func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.P
}
func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
for _, machine := range registry.machines {
machines, err := registry.machines.List()
if err != nil {
return api.Pod{}, "", err
}
for _, machine := range machines {
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return pod, machine, nil

View File

@ -28,7 +28,7 @@ import (
)
func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, machines)
registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines))
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"fmt"
"sort"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
var ErrDoesNotExist = fmt.Errorf("The requested resource does not exist.")
// Keep track of a set of minions. Safe for concurrent reading/writing.
type MinionRegistry interface {
List() (currentMinions []string, err error)
Insert(minion string) error
Delete(minion string) error
Contains(minion string) (bool, error)
}
// Initialize a minion registry with a list of minions.
func MakeMinionRegistry(minions []string) MinionRegistry {
m := &minionList{
minions: stringSet{},
}
for _, minion := range minions {
m.minions.insert(minion)
}
return m
}
type empty struct{}
type stringSet map[string]empty
func (s stringSet) insert(item string) {
s[item] = empty{}
}
func (s stringSet) delete(item string) {
delete(s, item)
}
func (s stringSet) has(item string) bool {
_, contained := s[item]
return contained
}
type minionList struct {
minions stringSet
lock sync.Mutex
}
func (m *minionList) List() (currentMinions []string, err error) {
m.lock.Lock()
defer m.lock.Unlock()
// Convert from map to []string
for minion := range m.minions {
currentMinions = append(currentMinions, minion)
}
sort.StringSlice(currentMinions).Sort()
return
}
func (m *minionList) Insert(newMinion string) error {
m.lock.Lock()
defer m.lock.Unlock()
m.minions.insert(newMinion)
return nil
}
func (m *minionList) Delete(minion string) error {
m.lock.Lock()
defer m.lock.Unlock()
m.minions.delete(minion)
return nil
}
func (m *minionList) Contains(minion string) (bool, error) {
m.lock.Lock()
defer m.lock.Unlock()
return m.minions.has(minion), nil
}
// MinionRegistryStorage implements the RESTStorage interface, backed by a MinionRegistry.
type MinionRegistryStorage struct {
registry MinionRegistry
}
func MakeMinionRegistryStorage(m MinionRegistry) apiserver.RESTStorage {
return &MinionRegistryStorage{
registry: m,
}
}
func (storage *MinionRegistryStorage) toApiMinion(name string) api.Minion {
return api.Minion{JSONBase: api.JSONBase{ID: name}}
}
func (storage *MinionRegistryStorage) List(selector labels.Selector) (interface{}, error) {
nameList, err := storage.registry.List()
if err != nil {
return nil, err
}
var list api.MinionList
for _, name := range nameList {
list.Minions = append(list.Minions, storage.toApiMinion(name))
}
return list, nil
}
func (storage *MinionRegistryStorage) Get(id string) (interface{}, error) {
exists, err := storage.registry.Contains(id)
if !exists {
return nil, ErrDoesNotExist
}
return storage.toApiMinion(id), err
}
func (storage *MinionRegistryStorage) Extract(body []byte) (interface{}, error) {
var minion api.Minion
err := api.DecodeInto(body, &minion)
return minion, err
}
func (storage *MinionRegistryStorage) Create(minion interface{}) (<-chan interface{}, error) {
return apiserver.MakeAsync(func() interface{} { return minion }), storage.registry.Insert(minion.(api.Minion).ID)
}
func (storage *MinionRegistryStorage) Update(minion interface{}) (<-chan interface{}, error) {
return nil, fmt.Errorf("Minions can only be created (inserted) and deleted.")
}
func (storage *MinionRegistryStorage) Delete(id string) (<-chan interface{}, error) {
exists, err := storage.registry.Contains(id)
if !exists {
return nil, ErrDoesNotExist
}
if err != nil {
return nil, err
}
return apiserver.MakeAsync(func() interface{} { return api.Status{Status: api.StatusSuccess} }), storage.registry.Delete(id)
}

View File

@ -0,0 +1,107 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
)
func TestMinionRegistry(t *testing.T) {
m := MakeMinionRegistry([]string{"foo", "bar"})
if has, err := m.Contains("foo"); !has || err != nil {
t.Errorf("missing expected object")
}
if has, err := m.Contains("bar"); !has || err != nil {
t.Errorf("missing expected object")
}
if has, err := m.Contains("baz"); has || err != nil {
t.Errorf("has unexpected object")
}
if err := m.Insert("baz"); err != nil {
t.Errorf("insert failed")
}
if has, err := m.Contains("baz"); !has || err != nil {
t.Errorf("insert didn't actually insert")
}
if err := m.Delete("bar"); err != nil {
t.Errorf("delete failed")
}
if has, err := m.Contains("bar"); has || err != nil {
t.Errorf("delete didn't actually delete")
}
list, err := m.List()
if err != nil {
t.Errorf("got error calling List")
}
if !reflect.DeepEqual(list, []string{"baz", "foo"}) {
t.Errorf("Unexpected list value: %#v", list)
}
}
func TestMinionRegistryStorage(t *testing.T) {
m := MakeMinionRegistry([]string{"foo", "bar"})
ms := MakeMinionRegistryStorage(m)
if obj, err := ms.Get("foo"); err != nil || obj.(api.Minion).ID != "foo" {
t.Errorf("missing expected object")
}
if obj, err := ms.Get("bar"); err != nil || obj.(api.Minion).ID != "bar" {
t.Errorf("missing expected object")
}
if _, err := ms.Get("baz"); err != ErrDoesNotExist {
t.Errorf("has unexpected object")
}
if _, err := ms.Create(api.Minion{JSONBase: api.JSONBase{ID: "baz"}}); err != nil {
t.Errorf("insert failed")
}
if obj, err := ms.Get("baz"); err != nil || obj.(api.Minion).ID != "baz" {
t.Errorf("insert didn't actually insert")
}
if _, err := ms.Delete("bar"); err != nil {
t.Errorf("delete failed")
}
if _, err := ms.Get("bar"); err != ErrDoesNotExist {
t.Errorf("delete didn't actually delete")
}
if _, err := ms.Delete("bar"); err != ErrDoesNotExist {
t.Errorf("delete returned wrong error")
}
list, err := ms.List(labels.Everything())
if err != nil {
t.Errorf("got error calling List")
}
expect := []api.Minion{
{
JSONBase: api.JSONBase{ID: "baz"},
}, {
JSONBase: api.JSONBase{ID: "foo"},
},
}
if !reflect.DeepEqual(list.(api.MinionList).Minions, expect) {
t.Errorf("Unexpected list value: %#v", list)
}
}

View File

@ -31,11 +31,11 @@ type Scheduler interface {
// RandomScheduler choses machines uniformly at random.
type RandomScheduler struct {
machines []string
machines MinionRegistry
random rand.Rand
}
func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler {
func MakeRandomScheduler(machines MinionRegistry, random rand.Rand) Scheduler {
return &RandomScheduler{
machines: machines,
random: random,
@ -43,35 +43,43 @@ func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler {
}
func (s *RandomScheduler) Schedule(pod api.Pod) (string, error) {
return s.machines[s.random.Int()%len(s.machines)], nil
machines, err := s.machines.List()
if err != nil {
return "", err
}
return machines[s.random.Int()%len(machines)], nil
}
// RoundRobinScheduler chooses machines in order.
type RoundRobinScheduler struct {
machines []string
machines MinionRegistry
currentIndex int
}
func MakeRoundRobinScheduler(machines []string) Scheduler {
func MakeRoundRobinScheduler(machines MinionRegistry) Scheduler {
return &RoundRobinScheduler{
machines: machines,
currentIndex: 0,
currentIndex: -1,
}
}
func (s *RoundRobinScheduler) Schedule(pod api.Pod) (string, error) {
result := s.machines[s.currentIndex]
s.currentIndex = (s.currentIndex + 1) % len(s.machines)
machines, err := s.machines.List()
if err != nil {
return "", err
}
s.currentIndex = (s.currentIndex + 1) % len(machines)
result := machines[s.currentIndex]
return result, nil
}
type FirstFitScheduler struct {
machines []string
machines MinionRegistry
registry PodRegistry
random *rand.Rand
}
func MakeFirstFitScheduler(machines []string, registry PodRegistry, random *rand.Rand) Scheduler {
func MakeFirstFitScheduler(machines MinionRegistry, registry PodRegistry, random *rand.Rand) Scheduler {
return &FirstFitScheduler{
machines: machines,
registry: registry,
@ -91,6 +99,10 @@ func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool {
}
func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) {
machines, err := s.machines.List()
if err != nil {
return "", err
}
machineToPods := map[string][]api.Pod{}
pods, err := s.registry.ListPods(labels.Everything())
if err != nil {
@ -101,7 +113,7 @@ func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) {
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
var machineOptions []string
for _, machine := range s.machines {
for _, machine := range machines {
podFits := true
for _, scheduledPod := range machineToPods[machine] {
for _, container := range pod.DesiredState.Manifest.Containers {

View File

@ -32,7 +32,7 @@ func expectSchedule(scheduler Scheduler, pod api.Pod, expected string, t *testin
}
func TestRoundRobinScheduler(t *testing.T) {
scheduler := MakeRoundRobinScheduler([]string{"m1", "m2", "m3", "m4"})
scheduler := MakeRoundRobinScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}))
expectSchedule(scheduler, api.Pod{}, "m1", t)
expectSchedule(scheduler, api.Pod{}, "m2", t)
expectSchedule(scheduler, api.Pod{}, "m3", t)
@ -41,7 +41,7 @@ func TestRoundRobinScheduler(t *testing.T) {
func TestRandomScheduler(t *testing.T) {
random := rand.New(rand.NewSource(0))
scheduler := MakeRandomScheduler([]string{"m1", "m2", "m3", "m4"}, *random)
scheduler := MakeRandomScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}), *random)
_, err := scheduler.Schedule(api.Pod{})
expectNoError(t, err)
}
@ -49,7 +49,7 @@ func TestRandomScheduler(t *testing.T) {
func TestFirstFitSchedulerNothingScheduled(t *testing.T) {
mockRegistry := MockPodRegistry{}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r)
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, api.Pod{}, "m3", t)
}
@ -81,7 +81,7 @@ func TestFirstFitSchedulerFirstScheduled(t *testing.T) {
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r)
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080), "m3", t)
}
@ -94,7 +94,7 @@ func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r)
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t)
}
@ -107,7 +107,7 @@ func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
},
}
r := rand.New(rand.NewSource(0))
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r)
scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r)
_, err := scheduler.Schedule(makePod("", 8080, 8081))
if err == nil {
t.Error("Unexpected non-error.")

View File

@ -30,14 +30,14 @@ import (
type ServiceRegistryStorage struct {
registry ServiceRegistry
cloud cloudprovider.Interface
hosts []string
machines MinionRegistry
}
func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, hosts []string) apiserver.RESTStorage {
func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage {
return &ServiceRegistryStorage{
registry: registry,
cloud: cloud,
hosts: hosts,
machines: machines,
}
}
@ -117,7 +117,11 @@ func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, e
balancer, ok = sr.cloud.TCPLoadBalancer()
}
if ok && balancer != nil {
err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts)
hosts, err := sr.machines.List()
if err != nil {
return nil, err
}
err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts)
if err != nil {
return nil, err
}

View File

@ -29,7 +29,7 @@ func TestServiceRegistry(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines)
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -51,7 +51,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines)
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -76,7 +76,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines)
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -99,7 +99,7 @@ func TestServiceRegistryDelete(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines)
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
@ -123,7 +123,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := MakeServiceRegistryStorage(memory, fakeCloud, machines)
storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines))
svc := api.Service{
JSONBase: api.JSONBase{ID: "foo"},