From e74ac01a627e9c775b8e085f179501b99f48b3fb Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 17 Jun 2014 16:42:29 -0700 Subject: [PATCH] Move run logic into package --- cmd/controller-manager/controller-manager.go | 4 +- pkg/controller/replication_controller.go | 59 ++++++++++---------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index 82175e67f8..7b24f1a429 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -29,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -53,7 +52,6 @@ func main() { Host: "http://" + *master, }) - go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) - go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second) + controllerManager.Run(10 * time.Second) select {} } diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 6e6e1f68db..df4efad36f 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -84,7 +84,13 @@ func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInt } } -func (rm *ReplicationManager) WatchControllers() { +// Begin watching and syncing. +func (rm *ReplicationManager) Run(period time.Duration) { + go util.Forever(func() { rm.synchronize() }, period) + go util.Forever(func() { rm.watchControllers() }, period) +} + +func (rm *ReplicationManager) watchControllers() { watchChannel := make(chan *etcd.Response) go func() { defer util.HandleCrash() @@ -166,34 +172,31 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli return nil } -func (rm *ReplicationManager) Synchronize() { - for { - response, err := rm.etcdClient.Get("/registry/controllers", false, false) - if err != nil { - log.Printf("Synchronization error %#v", err) - } - // TODO(bburns): There is a race here, if we get a version of the controllers, and then it is - // updated, its possible that the watch will pick up the change first, and then we will execute - // using the old version of the controller. - // Probably the correct thing to do is to use the version number in etcd to detect when - // we are stale. - // Punting on this for now, but this could lead to some nasty bugs, so we should really fix it - // sooner rather than later. - if response != nil && response.Node != nil && response.Node.Nodes != nil { - for _, value := range response.Node.Nodes { - var controllerSpec api.ReplicationController - err := json.Unmarshal([]byte(value.Value), &controllerSpec) - if err != nil { - log.Printf("Unexpected error: %#v", err) - continue - } - log.Printf("Synchronizing %s\n", controllerSpec.ID) - err = rm.syncReplicationController(controllerSpec) - if err != nil { - log.Printf("Error synchronizing: %#v", err) - } +func (rm *ReplicationManager) synchronize() { + response, err := rm.etcdClient.Get("/registry/controllers", false, false) + if err != nil { + log.Printf("Synchronization error %#v", err) + } + // TODO(bburns): There is a race here, if we get a version of the controllers, and then it is + // updated, its possible that the watch will pick up the change first, and then we will execute + // using the old version of the controller. + // Probably the correct thing to do is to use the version number in etcd to detect when + // we are stale. + // Punting on this for now, but this could lead to some nasty bugs, so we should really fix it + // sooner rather than later. + if response != nil && response.Node != nil && response.Node.Nodes != nil { + for _, value := range response.Node.Nodes { + var controllerSpec api.ReplicationController + err := json.Unmarshal([]byte(value.Value), &controllerSpec) + if err != nil { + log.Printf("Unexpected error: %#v", err) + continue + } + log.Printf("Synchronizing %s\n", controllerSpec.ID) + err = rm.syncReplicationController(controllerSpec) + if err != nil { + log.Printf("Error synchronizing: %#v", err) } } - time.Sleep(10 * time.Second) } }