Make the service reconciller use the API, not a PodRegistry

pull/6/head
Brendan Burns 2014-07-18 13:22:26 -07:00
parent fda69bcca2
commit c6255afe37
5 changed files with 105 additions and 48 deletions

View File

@ -76,11 +76,13 @@ func main() {
Port: *minionPort, Port: *minionPort,
} }
client := client.New("http://localhost:8080", nil)
var m *master.Master var m *master.Master
if len(etcdServerList) > 0 { if len(etcdServerList) > 0 {
m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp) m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp, client)
} else { } else {
m = master.NewMemoryServer(machineList, podInfoGetter, cloud) m = master.NewMemoryServer(machineList, podInfoGetter, cloud, client)
} }
glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix))

View File

@ -66,19 +66,33 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
return c.GetPodInfo("localhost", podID) return c.GetPodInfo("localhost", podID)
} }
type delegateHandler struct {
delegate http.Handler
}
func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if h.delegate != nil {
h.delegate.ServeHTTP(w, req)
}
w.WriteHeader(http.StatusNotFound)
}
func startComponents(manifestURL string) (apiServerURL string) { func startComponents(manifestURL string) (apiServerURL string) {
// Setup // Setup
servers := []string{"http://localhost:4001"} servers := []string{"http://localhost:4001"}
glog.Infof("Creating etcd client pointing to %v", servers) glog.Infof("Creating etcd client pointing to %v", servers)
machineList := []string{"localhost", "machine"} machineList := []string{"localhost", "machine"}
// Master handler := delegateHandler{}
m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "") apiserver := httptest.NewServer(&handler)
apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1"))
cl := client.New(apiserver.URL, nil) cl := client.New(apiserver.URL, nil)
cl.PollPeriod = time.Second * 1 cl.PollPeriod = time.Second * 1
cl.Sync = true cl.Sync = true
// Master
m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "", cl)
handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl) controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl)
controllerManager.Run(1 * time.Second) controllerManager.Run(1 * time.Second)

View File

@ -38,22 +38,24 @@ type Master struct {
serviceRegistry registry.ServiceRegistry serviceRegistry registry.ServiceRegistry
minionRegistry registry.MinionRegistry minionRegistry registry.MinionRegistry
storage map[string]apiserver.RESTStorage storage map[string]apiserver.RESTStorage
client *client.Client
} }
// NewMemoryServer returns a new instance of Master backed with memory (not etcd). // NewMemoryServer returns a new instance of Master backed with memory (not etcd).
func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, client *client.Client) *Master {
m := &Master{ m := &Master{
podRegistry: registry.MakeMemoryRegistry(), podRegistry: registry.MakeMemoryRegistry(),
controllerRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(),
serviceRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(),
minionRegistry: registry.MakeMinionRegistry(minions), minionRegistry: registry.MakeMinionRegistry(minions),
client: client,
} }
m.init(cloud, podInfoGetter) m.init(cloud, podInfoGetter)
return m return m
} }
// New returns a new instance of Master connected to the given etcdServer. // New returns a new instance of Master connected to the given etcdServer.
func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string, client *client.Client) *Master {
etcdClient := etcd.NewClient(etcdServers) etcdClient := etcd.NewClient(etcdServers)
minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp)
m := &Master{ m := &Master{
@ -61,6 +63,7 @@ func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, clou
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry, minionRegistry: minionRegistry,
client: client,
} }
m.init(cloud, podInfoGetter) m.init(cloud, podInfoGetter)
return m return m
@ -92,7 +95,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
// Run begins serving the Kubernetes API. It never returns. // Run begins serving the Kubernetes API. It never returns.
func (m *Master) Run(myAddress, apiPrefix string) error { func (m *Master) Run(myAddress, apiPrefix string) error {
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
s := &http.Server{ s := &http.Server{
@ -109,7 +112,7 @@ func (m *Master) Run(myAddress, apiPrefix string) error {
// Instead of calling Run, you can call this function to get a handler for your own server. // Instead of calling Run, you can call this function to get a handler for your own server.
// It is intended for testing. Only call once. // It is intended for testing. Only call once.
func (m *Master) ConstructHandler(apiPrefix string) http.Handler { func (m *Master) ConstructHandler(apiPrefix string) http.Handler {
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
return apiserver.New(m.storage, apiPrefix) return apiserver.New(m.storage, apiPrefix)

View File

@ -22,21 +22,22 @@ import (
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
func MakeEndpointController(serviceRegistry ServiceRegistry, podRegistry PodRegistry) *EndpointController { func MakeEndpointController(serviceRegistry ServiceRegistry, client *client.Client) *EndpointController {
return &EndpointController{ return &EndpointController{
serviceRegistry: serviceRegistry, serviceRegistry: serviceRegistry,
podRegistry: podRegistry, client: client,
} }
} }
type EndpointController struct { type EndpointController struct {
serviceRegistry ServiceRegistry serviceRegistry ServiceRegistry
podRegistry PodRegistry client *client.Client
} }
func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) {
@ -62,18 +63,19 @@ func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int,
func (e *EndpointController) SyncServiceEndpoints() error { func (e *EndpointController) SyncServiceEndpoints() error {
services, err := e.serviceRegistry.ListServices() services, err := e.serviceRegistry.ListServices()
if err != nil { if err != nil {
glog.Errorf("Failed to list services!")
return err return err
} }
var resultErr error var resultErr error
for _, service := range services.Items { for _, service := range services.Items {
pods, err := e.podRegistry.ListPods(labels.Set(service.Selector).AsSelector()) pods, err := e.client.ListPods(labels.Set(service.Selector).AsSelector())
if err != nil { if err != nil {
glog.Errorf("Error syncing service: %#v, skipping.", service) glog.Errorf("Error syncing service: %#v, skipping.", service)
resultErr = err resultErr = err
continue continue
} }
endpoints := make([]string, len(pods)) endpoints := make([]string, len(pods.Items))
for ix, pod := range pods { for ix, pod := range pods.Items {
port, err := findPort(&pod.DesiredState.Manifest, service.ContainerPort) port, err := findPort(&pod.DesiredState.Manifest, service.ContainerPort)
if err != nil { if err != nil {
glog.Errorf("Failed to find port for service: %v, %v", service, err) glog.Errorf("Failed to find port for service: %v, %v", service, err)

View File

@ -17,13 +17,46 @@ limitations under the License.
package registry package registry
import ( import (
"encoding/json"
"fmt" "fmt"
"net/http/httptest"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
func makePodList(count int) api.PodList {
pods := []api.Pod{}
for i := 0; i < count; i++ {
pods = append(pods, api.Pod{
JSONBase: api.JSONBase{
ID: fmt.Sprintf("pod%d", i),
},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Ports: []api.Port{
{
ContainerPort: 8080,
},
},
},
},
},
},
CurrentState: api.PodState{
PodIP: "1.2.3.4",
},
})
}
return api.PodList{
Items: pods,
}
}
func TestFindPort(t *testing.T) { func TestFindPort(t *testing.T) {
manifest := api.ContainerManifest{ manifest := api.ContainerManifest{
Containers: []api.Container{ Containers: []api.Container{
@ -78,21 +111,35 @@ func TestFindPort(t *testing.T) {
} }
func TestSyncEndpointsEmpty(t *testing.T) { func TestSyncEndpointsEmpty(t *testing.T) {
serviceRegistry := MockServiceRegistry{} body, _ := json.Marshal(makePodList(0))
podRegistry := MockPodRegistry{} fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) serviceRegistry := MockServiceRegistry{}
endpoints := MakeEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints() err := endpoints.SyncServiceEndpoints()
expectNoError(t, err) expectNoError(t, err)
} }
func TestSyncEndpointsError(t *testing.T) { func TestSyncEndpointsError(t *testing.T) {
body, _ := json.Marshal(makePodList(0))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{ serviceRegistry := MockServiceRegistry{
err: fmt.Errorf("test error"), err: fmt.Errorf("test error"),
} }
podRegistry := MockPodRegistry{}
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) endpoints := MakeEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints() err := endpoints.SyncServiceEndpoints()
if err != serviceRegistry.err { if err != serviceRegistry.err {
t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err) t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err)
@ -100,6 +147,14 @@ func TestSyncEndpointsError(t *testing.T) {
} }
func TestSyncEndpointsItems(t *testing.T) { func TestSyncEndpointsItems(t *testing.T) {
body, _ := json.Marshal(makePodList(1))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{ serviceRegistry := MockServiceRegistry{
list: api.ServiceList{ list: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
@ -111,30 +166,8 @@ func TestSyncEndpointsItems(t *testing.T) {
}, },
}, },
} }
podRegistry := MockPodRegistry{
pods: []api.Pod{
{
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Ports: []api.Port{
{
HostPort: 8080,
},
},
},
},
},
},
Labels: map[string]string{
"foo": "bar",
},
},
},
}
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) endpoints := MakeEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints() err := endpoints.SyncServiceEndpoints()
expectNoError(t, err) expectNoError(t, err)
if len(serviceRegistry.endpoints.Endpoints) != 1 { if len(serviceRegistry.endpoints.Endpoints) != 1 {
@ -143,6 +176,12 @@ func TestSyncEndpointsItems(t *testing.T) {
} }
func TestSyncEndpointsPodError(t *testing.T) { func TestSyncEndpointsPodError(t *testing.T) {
fakeHandler := util.FakeHandler{
StatusCode: 500,
}
testServer := httptest.NewTLSServer(&fakeHandler)
client := client.New(testServer.URL, nil)
serviceRegistry := MockServiceRegistry{ serviceRegistry := MockServiceRegistry{
list: api.ServiceList{ list: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
@ -154,11 +193,8 @@ func TestSyncEndpointsPodError(t *testing.T) {
}, },
}, },
} }
podRegistry := MockPodRegistry{
err: fmt.Errorf("test error."),
}
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) endpoints := MakeEndpointController(&serviceRegistry, client)
err := endpoints.SyncServiceEndpoints() err := endpoints.SyncServiceEndpoints()
if err == nil { if err == nil {
t.Error("Unexpected non-error") t.Error("Unexpected non-error")