diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index bf388e912d..efbee00538 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -14,11 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -// The controller manager is responsible for monitoring replication controllers, and creating corresponding -// pods to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the -// master to create/delete pods. -// -// TODO: Refactor the etcd watch code so that it is a pluggable interface. +// The controller manager is responsible for monitoring replication +// controllers, and creating corresponding pods to achieve the desired +// state. It uses the API to listen for new controllers and to create/delete +// pods. package main import ( @@ -29,19 +28,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" - "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) var ( - etcdServerList util.StringList - master = flag.String("master", "", "The address of the Kubernetes API server") + master = flag.String("master", "", "The address of the Kubernetes API server") ) -func init() { - flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated") -} - func main() { flag.Parse() util.InitLogs() @@ -49,15 +42,11 @@ func main() { verflag.PrintAndExitIfRequested() - if len(etcdServerList) == 0 || len(*master) == 0 { - glog.Fatal("usage: controller-manager -etcd_servers -master ") + if len(*master) == 0 { + glog.Fatal("usage: controller-manager -master ") } - // Set up logger for etcd client - etcd.SetLogger(util.NewLogger("etcd ")) - controllerManager := controller.MakeReplicationManager( - etcd.NewClient(etcdServerList), client.New("http://"+*master, nil)) controllerManager.Run(10 * time.Second) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index b983d6053a..137b4e8ac6 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -103,7 +103,7 @@ func startComponents(manifestURL string) (apiServerURL string) { }) handler.delegate = m.ConstructHandler("/api/v1beta1") - controllerManager := controller.MakeReplicationManager(etcdClient, cl) + controllerManager := controller.MakeReplicationManager(cl) // Prove that controllerManager's watch works by making it not sync until after this // test is over. (Hopefully we don't take 10 minutes!) diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 143c53bb2b..d9845c59d1 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -23,18 +23,14 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd -// with actual running pods. -// TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references -// from this file completely. +// ReplicationManager is responsible for synchronizing ReplicationController objects stored +// in the system with actual running pods. type ReplicationManager struct { - etcdHelper tools.EtcdHelper kubeClient client.Interface podControl PodControlInterface syncTime <-chan time.Time @@ -81,10 +77,9 @@ func (r RealPodControl) deletePod(podID string) error { } // MakeReplicationManager creates a new ReplicationManager. -func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Interface) *ReplicationManager { +func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager { rm := &ReplicationManager{ kubeClient: kubeClient, - etcdHelper: tools.EtcdHelper{etcdClient, api.Encoding, api.Versioning}, podControl: RealPodControl{ kubeClient: kubeClient, }, @@ -100,11 +95,6 @@ func (rm *ReplicationManager) Run(period time.Duration) { go util.Forever(func() { rm.watchControllers() }, period) } -// makeEtcdWatch starts watching via etcd. -func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) { - return rm.etcdHelper.WatchList("/registry/controllers", tools.Everything) -} - // makeAPIWatch starts watching via the apiserver. func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { // TODO: Fix this ugly type assertion. @@ -190,12 +180,15 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli } func (rm *ReplicationManager) synchronize() { + // TODO: remove this method completely and rely on the watch. + // Add resource version tracking to watch to make this work. var controllerSpecs []api.ReplicationController - err := rm.etcdHelper.ExtractList("/registry/controllers", &controllerSpecs) + list, err := rm.kubeClient.ListReplicationControllers(labels.Everything()) if err != nil { glog.Errorf("Synchronization error: %v (%#v)", err, err) return } + controllerSpecs = list.Items wg := sync.WaitGroup{} wg.Add(len(controllerSpecs)) for ix := range controllerSpecs { diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index e2845f852a..bebaa94820 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -19,6 +19,7 @@ package controller import ( "encoding/json" "fmt" + "net/http" "net/http/httptest" "reflect" "sync" @@ -116,7 +117,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { fakePodControl := FakePodControl{} - manager := MakeReplicationManager(nil, client) + manager := MakeReplicationManager(client) manager.podControl = &fakePodControl controllerSpec := makeReplicationController(2) @@ -136,7 +137,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { fakePodControl := FakePodControl{} - manager := MakeReplicationManager(nil, client) + manager := MakeReplicationManager(client) manager.podControl = &fakePodControl controllerSpec := makeReplicationController(1) @@ -156,7 +157,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) { fakePodControl := FakePodControl{} - manager := MakeReplicationManager(nil, client) + manager := MakeReplicationManager(client) manager.podControl = &fakePodControl controllerSpec := makeReplicationController(2) @@ -282,14 +283,31 @@ func TestSyncronize(t *testing.T) { }, } - fakeHandler := util.FakeHandler{ + fakePodHandler := util.FakeHandler{ StatusCode: 200, ResponseBody: "{\"apiVersion\": \"v1beta1\", \"kind\": \"PodList\"}", T: t, } - testServer := httptest.NewTLSServer(&fakeHandler) + fakeControllerHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: api.EncodeOrDie(&api.ReplicationControllerList{ + Items: []api.ReplicationController{ + controllerSpec1, + controllerSpec2, + }, + }), + T: t, + } + mux := http.NewServeMux() + mux.Handle("/api/v1beta1/pods/", &fakePodHandler) + mux.Handle("/api/v1beta1/replicationControllers/", &fakeControllerHandler) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + t.Errorf("Unexpected request for %v", req.RequestURI) + }) + testServer := httptest.NewTLSServer(mux) client := client.New(testServer.URL, nil) - manager := MakeReplicationManager(fakeEtcd, client) + manager := MakeReplicationManager(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl @@ -299,9 +317,8 @@ func TestSyncronize(t *testing.T) { } func TestWatchControllers(t *testing.T) { - fakeEtcd := tools.MakeFakeEtcdClient(t) fakeWatcher := watch.NewFake() - manager := MakeReplicationManager(fakeEtcd, nil) + manager := MakeReplicationManager(nil) manager.watchMaker = func() (watch.Interface, error) { return fakeWatcher, nil }