Merge pull request #242 from prometheus/feature/storage/long-tail-deletion

Include long-tail data deletion mechanism.
pull/243/merge
Matt T. Proud 2013-05-13 02:05:44 -07:00
commit 5b183f6434
5 changed files with 64 additions and 80 deletions

View File

@ -91,9 +91,6 @@ Executing the following target will start up Prometheus for lazy users:
``$(ARGUMENTS)``. This is useful for quick one-off invocations and smoke
testing.
### Mac OS X
We have a handy [Getting started on Mac OS X](documentation/guides/getting-started-osx.md) guide.
### Problems
If at any point you run into an error with the ``make`` build system in terms of
its not properly scaffolding things on a given environment, please file a bug or

View File

@ -1,63 +0,0 @@
# Getting started
## Installation
### Go
First, create a `$HOME/mygo` directory and its src subdirectory:
```bash
mkdir -p $HOME/mygo/src # create a place to put source code
```
Next, set it as the GOPATH. You should also add the bin subdirectory to your PATH environment variable so that you can run the commands therein without specifying their full path. To do this, add the following lines to `$HOME/.profile` (or equivalent):
```bash
export GOPATH=$HOME/mygo
export PATH=$PATH:$HOME/mygo/bin
```
Now you can install Go:
```bash
brew install go
```
### Dependencies
Install leveldb and protobuf dependencies:
```bash
brew install leveldb protobuf
```
### Libraries
```bash
go get code.google.com/p/goprotobuf/{proto,protoc-gen-go}
go get github.com/jmhodges/levigo
go get code.google.com/p/gorest
go get github.com/prometheus/{prometheus,client_golang}
```
## Build
```bash
cd ${GOPATH}/src/github.com/prometheus/prometheus
make build
```
## Configure
```bash
cp prometheus.conf.example prometheus.conf
```
## Run
```bash
./prometheus
```

67
main.go
View File

@ -29,6 +29,10 @@ import (
"time"
)
const (
deletionBatchSize = 100
)
// Commandline flags.
var (
printVersion = flag.Bool("version", false, "print version information")
@ -51,13 +55,18 @@ var (
headAge = flag.Duration("compact.headAgeInclusiveness", 5*time.Minute, "The relative inclusiveness of head samples.")
bodyAge = flag.Duration("compact.bodyAgeInclusiveness", time.Hour, "The relative inclusiveness of body samples.")
tailAge = flag.Duration("compact.tailAgeInclusiveness", 24*time.Hour, "The relative inclusiveness of tail samples.")
deleteInterval = flag.Duration("delete.interval", 10*11*time.Minute, "The amount of time between deletion of old values.")
deleteAge = flag.Duration("delete.ageMaximum", 10*24*time.Hour, "The relative maximum age for values before they are deleted.")
)
type prometheus struct {
headCompactionTimer *time.Ticker
bodyCompactionTimer *time.Ticker
tailCompactionTimer *time.Ticker
compactionMutex sync.Mutex
deletionTimer *time.Ticker
curationMutex sync.Mutex
curationState chan metric.CurationState
stopBackgroundOperations chan bool
@ -79,8 +88,8 @@ func (p *prometheus) interruptHandler() {
}
func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
p.compactionMutex.Lock()
defer p.compactionMutex.Unlock()
p.curationMutex.Lock()
defer p.curationMutex.Unlock()
processor := &metric.CompactionProcessor{
MaximumMutationPoolBatch: groupSize * 3,
@ -94,6 +103,21 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
p.curationMutex.Lock()
defer p.curationMutex.Unlock()
processor := &metric.DeletionProcessor{
MaximumMutationPoolBatch: batchSize,
}
curator := metric.Curator{
Stop: p.stopBackgroundOperations,
}
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
}
func (p *prometheus) close() {
if p.headCompactionTimer != nil {
p.headCompactionTimer.Stop()
@ -104,12 +128,15 @@ func (p *prometheus) close() {
if p.tailCompactionTimer != nil {
p.tailCompactionTimer.Stop()
}
if p.deletionTimer != nil {
p.deletionTimer.Stop()
}
if len(p.stopBackgroundOperations) == 0 {
p.stopBackgroundOperations <- true
}
p.compactionMutex.Lock()
p.curationMutex.Lock()
p.storage.Close()
close(p.stopBackgroundOperations)
@ -148,6 +175,7 @@ func main() {
headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)
tailCompactionTimer := time.NewTicker(*tailCompactInterval)
deletionTimer := time.NewTicker(*deleteInterval)
// Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(scrapeResults, *concurrentRetrievalAllowance)
@ -177,14 +205,19 @@ func main() {
}
prometheus := prometheus{
bodyCompactionTimer: bodyCompactionTimer,
curationState: curationState,
headCompactionTimer: headCompactionTimer,
ruleResults: ruleResults,
scrapeResults: scrapeResults,
bodyCompactionTimer: bodyCompactionTimer,
headCompactionTimer: headCompactionTimer,
tailCompactionTimer: tailCompactionTimer,
deletionTimer: deletionTimer,
curationState: curationState,
ruleResults: ruleResults,
scrapeResults: scrapeResults,
stopBackgroundOperations: make(chan bool, 1),
storage: ts,
tailCompactionTimer: tailCompactionTimer,
storage: ts,
}
defer prometheus.close()
@ -227,6 +260,18 @@ func main() {
}
}()
go func() {
for _ = range prometheus.deletionTimer.C {
log.Println("Starting deletion of stale values...")
err := prometheus.delete(*deleteAge, deletionBatchSize)
if err != nil {
log.Printf("could not delete due to %s", err)
}
log.Println("Done")
}
}()
// Queue depth will need to be exposed
ruleManager := rules.NewRuleManager(ruleResults, conf.EvaluationInterval(), ts)

View File

@ -118,8 +118,11 @@ func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, process
curationDurations.Add(labels, duration)
}(time.Now())
defer func() {
status <- CurationState{
select {
case status <- CurationState{
Active: false,
}:
default:
}
}()
@ -258,11 +261,14 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult)
}()
defer func() {
w.status <- CurationState{
select {
case w.status <- CurationState{
Active: true,
Name: w.processor.Name(),
Limit: w.ignoreYoungerThan,
Fingerprint: fingerprint,
}:
default:
}
}()

View File

@ -203,7 +203,6 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
}
pendingMutations++
default:
err = fmt.Errorf("Unhandled processing case.")
}