diff --git a/main.go b/main.go index bd95b8eca..3f2d522c2 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ import ( "log" "os" "os/signal" + "sync" "time" ) @@ -38,16 +39,35 @@ var ( concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") + + headCompactInterval = flag.Duration("compact.headInterval", 10*3*time.Minute, "The amount of time between head compactions.") + bodyCompactInterval = flag.Duration("compact.bodyInterval", 10*5*time.Minute, "The amount of time between body compactions.") + tailCompactInterval = flag.Duration("compact.tailInterval", 10*7*time.Minute, "The amount of time between tail compactions.") + + headGroupSize = flag.Int("compact.headGroupSize", 50, "The minimum group size for head samples.") + bodyGroupSize = flag.Int("compact.bodyGroupSize", 250, "The minimum group size for body samples.") + tailGroupSize = flag.Int("compact.tailGroupSize", 5000, "The minimum group size for tail samples.") + + headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.") + bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.") + tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.") ) type prometheus struct { - curationState chan metric.CurationState + headCompactionTimer *time.Ticker + bodyCompactionTimer *time.Ticker + tailCompactionTimer *time.Ticker + compactionMutex sync.Mutex + curationState chan metric.CurationState + stopBackgroundOperations chan bool + ruleResults chan *rules.Result - storage metric.TieredStorage scrapeResults chan format.Result + + storage *metric.TieredStorage } -func (p prometheus) interruptHandler() { +func (p *prometheus) interruptHandler() { notifier := make(chan os.Signal) signal.Notify(notifier, os.Interrupt) @@ -58,9 +78,42 @@ func (p prometheus) interruptHandler() { os.Exit(0) } -func (p prometheus) close() { - close(p.curationState) +func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { + p.compactionMutex.Lock() + defer p.compactionMutex.Unlock() + + processor := &metric.CompactionProcessor{ + MaximumMutationPoolBatch: groupSize * 3, + MinimumGroupSize: groupSize, + } + + curator := metric.Curator{ + Stop: p.stopBackgroundOperations, + } + + return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) +} + +func (p *prometheus) close() { + if p.headCompactionTimer != nil { + p.headCompactionTimer.Stop() + } + if p.bodyCompactionTimer != nil { + p.bodyCompactionTimer.Stop() + } + if p.tailCompactionTimer != nil { + p.tailCompactionTimer.Stop() + } + + if len(p.stopBackgroundOperations) == 0 { + p.stopBackgroundOperations <- true + } + + p.compactionMutex.Lock() + p.storage.Close() + close(p.stopBackgroundOperations) + close(p.curationState) } func main() { @@ -91,6 +144,10 @@ func main() { scrapeResults := make(chan format.Result, *scrapeResultsQueueCapacity) ruleResults := make(chan *rules.Result, *ruleResultsQueueCapacity) curationState := make(chan metric.CurationState, 1) + // Coprime numbers, fool! + headCompactionTimer := time.NewTicker(*headCompactInterval) + bodyCompactionTimer := time.NewTicker(*bodyCompactInterval) + tailCompactionTimer := time.NewTicker(*tailCompactInterval) // Queue depth will need to be exposed targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance) @@ -120,16 +177,58 @@ func main() { } prometheus := prometheus{ - curationState: curationState, - ruleResults: ruleResults, - scrapeResults: scrapeResults, - storage: *ts, + bodyCompactionTimer: bodyCompactionTimer, + curationState: curationState, + headCompactionTimer: headCompactionTimer, + ruleResults: ruleResults, + scrapeResults: scrapeResults, + stopBackgroundOperations: make(chan bool, 1), + storage: ts, + tailCompactionTimer: tailCompactionTimer, } defer prometheus.close() go ts.Serve() go prometheus.interruptHandler() + go func() { + for _ = range prometheus.headCompactionTimer.C { + log.Println("Starting head compaction...") + err := prometheus.compact(*headAge, *headGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + go func() { + for _ = range prometheus.bodyCompactionTimer.C { + log.Println("Starting body compaction...") + err := prometheus.compact(*bodyAge, *bodyGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + go func() { + for _ = range prometheus.tailCompactionTimer.C { + log.Println("Starting tail compaction...") + err := prometheus.compact(*tailAge, *tailGroupSize) + + if err != nil { + log.Printf("could not compact due to %s", err) + } + log.Println("Done") + } + }() + + // Queue depth will need to be exposed + ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts) err = ruleManager.AddRulesFromConfig(conf) if err != nil { diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 771dd2757..9b6330915 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -103,7 +103,7 @@ type watermarkOperator struct { // how much progress has been made. func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { defer func(t time.Time) { - duration := float64(time.Since(t)) + duration := float64(time.Since(t) / time.Millisecond) labels := map[string]string{ cutOff: fmt.Sprint(ignoreYoungerThan), diff --git a/storage/metric/instrumentation.go b/storage/metric/instrumentation.go index 6c07f4db0..71e95238d 100644 --- a/storage/metric/instrumentation.go +++ b/storage/metric/instrumentation.go @@ -91,4 +91,6 @@ func init() { prometheus.Register("prometheus_storage_operation_time_total_microseconds", "The total time spent performing a given storage operation.", prometheus.NilLabels, storageOperationDurations) prometheus.Register("prometheus_storage_queue_sizes_total", "The various sizes and capacities of the storage queues.", prometheus.NilLabels, queueSizes) prometheus.Register("curation_filter_operations_total", "The number of curation filter operations completed.", prometheus.NilLabels, curationFilterOperations) + prometheus.Register("curation_duration_ms_total", "The total time spent in curation (ms).", prometheus.NilLabels, curationDuration) + prometheus.Register("curation_durations_ms", "Histogram of time spent in curation (ms).", prometheus.NilLabels, curationDurations) } diff --git a/web/templates/status.html b/web/templates/status.html index 337a7f0ba..9b08cb32a 100644 --- a/web/templates/status.html +++ b/web/templates/status.html @@ -51,6 +51,7 @@