From 79ee5aa250a3b7e7297515ea219dd76e0c12a24d Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 19 Jun 2014 18:31:38 -0700 Subject: [PATCH] Implement minion registry. Minions now a first-class object. --- cmd/integration/integration.go | 2 +- pkg/master/master.go | 23 ++++++++++-------- pkg/registry/etcd_registry.go | 16 +++++++++---- pkg/registry/etcd_registry_test.go | 2 +- pkg/registry/minion_registry_test.go | 2 +- pkg/registry/scheduler.go | 34 ++++++++++++++++++--------- pkg/registry/scheduler_test.go | 12 +++++----- pkg/registry/service_registry.go | 12 ++++++---- pkg/registry/service_registry_test.go | 10 ++++---- 9 files changed, 70 insertions(+), 43 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 7a78ea10f2..9a5c49196c 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -39,7 +39,7 @@ func main() { servers := []string{"http://localhost:4001"} log.Printf("Creating etcd client pointing to %v", servers) etcdClient := etcd.NewClient(servers) - machineList := []string{"machine"} + machineList := registry.MakeMinionRegistry([]string{"machine"}) reg := registry.MakeEtcdRegistry(etcdClient, machineList) diff --git a/pkg/master/master.go b/pkg/master/master.go index 7150726322..d27cbbddfd 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -34,8 +34,8 @@ type Master struct { podRegistry registry.PodRegistry controllerRegistry registry.ControllerRegistry serviceRegistry registry.ServiceRegistry + minionRegistry registry.MinionRegistry - minions []string random *rand.Rand storage map[string]apiserver.RESTStorage } @@ -46,37 +46,40 @@ func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master { podRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(), + minionRegistry: registry.MakeMinionRegistry(minions), } - m.init(minions, cloud) + m.init(cloud) return m } // Returns a new apiserver. func New(etcdServers, minions []string, cloud cloudprovider.Interface) *Master { etcdClient := etcd.NewClient(etcdServers) + minionRegistry := registry.MakeMinionRegistry(minions) m := &Master{ - podRegistry: registry.MakeEtcdRegistry(etcdClient, minions), - controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minions), - serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minions), + podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), + controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), + serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), + minionRegistry: minionRegistry, } - m.init(minions, cloud) + m.init(cloud) return m } -func (m *Master) init(minions []string, cloud cloudprovider.Interface) { +func (m *Master) init(cloud cloudprovider.Interface) { containerInfo := &client.HTTPContainerInfo{ Client: http.DefaultClient, Port: 10250, } - m.minions = minions m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30) go podCache.Loop() m.storage = map[string]apiserver.RESTStorage{ - "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud, podCache), + "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minionRegistry, m.podRegistry, m.random), cloud, podCache), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry), - "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions), + "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), + "minions": registry.MakeMinionRegistryStorage(m.minionRegistry), } } diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index a4df170c15..22a86e20ed 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -32,7 +32,7 @@ import ( // EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd. type EtcdRegistry struct { etcdClient util.EtcdClient - machines []string + machines MinionRegistry manifestFactory ManifestFactory } @@ -40,7 +40,7 @@ type EtcdRegistry struct { // 'client' is the connection to etcd // 'machines' is the list of machines // 'scheduler' is the scheduling algorithm to use. -func MakeEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { +func MakeEtcdRegistry(client util.EtcdClient, machines MinionRegistry) *EtcdRegistry { registry := &EtcdRegistry{ etcdClient: client, machines: machines, @@ -61,7 +61,11 @@ func (registry *EtcdRegistry) helper() *util.EtcdHelper { func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { pods := []api.Pod{} - for _, machine := range registry.machines { + machines, err := registry.machines.List() + if err != nil { + return nil, err + } + for _, machine := range machines { var machinePods []api.Pod err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) if err != nil { @@ -175,7 +179,11 @@ func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.P } func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) { - for _, machine := range registry.machines { + machines, err := registry.machines.List() + if err != nil { + return api.Pod{}, "", err + } + for _, machine := range machines { pod, err := registry.getPodForMachine(machine, podID) if err == nil { return pod, machine, nil diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index 9e794bc683..29de298a38 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -28,7 +28,7 @@ import ( ) func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { - registry := MakeEtcdRegistry(client, machines) + registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines)) registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: &MockServiceRegistry{}, } diff --git a/pkg/registry/minion_registry_test.go b/pkg/registry/minion_registry_test.go index 17b40a1bee..fbad64426f 100644 --- a/pkg/registry/minion_registry_test.go +++ b/pkg/registry/minion_registry_test.go @@ -92,7 +92,7 @@ func TestMinionRegistryStorage(t *testing.T) { if err != nil { t.Errorf("got error calling List") } - if !reflect.DeepEqual(list.(string), []string{"baz", "foo"}) { + if !reflect.DeepEqual(list.([]string), []string{"baz", "foo"}) { t.Errorf("Unexpected list value: %#v", list) } } diff --git a/pkg/registry/scheduler.go b/pkg/registry/scheduler.go index ba057407bc..e0518259cb 100644 --- a/pkg/registry/scheduler.go +++ b/pkg/registry/scheduler.go @@ -31,11 +31,11 @@ type Scheduler interface { // RandomScheduler choses machines uniformly at random. type RandomScheduler struct { - machines []string + machines MinionRegistry random rand.Rand } -func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler { +func MakeRandomScheduler(machines MinionRegistry, random rand.Rand) Scheduler { return &RandomScheduler{ machines: machines, random: random, @@ -43,35 +43,43 @@ func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler { } func (s *RandomScheduler) Schedule(pod api.Pod) (string, error) { - return s.machines[s.random.Int()%len(s.machines)], nil + machines, err := s.machines.List() + if err != nil { + return "", err + } + return machines[s.random.Int()%len(machines)], nil } // RoundRobinScheduler chooses machines in order. type RoundRobinScheduler struct { - machines []string + machines MinionRegistry currentIndex int } -func MakeRoundRobinScheduler(machines []string) Scheduler { +func MakeRoundRobinScheduler(machines MinionRegistry) Scheduler { return &RoundRobinScheduler{ machines: machines, - currentIndex: 0, + currentIndex: -1, } } func (s *RoundRobinScheduler) Schedule(pod api.Pod) (string, error) { - result := s.machines[s.currentIndex] - s.currentIndex = (s.currentIndex + 1) % len(s.machines) + machines, err := s.machines.List() + if err != nil { + return "", err + } + s.currentIndex = (s.currentIndex + 1) % len(machines) + result := machines[s.currentIndex] return result, nil } type FirstFitScheduler struct { - machines []string + machines MinionRegistry registry PodRegistry random *rand.Rand } -func MakeFirstFitScheduler(machines []string, registry PodRegistry, random *rand.Rand) Scheduler { +func MakeFirstFitScheduler(machines MinionRegistry, registry PodRegistry, random *rand.Rand) Scheduler { return &FirstFitScheduler{ machines: machines, registry: registry, @@ -91,6 +99,10 @@ func (s *FirstFitScheduler) containsPort(pod api.Pod, port api.Port) bool { } func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) { + machines, err := s.machines.List() + if err != nil { + return "", err + } machineToPods := map[string][]api.Pod{} pods, err := s.registry.ListPods(labels.Everything()) if err != nil { @@ -101,7 +113,7 @@ func (s *FirstFitScheduler) Schedule(pod api.Pod) (string, error) { machineToPods[host] = append(machineToPods[host], scheduledPod) } var machineOptions []string - for _, machine := range s.machines { + for _, machine := range machines { podFits := true for _, scheduledPod := range machineToPods[machine] { for _, container := range pod.DesiredState.Manifest.Containers { diff --git a/pkg/registry/scheduler_test.go b/pkg/registry/scheduler_test.go index 82da7b4268..e32304a83b 100644 --- a/pkg/registry/scheduler_test.go +++ b/pkg/registry/scheduler_test.go @@ -32,7 +32,7 @@ func expectSchedule(scheduler Scheduler, pod api.Pod, expected string, t *testin } func TestRoundRobinScheduler(t *testing.T) { - scheduler := MakeRoundRobinScheduler([]string{"m1", "m2", "m3", "m4"}) + scheduler := MakeRoundRobinScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"})) expectSchedule(scheduler, api.Pod{}, "m1", t) expectSchedule(scheduler, api.Pod{}, "m2", t) expectSchedule(scheduler, api.Pod{}, "m3", t) @@ -41,7 +41,7 @@ func TestRoundRobinScheduler(t *testing.T) { func TestRandomScheduler(t *testing.T) { random := rand.New(rand.NewSource(0)) - scheduler := MakeRandomScheduler([]string{"m1", "m2", "m3", "m4"}, *random) + scheduler := MakeRandomScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3", "m4"}), *random) _, err := scheduler.Schedule(api.Pod{}) expectNoError(t, err) } @@ -49,7 +49,7 @@ func TestRandomScheduler(t *testing.T) { func TestFirstFitSchedulerNothingScheduled(t *testing.T) { mockRegistry := MockPodRegistry{} r := rand.New(rand.NewSource(0)) - scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) + scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r) expectSchedule(scheduler, api.Pod{}, "m3", t) } @@ -81,7 +81,7 @@ func TestFirstFitSchedulerFirstScheduled(t *testing.T) { }, } r := rand.New(rand.NewSource(0)) - scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) + scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r) expectSchedule(scheduler, makePod("", 8080), "m3", t) } @@ -94,7 +94,7 @@ func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) { }, } r := rand.New(rand.NewSource(0)) - scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) + scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r) expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t) } @@ -107,7 +107,7 @@ func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) { }, } r := rand.New(rand.NewSource(0)) - scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry, r) + scheduler := MakeFirstFitScheduler(MakeMinionRegistry([]string{"m1", "m2", "m3"}), &mockRegistry, r) _, err := scheduler.Schedule(makePod("", 8080, 8081)) if err == nil { t.Error("Unexpected non-error.") diff --git a/pkg/registry/service_registry.go b/pkg/registry/service_registry.go index b55912054d..419719869d 100644 --- a/pkg/registry/service_registry.go +++ b/pkg/registry/service_registry.go @@ -30,14 +30,14 @@ import ( type ServiceRegistryStorage struct { registry ServiceRegistry cloud cloudprovider.Interface - hosts []string + machines MinionRegistry } -func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, hosts []string) apiserver.RESTStorage { +func MakeServiceRegistryStorage(registry ServiceRegistry, cloud cloudprovider.Interface, machines MinionRegistry) apiserver.RESTStorage { return &ServiceRegistryStorage{ registry: registry, cloud: cloud, - hosts: hosts, + machines: machines, } } @@ -117,7 +117,11 @@ func (sr *ServiceRegistryStorage) Create(obj interface{}) (<-chan interface{}, e balancer, ok = sr.cloud.TCPLoadBalancer() } if ok && balancer != nil { - err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts) + hosts, err := sr.machines.List() + if err != nil { + return nil, err + } + err = balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, hosts) if err != nil { return nil, err } diff --git a/pkg/registry/service_registry_test.go b/pkg/registry/service_registry_test.go index 2b6d09cb52..109a9b7f7c 100644 --- a/pkg/registry/service_registry_test.go +++ b/pkg/registry/service_registry_test.go @@ -29,7 +29,7 @@ func TestServiceRegistry(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) + storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -51,7 +51,7 @@ func TestServiceRegistryExternalService(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) + storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -76,7 +76,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { } machines := []string{"foo", "bar", "baz"} - storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) + storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -99,7 +99,7 @@ func TestServiceRegistryDelete(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) + storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -123,7 +123,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { fakeCloud := &cloudprovider.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := MakeServiceRegistryStorage(memory, fakeCloud, machines) + storage := MakeServiceRegistryStorage(memory, fakeCloud, MakeMinionRegistry(machines)) svc := api.Service{ JSONBase: api.JSONBase{ID: "foo"},