Move run logic into package

pull/6/head
Daniel Smith 2014-06-17 16:42:29 -07:00
parent 65d6280936
commit e74ac01a62
2 changed files with 32 additions and 31 deletions

View File

@ -29,7 +29,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
@ -53,7 +52,6 @@ func main() {
Host: "http://" + *master, Host: "http://" + *master,
}) })
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second) controllerManager.Run(10 * time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)
select {} select {}
} }

View File

@ -84,7 +84,13 @@ func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInt
} }
} }
func (rm *ReplicationManager) WatchControllers() { // Begin watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
go util.Forever(func() { rm.synchronize() }, period)
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
go func() { go func() {
defer util.HandleCrash() defer util.HandleCrash()
@ -166,34 +172,31 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
return nil return nil
} }
func (rm *ReplicationManager) Synchronize() { func (rm *ReplicationManager) synchronize() {
for { response, err := rm.etcdClient.Get("/registry/controllers", false, false)
response, err := rm.etcdClient.Get("/registry/controllers", false, false) if err != nil {
if err != nil { log.Printf("Synchronization error %#v", err)
log.Printf("Synchronization error %#v", err) }
} // TODO(bburns): There is a race here, if we get a version of the controllers, and then it is
// TODO(bburns): There is a race here, if we get a version of the controllers, and then it is // updated, its possible that the watch will pick up the change first, and then we will execute
// updated, its possible that the watch will pick up the change first, and then we will execute // using the old version of the controller.
// using the old version of the controller. // Probably the correct thing to do is to use the version number in etcd to detect when
// Probably the correct thing to do is to use the version number in etcd to detect when // we are stale.
// we are stale. // Punting on this for now, but this could lead to some nasty bugs, so we should really fix it
// Punting on this for now, but this could lead to some nasty bugs, so we should really fix it // sooner rather than later.
// sooner rather than later. if response != nil && response.Node != nil && response.Node.Nodes != nil {
if response != nil && response.Node != nil && response.Node.Nodes != nil { for _, value := range response.Node.Nodes {
for _, value := range response.Node.Nodes { var controllerSpec api.ReplicationController
var controllerSpec api.ReplicationController err := json.Unmarshal([]byte(value.Value), &controllerSpec)
err := json.Unmarshal([]byte(value.Value), &controllerSpec) if err != nil {
if err != nil { log.Printf("Unexpected error: %#v", err)
log.Printf("Unexpected error: %#v", err) continue
continue }
} log.Printf("Synchronizing %s\n", controllerSpec.ID)
log.Printf("Synchronizing %s\n", controllerSpec.ID) err = rm.syncReplicationController(controllerSpec)
err = rm.syncReplicationController(controllerSpec) if err != nil {
if err != nil { log.Printf("Error synchronizing: %#v", err)
log.Printf("Error synchronizing: %#v", err)
}
} }
} }
time.Sleep(10 * time.Second)
} }
} }