Merge pull request #1578 from brendandburns/controller

Extract the service controller from the apiserver.
pull/6/head
Daniel Smith 2014-10-06 12:44:41 -07:00
commit cc086908aa
5 changed files with 18 additions and 35 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
masterPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/golang/glog"
@ -64,7 +65,11 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(*address, strconv.Itoa(*port)), nil)
endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := controller.NewReplicationManager(kubeClient)
controllerManager.Run(10 * time.Second)
select {}
}

View File

@ -41,6 +41,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
@ -136,6 +137,9 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Scheduler
scheduler.New((&factory.ConfigFactory{cl}).Create()).Run()
endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := controller.NewReplicationManager(cl)
// Prove that controllerManager's watch works by making it not sync until after this

View File

@ -35,7 +35,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -132,9 +131,6 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
podCache := NewPodCache(podInfoGetter, m.podRegistry)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
endpoints := servicecontroller.NewEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
m.storage = map[string]apiserver.RESTStorage{
"pods": pod.NewREST(&pod.RESTConfig{
CloudProvider: cloud,

View File

@ -25,7 +25,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
@ -34,13 +33,11 @@ import (
// EndpointController manages service endpoints.
type EndpointController struct {
client *client.Client
serviceRegistry service.Registry
}
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(serviceRegistry service.Registry, client *client.Client) *EndpointController {
func NewEndpointController(client *client.Client) *EndpointController {
return &EndpointController{
serviceRegistry: serviceRegistry,
client: client,
}
}

View File

@ -26,7 +26,6 @@ import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@ -160,8 +159,7 @@ func TestSyncEndpointsEmpty(t *testing.T) {
serverResponse{http.StatusOK, api.ServiceList{}},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -173,10 +171,7 @@ func TestSyncEndpointsError(t *testing.T) {
serverResponse{http.StatusInternalServerError, api.ServiceList{}},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{
Err: fmt.Errorf("test error"),
}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err == nil {
t.Errorf("unexpected non-error")
}
@ -204,8 +199,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Endpoints: []string{"6.7.8.9:1000"},
}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -240,8 +234,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
Endpoints: []string{"1.2.3.4:8080"},
}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -264,8 +257,7 @@ func TestSyncEndpointsItems(t *testing.T) {
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -293,18 +285,7 @@ func TestSyncEndpointsPodError(t *testing.T) {
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
serviceRegistry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
Selector: map[string]string{
"foo": "bar",
},
},
},
},
}
endpoints := NewEndpointController(&serviceRegistry, client)
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err == nil {
t.Error("Unexpected non-error")
}