Merge pull request #757 from lavalamp/repCtl

Remove etcd dep from controller manager
pull/6/head
brendandburns 2014-08-05 11:47:31 -07:00
commit cc4300c3ec
4 changed files with 40 additions and 41 deletions

View File

@ -14,11 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// The controller manager is responsible for monitoring replication controllers, and creating corresponding // The controller manager is responsible for monitoring replication
// pods to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the // controllers, and creating corresponding pods to achieve the desired
// master to create/delete pods. // state. It uses the API to listen for new controllers and to create/delete
// // pods.
// TODO: Refactor the etcd watch code so that it is a pluggable interface.
package main package main
import ( import (
@ -29,19 +28,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog" "github.com/golang/glog"
) )
var ( var (
etcdServerList util.StringList
master = flag.String("master", "", "The address of the Kubernetes API server") master = flag.String("master", "", "The address of the Kubernetes API server")
) )
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated")
}
func main() { func main() {
flag.Parse() flag.Parse()
util.InitLogs() util.InitLogs()
@ -49,15 +42,11 @@ func main() {
verflag.PrintAndExitIfRequested() verflag.PrintAndExitIfRequested()
if len(etcdServerList) == 0 || len(*master) == 0 { if len(*master) == 0 {
glog.Fatal("usage: controller-manager -etcd_servers <servers> -master <master>") glog.Fatal("usage: controller-manager -master <master>")
} }
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
controllerManager := controller.MakeReplicationManager( controllerManager := controller.MakeReplicationManager(
etcd.NewClient(etcdServerList),
client.New("http://"+*master, nil)) client.New("http://"+*master, nil))
controllerManager.Run(10 * time.Second) controllerManager.Run(10 * time.Second)

View File

@ -103,7 +103,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
}) })
handler.delegate = m.ConstructHandler("/api/v1beta1") handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcdClient, cl) controllerManager := controller.MakeReplicationManager(cl)
// Prove that controllerManager's watch works by making it not sync until after this // Prove that controllerManager's watch works by making it not sync until after this
// test is over. (Hopefully we don't take 10 minutes!) // test is over. (Hopefully we don't take 10 minutes!)

View File

@ -23,18 +23,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd // ReplicationManager is responsible for synchronizing ReplicationController objects stored
// with actual running pods. // in the system with actual running pods.
// TODO: Allow choice of switching between etcd/apiserver watching, or remove etcd references
// from this file completely.
type ReplicationManager struct { type ReplicationManager struct {
etcdHelper tools.EtcdHelper
kubeClient client.Interface kubeClient client.Interface
podControl PodControlInterface podControl PodControlInterface
syncTime <-chan time.Time syncTime <-chan time.Time
@ -81,10 +77,9 @@ func (r RealPodControl) deletePod(podID string) error {
} }
// MakeReplicationManager creates a new ReplicationManager. // MakeReplicationManager creates a new ReplicationManager.
func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.Interface) *ReplicationManager { func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager {
rm := &ReplicationManager{ rm := &ReplicationManager{
kubeClient: kubeClient, kubeClient: kubeClient,
etcdHelper: tools.EtcdHelper{etcdClient, api.Encoding, api.Versioning},
podControl: RealPodControl{ podControl: RealPodControl{
kubeClient: kubeClient, kubeClient: kubeClient,
}, },
@ -100,11 +95,6 @@ func (rm *ReplicationManager) Run(period time.Duration) {
go util.Forever(func() { rm.watchControllers() }, period) go util.Forever(func() { rm.watchControllers() }, period)
} }
// makeEtcdWatch starts watching via etcd.
func (rm *ReplicationManager) makeEtcdWatch() (watch.Interface, error) {
return rm.etcdHelper.WatchList("/registry/controllers", tools.Everything)
}
// makeAPIWatch starts watching via the apiserver. // makeAPIWatch starts watching via the apiserver.
func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) { func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) {
// TODO: Fix this ugly type assertion. // TODO: Fix this ugly type assertion.
@ -190,12 +180,15 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
} }
func (rm *ReplicationManager) synchronize() { func (rm *ReplicationManager) synchronize() {
// TODO: remove this method completely and rely on the watch.
// Add resource version tracking to watch to make this work.
var controllerSpecs []api.ReplicationController var controllerSpecs []api.ReplicationController
err := rm.etcdHelper.ExtractList("/registry/controllers", &controllerSpecs) list, err := rm.kubeClient.ListReplicationControllers(labels.Everything())
if err != nil { if err != nil {
glog.Errorf("Synchronization error: %v (%#v)", err, err) glog.Errorf("Synchronization error: %v (%#v)", err, err)
return return
} }
controllerSpecs = list.Items
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(controllerSpecs)) wg.Add(len(controllerSpecs))
for ix := range controllerSpecs { for ix := range controllerSpecs {

View File

@ -19,6 +19,7 @@ package controller
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"sync" "sync"
@ -116,7 +117,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
fakePodControl := FakePodControl{} fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client) manager := MakeReplicationManager(client)
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2) controllerSpec := makeReplicationController(2)
@ -136,7 +137,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
fakePodControl := FakePodControl{} fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client) manager := MakeReplicationManager(client)
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(1) controllerSpec := makeReplicationController(1)
@ -156,7 +157,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) {
fakePodControl := FakePodControl{} fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, client) manager := MakeReplicationManager(client)
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2) controllerSpec := makeReplicationController(2)
@ -282,14 +283,31 @@ func TestSyncronize(t *testing.T) {
}, },
} }
fakeHandler := util.FakeHandler{ fakePodHandler := util.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: "{\"apiVersion\": \"v1beta1\", \"kind\": \"PodList\"}", ResponseBody: "{\"apiVersion\": \"v1beta1\", \"kind\": \"PodList\"}",
T: t, T: t,
} }
testServer := httptest.NewTLSServer(&fakeHandler) fakeControllerHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: api.EncodeOrDie(&api.ReplicationControllerList{
Items: []api.ReplicationController{
controllerSpec1,
controllerSpec2,
},
}),
T: t,
}
mux := http.NewServeMux()
mux.Handle("/api/v1beta1/pods/", &fakePodHandler)
mux.Handle("/api/v1beta1/replicationControllers/", &fakeControllerHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewTLSServer(mux)
client := client.New(testServer.URL, nil) client := client.New(testServer.URL, nil)
manager := MakeReplicationManager(fakeEtcd, client) manager := MakeReplicationManager(client)
fakePodControl := FakePodControl{} fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
@ -299,9 +317,8 @@ func TestSyncronize(t *testing.T) {
} }
func TestWatchControllers(t *testing.T) { func TestWatchControllers(t *testing.T) {
fakeEtcd := tools.MakeFakeEtcdClient(t)
fakeWatcher := watch.NewFake() fakeWatcher := watch.NewFake()
manager := MakeReplicationManager(fakeEtcd, nil) manager := MakeReplicationManager(nil)
manager.watchMaker = func() (watch.Interface, error) { manager.watchMaker = func() (watch.Interface, error) {
return fakeWatcher, nil return fakeWatcher, nil
} }