Schedule the background compactors to run.

This commit introduces three background compactors, which compact
sparse samples together.

1. Older than five minutes is grouped together into chunks of 50 every 30
   minutes.

2. Older than 60 minutes is grouped together into chunks of 250 every 50
   minutes.

3. Older than one day is grouped together into chunks of 5000 every 70
   minutes.
pull/228/head
Matt Proud 12 years ago
parent 6551356af4
commit 7f0d816574

@ -25,6 +25,7 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sync"
"time" "time"
) )
@ -38,16 +39,35 @@ var (
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.")
headCompactInterval = flag.Duration("compact.headInterval", 10*3*time.Minute, "The amount of time between head compactions.")
bodyCompactInterval = flag.Duration("compact.bodyInterval", 10*5*time.Minute, "The amount of time between body compactions.")
tailCompactInterval = flag.Duration("compact.tailInterval", 10*7*time.Minute, "The amount of time between tail compactions.")
headGroupSize = flag.Int("compact.headGroupSize", 50, "The minimum group size for head samples.")
bodyGroupSize = flag.Int("compact.bodyGroupSize", 250, "The minimum group size for body samples.")
tailGroupSize = flag.Int("compact.tailGroupSize", 5000, "The minimum group size for tail samples.")
headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.")
bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.")
tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.")
) )
type prometheus struct { type prometheus struct {
curationState chan metric.CurationState headCompactionTimer *time.Ticker
bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker
compactionMutex sync.Mutex
curationState chan metric.CurationState
stopBackgroundOperations chan bool
ruleResults chan *rules.Result ruleResults chan *rules.Result
storage metric.TieredStorage
scrapeResults chan format.Result scrapeResults chan format.Result
storage *metric.TieredStorage
} }
func (p prometheus) interruptHandler() { func (p *prometheus) interruptHandler() {
notifier := make(chan os.Signal) notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt) signal.Notify(notifier, os.Interrupt)
@ -58,9 +78,42 @@ func (p prometheus) interruptHandler() {
os.Exit(0) os.Exit(0)
} }
func (p prometheus) close() { func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
close(p.curationState) p.compactionMutex.Lock()
defer p.compactionMutex.Unlock()
processor := &metric.CompactionProcessor{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
}
curator := metric.Curator{
Stop: p.stopBackgroundOperations,
}
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) close() {
if p.headCompactionTimer != nil {
p.headCompactionTimer.Stop()
}
if p.bodyCompactionTimer != nil {
p.bodyCompactionTimer.Stop()
}
if p.tailCompactionTimer != nil {
p.tailCompactionTimer.Stop()
}
if len(p.stopBackgroundOperations) == 0 {
p.stopBackgroundOperations <- true
}
p.compactionMutex.Lock()
p.storage.Close() p.storage.Close()
close(p.stopBackgroundOperations)
close(p.curationState)
} }
func main() { func main() {
@ -91,6 +144,10 @@ func main() {
scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity)
ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity)
curationState := make(chan metric.CurationState, 1) curationState := make(chan metric.CurationState, 1)
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
tailCompactionTimer := time.NewTicker(*tailCompactInterval)
// Queue depth will need to be exposed // Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance)
@ -120,16 +177,58 @@ func main() {
} }
prometheus := prometheus{ prometheus := prometheus{
curationState: curationState, bodyCompactionTimer: bodyCompactionTimer,
ruleResults: ruleResults, curationState: curationState,
scrapeResults: scrapeResults, headCompactionTimer: headCompactionTimer,
storage: *ts, ruleResults: ruleResults,
scrapeResults: scrapeResults,
stopBackgroundOperations: make(chan bool, 1),
storage: ts,
tailCompactionTimer: tailCompactionTimer,
} }
defer prometheus.close() defer prometheus.close()
go ts.Serve() go ts.Serve()
go prometheus.interruptHandler() go prometheus.interruptHandler()
go func() {
for _ = range prometheus.headCompactionTimer.C {
log.Println("Starting head compaction...")
err := prometheus.compact(*headAge, *headGroupSize)
if err != nil {
log.Printf("could not compact due to %s", err)
}
log.Println("Done")
}
}()
go func() {
for _ = range prometheus.bodyCompactionTimer.C {
log.Println("Starting body compaction...")
err := prometheus.compact(*bodyAge, *bodyGroupSize)
if err != nil {
log.Printf("could not compact due to %s", err)
}
log.Println("Done")
}
}()
go func() {
for _ = range prometheus.tailCompactionTimer.C {
log.Println("Starting tail compaction...")
err := prometheus.compact(*tailAge, *tailGroupSize)
if err != nil {
log.Printf("could not compact due to %s", err)
}
log.Println("Done")
}
}()
// Queue depth will need to be exposed
ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts)
err = ruleManager.AddRulesFromConfig(conf) err = ruleManager.AddRulesFromConfig(conf)
if err != nil { if err != nil {

@ -103,7 +103,7 @@ type watermarkOperator struct {
// how much progress has been made. // how much progress has been made.
func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
defer func(t time.Time) { defer func(t time.Time) {
duration := float64(time.Since(t)) duration := float64(time.Since(t) / time.Millisecond)
labels := map[string]string{ labels := map[string]string{
cutOff: fmt.Sprint(ignoreYoungerThan), cutOff: fmt.Sprint(ignoreYoungerThan),

@ -91,4 +91,6 @@ func init() {
prometheus.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", prometheus.NilLabels, storageOperationDurations) prometheus.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", prometheus.NilLabels, storageOperationDurations)
prometheus.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", prometheus.NilLabels, queueSizes) prometheus.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", prometheus.NilLabels, queueSizes)
prometheus.Register("curation_filter_operations_total", "The number of curation filter operations completed.", prometheus.NilLabels, curationFilterOperations) prometheus.Register("curation_filter_operations_total", "The number of curation filter operations completed.", prometheus.NilLabels, curationFilterOperations)
prometheus.Register("curation_duration_ms_total", "The total time spent in curation (ms).", prometheus.NilLabels, curationDuration)
prometheus.Register("curation_durations_ms", "Histogram of time spent in curation (ms).", prometheus.NilLabels, curationDurations)
} }

@ -51,6 +51,7 @@
<th>Active</th> <th>Active</th>
<td>{{.Curation.Active}}</td> <td>{{.Curation.Active}}</td>
</tr> </tr>
{{if .Curation.Active}}
<tr> <tr>
<th>Processor Name</th> <th>Processor Name</th>
<td>{{.Curation.Name}}</td> <td>{{.Curation.Name}}</td>
@ -59,7 +60,6 @@
<th>Recency Limit</th> <th>Recency Limit</th>
<td>{{.Curation.Limit}}</td> <td>{{.Curation.Limit}}</td>
</tr> </tr>
{{if .Curation.Fingerprint}}
<tr> <tr>
<th>Current Fingerprint</th> <th>Current Fingerprint</th>
<td>{{.Curation.Fingerprint}}</td> <td>{{.Curation.Fingerprint}}</td>

Loading…
Cancel
Save