mirror of https://github.com/prometheus/prometheus
Wrap LevelDB iterator operations behind interface.
The LevelDB storage types return an interface type now that wraps around the underlying iterator. This both enhances testability but improves upon, in my opinion, the interface design for the LevelDB iterator. Secondarily, the resource reaping behaviors for the LevelDB iterators have been improved by dropping the externalized io.Closer object. Finally, the iterator provisioning methods provide the option for indicating whether one wants a snapshotted iterator or not.pull/96/head
parent
f2a30cf20c
commit
b2e4c88b80
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -42,9 +43,9 @@ func (f *diskFrontier) ContainsFingerprint(fingerprint model.Fingerprint) bool {
|
|||
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
|
||||
}
|
||||
|
||||
func newDiskFrontier(i iterator) (d *diskFrontier, err error) {
|
||||
i.SeekToLast()
|
||||
if !i.Valid() || i.Key() == nil {
|
||||
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, err error) {
|
||||
|
||||
if !i.SeekToLast() || i.Key() == nil {
|
||||
return
|
||||
}
|
||||
lastKey, err := extractSampleKey(i)
|
||||
|
@ -85,7 +86,7 @@ func (f seriesFrontier) String() string {
|
|||
// newSeriesFrontier furnishes a populated diskFrontier for a given
|
||||
// fingerprint. A nil diskFrontier will be returned if the series cannot
|
||||
// be found in the store.
|
||||
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seriesFrontier, err error) {
|
||||
func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i leveldb.Iterator) (s *seriesFrontier, err error) {
|
||||
var (
|
||||
lowerSeek = firstSupertime
|
||||
upperSeek = lastSupertime
|
||||
|
@ -129,7 +130,7 @@ func newSeriesFrontier(f model.Fingerprint, d diskFrontier, i iterator) (s *seri
|
|||
//
|
||||
//
|
||||
if !retrievedFingerprint.Equal(f) {
|
||||
i.Prev()
|
||||
i.Previous()
|
||||
|
||||
retrievedKey, err = extractSampleKey(i)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
type Iterator interface {
|
||||
Seek(key interface{}) (ok bool)
|
||||
Next() (ok bool)
|
||||
Previous() (ok bool)
|
||||
Key() interface{}
|
||||
Value() interface{}
|
||||
}
|
|
@ -683,7 +683,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
|
|||
return
|
||||
}
|
||||
|
||||
func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
|
||||
func extractSampleKey(i leveldb.Iterator) (k *dto.SampleKey, err error) {
|
||||
if i == nil {
|
||||
panic("nil iterator")
|
||||
}
|
||||
|
@ -698,7 +698,7 @@ func extractSampleKey(i iterator) (k *dto.SampleKey, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func extractSampleValues(i iterator) (v *dto.SampleValueSeries, err error) {
|
||||
func extractSampleValues(i leveldb.Iterator) (v *dto.SampleValueSeries, err error) {
|
||||
if i == nil {
|
||||
panic("nil iterator")
|
||||
}
|
||||
|
@ -937,18 +937,6 @@ func interpolate(x1, x2 time.Time, y1, y2 float32, e time.Time) float32 {
|
|||
return y1 + (offset * dDt)
|
||||
}
|
||||
|
||||
type iterator interface {
|
||||
Close()
|
||||
Key() []byte
|
||||
Next()
|
||||
Prev()
|
||||
Seek([]byte)
|
||||
SeekToFirst()
|
||||
SeekToLast()
|
||||
Valid() bool
|
||||
Value() []byte
|
||||
}
|
||||
|
||||
func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.Time, s StalenessPolicy) (sample *model.Sample, err error) {
|
||||
begin := time.Now()
|
||||
|
||||
|
@ -975,15 +963,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
|
|||
return
|
||||
}
|
||||
|
||||
iterator, closer, err := l.metricSamples.GetIterator()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
iterator := l.metricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
defer closer.Close()
|
||||
|
||||
iterator.Seek(e)
|
||||
if !iterator.Valid() {
|
||||
if !iterator.Seek(e) {
|
||||
/*
|
||||
* Two cases for this:
|
||||
* 1.) Corruption in LevelDB.
|
||||
|
@ -994,13 +977,10 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
|
|||
* database is sufficient for our purposes. This is, in all reality, a
|
||||
* corner case but one that could bring down the system.
|
||||
*/
|
||||
iterator, closer, err = l.metricSamples.GetIterator()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer closer.Close()
|
||||
iterator.SeekToLast()
|
||||
if !iterator.Valid() {
|
||||
iterator = l.metricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
if !iterator.SeekToLast() {
|
||||
/*
|
||||
* For whatever reason, the LevelDB cannot be recovered.
|
||||
*/
|
||||
|
@ -1048,8 +1028,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
|
|||
|
||||
firstTime := indexable.DecodeTime(firstKey.Timestamp)
|
||||
if t.Before(firstTime) || peekAhead {
|
||||
iterator.Prev()
|
||||
if !iterator.Valid() {
|
||||
if !iterator.Previous() {
|
||||
/*
|
||||
* Two cases for this:
|
||||
* 1.) Corruption in LevelDB.
|
||||
|
@ -1106,8 +1085,7 @@ func (l *LevelDBMetricPersistence) GetValueAtTime(fp model.Fingerprint, t time.T
|
|||
return
|
||||
}
|
||||
|
||||
iterator.Next()
|
||||
if !iterator.Valid() {
|
||||
if !iterator.Next() {
|
||||
/*
|
||||
* Two cases for this:
|
||||
* 1.) Corruption in LevelDB.
|
||||
|
@ -1188,17 +1166,12 @@ func (l *LevelDBMetricPersistence) GetRangeValues(fp model.Fingerprint, i model.
|
|||
return
|
||||
}
|
||||
|
||||
iterator, closer, err := l.metricSamples.GetIterator()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer closer.Close()
|
||||
|
||||
iterator.Seek(e)
|
||||
iterator := l.metricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
predicate := keyIsOlderThan(i.NewestInclusive)
|
||||
|
||||
for ; iterator.Valid(); iterator.Next() {
|
||||
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
|
||||
retrievedKey := &dto.SampleKey{}
|
||||
|
||||
retrievedKey, err = extractSampleKey(iterator)
|
||||
|
|
|
@ -15,12 +15,12 @@ package metric
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/jmhodges/levigo"
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/model"
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -139,13 +139,10 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) {
|
|||
|
||||
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
|
||||
}()
|
||||
i, closer, err := t.diskStorage.metricSamples.GetIterator()
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
i := t.diskStorage.metricSamples.NewIterator(true)
|
||||
defer i.Close()
|
||||
|
||||
t.diskFrontier, err = newDiskFrontier(i)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -365,13 +362,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
}
|
||||
|
||||
// Get a single iterator that will be used for all data extraction below.
|
||||
iterator, closer, err := t.diskStorage.metricSamples.GetIterator()
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
iterator := t.diskStorage.metricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
|
||||
for _, scanJob := range scans {
|
||||
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
|
||||
|
@ -442,7 +434,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) {
|
|||
return
|
||||
}
|
||||
|
||||
func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) {
|
||||
func (t *tieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint model.Fingerprint, ts time.Time) (chunk []model.SamplePair) {
|
||||
var (
|
||||
targetKey = &dto.SampleKey{
|
||||
Fingerprint: fingerprint.ToDTO(),
|
||||
|
@ -481,7 +473,7 @@ func (t *tieredStorage) loadChunkAroundTime(iterator *levigo.Iterator, frontier
|
|||
rewound := false
|
||||
firstTime := indexable.DecodeTime(foundKey.Timestamp)
|
||||
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
|
||||
iterator.Prev()
|
||||
iterator.Previous()
|
||||
rewound = true
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leveldb
|
||||
|
||||
// TODO: Evaluate whether to use coding.Encoder for the key and values instead
|
||||
// raw bytes for consistency reasons.
|
||||
|
||||
// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is
|
||||
// conducive to IO-free testing.
|
||||
//
|
||||
// It borrows some of the operational assumptions from goskiplist, which
|
||||
// functions very similarly, in that it uses no separate Valid method to
|
||||
// determine health. All methods that have a return signature of (ok bool)
|
||||
// assume in the real LevelDB case that if ok == false that the iterator
|
||||
// must be disposed of at this given instance and recreated if future
|
||||
// work is desired. This is a quirk of LevelDB itself!
|
||||
type Iterator interface {
|
||||
// GetError reports low-level errors, if available. This should not indicate
|
||||
// that the iterator is necessarily unhealthy but maybe that the underlying
|
||||
// table is corrupted itself. See the notes above for (ok bool) return
|
||||
// signatures to determine iterator health.
|
||||
GetError() error
|
||||
Key() []byte
|
||||
Next() (ok bool)
|
||||
Previous() (ok bool)
|
||||
Seek(key []byte) (ok bool)
|
||||
SeekToFirst() (ok bool)
|
||||
SeekToLast() (ok bool)
|
||||
Value() []byte
|
||||
}
|
|
@ -19,7 +19,6 @@ import (
|
|||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"io"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -38,13 +37,94 @@ type LevelDBPersistence struct {
|
|||
writeOptions *levigo.WriteOptions
|
||||
}
|
||||
|
||||
// LevelDB iterators have a number of resources that need to be closed.
|
||||
// iteratorCloser encapsulates the various ones.
|
||||
type iteratorCloser struct {
|
||||
// levigoIterator wraps the LevelDB resources in a convenient manner for uniform
|
||||
// resource access and closing through the raw.Iterator protocol.
|
||||
type levigoIterator struct {
|
||||
// iterator is the receiver of most proxied operation calls.
|
||||
iterator *levigo.Iterator
|
||||
// readOptions is only set if the iterator is a snapshot of an underlying
|
||||
// database. This signals that it needs to be explicitly reaped upon the
|
||||
// end of this iterator's life.
|
||||
readOptions *levigo.ReadOptions
|
||||
// snapshot is only set if the iterator is a snapshot of an underlying
|
||||
// database. This signals that it needs to be explicitly reaped upon the
|
||||
// end of this this iterator's life.
|
||||
snapshot *levigo.Snapshot
|
||||
// storage is only set if the iterator is a snapshot of an underlying
|
||||
// database. This signals that it needs to be explicitly reaped upon the
|
||||
// end of this this iterator's life. The snapshot must be freed in the
|
||||
// context of an actual database.
|
||||
storage *levigo.DB
|
||||
// closed indicates whether the iterator has been closed before.
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Close() (err error) {
|
||||
if i.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if i.iterator != nil {
|
||||
i.iterator.Close()
|
||||
}
|
||||
if i.readOptions != nil {
|
||||
i.readOptions.Close()
|
||||
}
|
||||
if i.snapshot != nil {
|
||||
i.storage.ReleaseSnapshot(i.snapshot)
|
||||
}
|
||||
|
||||
// Explicitly dereference the pointers to prevent cycles, however unlikely.
|
||||
i.iterator = nil
|
||||
i.readOptions = nil
|
||||
i.snapshot = nil
|
||||
i.storage = nil
|
||||
|
||||
i.closed = true
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (i levigoIterator) Seek(key []byte) (ok bool) {
|
||||
i.iterator.Seek(key)
|
||||
|
||||
return i.iterator.Valid()
|
||||
}
|
||||
|
||||
func (i levigoIterator) SeekToFirst() (ok bool) {
|
||||
i.iterator.SeekToFirst()
|
||||
|
||||
return i.iterator.Valid()
|
||||
}
|
||||
|
||||
func (i levigoIterator) SeekToLast() (ok bool) {
|
||||
i.iterator.SeekToLast()
|
||||
|
||||
return i.iterator.Valid()
|
||||
}
|
||||
|
||||
func (i levigoIterator) Next() (ok bool) {
|
||||
i.iterator.Next()
|
||||
|
||||
return i.iterator.Valid()
|
||||
}
|
||||
|
||||
func (i levigoIterator) Previous() (ok bool) {
|
||||
i.iterator.Prev()
|
||||
|
||||
return i.iterator.Valid()
|
||||
}
|
||||
|
||||
func (i levigoIterator) Key() (key []byte) {
|
||||
return i.iterator.Key()
|
||||
}
|
||||
|
||||
func (i levigoIterator) Value() (value []byte) {
|
||||
return i.iterator.Value()
|
||||
}
|
||||
|
||||
func (i levigoIterator) GetError() (err error) {
|
||||
return i.iterator.GetError()
|
||||
}
|
||||
|
||||
func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (p *LevelDBPersistence, err error) {
|
||||
|
@ -68,8 +148,11 @@ func NewLevelDBPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilter
|
|||
return
|
||||
}
|
||||
|
||||
readOptions := levigo.NewReadOptions()
|
||||
writeOptions := levigo.NewWriteOptions()
|
||||
var (
|
||||
readOptions = levigo.NewReadOptions()
|
||||
writeOptions = levigo.NewWriteOptions()
|
||||
)
|
||||
|
||||
writeOptions.SetSync(*leveldbFlushOnMutate)
|
||||
p = &LevelDBPersistence{
|
||||
cache: cache,
|
||||
|
@ -185,56 +268,53 @@ func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
|
|||
return l.storage.Write(l.writeOptions, batch.batch)
|
||||
}
|
||||
|
||||
func (i *iteratorCloser) Close() (err error) {
|
||||
defer func() {
|
||||
if i.storage != nil {
|
||||
if i.snapshot != nil {
|
||||
i.storage.ReleaseSnapshot(i.snapshot)
|
||||
}
|
||||
}
|
||||
}()
|
||||
// NewIterator creates a new levigoIterator, which follows the Iterator
|
||||
// interface.
|
||||
//
|
||||
// Important notes:
|
||||
//
|
||||
// For each of the iterator methods that have a return signature of (ok bool),
|
||||
// if ok == false, the iterator may not be used any further and must be closed.
|
||||
// Further work with the database requires the creation of a new iterator. This
|
||||
// is due to LevelDB and Levigo design. Please refer to Jeff and Sanjay's notes
|
||||
// in the LevelDB documentation for this behavior's rationale.
|
||||
//
|
||||
// The returned iterator must explicitly be closed; otherwise non-managed memory
|
||||
// will be leaked.
|
||||
//
|
||||
// The iterator is optionally snapshotable.
|
||||
func (l *LevelDBPersistence) NewIterator(snapshotted bool) levigoIterator {
|
||||
var (
|
||||
snapshot *levigo.Snapshot
|
||||
readOptions *levigo.ReadOptions
|
||||
iterator *levigo.Iterator
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if i.iterator != nil {
|
||||
i.iterator.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if i.readOptions != nil {
|
||||
i.readOptions.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err error) {
|
||||
snapshot := l.storage.NewSnapshot()
|
||||
readOptions := levigo.NewReadOptions()
|
||||
if snapshotted {
|
||||
snapshot = l.storage.NewSnapshot()
|
||||
readOptions = levigo.NewReadOptions()
|
||||
readOptions.SetSnapshot(snapshot)
|
||||
i = l.storage.NewIterator(readOptions)
|
||||
iterator = l.storage.NewIterator(readOptions)
|
||||
} else {
|
||||
iterator = l.storage.NewIterator(l.readOptions)
|
||||
}
|
||||
|
||||
// TODO: Kill the return of an additional io.Closer and just use a decorated
|
||||
// iterator interface.
|
||||
c = &iteratorCloser{
|
||||
iterator: i,
|
||||
return levigoIterator{
|
||||
iterator: iterator,
|
||||
readOptions: readOptions,
|
||||
snapshot: snapshot,
|
||||
storage: l.storage,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
|
||||
iterator, closer, err := l.GetIterator()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer closer.Close()
|
||||
var (
|
||||
iterator = l.NewIterator(true)
|
||||
valid bool
|
||||
)
|
||||
defer iterator.Close()
|
||||
|
||||
for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() {
|
||||
for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() {
|
||||
err = iterator.GetError()
|
||||
if err != nil {
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue