Browse Source

Merge pull request #280 from prometheus/cleanup/rules-concurrency

Put RuleManager concurrency in hands of caller, fix races.
pull/281/head
juliusv 12 years ago
parent
commit
95adf1958b
  1. 1
      main.go
  2. 19
      rules/manager.go

1
main.go

@ -325,6 +325,7 @@ func main() {
if err != nil {
log.Fatalf("Error loading rule files: %v", err)
}
go ruleManager.Run()
go func() {
err := webService.ServeForever()

19
rules/manager.go

@ -29,10 +29,14 @@ type Result struct {
type RuleManager interface {
AddRulesFromConfig(config config.Config) error
Run()
}
type ruleManager struct {
// Protects the rules list.
sync.Mutex
rules []Rule
results chan *Result
done chan bool
interval time.Duration
@ -47,12 +51,10 @@ func NewRuleManager(results chan *Result, interval time.Duration, storage *metri
interval: interval,
storage: storage,
}
// BUG(julius): Extract this so that the caller manages concurrency.
go manager.run(results)
return manager
}
func (m *ruleManager) run(results chan *Result) {
func (m *ruleManager) Run() {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
@ -60,7 +62,7 @@ func (m *ruleManager) run(results chan *Result) {
select {
case <-ticker.C:
start := time.Now()
m.runIteration(results)
m.runIteration(m.results)
evalDurations.Add(map[string]string{intervalKey: m.interval.String()}, float64(time.Since(start)/time.Millisecond))
case <-m.done:
log.Printf("RuleManager exiting...")
@ -80,7 +82,12 @@ func (m *ruleManager) runIteration(results chan *Result) {
now := time.Now()
wg := sync.WaitGroup{}
for _, rule := range m.rules {
m.Lock()
rules := make([]Rule, len(m.rules))
copy(rules, m.rules)
m.Unlock()
for _, rule := range rules {
wg.Add(1)
// BUG(julius): Look at fixing thundering herd.
go func(rule Rule) {
@ -104,7 +111,9 @@ func (m *ruleManager) AddRulesFromConfig(config config.Config) error {
if err != nil {
return err
}
m.Lock()
m.rules = append(m.rules, newRules...)
m.Unlock()
}
return nil
}

Loading…
Cancel
Save