diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 584f358148..db5a324881 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -76,11 +76,13 @@ func main() { Port: *minionPort, } + client := client.New("http://localhost:8080", nil) + var m *master.Master if len(etcdServerList) > 0 { - m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp) + m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp, client) } else { - m = master.NewMemoryServer(machineList, podInfoGetter, cloud) + m = master.NewMemoryServer(machineList, podInfoGetter, cloud, client) } glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 0295b829c6..175c2ce9e4 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -66,19 +66,33 @@ func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) { return c.GetPodInfo("localhost", podID) } +type delegateHandler struct { + delegate http.Handler +} + +func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if h.delegate != nil { + h.delegate.ServeHTTP(w, req) + } + w.WriteHeader(http.StatusNotFound) +} + func startComponents(manifestURL string) (apiServerURL string) { // Setup servers := []string{"http://localhost:4001"} glog.Infof("Creating etcd client pointing to %v", servers) machineList := []string{"localhost", "machine"} - // Master - m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "") - apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1")) - + handler := delegateHandler{} + apiserver := httptest.NewServer(&handler) cl := client.New(apiserver.URL, nil) cl.PollPeriod = time.Second * 1 cl.Sync = true + + // Master + m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "", cl) + handler.delegate = m.ConstructHandler("/api/v1beta1") + controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl) controllerManager.Run(1 * time.Second) diff --git a/pkg/master/master.go b/pkg/master/master.go index 1296ee500f..a26cbdd655 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -38,22 +38,24 @@ type Master struct { serviceRegistry registry.ServiceRegistry minionRegistry registry.MinionRegistry storage map[string]apiserver.RESTStorage + client *client.Client } // NewMemoryServer returns a new instance of Master backed with memory (not etcd). -func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { +func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, client *client.Client) *Master { m := &Master{ podRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(), minionRegistry: registry.MakeMinionRegistry(minions), + client: client, } m.init(cloud, podInfoGetter) return m } // New returns a new instance of Master connected to the given etcdServer. -func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { +func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string, client *client.Client) *Master { etcdClient := etcd.NewClient(etcdServers) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) m := &Master{ @@ -61,6 +63,7 @@ func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, clou controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, + client: client, } m.init(cloud, podInfoGetter) return m @@ -92,7 +95,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf // Run begins serving the Kubernetes API. It never returns. func (m *Master) Run(myAddress, apiPrefix string) error { - endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) + endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) s := &http.Server{ @@ -109,7 +112,7 @@ func (m *Master) Run(myAddress, apiPrefix string) error { // Instead of calling Run, you can call this function to get a handler for your own server. // It is intended for testing. Only call once. func (m *Master) ConstructHandler(apiPrefix string) http.Handler { - endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) + endpoints := registry.MakeEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) return apiserver.New(m.storage, apiPrefix) diff --git a/pkg/registry/endpoints.go b/pkg/registry/endpoints.go index 2e3e4e6fdf..3696685e74 100644 --- a/pkg/registry/endpoints.go +++ b/pkg/registry/endpoints.go @@ -22,21 +22,22 @@ import ( "strconv" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) -func MakeEndpointController(serviceRegistry ServiceRegistry, podRegistry PodRegistry) *EndpointController { +func MakeEndpointController(serviceRegistry ServiceRegistry, client *client.Client) *EndpointController { return &EndpointController{ serviceRegistry: serviceRegistry, - podRegistry: podRegistry, + client: client, } } type EndpointController struct { serviceRegistry ServiceRegistry - podRegistry PodRegistry + client *client.Client } func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) { @@ -62,18 +63,19 @@ func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, func (e *EndpointController) SyncServiceEndpoints() error { services, err := e.serviceRegistry.ListServices() if err != nil { + glog.Errorf("Failed to list services!") return err } var resultErr error for _, service := range services.Items { - pods, err := e.podRegistry.ListPods(labels.Set(service.Selector).AsSelector()) + pods, err := e.client.ListPods(labels.Set(service.Selector).AsSelector()) if err != nil { glog.Errorf("Error syncing service: %#v, skipping.", service) resultErr = err continue } - endpoints := make([]string, len(pods)) - for ix, pod := range pods { + endpoints := make([]string, len(pods.Items)) + for ix, pod := range pods.Items { port, err := findPort(&pod.DesiredState.Manifest, service.ContainerPort) if err != nil { glog.Errorf("Failed to find port for service: %v, %v", service, err) diff --git a/pkg/registry/endpoints_test.go b/pkg/registry/endpoints_test.go index 8581bbae70..6a0b696b12 100644 --- a/pkg/registry/endpoints_test.go +++ b/pkg/registry/endpoints_test.go @@ -17,13 +17,46 @@ limitations under the License. package registry import ( + "encoding/json" "fmt" + "net/http/httptest" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +func makePodList(count int) api.PodList { + pods := []api.Pod{} + for i := 0; i < count; i++ { + pods = append(pods, api.Pod{ + JSONBase: api.JSONBase{ + ID: fmt.Sprintf("pod%d", i), + }, + DesiredState: api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + { + Ports: []api.Port{ + { + ContainerPort: 8080, + }, + }, + }, + }, + }, + }, + CurrentState: api.PodState{ + PodIP: "1.2.3.4", + }, + }) + } + return api.PodList{ + Items: pods, + } +} + func TestFindPort(t *testing.T) { manifest := api.ContainerManifest{ Containers: []api.Container{ @@ -78,21 +111,35 @@ func TestFindPort(t *testing.T) { } func TestSyncEndpointsEmpty(t *testing.T) { - serviceRegistry := MockServiceRegistry{} - podRegistry := MockPodRegistry{} + body, _ := json.Marshal(makePodList(0)) + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewTLSServer(&fakeHandler) + client := client.New(testServer.URL, nil) - endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) + serviceRegistry := MockServiceRegistry{} + + endpoints := MakeEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() expectNoError(t, err) } func TestSyncEndpointsError(t *testing.T) { + body, _ := json.Marshal(makePodList(0)) + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewTLSServer(&fakeHandler) + client := client.New(testServer.URL, nil) + serviceRegistry := MockServiceRegistry{ err: fmt.Errorf("test error"), } - podRegistry := MockPodRegistry{} - endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) + endpoints := MakeEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err != serviceRegistry.err { t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err) @@ -100,6 +147,14 @@ func TestSyncEndpointsError(t *testing.T) { } func TestSyncEndpointsItems(t *testing.T) { + body, _ := json.Marshal(makePodList(1)) + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewTLSServer(&fakeHandler) + client := client.New(testServer.URL, nil) + serviceRegistry := MockServiceRegistry{ list: api.ServiceList{ Items: []api.Service{ @@ -111,30 +166,8 @@ func TestSyncEndpointsItems(t *testing.T) { }, }, } - podRegistry := MockPodRegistry{ - pods: []api.Pod{ - { - DesiredState: api.PodState{ - Manifest: api.ContainerManifest{ - Containers: []api.Container{ - { - Ports: []api.Port{ - { - HostPort: 8080, - }, - }, - }, - }, - }, - }, - Labels: map[string]string{ - "foo": "bar", - }, - }, - }, - } - endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) + endpoints := MakeEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() expectNoError(t, err) if len(serviceRegistry.endpoints.Endpoints) != 1 { @@ -143,6 +176,12 @@ func TestSyncEndpointsItems(t *testing.T) { } func TestSyncEndpointsPodError(t *testing.T) { + fakeHandler := util.FakeHandler{ + StatusCode: 500, + } + testServer := httptest.NewTLSServer(&fakeHandler) + client := client.New(testServer.URL, nil) + serviceRegistry := MockServiceRegistry{ list: api.ServiceList{ Items: []api.Service{ @@ -154,11 +193,8 @@ func TestSyncEndpointsPodError(t *testing.T) { }, }, } - podRegistry := MockPodRegistry{ - err: fmt.Errorf("test error."), - } - endpoints := MakeEndpointController(&serviceRegistry, &podRegistry) + endpoints := MakeEndpointController(&serviceRegistry, client) err := endpoints.SyncServiceEndpoints() if err == nil { t.Error("Unexpected non-error")