Merge pull request #1492 from prometheus/beorn7/storage4

Merging what has already been reviewed in other PRs.
pull/1497/head
Björn Rabenstein 2016-03-17 14:43:59 +01:00
commit e83f05fe93
14 changed files with 586 additions and 657 deletions

View File

@ -79,7 +79,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
Inspect(a.Expr, func(node Node) bool { Inspect(a.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) n.metrics = a.Storage.MetricsForLabelMatchers(
a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset),
n.LabelMatchers...,
)
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset) pt := getPreloadTimes(n.Offset)
@ -95,7 +98,10 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
} }
} }
case *MatrixSelector: case *MatrixSelector:
n.metrics = a.Storage.MetricsForLabelMatchers(n.LabelMatchers...) n.metrics = a.Storage.MetricsForLabelMatchers(
a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset),
n.LabelMatchers...,
)
n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics))
pt := getPreloadTimes(n.Offset) pt := getPreloadTimes(n.Offset)

View File

@ -17,6 +17,7 @@ import (
"container/list" "container/list"
"fmt" "fmt"
"io" "io"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -261,54 +262,72 @@ type chunk interface {
// generally not safe to use a chunkIterator concurrently with or after chunk // generally not safe to use a chunkIterator concurrently with or after chunk
// mutation. // mutation.
type chunkIterator interface { type chunkIterator interface {
// length returns the number of samples in the chunk.
length() int
// Gets the timestamp of the n-th sample in the chunk.
timestampAtIndex(int) (model.Time, error)
// Gets the last timestamp in the chunk. // Gets the last timestamp in the chunk.
lastTimestamp() (model.Time, error) lastTimestamp() (model.Time, error)
// Gets the sample value of the n-th sample in the chunk.
sampleValueAtIndex(int) (model.SampleValue, error)
// Gets the last sample value in the chunk.
lastSampleValue() (model.SampleValue, error)
// Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no
// applicable value exists, ZeroSamplePair is returned.
valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
// Gets all values contained within a given interval.
rangeValues(metric.Interval) ([]model.SamplePair, error)
// Whether a given timestamp is contained between first and last value // Whether a given timestamp is contained between first and last value
// in the chunk. // in the chunk.
contains(model.Time) (bool, error) contains(model.Time) (bool, error)
// values returns a channel, from which all sample values in the chunk // Scans the next value in the chunk. Directly after the iterator has
// can be received in order. The channel is closed after the last // been created, the next value is the first value in the
// one. It is generally not safe to mutate the chunk while the channel // chunk. Otherwise, it is the value following the last value scanned or
// is still open. If a value is returned with error!=nil, no further // found (by one of the find... methods). Returns false if either the
// values will be returned and the channel is closed. // end of the chunk is reached or an error has occurred.
values() <-chan struct { scan() bool
model.SamplePair // Finds the most recent value at or before the provided time. Returns
error // false if either the chunk contains no value at or before the provided
// time, or an error has occurred.
findAtOrBefore(model.Time) bool
// Finds the oldest value at or after the provided time. Returns false
// if either the chunk contains no value at or after the provided time,
// or an error has occurred.
findAtOrAfter(model.Time) bool
// Returns the last value scanned (by the scan method) or found (by one
// of the find... methods). It returns ZeroSamplePair before any of
// those methods were called.
value() model.SamplePair
// Returns the last error encountered. In general, an error signals data
// corruption in the chunk and requires quarantining.
err() error
} }
// rangeValues is a utility function that retrieves all values within the given
// range from a chunkIterator.
func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) {
result := []model.SamplePair{}
if !it.findAtOrAfter(in.OldestInclusive) {
return result, it.err()
}
for !it.value().Timestamp.After(in.NewestInclusive) {
result = append(result, it.value())
if !it.scan() {
break
}
}
return result, it.err()
} }
func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
chunkOps.WithLabelValues(transcode).Inc() chunkOps.WithLabelValues(transcode).Inc()
head := dst var (
body := []chunk{} head = dst
for v := range src.newIterator().values() { body, newChunks []chunk
if v.error != nil { err error
return nil, v.error )
}
newChunks, err := head.add(v.SamplePair) it := src.newIterator()
if err != nil { for it.scan() {
if newChunks, err = head.add(it.value()); err != nil {
return nil, err return nil, err
} }
body = append(body, newChunks[:len(newChunks)-1]...) body = append(body, newChunks[:len(newChunks)-1]...)
head = newChunks[len(newChunks)-1] head = newChunks[len(newChunks)-1]
} }
newChunks, err := head.add(s) if it.err() != nil {
if err != nil { return nil, it.err()
}
if newChunks, err = head.add(s); err != nil {
return nil, err return nil, err
} }
return append(body, newChunks...), nil return append(body, newChunks...), nil
@ -334,3 +353,94 @@ func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
} }
} }
// indexAccessor allows accesses to samples by index.
type indexAccessor interface {
timestampAtIndex(int) model.Time
sampleValueAtIndex(int) model.SampleValue
err() error
}
// indexAccessingChunkIterator is a chunk iterator for chunks for which an
// indexAccessor implementation exists.
type indexAccessingChunkIterator struct {
len int
pos int
lastValue model.SamplePair
acc indexAccessor
}
func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingChunkIterator {
return &indexAccessingChunkIterator{
len: len,
pos: -1,
lastValue: ZeroSamplePair,
acc: acc,
}
}
// lastTimestamp implements chunkIterator.
func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) {
return it.acc.timestampAtIndex(it.len - 1), it.acc.err()
}
// contains implements chunkIterator.
func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.acc.timestampAtIndex(0)) &&
!t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err()
}
// scan implements chunkIterator.
func (it *indexAccessingChunkIterator) scan() bool {
it.pos++
if it.pos >= it.len {
return false
}
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(it.pos),
Value: it.acc.sampleValueAtIndex(it.pos),
}
return it.acc.err() == nil
}
// findAtOrBefore implements chunkIterator.
func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool {
return it.acc.timestampAtIndex(i).After(t)
})
if i == 0 || it.acc.err() != nil {
return false
}
it.pos = i - 1
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(i - 1),
Value: it.acc.sampleValueAtIndex(i - 1),
}
return true
}
// findAtOrAfter implements chunkIterator.
func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool {
i := sort.Search(it.len, func(i int) bool {
return !it.acc.timestampAtIndex(i).Before(t)
})
if i == it.len || it.acc.err() != nil {
return false
}
it.pos = i
it.lastValue = model.SamplePair{
Timestamp: it.acc.timestampAtIndex(i),
Value: it.acc.sampleValueAtIndex(i),
}
return true
}
// value implements chunkIterator.
func (it *indexAccessingChunkIterator) value() model.SamplePair {
return it.lastValue
}
// err implements chunkIterator.
func (it *indexAccessingChunkIterator) err() error {
return it.acc.err()
}

View File

@ -140,7 +140,13 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
} }
} }
p.setDirty(false, nil) p.dirtyMtx.Lock()
// Only declare storage clean if it didn't become dirty during crash recovery.
if !p.becameDirty {
p.dirty = false
}
p.dirtyMtx.Unlock()
log.Warn("Crash recovery complete.") log.Warn("Crash recovery complete.")
return nil return nil
} }

View File

@ -18,11 +18,8 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"sort"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
) )
// The 21-byte header of a delta-encoded chunk looks like: // The 21-byte header of a delta-encoded chunk looks like:
@ -201,15 +198,14 @@ func (c deltaEncodedChunk) firstTime() model.Time {
// newIterator implements chunk. // newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator { func (c *deltaEncodedChunk) newIterator() chunkIterator {
return &deltaEncodedChunkIterator{ return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c, c: *c,
len: c.len(),
baseT: c.baseTime(), baseT: c.baseTime(),
baseV: c.baseValue(), baseV: c.baseValue(),
tBytes: c.timeBytes(), tBytes: c.timeBytes(),
vBytes: c.valueBytes(), vBytes: c.valueBytes(),
isInt: c.isInt(), isInt: c.isInt(),
} })
} }
// marshal implements chunk. // marshal implements chunk.
@ -303,184 +299,67 @@ func (c deltaEncodedChunk) len() int {
return (len(c) - deltaHeaderBytes) / c.sampleSize() return (len(c) - deltaHeaderBytes) / c.sampleSize()
} }
// deltaEncodedChunkIterator implements chunkIterator. // deltaEncodedIndexAccessor implements indexAccessor.
type deltaEncodedChunkIterator struct { type deltaEncodedIndexAccessor struct {
c deltaEncodedChunk c deltaEncodedChunk
len int
baseT model.Time baseT model.Time
baseV model.SampleValue baseV model.SampleValue
tBytes, vBytes deltaBytes tBytes, vBytes deltaBytes
isInt bool isInt bool
lastErr error
} }
// length implements chunkIterator. func (acc *deltaEncodedIndexAccessor) err() error {
func (it *deltaEncodedChunkIterator) length() int { return it.len } return acc.lastErr
// valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
} }
// rangeValues implements chunkIterator. func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes)
var lastErr error
oldest := sort.Search(it.len, func(i int) bool { switch acc.tBytes {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result, nil
}
// contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *deltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
return valuesChan
}
// timestampAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes {
case d1: case d1:
return it.baseT + model.Time(uint8(it.c[offset])), nil return acc.baseT + model.Time(uint8(acc.c[offset]))
case d2: case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:]))
case d4: case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:]))
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
default: default:
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
return model.Earliest
} }
} }
// lastTimestamp implements chunkIterator. func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator. if acc.isInt {
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { switch acc.vBytes {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
if it.isInt {
switch it.vBytes {
case d0: case d0:
return it.baseV, nil return acc.baseV
case d1: case d1:
return it.baseV + model.SampleValue(int8(it.c[offset])), nil return acc.baseV + model.SampleValue(int8(acc.c[offset]))
case d2: case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4: case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
// No d8 for ints. // No d8 for ints.
default: default:
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
return 0
} }
} else { } else {
switch it.vBytes { switch acc.vBytes {
case d4: case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
default: default:
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
return 0
} }
} }
} }
// lastSampleValue implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View File

@ -18,11 +18,8 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"sort"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
) )
// The 37-byte header of a delta-encoded chunk looks like: // The 37-byte header of a delta-encoded chunk looks like:
@ -207,9 +204,8 @@ func (c doubleDeltaEncodedChunk) firstTime() model.Time {
// newIterator implements chunk. // newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
return &doubleDeltaEncodedChunkIterator{ return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c, c: *c,
len: c.len(),
baseT: c.baseTime(), baseT: c.baseTime(),
baseΔT: c.baseTimeDelta(), baseΔT: c.baseTimeDelta(),
baseV: c.baseValue(), baseV: c.baseValue(),
@ -217,7 +213,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
tBytes: c.timeBytes(), tBytes: c.timeBytes(),
vBytes: c.valueBytes(), vBytes: c.valueBytes(),
isInt: c.isInt(), isInt: c.isInt(),
} })
} }
// marshal implements chunk. // marshal implements chunk.
@ -409,223 +405,106 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
return []chunk{&c}, nil return []chunk{&c}, nil
} }
// doubleDeltaEncodedChunkIterator implements chunkIterator. // doubleDeltaEncodedIndexAccessor implements indexAccessor.
type doubleDeltaEncodedChunkIterator struct { type doubleDeltaEncodedIndexAccessor struct {
c doubleDeltaEncodedChunk c doubleDeltaEncodedChunk
len int
baseT, baseΔT model.Time baseT, baseΔT model.Time
baseV, baseΔV model.SampleValue baseV, baseΔV model.SampleValue
tBytes, vBytes deltaBytes tBytes, vBytes deltaBytes
isInt bool isInt bool
lastErr error
} }
// length implements chunkIterator. func (acc *doubleDeltaEncodedIndexAccessor) err() error {
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } return acc.lastErr
// valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool {
ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
})
if i == 0 || lastErr != nil {
return ZeroSamplePair, lastErr
}
ts, err := it.timestampAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
} }
// rangeValues implements chunkIterator. func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
})
newest := sort.Search(it.len, func(i int) bool {
t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
})
if oldest == it.len || lastErr != nil {
return nil, lastErr
}
result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
return nil, err
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
}
return result, nil
}
// contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
}
// values implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() {
for i := 0; i < it.len; i++ {
t, err := it.timestampAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
}
close(valuesChan)
}()
return valuesChan
}
// timestampAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
if idx == 0 { if idx == 0 {
return it.baseT, nil return acc.baseT
} }
if idx == 1 { if idx == 1 {
// If time bytes are at d8, the time is saved directly rather // If time bytes are at d8, the time is saved directly rather
// than as a difference. // than as a difference.
if it.tBytes == d8 { if acc.tBytes == d8 {
return it.baseΔT, nil return acc.baseΔT
} }
return it.baseT + it.baseΔT, nil return acc.baseT + acc.baseΔT
} }
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
switch it.tBytes { switch acc.tBytes {
case d1: case d1:
return it.baseT + return acc.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*acc.baseΔT +
model.Time(int8(it.c[offset])), nil model.Time(int8(acc.c[offset]))
case d2: case d2:
return it.baseT + return acc.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*acc.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4: case d4:
return it.baseT + return acc.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*acc.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
default: default:
return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
return model.Earliest
} }
} }
// lastTimestamp implements chunkIterator. func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1)
}
// sampleValueAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
if idx == 0 { if idx == 0 {
return it.baseV, nil return acc.baseV
} }
if idx == 1 { if idx == 1 {
// If value bytes are at d8, the value is saved directly rather // If value bytes are at d8, the value is saved directly rather
// than as a difference. // than as a difference.
if it.vBytes == d8 { if acc.vBytes == d8 {
return it.baseΔV, nil return acc.baseΔV
} }
return it.baseV + it.baseΔV, nil return acc.baseV + acc.baseΔV
} }
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
if it.isInt { if acc.isInt {
switch it.vBytes { switch acc.vBytes {
case d0: case d0:
return it.baseV + return acc.baseV +
model.SampleValue(idx)*it.baseΔV, nil model.SampleValue(idx)*acc.baseΔV
case d1: case d1:
return it.baseV + return acc.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int8(it.c[offset])), nil model.SampleValue(int8(acc.c[offset]))
case d2: case d2:
return it.baseV + return acc.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4: case d4:
return it.baseV + return acc.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
// No d8 for ints. // No d8 for ints.
default: default:
return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
return 0
} }
} else { } else {
switch it.vBytes { switch acc.vBytes {
case d4: case d4:
return it.baseV + return acc.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*acc.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
default: default:
return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
return 0
} }
} }
} }
// lastSampleValue implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1)
}

View File

@ -40,20 +40,22 @@ type Storage interface {
// NewPreloader returns a new Preloader which allows preloading and pinning // NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query. // series data into memory for use within a query.
NewPreloader() Preloader NewPreloader() Preloader
// MetricsForLabelMatchers returns the metrics from storage that satisfy the given // MetricsForLabelMatchers returns the metrics from storage that satisfy
// label matchers. At least one label matcher must be specified that does not // the given label matchers. At least one label matcher must be
// match the empty string. // specified that does not match the empty string. The times from and
MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric // through are hints for the storage to optimize the search. The storage
// LastSamplePairForFingerprint returns the last sample pair that has // MAY exclude metrics that have no samples in the specified interval
// been ingested for the provided fingerprint. If this instance of the // from the returned map. In doubt, specify model.Earliest for from and
// model.Latest for through.
MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric
// LastSampleForFingerprint returns the last sample that has been
// ingested for the provided fingerprint. If this instance of the
// Storage has never ingested a sample for the provided fingerprint (or // Storage has never ingested a sample for the provided fingerprint (or
// the last ingestion is so long ago that the series has been archived), // the last ingestion is so long ago that the series has been archived),
// ZeroSamplePair is returned. // ZeroSample is returned.
LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair LastSampleForFingerprint(model.Fingerprint) model.Sample
// Get all of the label values that are associated with a given label name. // Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues LabelValuesForLabelName(model.LabelName) model.LabelValues
// Get the metric associated with the provided fingerprint.
MetricForFingerprint(model.Fingerprint) metric.Metric
// Drop all time series associated with the given fingerprints. // Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint) DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the // Run the various maintenance loops in goroutines. Returns when the
@ -89,7 +91,7 @@ type SeriesIterator interface {
type Preloader interface { type Preloader interface {
PreloadRange( PreloadRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from, through model.Time,
) SeriesIterator ) SeriesIterator
PreloadInstant( PreloadInstant(
fp model.Fingerprint, fp model.Fingerprint,
@ -100,8 +102,15 @@ type Preloader interface {
} }
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local // ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
// package to signal a non-existing sample. It is a SamplePair with timestamp // package to signal a non-existing sample pair. It is a SamplePair with
// model.Earliest and value 0.0. Note that the natural zero value of SamplePair // timestamp model.Earliest and value 0.0. Note that the natural zero value of
// has a timestamp of 0, which is possible to appear in a real SamplePair and // SamplePair has a timestamp of 0, which is possible to appear in a real
// thus not suitable to signal a non-existing SamplePair. // SamplePair and thus not suitable to signal a non-existing SamplePair.
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}
// ZeroSample is the pseudo zero-value of model.Sample used by the local package
// to signal a non-existing sample. It is a Sample with timestamp
// model.Earliest, value 0.0, and metric nil. Note that the natural zero value
// of Sample has a timestamp of 0, which is possible to appear in a real
// Sample and thus not suitable to signal a non-existing Sample.
var ZeroSample = model.Sample{Timestamp: model.Earliest}

View File

@ -312,49 +312,44 @@ func (p *persistence) isDirty() bool {
return p.dirty return p.dirty
} }
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // setDirty flags the storage as dirty in a goroutine-safe way. The provided
// set to true with this method, it cannot be set to false again. (If we became // error will be logged as a reason the first time the storage is flagged as dirty.
// dirty during our runtime, there is no way back. If we were dirty from the func (p *persistence) setDirty(err error) {
// start, a clean-up might make us clean again.) The provided error will be
// logged as a reason if dirty is true.
func (p *persistence) setDirty(dirty bool, err error) {
if dirty {
p.dirtyCounter.Inc() p.dirtyCounter.Inc()
}
p.dirtyMtx.Lock() p.dirtyMtx.Lock()
defer p.dirtyMtx.Unlock() defer p.dirtyMtx.Unlock()
if p.becameDirty { if p.becameDirty {
return return
} }
p.dirty = dirty p.dirty = true
if dirty {
p.becameDirty = true p.becameDirty = true
log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
} }
}
// fingerprintsForLabelPair returns the fingerprints for the given label // fingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued // pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index // for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.) // yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) (model.Fingerprints, error) { func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
fps, _, err := p.labelPairToFingerprints.Lookup(lp) fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil { if err != nil {
return nil, err p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
return nil
} }
return fps, nil return fps
} }
// labelValuesForLabelName returns the label values for the given label // labelValuesForLabelName returns the label values for the given label
// name. This method is goroutine-safe but take into account that metrics queued // name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index // for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.) // yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln) lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil { if err != nil {
return nil, err p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
return nil
} }
return lvs, nil return lvs
} }
// persistChunks persists a number of consecutive chunks of a series. It is the // persistChunks persists a number of consecutive chunks of a series. It is the
@ -363,13 +358,10 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
// the (zero-based) index of the first persisted chunk within the series // the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the // file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0). // misconception that the chunk was written at position 0).
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) {
defer func() {
if err != nil {
p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err))
}
}()
f, err := p.openChunkFileForWriting(fp) f, err := p.openChunkFileForWriting(fp)
if err != nil { if err != nil {
return -1, err return -1, err
@ -743,6 +735,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
// deleted (in which case the returned timestamp will be 0 and must be ignored). // deleted (in which case the returned timestamp will be 0 and must be ignored).
// It is the caller's responsibility to make sure nothing is persisted or loaded // It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently. // for the same fingerprint concurrently.
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) dropAndPersistChunks( func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []chunk, fp model.Fingerprint, beforeTime model.Time, chunks []chunk,
) ( ) (
@ -755,12 +750,6 @@ func (p *persistence) dropAndPersistChunks(
// Style note: With the many return values, it was decided to use naked // Style note: With the many return values, it was decided to use naked
// returns in this method. They make the method more readable, but // returns in this method. They make the method more readable, but
// please handle with care! // please handle with care!
defer func() {
if err != nil {
p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err))
}
}()
if len(chunks) > 0 { if len(chunks) > 0 {
// We have chunks to persist. First check if those are already // We have chunks to persist. First check if those are already
// too old. If that's the case, the chunks in the series file // too old. If that's the case, the chunks in the series file
@ -1014,29 +1003,28 @@ func (p *persistence) waitForIndexing() {
// the metric. The caller must have locked the fingerprint. // the metric. The caller must have locked the fingerprint.
func (p *persistence) archiveMetric( func (p *persistence) archiveMetric(
fp model.Fingerprint, m model.Metric, first, last model.Time, fp model.Fingerprint, m model.Metric, first, last model.Time,
) error { ) {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
p.setDirty(true, err) p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err))
return err return
} }
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
p.setDirty(true, err) p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err))
return err
} }
return nil
} }
// hasArchivedMetric returns whether the archived metric for the given // hasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in the // fingerprint exists and if yes, what the first and last timestamp in the
// corresponding series is. This method is goroutine-safe. // corresponding series is. This method is goroutine-safe.
func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
hasMetric bool, firstTime, lastTime model.Time, err error, hasMetric bool, firstTime, lastTime model.Time,
) { ) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil { if err != nil {
p.setDirty(true, err) p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err))
hasMetric = false
} }
return return hasMetric, firstTime, lastTime
} }
// updateArchivedTimeRange updates an archived time range. The caller must make // updateArchivedTimeRange updates an archived time range. The caller must make
@ -1074,7 +1062,11 @@ func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model
// method is goroutine-safe. // method is goroutine-safe.
func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) { func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
return metric, err if err != nil {
p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err))
return nil, err
}
return metric, nil
} }
// purgeArchivedMetric deletes an archived fingerprint and its corresponding // purgeArchivedMetric deletes an archived fingerprint and its corresponding
@ -1084,7 +1076,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err))
} }
}() }()

View File

@ -82,14 +82,14 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint
} }
func chunksEqual(c1, c2 chunk) bool { func chunksEqual(c1, c2 chunk) bool {
values2 := c2.newIterator().values() it1 := c1.newIterator()
for v1 := range c1.newIterator().values() { it2 := c2.newIterator()
v2 := <-values2 for it1.scan() && it2.scan() {
if !(v1 == v2) { if !(it1.value() == it2.value()) {
return false return false
} }
} }
return true return it1.err() == nil && it2.err() == nil
} }
func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
@ -770,58 +770,46 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
p.indexMetric(2, m2) p.indexMetric(2, m2)
p.waitForIndexing() p.waitForIndexing()
outFPs, err := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) outFPs := p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
want := model.Fingerprints{1} want := model.Fingerprints{1}
if !reflect.DeepEqual(outFPs, want) { if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs) t.Errorf("want %#v, got %#v", want, outFPs)
} }
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
want = model.Fingerprints{2} want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) { if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs) t.Errorf("want %#v, got %#v", want, outFPs)
} }
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || !archived { if archived, _, _ := p.hasArchivedMetric(1); !archived {
t.Error("want FP 1 archived") t.Error("want FP 1 archived")
} }
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { if archived, _, _ := p.hasArchivedMetric(2); !archived {
t.Error("want FP 2 archived") t.Error("want FP 2 archived")
} }
if err != p.purgeArchivedMetric(1) { if err := p.purgeArchivedMetric(1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err != p.purgeArchivedMetric(3) { if err := p.purgeArchivedMetric(3); err != nil {
// Purging something that has not beet archived is not an error. // Purging something that has not beet archived is not an error.
t.Fatal(err) t.Fatal(err)
} }
p.waitForIndexing() p.waitForIndexing()
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"}) outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n1", Value: "v1"})
if err != nil {
t.Fatal(err)
}
want = nil want = nil
if !reflect.DeepEqual(outFPs, want) { if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs) t.Errorf("want %#v, got %#v", want, outFPs)
} }
outFPs, err = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"}) outFPs = p.fingerprintsForLabelPair(model.LabelPair{Name: "n2", Value: "v2"})
if err != nil {
t.Fatal(err)
}
want = model.Fingerprints{2} want = model.Fingerprints{2}
if !reflect.DeepEqual(outFPs, want) { if !reflect.DeepEqual(outFPs, want) {
t.Errorf("want %#v, got %#v", want, outFPs) t.Errorf("want %#v, got %#v", want, outFPs)
} }
if archived, _, _, err := p.hasArchivedMetric(1); err != nil || archived { if archived, _, _ := p.hasArchivedMetric(1); archived {
t.Error("want FP 1 not archived") t.Error("want FP 1 not archived")
} }
if archived, _, _, err := p.hasArchivedMetric(2); err != nil || !archived { if archived, _, _ := p.hasArchivedMetric(2); !archived {
t.Error("want FP 2 archived") t.Error("want FP 2 archived")
} }
} }
@ -983,9 +971,7 @@ func testIndexing(t *testing.T, encoding chunkEncoding) {
for i, b := range batches { for i, b := range batches {
for fp, m := range b.fpToMetric { for fp, m := range b.fpToMetric {
p.indexMetric(fp, m) p.indexMetric(fp, m)
if err := p.archiveMetric(fp, m, 1, 2); err != nil { p.archiveMetric(fp, m, 1, 2)
t.Fatal(err)
}
indexedFpsToMetrics[fp] = m indexedFpsToMetrics[fp] = m
} }
verifyIndexedState(i, t, b, indexedFpsToMetrics, p) verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
@ -1029,10 +1015,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
} }
// Check that archived metrics are in membership index. // Check that archived metrics are in membership index.
has, first, last, err := p.hasArchivedMetric(fp) has, first, last := p.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !has { if !has {
t.Errorf("%d. fingerprint %v not found", i, fp) t.Errorf("%d. fingerprint %v not found", i, fp)
} }
@ -1046,10 +1029,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label name -> label values mappings. // Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs { for ln, lvs := range b.expectedLnToLvs {
outLvs, err := p.labelValuesForLabelName(ln) outLvs := p.labelValuesForLabelName(ln)
if err != nil {
t.Fatal(err)
}
outSet := codable.LabelValueSet{} outSet := codable.LabelValueSet{}
for _, lv := range outLvs { for _, lv := range outLvs {
@ -1063,10 +1043,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label pair -> fingerprints mappings. // Compare label pair -> fingerprints mappings.
for lp, fps := range b.expectedLpToFps { for lp, fps := range b.expectedLpToFps {
outFPs, err := p.fingerprintsForLabelPair(lp) outFPs := p.fingerprintsForLabelPair(lp)
if err != nil {
t.Fatal(err)
}
outSet := codable.FingerprintSet{} outSet := codable.FingerprintSet{}
for _, fp := range outFPs { for _, fp := range outFPs {

View File

@ -557,12 +557,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
return ZeroSamplePair return ZeroSamplePair
} }
if containsT { if containsT {
value, err := it.chunkIt.valueAtOrBeforeTime(t) if it.chunkIt.findAtOrBefore(t) {
if err != nil { return it.chunkIt.value()
it.quarantine(err)
return ZeroSamplePair
} }
return value if it.chunkIt.err() != nil {
it.quarantine(it.chunkIt.err())
}
return ZeroSamplePair
} }
} }
@ -580,12 +581,13 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
return ZeroSamplePair return ZeroSamplePair
} }
it.chunkIt = it.chunkIterator(l - i) it.chunkIt = it.chunkIterator(l - i)
value, err := it.chunkIt.valueAtOrBeforeTime(t) if it.chunkIt.findAtOrBefore(t) {
if err != nil { return it.chunkIt.value()
it.quarantine(err)
return ZeroSamplePair
} }
return value if it.chunkIt.err() != nil {
it.quarantine(it.chunkIt.err())
}
return ZeroSamplePair
} }
// RangeValues implements SeriesIterator. // RangeValues implements SeriesIterator.
@ -612,7 +614,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.firstTime().After(in.NewestInclusive) { if c.firstTime().After(in.NewestInclusive) {
break break
} }
chValues, err := it.chunkIterator(i + j).rangeValues(in) chValues, err := rangeValues(it.chunkIterator(i+j), in)
if err != nil { if err != nil {
it.quarantine(err) it.quarantine(err)
return nil return nil

View File

@ -16,6 +16,7 @@ package local
import ( import (
"container/list" "container/list"
"errors"
"fmt" "fmt"
"math" "math"
"sync" "sync"
@ -128,7 +129,8 @@ const (
type syncStrategy func() bool type syncStrategy func() bool
type memorySeriesStorage struct { type memorySeriesStorage struct {
// numChunksToPersist has to be aligned for atomic operations. // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
archiveHighWatermark model.Time // No archived series has samples after this time.
numChunksToPersist int64 // The number of chunks waiting for persistence. numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled. maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
rushed bool // Whether the storage is in rushed mode. rushed bool // Whether the storage is in rushed mode.
@ -162,7 +164,7 @@ type memorySeriesStorage struct {
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter ingestedSamplesCount prometheus.Counter
outOfOrderSamplesCount prometheus.Counter outOfOrderSamplesCount prometheus.Counter
invalidPreloadRequestsCount prometheus.Counter nonExistentSeriesMatchesCount prometheus.Counter
maintainSeriesDuration *prometheus.SummaryVec maintainSeriesDuration *prometheus.SummaryVec
persistenceUrgencyScore prometheus.Gauge persistenceUrgencyScore prometheus.Gauge
rushedMode prometheus.Gauge rushedMode prometheus.Gauge
@ -200,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
dropAfter: o.PersistenceRetentionPeriod, dropAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval, checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
archiveHighWatermark: model.Now().Add(-headChunkTimeout),
maxChunksToPersist: o.MaxChunksToPersist, maxChunksToPersist: o.MaxChunksToPersist,
@ -245,11 +248,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
Name: "out_of_order_samples_total", Name: "out_of_order_samples_total",
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
}), }),
invalidPreloadRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{ nonExistentSeriesMatchesCount: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "invalid_preload_requests_total", Name: "non_existent_series_matches_total",
Help: "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes.", Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
}), }),
maintainSeriesDuration: prometheus.NewSummaryVec( maintainSeriesDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
@ -367,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() {
} }
// LastSampleForFingerprint implements Storage. // LastSampleForFingerprint implements Storage.
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
return ZeroSamplePair return ZeroSample
}
sp := series.lastSamplePair()
return model.Sample{
Metric: series.metric,
Value: sp.Value,
Timestamp: sp.Timestamp,
} }
return series.lastSamplePair()
} }
// boundedIterator wraps a SeriesIterator and does not allow fetching // boundedIterator wraps a SeriesIterator and does not allow fetching
@ -417,10 +425,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
var result map[model.Fingerprint]struct{} var result map[model.Fingerprint]struct{}
for _, pair := range pairs { for _, pair := range pairs {
intersection := map[model.Fingerprint]struct{}{} intersection := map[model.Fingerprint]struct{}{}
fps, err := s.persistence.fingerprintsForLabelPair(pair) fps := s.persistence.fingerprintsForLabelPair(pair)
if err != nil {
log.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 { if len(fps) == 0 {
return nil return nil
} }
@ -438,7 +443,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
} }
// MetricsForLabelMatchers implements Storage. // MetricsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric { func (s *memorySeriesStorage) MetricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) map[model.Fingerprint]metric.Metric {
var ( var (
equals []model.LabelPair equals []model.LabelPair
filters []*metric.LabelMatcher filters []*metric.LabelMatcher
@ -490,9 +498,13 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
filters = remaining filters = remaining
} }
result := make(map[model.Fingerprint]metric.Metric, len(resFPs)) result := map[model.Fingerprint]metric.Metric{}
for fp := range resFPs { for fp := range resFPs {
result[fp] = s.MetricForFingerprint(fp) s.fpLocker.Lock(fp)
if met, _, ok := s.metricForRange(fp, from, through); ok {
result[fp] = metric.Metric{Metric: met}
}
s.fpLocker.Unlock(fp)
} }
for _, matcher := range filters { for _, matcher := range filters {
for fp, met := range result { for fp, met := range result {
@ -504,37 +516,55 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
return result return result
} }
// LabelValuesForLabelName implements Storage. // metricForRange returns the metric for the given fingerprint if the
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { // corresponding time series has samples between 'from' and 'through', together
lvs, err := s.persistence.labelValuesForLabelName(labelName) // with a pointer to the series if it is in memory already. For a series that
if err != nil { // does not have samples between 'from' and 'through', the returned bool is
log.Errorf("Error getting label values for label name %q: %v", labelName, err) // false. For an archived series that does contain samples between 'from' and
} // 'through', it returns (metric, nil, true).
return lvs //
} // The caller must have locked the fp.
func (s *memorySeriesStorage) metricForRange(
// MetricForFingerprint implements Storage. fp model.Fingerprint,
func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.Metric { from, through model.Time,
s.fpLocker.Lock(fp) ) (model.Metric, *memorySeries, bool) {
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if ok { if ok {
// Wrap the returned metric in a copy-on-write (COW) metric here because if series.lastTime.Before(from) || series.firstTime().After(through) {
// the caller might mutate it. return nil, nil, false
return metric.Metric{
Metric: series.metric,
} }
return series.metric, series, true
}
// From here on, we are only concerned with archived metrics.
// If the high watermark of archived series is before 'from', we are done.
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
if watermark < from {
return nil, nil, false
}
if from.After(model.Earliest) || through.Before(model.Latest) {
// The range lookup is relatively cheap, so let's do it first if
// we have a chance the archived metric is not in the range.
has, first, last := s.persistence.hasArchivedMetric(fp)
if !has {
s.nonExistentSeriesMatchesCount.Inc()
return nil, nil, false
}
if first.After(through) || last.Before(from) {
return nil, nil, false
} }
met, err := s.persistence.archivedMetric(fp)
if err != nil {
log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
} }
return metric.Metric{ metric, err := s.persistence.archivedMetric(fp)
Metric: met, if err != nil {
Copied: false, // archivedMetric has already flagged the storage as dirty in this case.
return nil, nil, false
} }
return metric, nil, true
}
// LabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
return s.persistence.labelValuesForLabelName(labelName)
} }
// DropMetric implements Storage. // DropMetric implements Storage.
@ -562,7 +592,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below. }() // Func wrapper because fp might change below.
if err != nil { if err != nil {
s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) s.persistence.setDirty(fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
return err return err
} }
if fp != rawFP { if fp != rawFP {
@ -695,36 +725,20 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return series, nil return series, nil
} }
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
func (s *memorySeriesStorage) getSeriesForRange( //
// The caller must have locked the fp.
func (s *memorySeriesStorage) seriesForRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) *memorySeries { ) *memorySeries {
series, ok := s.fpToSeries.get(fp) metric, series, ok := s.metricForRange(fp, from, through)
if ok { if !ok {
return series
}
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil return nil
} }
if !has { if series == nil {
s.invalidPreloadRequestsCount.Inc() series, _ = s.getOrCreateSeries(fp, metric)
return nil // getOrCreateSeries took care of quarantining already, so ignore the error.
}
if last.Before(from) || first.After(through) {
return nil
}
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
series, err = s.getOrCreateSeries(fp, metric)
if err != nil {
// getOrCreateSeries took care of quarantining already.
return nil
} }
return series return series
} }
@ -736,7 +750,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series := s.getSeriesForRange(fp, from, through) series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nil, nopIter return nil, nopIter
} }
@ -755,7 +769,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant(
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series := s.getSeriesForRange(fp, from, through) series := s.seriesForRange(fp, from, through)
if series == nil { if series == nil {
return nil, nopIter return nil, nopIter
} }
@ -1106,17 +1120,22 @@ func (s *memorySeriesStorage) maintainMemorySeries(
} }
} }
// Archive if all chunks are evicted. // Archive if all chunks are evicted. Also make sure the last sample has
if iOldestNotEvicted == -1 { // an age of at least headChunkTimeout (which is very likely anyway).
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
if err := s.persistence.archiveMetric( s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
fp, series.metric, series.firstTime(), series.lastTime,
); err != nil {
log.Errorf("Error archiving metric %v: %v", series.metric, err)
return
}
s.seriesOps.WithLabelValues(archive).Inc() s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
if oldWatermark < int64(series.lastTime) {
if !atomic.CompareAndSwapInt64(
(*int64)(&s.archiveHighWatermark),
oldWatermark, int64(series.lastTime),
) {
panic("s.archiveHighWatermark modified outside of maintainMemorySeries")
}
}
return return
} }
// If we are here, the series is not archived, so check for chunkDesc // If we are here, the series is not archived, so check for chunkDesc
@ -1138,8 +1157,20 @@ func (s *memorySeriesStorage) maintainMemorySeries(
func (s *memorySeriesStorage) writeMemorySeries( func (s *memorySeriesStorage) writeMemorySeries(
fp model.Fingerprint, series *memorySeries, beforeTime model.Time, fp model.Fingerprint, series *memorySeries, beforeTime model.Time,
) bool { ) bool {
cds := series.chunksToPersist() var (
persistErr error
cds = series.chunksToPersist()
)
defer func() { defer func() {
if persistErr != nil {
s.quarantineSeries(fp, series.metric, persistErr)
s.persistErrors.Inc()
}
// The following is done even in case of an error to ensure
// correct counter bookkeeping and to not pin chunks in memory
// that belong to a series that is scheduled for quarantine
// anyway.
for _, cd := range cds { for _, cd := range cds {
cd.unpin(s.evictRequests) cd.unpin(s.evictRequests)
} }
@ -1160,9 +1191,9 @@ func (s *memorySeriesStorage) writeMemorySeries(
if len(cds) == 0 { if len(cds) == 0 {
return false return false
} }
offset, err := s.persistence.persistChunks(fp, chunks) var offset int
if err != nil { offset, persistErr = s.persistence.persistChunks(fp, chunks)
s.persistErrors.Inc() if persistErr != nil {
return false return false
} }
if series.chunkDescsOffset == -1 { if series.chunkDescsOffset == -1 {
@ -1174,14 +1205,12 @@ func (s *memorySeriesStorage) writeMemorySeries(
return false return false
} }
newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, err := newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, persistErr :=
s.persistence.dropAndPersistChunks(fp, beforeTime, chunks) s.persistence.dropAndPersistChunks(fp, beforeTime, chunks)
if err != nil { if persistErr != nil {
s.persistErrors.Inc()
return false return false
} }
if err := series.dropChunks(beforeTime); err != nil { if persistErr = series.dropChunks(beforeTime); persistErr != nil {
s.persistErrors.Inc()
return false return false
} }
if len(series.chunkDescs) == 0 && allDroppedFromPersistence { if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
@ -1198,8 +1227,8 @@ func (s *memorySeriesStorage) writeMemorySeries(
} else { } else {
series.chunkDescsOffset -= numDroppedFromPersistence series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 { if series.chunkDescsOffset < 0 {
s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) persistErr = errors.New("dropped more chunks from persistence than from memory")
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. series.chunkDescsOffset = -1
} }
} }
return false return false
@ -1217,11 +1246,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.Error("Error looking up archived time range: ", err)
return
}
if !has || !firstTime.Before(beforeTime) { if !has || !firstTime.Before(beforeTime) {
// Oldest sample not old enough, or metric purged or unarchived in the meantime. // Oldest sample not old enough, or metric purged or unarchived in the meantime.
return return
@ -1234,10 +1259,7 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
log.Error("Error dropping persisted chunks: ", err) log.Error("Error dropping persisted chunks: ", err)
} }
if allDropped { if allDropped {
if err := s.persistence.purgeArchivedMetric(fp); err != nil { s.persistence.purgeArchivedMetric(fp) // Ignoring error. Nothing we can do.
log.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err)
return
}
s.seriesOps.WithLabelValues(archivePurge).Inc() s.seriesOps.WithLabelValues(archivePurge).Inc()
return return
} }
@ -1426,13 +1448,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
s.incNumChunksToPersist(-numChunksNotYetPersisted) s.incNumChunksToPersist(-numChunksNotYetPersisted)
} else { } else {
if err := s.persistence.purgeArchivedMetric(fp); err != nil { s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do.
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error purging metric from archive.")
}
} }
if m != nil { if m != nil {
// If we know a metric now, unindex it in any case. // If we know a metric now, unindex it in any case.
@ -1480,7 +1496,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.seriesOps.Describe(ch) s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc() ch <- s.ingestedSamplesCount.Desc()
ch <- s.outOfOrderSamplesCount.Desc() ch <- s.outOfOrderSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc() ch <- s.nonExistentSeriesMatchesCount.Desc()
ch <- numMemChunksDesc ch <- numMemChunksDesc
s.maintainSeriesDuration.Describe(ch) s.maintainSeriesDuration.Describe(ch)
ch <- s.persistenceUrgencyScore.Desc() ch <- s.persistenceUrgencyScore.Desc()
@ -1507,7 +1523,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.seriesOps.Collect(ch) s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount ch <- s.ingestedSamplesCount
ch <- s.outOfOrderSamplesCount ch <- s.outOfOrderSamplesCount
ch <- s.invalidPreloadRequestsCount ch <- s.nonExistentSeriesMatchesCount
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
numMemChunksDesc, numMemChunksDesc,
prometheus.GaugeValue, prometheus.GaugeValue,

View File

@ -34,6 +34,7 @@ func TestMatches(t *testing.T) {
storage, closer := NewTestStorage(t, 1) storage, closer := NewTestStorage(t, 1)
defer closer.Close() defer closer.Close()
storage.archiveHighWatermark = 90
samples := make([]*model.Sample, 100) samples := make([]*model.Sample, 100)
fingerprints := make(model.Fingerprints, 100) fingerprints := make(model.Fingerprints, 100)
@ -56,6 +57,20 @@ func TestMatches(t *testing.T) {
} }
storage.WaitForIndexing() storage.WaitForIndexing()
// Archive every tenth metric.
for i, fp := range fingerprints {
if i%10 != 0 {
continue
}
s, ok := storage.fpToSeries.get(fp)
if !ok {
t.Fatal("could not retrieve series for fp", fp)
}
storage.fpLocker.Lock(fp)
storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime)
storage.fpLocker.Unlock(fp)
}
newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher { newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher {
lm, err := metric.NewLabelMatcher(matchType, name, value) lm, err := metric.NewLabelMatcher(matchType, name, value)
if err != nil { if err != nil {
@ -178,7 +193,10 @@ func TestMatches(t *testing.T) {
} }
for _, mt := range matcherTests { for _, mt := range matcherTests {
res := storage.MetricsForLabelMatchers(mt.matchers...) res := storage.MetricsForLabelMatchers(
model.Earliest, model.Latest,
mt.matchers...,
)
if len(mt.expected) != len(res) { if len(mt.expected) != len(res) {
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res))
} }
@ -194,6 +212,56 @@ func TestMatches(t *testing.T) {
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers) t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers)
} }
} }
// Smoketest for from/through.
if len(storage.MetricsForLabelMatchers(
model.Earliest, -10000,
mt.matchers...,
)) > 0 {
t.Error("expected no matches with 'through' older than any sample")
}
if len(storage.MetricsForLabelMatchers(
10000, model.Latest,
mt.matchers...,
)) > 0 {
t.Error("expected no matches with 'from' newer than any sample")
}
// Now the tricky one, cut out something from the middle.
var (
from model.Time = 25
through model.Time = 75
)
res = storage.MetricsForLabelMatchers(
from, through,
mt.matchers...,
)
expected := model.Fingerprints{}
for _, fp := range mt.expected {
i := 0
for ; fingerprints[i] != fp && i < len(fingerprints); i++ {
}
if i == len(fingerprints) {
t.Fatal("expected fingerprint does not exist")
}
if !model.Time(i).Before(from) && !model.Time(i).After(through) {
expected = append(expected, fp)
}
}
if len(expected) != len(res) {
t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res))
}
for fp1 := range res {
found := false
for _, fp2 := range expected {
if fp1 == fp2 {
found = true
break
}
}
if !found {
t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers)
}
}
} }
} }
@ -362,7 +430,10 @@ func BenchmarkLabelMatching(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{}
for _, mt := range matcherTests { for _, mt := range matcherTests {
benchLabelMatchingRes = s.MetricsForLabelMatchers(mt...) benchLabelMatchingRes = s.MetricsForLabelMatchers(
model.Earliest, model.Latest,
mt...,
)
} }
} }
// Stop timer to not count the storage closing. // Stop timer to not count the storage closing.
@ -465,11 +536,7 @@ func TestDropMetrics(t *testing.T) {
s.maintainMemorySeries(fpToBeArchived, 0) s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived) s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived) s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric( s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived) s.fpLocker.Unlock(fpToBeArchived)
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
@ -576,11 +643,7 @@ func TestQuarantineMetric(t *testing.T) {
s.maintainMemorySeries(fpToBeArchived, 0) s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived) s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived) s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric( s.persistence.archiveMetric(fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond))
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived) s.fpLocker.Unlock(fpToBeArchived)
// Corrupt the series file for m3. // Corrupt the series file for m3.
@ -692,11 +755,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
if cd.isEvicted() { if cd.isEvicted() {
continue continue
} }
for sample := range cd.c.newIterator().values() { it := cd.c.newIterator()
if sample.error != nil { for it.scan() {
t.Error(sample.error) values = append(values, it.value())
} }
values = append(values, sample.SamplePair) if it.err() != nil {
t.Error(it.err())
} }
} }
@ -1137,36 +1201,22 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := s.persistence.archiveMetric( s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
fp, series.metric, series.firstTime(), lastTime, archived, _, _ := s.persistence.hasArchivedMetric(fp)
); err != nil {
t.Fatal(err)
}
archived, _, _, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
} }
// Drop ~half of the chunks of an archived series. // Drop ~half of the chunks of an archived series.
s.maintainArchivedSeries(fp, 10000) s.maintainArchivedSeries(fp, 10000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived { if !archived {
t.Fatal("archived series purged although only half of the chunks dropped") t.Fatal("archived series purged although only half of the chunks dropped")
} }
// Drop everything. // Drop everything.
s.maintainArchivedSeries(fp, 100000) s.maintainArchivedSeries(fp, 100000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if archived { if archived {
t.Fatal("archived series not dropped") t.Fatal("archived series not dropped")
} }
@ -1192,16 +1242,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := s.persistence.archiveMetric( s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime)
fp, series.metric, series.firstTime(), lastTime, archived, _, _ = s.persistence.hasArchivedMetric(fp)
); err != nil {
t.Fatal(err)
}
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived { if !archived {
t.Fatal("not archived") t.Fatal("not archived")
} }
@ -1213,24 +1255,25 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if !ok { if !ok {
t.Fatal("could not find series") t.Fatal("could not find series")
} }
archived, _, _, err = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if archived { if archived {
t.Fatal("archived") t.Fatal("archived")
} }
// Set archiveHighWatermark to a low value so that we can see it increase.
s.archiveHighWatermark = 42
// This will archive again, but must not drop it completely, despite the // This will archive again, but must not drop it completely, despite the
// memorySeries being empty. // memorySeries being empty.
s.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp) archived, _, _ = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
if !archived { if !archived {
t.Fatal("series purged completely") t.Fatal("series purged completely")
} }
// archiveHighWatermark must have been set by maintainMemorySeries.
if want, got := model.Time(19998), s.archiveHighWatermark; want != got {
t.Errorf("want archiveHighWatermark %v, got %v", want, got)
}
} }
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) { func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {

View File

@ -21,6 +21,7 @@ package local
import ( import (
"time" "time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -51,6 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
SyncStrategy: Adaptive, SyncStrategy: Adaptive,
} }
storage := NewMemorySeriesStorage(o) storage := NewMemorySeriesStorage(o)
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
if err := storage.Start(); err != nil { if err := storage.Start(); err != nil {
directory.Close() directory.Close()
t.Fatalf("Error creating storage: %s", err) t.Fatalf("Error creating storage: %s", err)

View File

@ -226,7 +226,10 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for fp, met := range api.Storage.MetricsForLabelMatchers(matchers...) { for fp, met := range api.Storage.MetricsForLabelMatchers(
model.Earliest, model.Latest, // Get every series.
matchers...,
) {
res[fp] = met res[fp] = met
} }
} }
@ -250,7 +253,10 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
if err != nil { if err != nil {
return nil, &apiError{errorBadData, err} return nil, &apiError{errorBadData, err}
} }
for fp := range api.Storage.MetricsForLabelMatchers(matchers...) { for fp := range api.Storage.MetricsForLabelMatchers(
model.Earliest, model.Latest, // Get every series.
matchers...,
) {
fps[fp] = struct{}{} fps[fp] = struct{}{}
} }
} }

View File

@ -19,7 +19,6 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -33,7 +32,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
req.ParseForm() req.ParseForm()
metrics := map[model.Fingerprint]metric.Metric{} fps := map[model.Fingerprint]struct{}{}
for _, s := range req.Form["match[]"] { for _, s := range req.Form["match[]"] {
matchers, err := promql.ParseMetricSelector(s) matchers, err := promql.ParseMetricSelector(s)
@ -41,8 +40,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
} }
for fp, met := range h.storage.MetricsForLabelMatchers(matchers...) { for fp := range h.storage.MetricsForLabelMatchers(
metrics[fp] = met model.Now().Add(-promql.StalenessDelta), model.Latest,
matchers...,
) {
fps[fp] = struct{}{}
} }
} }
@ -62,19 +64,19 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
Type: dto.MetricType_UNTYPED.Enum(), Type: dto.MetricType_UNTYPED.Enum(),
} }
for fp, met := range metrics { for fp := range fps {
globalUsed := map[model.LabelName]struct{}{} globalUsed := map[model.LabelName]struct{}{}
sp := h.storage.LastSamplePairForFingerprint(fp) s := h.storage.LastSampleForFingerprint(fp)
// Discard if sample does not exist or lays before the staleness interval. // Discard if sample does not exist or lays before the staleness interval.
if sp.Timestamp.Before(minTimestamp) { if s.Timestamp.Before(minTimestamp) {
continue continue
} }
// Reset label slice. // Reset label slice.
protMetric.Label = protMetric.Label[:0] protMetric.Label = protMetric.Label[:0]
for ln, lv := range met.Metric { for ln, lv := range s.Metric {
if ln == model.MetricNameLabel { if ln == model.MetricNameLabel {
protMetricFam.Name = proto.String(string(lv)) protMetricFam.Name = proto.String(string(lv))
continue continue
@ -98,8 +100,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
} }
protMetric.TimestampMs = proto.Int64(int64(sp.Timestamp)) protMetric.TimestampMs = proto.Int64(int64(s.Timestamp))
protMetric.Untyped.Value = proto.Float64(float64(sp.Value)) protMetric.Untyped.Value = proto.Float64(float64(s.Value))
if err := enc.Encode(protMetricFam); err != nil { if err := enc.Encode(protMetricFam); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)