Compact everything to the same sample group size.

Change-Id: Ibb4f3a5d76173d64de916ef1eb41ab5d7900c97b
changes/22/122/1
Julius Volz 2014-02-19 16:03:30 +01:00
parent 682cf6fc51
commit 2279fcbac4
1 changed files with 12 additions and 56 deletions

68
main.go
View File

@ -52,17 +52,9 @@ var (
diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.")
headCompactInterval = flag.Duration("compact.headInterval", 3*time.Hour, "The amount of time between head compactions.")
bodyCompactInterval = flag.Duration("compact.bodyInterval", 5*time.Hour, "The amount of time between body compactions.")
tailCompactInterval = flag.Duration("compact.tailInterval", 7*time.Hour, "The amount of time between tail compactions.")
headGroupSize = flag.Int("compact.headGroupSize", 500, "The minimum group size for head samples.")
bodyGroupSize = flag.Int("compact.bodyGroupSize", 5000, "The minimum group size for body samples.")
tailGroupSize = flag.Int("compact.tailGroupSize", 10000, "The minimum group size for tail samples.")
headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.")
bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.")
tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.")
compactInterval = flag.Duration("compact.interval", 3*time.Hour, "The amount of time between compactions.")
compactGroupSize = flag.Int("compact.groupSize", 500, "The minimum group size for compacted samples.")
compactAgeInclusiveness = flag.Duration("compact.ageInclusiveness", 5*time.Minute, "The age beyond which samples should be compacted.")
deleteInterval = flag.Duration("delete.interval", 11*time.Hour, "The amount of time between deletion of old values.")
@ -79,9 +71,7 @@ var (
)
type prometheus struct {
headCompactionTimer *time.Ticker
bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker
compactionTimer *time.Ticker
deletionTimer *time.Ticker
curationSema chan bool
@ -168,14 +158,8 @@ func (p *prometheus) close() {
default:
}
if p.headCompactionTimer != nil {
p.headCompactionTimer.Stop()
}
if p.bodyCompactionTimer != nil {
p.bodyCompactionTimer.Stop()
}
if p.tailCompactionTimer != nil {
p.tailCompactionTimer.Stop()
if p.compactionTimer != nil {
p.compactionTimer.Stop()
}
if p.deletionTimer != nil {
p.deletionTimer.Stop()
@ -238,10 +222,8 @@ func main() {
Ingester: retrieval.ChannelIngester(unwrittenSamples),
}
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
tailCompactionTimer := time.NewTicker(*tailCompactInterval)
compactionTimer := time.NewTicker(*compactInterval)
deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed
@ -304,9 +286,7 @@ func main() {
}
prometheus := &prometheus{
bodyCompactionTimer: bodyCompactionTimer,
headCompactionTimer: headCompactionTimer,
tailCompactionTimer: tailCompactionTimer,
compactionTimer: compactionTimer,
deletionTimer: deletionTimer,
@ -332,33 +312,9 @@ func main() {
go prometheus.interruptHandler()
go func() {
for _ = range prometheus.headCompactionTimer.C {
glog.Info("Starting head compaction...")
err := prometheus.compact(*headAge, *headGroupSize)
if err != nil {
glog.Error("could not compact: ", err)
}
glog.Info("Done")
}
}()
go func() {
for _ = range prometheus.bodyCompactionTimer.C {
glog.Info("Starting body compaction...")
err := prometheus.compact(*bodyAge, *bodyGroupSize)
if err != nil {
glog.Error("could not compact: ", err)
}
glog.Info("Done")
}
}()
go func() {
for _ = range prometheus.tailCompactionTimer.C {
glog.Info("Starting tail compaction...")
err := prometheus.compact(*tailAge, *tailGroupSize)
for _ = range prometheus.compactionTimer.C {
glog.Info("Starting compaction...")
err := prometheus.compact(*compactAgeInclusiveness, *compactGroupSize)
if err != nil {
glog.Error("could not compact: ", err)