Merge pull request #21262 from gmarek/backoff

Fix log gatherer
pull/6/head
Marek Grabowski 2016-02-15 17:06:26 +01:00
commit b1db8d3e9b
1 changed files with 11 additions and 9 deletions

View File

@ -218,6 +218,15 @@ func (g *LogSizeGatherer) Run() {
} }
} }
func (g *LogSizeGatherer) pushWorkItem(workItem WorkItem) {
select {
case <-time.After(time.Duration(workItem.backoffMultiplier) * pollingPeriod):
g.workChannel <- workItem
case <-g.stopChannel:
return
}
}
// Work does a single unit of work: tries to take out a WorkItem from the queue, ssh-es into a given machine, // Work does a single unit of work: tries to take out a WorkItem from the queue, ssh-es into a given machine,
// gathers data, writes it to the shared <data> map, and creates a gorouting which reinserts work item into // gathers data, writes it to the shared <data> map, and creates a gorouting which reinserts work item into
// the queue with a <pollingPeriod> delay. Returns false if worker should exit. // the queue with a <pollingPeriod> delay. Returns false if worker should exit.
@ -239,7 +248,7 @@ func (g *LogSizeGatherer) Work() bool {
if workItem.backoffMultiplier < 128 { if workItem.backoffMultiplier < 128 {
workItem.backoffMultiplier *= 2 workItem.backoffMultiplier *= 2
} }
g.workChannel <- workItem go g.pushWorkItem(workItem)
return true return true
} }
workItem.backoffMultiplier = 1 workItem.backoffMultiplier = 1
@ -255,13 +264,6 @@ func (g *LogSizeGatherer) Work() bool {
} }
g.data.AddNewData(workItem.ip, path, now, size) g.data.AddNewData(workItem.ip, path, now, size)
} }
go func() { go g.pushWorkItem(workItem)
select {
case <-time.After(time.Duration(workItem.backoffMultiplier) * pollingPeriod):
g.workChannel <- workItem
case <-g.stopChannel:
return
}
}()
return true return true
} }