Make LevelDB operating modes invocation-time conf.

Presently our use of LevelDB and its operating modes are hardcoded
into the storage stack.  This pull request decouples this and
re-exposes this through flags.  We can now perform benchmarking
and remedial tuning.
pull/52/head
Matt T. Proud 2013-01-27 20:28:37 +01:00
parent 567e2948d3
commit 79ba248bbe
2 changed files with 30 additions and 13 deletions

View File

@ -14,12 +14,23 @@
package leveldb package leveldb
import ( import (
"flag"
index "github.com/prometheus/prometheus/storage/raw/index/leveldb" index "github.com/prometheus/prometheus/storage/raw/index/leveldb"
storage "github.com/prometheus/prometheus/storage/raw/leveldb" storage "github.com/prometheus/prometheus/storage/raw/leveldb"
"io" "io"
"log" "log"
) )
var (
// These flag values are back of the envelope, though they seem sensible.
// Please re-evaluate based on your own needs.
fingerprintsToLabelPairCacheSize = flag.Int("fingerprintsToLabelPairCacheSizeBytes", 100*1024*1024, "The size for the fingerprint to label pair index (bytes).")
samplesByFingerprintCacheSize = flag.Int("samplesByFingerprintCacheSizeBytes", 500*1024*1024, "The size for the samples database (bytes).")
labelNameToFingerprintsCacheSize = flag.Int("labelNameToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label name to metric fingerprint index (bytes).")
labelPairToFingerprintsCacheSize = flag.Int("labelPairToFingerprintsCacheSizeBytes", 100*1024*1024, "The size for the label pair to metric fingerprint index (bytes).")
metricMembershipIndexCacheSize = flag.Int("metricMembershipCacheSizeBytes", 50*1024*1024, "The size for the metric membership index (bytes).")
)
type leveldbOpener func() type leveldbOpener func()
func (l *LevelDBMetricPersistence) Close() error { func (l *LevelDBMetricPersistence) Close() error {
@ -86,7 +97,7 @@ func (l *LevelDBMetricPersistence) Close() error {
return nil return nil
} }
func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistence, error) { func NewLevelDBMetricPersistence(baseDirectory string) (persistence *LevelDBMetricPersistence, err error) {
log.Printf("Opening LevelDBPersistence storage containers...") log.Printf("Opening LevelDBPersistence storage containers...")
errorChannel := make(chan error, 5) errorChannel := make(chan error, 5)
@ -101,7 +112,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Label Names and Value Pairs by Fingerprint", "Label Names and Value Pairs by Fingerprint",
func() { func() {
var err error var err error
emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) emission.fingerprintToMetrics, err = storage.NewLevelDBPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", *fingerprintsToLabelPairCacheSize, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -109,7 +120,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Samples by Fingerprint", "Samples by Fingerprint",
func() { func() {
var err error var err error
emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) emission.metricSamples, err = storage.NewLevelDBPersistence(baseDirectory+"/samples_by_fingerprint", *samplesByFingerprintCacheSize, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -117,7 +128,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name", "Fingerprints by Label Name",
func() { func() {
var err error var err error
emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) emission.labelNameToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name", *labelNameToFingerprintsCacheSize, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -125,7 +136,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Fingerprints by Label Name and Value Pair", "Fingerprints by Label Name and Value Pair",
func() { func() {
var err error var err error
emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) emission.labelSetToFingerprints, err = storage.NewLevelDBPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", *labelPairToFingerprintsCacheSize, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -133,7 +144,7 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Metric Membership Index", "Metric Membership Index",
func() { func() {
var err error var err error
emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) emission.metricMembershipIndex, err = index.NewLevelDBMembershipIndex(baseDirectory+"/metric_membership_index", *metricMembershipIndexCacheSize, 10)
errorChannel <- err errorChannel <- err
}, },
}, },
@ -149,16 +160,18 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
} }
for i := 0; i < cap(errorChannel); i++ { for i := 0; i < cap(errorChannel); i++ {
openingError := <-errorChannel err = <-errorChannel
if openingError != nil { if err != nil {
log.Printf("Could not open a LevelDBPersistence storage container: %q\n", openingError) log.Printf("Could not open a LevelDBPersistence storage container: %q\n", err)
return nil, openingError return
} }
} }
log.Printf("Successfully opened all LevelDBPersistence storage containers.\n") log.Printf("Successfully opened all LevelDBPersistence storage containers.\n")
return emission, nil persistence = emission
return
} }

View File

@ -14,12 +14,17 @@
package leveldb package leveldb
import ( import (
"flag"
"github.com/jmhodges/levigo" "github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding" "github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage/raw" "github.com/prometheus/prometheus/storage/raw"
"io" "io"
) )
var (
leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", true, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).")
)
type LevelDBPersistence struct { type LevelDBPersistence struct {
cache *levigo.Cache cache *levigo.Cache
filterPolicy *levigo.FilterPolicy filterPolicy *levigo.FilterPolicy
@ -54,8 +59,7 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
readOptions := levigo.NewReadOptions() readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions() writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(true) writeOptions.SetSync(*leveldbFlushOnMutate)
p = &LevelDBPersistence{ p = &LevelDBPersistence{
cache: cache, cache: cache,
filterPolicy: filterPolicy, filterPolicy: filterPolicy,