diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c330646119..0fe9e0ab79 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -113,7 +113,7 @@ func startComponents(manifestURL string) (apiServerURL string) { // Kubelet (localhost) cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) - config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd")) + config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd")) config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url")) myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) @@ -125,7 +125,7 @@ func startComponents(manifestURL string) (apiServerURL string) { // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates) - config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd")) + config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd")) otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) go util.Forever(func() { diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 7ac463858c..8d806e26df 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -138,7 +138,7 @@ func main() { if len(etcdServerList) > 0 { glog.Infof("Watching for etcd configs at %v", etcdServerList) etcdClient = etcd.NewClient(etcdServerList) - kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, 30*time.Second, cfg.Channel("etcd")) + kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd")) } // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 57baa5bf69..1538f51775 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -41,20 +41,21 @@ type SourceEtcd struct { client tools.EtcdClient updates chan<- interface{} - waitDuration time.Duration + interval time.Duration + timeout time.Duration } // NewSourceEtcd creates a config source that watches and pulls from a key in etcd -func NewSourceEtcd(key string, client tools.EtcdClient, period time.Duration, updates chan<- interface{}) *SourceEtcd { +func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface{}) *SourceEtcd { config := &SourceEtcd{ key: key, client: client, updates: updates, - waitDuration: period, + timeout: 1 * time.Minute, } glog.Infof("Watching etcd for %s", key) - go util.Forever(config.run, period) + go util.Forever(config.run, time.Second) return config } @@ -81,7 +82,7 @@ func (s *SourceEtcd) fetchNextState(fromIndex uint64) (nextIndex uint64, err err if fromIndex == 0 { response, err = s.client.Get(s.key, true, false) } else { - response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.waitDuration)) + response, err = s.client.Watch(s.key, fromIndex, false, nil, stopChannel(s.timeout)) if tools.IsEtcdWatchStoppedByUser(err) { return fromIndex, nil } @@ -125,9 +126,13 @@ func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) { return pods, nil } -// stopChannel creates a channel that is closed after a duration for use with etcd client API +// stopChannel creates a channel that is closed after a duration for use with etcd client API. +// If until is 0, the channel will never close. func stopChannel(until time.Duration) chan bool { stop := make(chan bool) + if until == 0 { + return stop + } go func() { select { case <-time.After(until): diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index 854e88bdbc..f0b025deaf 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -43,7 +43,7 @@ func TestGetEtcdData(t *testing.T) { }, E: nil, } - NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, time.Millisecond, ch) + NewSourceEtcd("/registry/hosts/machine/kubelet", fakeClient, ch) //TODO: update FakeEtcdClient.Watch to handle receiver=nil with a given index //returns an infinite stream of updates