You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
prometheus/storage/raw/leveldb/leveldb.go

275 lines
6.1 KiB

// Copyright 2012 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"flag"
"github.com/jmhodges/levigo"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"io"
)
var (
leveldbFlushOnMutate = flag.Bool("leveldbFlushOnMutate", true, "Whether LevelDB should flush every operation to disk upon mutation before returning (bool).")
leveldbUseSnappy = flag.Bool("leveldbUseSnappy", true, "Whether LevelDB attempts to use Snappy for compressing elements (bool).")
leveldbUseParanoidChecks = flag.Bool("leveldbUseParanoidChecks", true, "Whether LevelDB uses expensive checks (bool).")
)
type LevelDBPersistence struct {
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy
options *levigo.Options
storage *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
}
type iteratorCloser struct {
iterator *levigo.Iterator
readOptions *levigo.ReadOptions
snapshot *levigo.Snapshot
storage *levigo.DB
}
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) {
options := levigo.NewOptions()
options.SetCreateIfMissing(true)
options.SetParanoidChecks(*leveldbUseParanoidChecks)
compression := levigo.NoCompression
if *leveldbUseSnappy {
compression = levigo.SnappyCompression
}
options.SetCompression(compression)
cache := levigo.NewLRUCache(cacheCapacity)
options.SetCache(cache)
filterPolicy := levigo.NewBloomFilter(bitsPerBloomFilterEncoded)
options.SetFilterPolicy(filterPolicy)
storage, err := levigo.Open(storageRoot, options)
if err != nil {
return
}
readOptions := levigo.NewReadOptions()
writeOptions := levigo.NewWriteOptions()
writeOptions.SetSync(*leveldbFlushOnMutate)
p = &LevelDBPersistence{
cache: cache,
filterPolicy: filterPolicy,
options: options,
readOptions: readOptions,
storage: storage,
writeOptions: writeOptions,
}
return
}
func (l *LevelDBPersistence) Close() (err error) {
// These are deferred to take advantage of forced closing in case of stack
// unwinding due to anomalies.
defer func() {
if l.storage != nil {
l.storage.Close()
}
}()
defer func() {
if l.filterPolicy != nil {
l.filterPolicy.Close()
}
}()
defer func() {
if l.cache != nil {
l.cache.Close()
}
}()
defer func() {
if l.options != nil {
l.options.Close()
}
}()
defer func() {
if l.readOptions != nil {
l.readOptions.Close()
}
}()
defer func() {
if l.writeOptions != nil {
l.writeOptions.Close()
}
}()
return
}
func (l *LevelDBPersistence) Get(value coding.Encoder) (b []byte, err error) {
key, err := value.Encode()
if err != nil {
return
}
return l.storage.Get(l.readOptions, key)
}
func (l *LevelDBPersistence) Has(value coding.Encoder) (h bool, err error) {
raw, err := l.Get(value)
if err != nil {
return
}
h = raw != nil
return
}
func (l *LevelDBPersistence) Drop(value coding.Encoder) (err error) {
key, err := value.Encode()
if err != nil {
return
}
err = l.storage.Delete(l.writeOptions, key)
return
}
func (l *LevelDBPersistence) Put(key, value coding.Encoder) (err error) {
keyEncoded, err := key.Encode()
if err != nil {
return
}
valueEncoded, err := value.Encode()
if err != nil {
return
}
err = l.storage.Put(l.writeOptions, keyEncoded, valueEncoded)
return
}
func (l *LevelDBPersistence) GetAll() (pairs []raw.Pair, err error) {
snapshot := l.storage.NewSnapshot()
defer l.storage.ReleaseSnapshot(snapshot)
readOptions := levigo.NewReadOptions()
defer readOptions.Close()
readOptions.SetSnapshot(snapshot)
iterator := l.storage.NewIterator(readOptions)
defer iterator.Close()
iterator.SeekToFirst()
for iterator := iterator; iterator.Valid(); iterator.Next() {
pairs = append(pairs, raw.Pair{Left: iterator.Key(), Right: iterator.Value()})
err = iterator.GetError()
if err != nil {
return
}
}
return
}
func (i *iteratorCloser) Close() (err error) {
defer func() {
if i.storage != nil {
if i.snapshot != nil {
i.storage.ReleaseSnapshot(i.snapshot)
}
}
}()
defer func() {
if i.iterator != nil {
i.iterator.Close()
}
}()
defer func() {
if i.readOptions != nil {
i.readOptions.Close()
}
}()
return
}
func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) {
snapshot := l.storage.NewSnapshot()
readOptions := levigo.NewReadOptions()
readOptions.SetSnapshot(snapshot)
i = l.storage.NewIterator(readOptions)
c = &iteratorCloser{
iterator: i,
readOptions: readOptions,
snapshot: snapshot,
storage: l.storage,
}
return
}
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
iterator, closer, err := l.GetIterator()
if err != nil {
return
}
defer closer.Close()
for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() {
err = iterator.GetError()
if err != nil {
return
}
decodedKey, decodeErr := decoder.DecodeKey(iterator.Key())
if decodeErr != nil {
continue
}
decodedValue, decodeErr := decoder.DecodeValue(iterator.Value())
if decodeErr != nil {
continue
}
switch filter.Filter(decodedKey, decodedValue) {
case storage.STOP:
return
case storage.SKIP:
continue
case storage.ACCEPT:
opErr := operator.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {
continue
}
break
}
}
}
scannedEntireCorpus = true
return
}