Merge pull request #1405 from prometheus/beorn7/storage

Streamline series iterator creation
pull/1465/head
Björn Rabenstein 2016-03-07 13:30:56 +01:00
commit 2a2cc52828
14 changed files with 450 additions and 470 deletions

View File

@ -125,26 +125,45 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
}() }()
// Preload all analyzed ranges. // Preload all analyzed ranges.
iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{}
for offset, pt := range a.offsetPreloadTimes { for offset, pt := range a.offsetPreloadTimes {
itersForDuration := map[model.Fingerprint]local.SeriesIterator{}
iters[offset] = itersForDuration
start := a.Start.Add(-offset) start := a.Start.Add(-offset)
end := a.End.Add(-offset) end := a.End.Add(-offset)
for fp, rangeDuration := range pt.ranges { for fp, rangeDuration := range pt.ranges {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta) startOfRange := start.Add(-rangeDuration)
if StalenessDelta > rangeDuration {
// Cover a weird corner case: The expression
// mixes up instants and ranges for the same
// series. We'll handle that over-all as
// range. But if the rangeDuration is smaller
// than the StalenessDelta, the range wouldn't
// cover everything potentially needed for the
// instant, so we have to extend startOfRange.
startOfRange = start.Add(-StalenessDelta)
}
iter, err := p.PreloadRange(fp, startOfRange, end)
if err != nil { if err != nil {
return nil, err return nil, err
} }
itersForDuration[fp] = iter
} }
for fp := range pt.instants { for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
err = p.PreloadRange(fp, start, end, StalenessDelta) // Need to look backwards by StalenessDelta but not
// forward because we always return the closest sample
// _before_ the reference time.
iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end)
if err != nil { if err != nil {
return nil, err return nil, err
} }
itersForDuration[fp] = iter
} }
} }
@ -153,11 +172,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
for fp := range n.metrics { for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp) n.iterators[fp] = iters[n.Offset][fp]
} }
case *MatrixSelector: case *MatrixSelector:
for fp := range n.metrics { for fp := range n.metrics {
n.iterators[fp] = a.Storage.NewIterator(fp) n.iterators[fp] = iters[n.Offset][fp]
} }
} }
return true return true

View File

@ -688,16 +688,17 @@ func (ev *evaluator) eval(expr Expr) model.Value {
func (ev *evaluator) vectorSelector(node *VectorSelector) vector { func (ev *evaluator) vectorSelector(node *VectorSelector) vector {
vec := vector{} vec := vector{}
for fp, it := range node.iterators { for fp, it := range node.iterators {
sampleCandidates := it.ValueAtTime(ev.Timestamp.Add(-node.Offset)) refTime := ev.Timestamp.Add(-node.Offset)
samplePair := chooseClosestBefore(sampleCandidates, ev.Timestamp.Add(-node.Offset)) samplePair := it.ValueAtOrBeforeTime(refTime)
if samplePair != nil { if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) {
continue // Sample outside of staleness policy window.
}
vec = append(vec, &sample{ vec = append(vec, &sample{
Metric: node.metrics[fp], Metric: node.metrics[fp],
Value: samplePair.Value, Value: samplePair.Value,
Timestamp: ev.Timestamp, Timestamp: ev.Timestamp,
}) })
} }
}
return vec return vec
} }
@ -1168,23 +1169,6 @@ func shouldDropMetricName(op itemType) bool {
// series is considered stale. // series is considered stale.
var StalenessDelta = 5 * time.Minute var StalenessDelta = 5 * time.Minute
// chooseClosestBefore chooses the closest sample of a list of samples
// before or at a given target time.
func chooseClosestBefore(samples []model.SamplePair, timestamp model.Time) *model.SamplePair {
for _, candidate := range samples {
delta := candidate.Timestamp.Sub(timestamp)
// Samples before or at target time.
if delta <= 0 {
// Ignore samples outside of staleness policy window.
if -delta > StalenessDelta {
continue
}
return &candidate
}
}
return nil
}
// A queryGate controls the maximum number of concurrently running and waiting queries. // A queryGate controls the maximum number of concurrently running and waiting queries.
type queryGate struct { type queryGate struct {
ch chan struct{} ch chan struct{}

View File

@ -583,6 +583,11 @@ func funcPredictLinear(ev *evaluator, args Expressions) model.Value {
} }
// add predicted delta to last value. // add predicted delta to last value.
// TODO(beorn7): This is arguably suboptimal. The funcDeriv above has
// given us an estimate over the range. So we should add the delta to
// the value predicted for the end of the range. Also, once this has
// been rectified, we are not using BoundaryValues anywhere anymore, so
// we can kick out a whole lot of code.
matrixBounds := ev.evalMatrixBounds(args[0]) matrixBounds := ev.evalMatrixBounds(args[0])
outVec := make(vector, 0, len(signatureToDelta)) outVec := make(vector, 0, len(signatureToDelta))
for _, samples := range matrixBounds { for _, samples := range matrixBounds {

37
promql/testdata/selectors.test vendored Normal file
View File

@ -0,0 +1,37 @@
load 10s
http_requests{job="api-server", instance="0", group="production"} 0+10x1000 100+30x1000
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000
http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000
http_requests{job="api-server", instance="1", group="canary"} 0+40x2000
eval instant at 8000s rate(http_requests[1m])
{job="api-server", instance="0", group="production"} 1
{job="api-server", instance="1", group="production"} 2
{job="api-server", instance="0", group="canary"} 3
{job="api-server", instance="1", group="canary"} 4
eval instant at 18000s rate(http_requests[1m])
{job="api-server", instance="0", group="production"} 3
{job="api-server", instance="1", group="production"} 3
{job="api-server", instance="0", group="canary"} 8
{job="api-server", instance="1", group="canary"} 4
eval instant at 8000s rate(http_requests{group=~"pro.*"}[1m])
{job="api-server", instance="0", group="production"} 1
{job="api-server", instance="1", group="production"} 2
eval instant at 18000s rate(http_requests{group=~".*ry", instance="1"}[1m])
{job="api-server", instance="1", group="canary"} 4
eval instant at 18000s rate(http_requests{instance!="3"}[1m] offset 10000s)
{job="api-server", instance="0", group="production"} 1
{job="api-server", instance="1", group="production"} 2
{job="api-server", instance="0", group="canary"} 3
{job="api-server", instance="1", group="canary"} 4
eval instant at 18000s rate(http_requests[40s]) - rate(http_requests[1m] offset 10000s)
{job="api-server", instance="0", group="production"} 2
{job="api-server", instance="1", group="production"} 1
{job="api-server", instance="0", group="canary"} 5
{job="api-server", instance="1", group="canary"} 0

View File

@ -53,13 +53,46 @@ const (
doubleDelta doubleDelta
) )
// chunkDesc contains meta-data for a chunk. Many of its methods are // chunkDesc contains meta-data for a chunk. Pay special attention to the
// goroutine-safe proxies for chunk methods. // documented requirements for calling its methods concurrently (WRT pinning and
// locking). The doc comments spell out the requirements for each method, but
// here is an overview and general explanation:
//
// Everything that changes the pinning of the underlying chunk or deals with its
// eviction is protected by a mutex. This affects the following methods: pin,
// unpin, refCount, isEvicted, maybeEvict. These methods can be called at any
// time without further prerequisites.
//
// Another group of methods acts on (or sets) the underlying chunk. These
// methods involve no locking. They may only be called if the caller has pinned
// the chunk (to guarantee the chunk is not evicted concurrently). Also, the
// caller must make sure nobody else will call these methods concurrently,
// either by holding the sole reference to the chunkDesc (usually during loading
// or creation) or by locking the fingerprint of the series the chunkDesc
// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk.
//
// Finally, there are the special cases firstTime and lastTime. lastTime requires
// to have locked the fingerprint of the series but the chunk does not need to
// be pinned. That's because the chunkLastTime field in chunkDesc gets populated
// upon completion of the chunk (when it is still pinned, and which happens
// while the series's fingerprint is locked). Once that has happened, calling
// lastTime does not require the chunk to be loaded anymore. Before that has
// happened, the chunk is pinned anyway. The chunkFirstTime field in chunkDesc
// is populated upon creation of a chunkDesc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for
// consistency with lastTime.
//
// Yet another (deprecated) case is lastSamplePair. It's used in federation and
// must be callable without pinning. Locking the fingerprint of the series is
// still required (to avoid concurrent appends to the chunk). The call is
// relatively expensive because of the required acquisition of the evict
// mutex. It will go away, though, once tracking the lastSamplePair has been
// moved into the series object.
type chunkDesc struct { type chunkDesc struct {
sync.Mutex // TODO(beorn7): Try out if an RWMutex would help here. sync.Mutex // Protects pinning.
c chunk // nil if chunk is evicted. c chunk // nil if chunk is evicted.
rCnt int rCnt int
chunkFirstTime model.Time // Populated at creation. chunkFirstTime model.Time // Populated at creation. Immutable.
chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset.
// evictListElement is nil if the chunk is not in the evict list. // evictListElement is nil if the chunk is not in the evict list.
@ -83,16 +116,17 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
} }
} }
// add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of
// the series.
func (cd *chunkDesc) add(s *model.SamplePair) []chunk { func (cd *chunkDesc) add(s *model.SamplePair) []chunk {
cd.Lock()
defer cd.Unlock()
return cd.c.add(s) return cd.c.add(s)
} }
// pin increments the refCount by one. Upon increment from 0 to 1, this // pin increments the refCount by one. Upon increment from 0 to 1, this
// chunkDesc is removed from the evict list. To enable the latter, the // chunkDesc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided. // evictRequests channel has to be provided. This method can be called
// concurrently at any time.
func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -106,7 +140,8 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this // unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// chunkDesc is added to the evict list. To enable the latter, the evictRequests // chunkDesc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided. // channel has to be provided. This method can be called concurrently at any
// time.
func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -121,6 +156,8 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) {
} }
} }
// refCount returns the number of pins. This method can be called concurrently
// at any time.
func (cd *chunkDesc) refCount() int { func (cd *chunkDesc) refCount() int {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -128,30 +165,39 @@ func (cd *chunkDesc) refCount() int {
return cd.rCnt return cd.rCnt
} }
// firstTime returns the timestamp of the first sample in the chunk. This method
// can be called concurrently at any time. It only returns the immutable
// cd.chunkFirstTime without any locking. Arguably, this method is
// useless. However, it provides consistency with the lastTime method.
func (cd *chunkDesc) firstTime() model.Time { func (cd *chunkDesc) firstTime() model.Time {
// No lock required, will never be modified.
return cd.chunkFirstTime return cd.chunkFirstTime
} }
// lastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to
// be locked.
func (cd *chunkDesc) lastTime() model.Time { func (cd *chunkDesc) lastTime() model.Time {
cd.Lock()
defer cd.Unlock()
if cd.chunkLastTime != model.Earliest || cd.c == nil { if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime return cd.chunkLastTime
} }
return cd.c.newIterator().lastTimestamp() return cd.c.newIterator().lastTimestamp()
} }
// maybePopulateLastTime populates the chunkLastTime from the underlying chunk
// if it has not yet happened. Call this method directly after having added the
// last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series.
func (cd *chunkDesc) maybePopulateLastTime() { func (cd *chunkDesc) maybePopulateLastTime() {
cd.Lock()
defer cd.Unlock()
if cd.chunkLastTime == model.Earliest && cd.c != nil { if cd.chunkLastTime == model.Earliest && cd.c != nil {
cd.chunkLastTime = cd.c.newIterator().lastTimestamp() cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
} }
} }
// lastSamplePair returns the last sample pair of the underlying chunk, or nil
// if the chunk is evicted. For safe concurrent access, this method requires the
// fingerprint of the time series to be locked.
// TODO(beorn7): Move up into memorySeries.
func (cd *chunkDesc) lastSamplePair() *model.SamplePair { func (cd *chunkDesc) lastSamplePair() *model.SamplePair {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -166,28 +212,22 @@ func (cd *chunkDesc) lastSamplePair() *model.SamplePair {
} }
} }
// isEvicted returns whether the chunk is evicted. For safe concurrent access,
// the caller must have locked the fingerprint of the series.
func (cd *chunkDesc) isEvicted() bool { func (cd *chunkDesc) isEvicted() bool {
// Locking required here because we do not want the caller to force
// pinning the chunk first, so it could be evicted while this method is
// called.
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
return cd.c == nil return cd.c == nil
} }
func (cd *chunkDesc) contains(t model.Time) bool { // setChunk sets the underlying chunk. The caller must have locked the
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first
} // call pin and then set the chunk).
func (cd *chunkDesc) chunk() chunk {
cd.Lock()
defer cd.Unlock()
return cd.c
}
func (cd *chunkDesc) setChunk(c chunk) { func (cd *chunkDesc) setChunk(c chunk) {
cd.Lock()
defer cd.Unlock()
if cd.c != nil { if cd.c != nil {
panic("chunk already set") panic("chunk already set")
} }
@ -196,7 +236,7 @@ func (cd *chunkDesc) setChunk(c chunk) {
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even // is now evicted, which includes the case that the chunk was evicted even
// before this method was called. // before this method was called. It can be called concurrently at any time.
func (cd *chunkDesc) maybeEvict() bool { func (cd *chunkDesc) maybeEvict() bool {
cd.Lock() cd.Lock()
defer cd.Unlock() defer cd.Unlock()
@ -207,9 +247,9 @@ func (cd *chunkDesc) maybeEvict() bool {
if cd.rCnt != 0 { if cd.rCnt != 0 {
return false return false
} }
// Last opportunity to populate chunkLastTime.
if cd.chunkLastTime == model.Earliest { if cd.chunkLastTime == model.Earliest {
cd.chunkLastTime = cd.c.newIterator().lastTimestamp() // This must never happen.
panic("chunkLastTime not populated for evicted chunk")
} }
cd.c = nil cd.c = nil
chunkOps.WithLabelValues(evict).Inc() chunkOps.WithLabelValues(evict).Inc()
@ -251,12 +291,11 @@ type chunkIterator interface {
sampleValueAtIndex(int) model.SampleValue sampleValueAtIndex(int) model.SampleValue
// Gets the last sample value in the chunk. // Gets the last sample value in the chunk.
lastSampleValue() model.SampleValue lastSampleValue() model.SampleValue
// Gets the two values that are immediately adjacent to a given time. In // Gets the value that is closest before the given time. In case a value
// case a value exist at precisely the given time, only that single // exists at precisely the given time, that value is returned. If no
// value is returned. Only the first or last value is returned (as a // applicable value exists, a SamplePair with timestamp model.Earliest
// single value), if the given time is before or after the first or last // and value 0.0 is returned.
// value, respectively. valueAtOrBeforeTime(model.Time) model.SamplePair
valueAtTime(model.Time) []model.SamplePair
// Gets all values contained within a given interval. // Gets all values contained within a given interval.
rangeValues(metric.Interval) []model.SamplePair rangeValues(metric.Interval) []model.SamplePair
// Whether a given timestamp is contained between first and last value // Whether a given timestamp is contained between first and last value

View File

@ -301,41 +301,17 @@ type deltaEncodedChunkIterator struct {
// length implements chunkIterator. // length implements chunkIterator.
func (it *deltaEncodedChunkIterator) length() int { return it.len } func (it *deltaEncodedChunkIterator) length() int { return it.len }
// valueAtTime implements chunkIterator. // valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(t) return it.timestampAtIndex(i).After(t)
}) })
if i == 0 {
switch i { return model.SamplePair{Timestamp: model.Earliest}
case 0:
return []model.SamplePair{{
Timestamp: it.timestampAtIndex(0),
Value: it.sampleValueAtIndex(0),
}}
case it.len:
return []model.SamplePair{{
Timestamp: it.timestampAtIndex(it.len - 1),
Value: it.sampleValueAtIndex(it.len - 1),
}}
default:
ts := it.timestampAtIndex(i)
if ts.Equal(t) {
return []model.SamplePair{{
Timestamp: ts,
Value: it.sampleValueAtIndex(i),
}}
} }
return []model.SamplePair{ return model.SamplePair{
{
Timestamp: it.timestampAtIndex(i - 1), Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1), Value: it.sampleValueAtIndex(i - 1),
},
{
Timestamp: ts,
Value: it.sampleValueAtIndex(i),
},
}
} }
} }

View File

@ -407,41 +407,17 @@ type doubleDeltaEncodedChunkIterator struct {
// length implements chunkIterator. // length implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
// valueAtTime implements chunkIterator. // valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtTime(t model.Time) []model.SamplePair { func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair {
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(t) return it.timestampAtIndex(i).After(t)
}) })
if i == 0 {
switch i { return model.SamplePair{Timestamp: model.Earliest}
case 0:
return []model.SamplePair{{
Timestamp: it.timestampAtIndex(0),
Value: it.sampleValueAtIndex(0),
}}
case it.len:
return []model.SamplePair{{
Timestamp: it.timestampAtIndex(it.len - 1),
Value: it.sampleValueAtIndex(it.len - 1),
}}
default:
ts := it.timestampAtIndex(i)
if ts.Equal(t) {
return []model.SamplePair{{
Timestamp: ts,
Value: it.sampleValueAtIndex(i),
}}
} }
return []model.SamplePair{ return model.SamplePair{
{
Timestamp: it.timestampAtIndex(i - 1), Timestamp: it.timestampAtIndex(i - 1),
Value: it.sampleValueAtIndex(i - 1), Value: it.sampleValueAtIndex(i - 1),
},
{
Timestamp: ts,
Value: it.sampleValueAtIndex(i),
},
}
} }
} }

View File

@ -14,8 +14,6 @@
package local package local
import ( import (
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -52,10 +50,6 @@ type Storage interface {
LabelValuesForLabelName(model.LabelName) model.LabelValues LabelValuesForLabelName(model.LabelName) model.LabelValues
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
MetricForFingerprint(model.Fingerprint) metric.Metric MetricForFingerprint(model.Fingerprint) metric.Metric
// Construct an iterator for a given fingerprint.
// The iterator will never return samples older than retention time,
// relative to the time NewIterator was called.
NewIterator(model.Fingerprint) SeriesIterator
// Drop all time series associated with the given fingerprints. // Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint) DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the // Run the various maintenance loops in goroutines. Returns when the
@ -77,12 +71,11 @@ type Storage interface {
// modifying the corresponding series, but the iterator will represent the state // modifying the corresponding series, but the iterator will represent the state
// of the series prior the modification. // of the series prior the modification.
type SeriesIterator interface { type SeriesIterator interface {
// Gets the two values that are immediately adjacent to a given time. In // Gets the value that is closest before the given time. In case a value
// case a value exist at precisely the given time, only that single // exists at precisely the given time, that value is returned. If no
// value is returned. Only the first or last value is returned (as a // applicable value exists, a SamplePair with timestamp model.Earliest
// single value), if the given time is before or after the first or last // and value 0.0 is returned.
// value, respectively. ValueAtOrBeforeTime(model.Time) model.SamplePair
ValueAtTime(model.Time) []model.SamplePair
// Gets the boundary values of an interval: the first and last value // Gets the boundary values of an interval: the first and last value
// within a given interval. // within a given interval.
BoundaryValues(metric.Interval) []model.SamplePair BoundaryValues(metric.Interval) []model.SamplePair
@ -90,15 +83,14 @@ type SeriesIterator interface {
RangeValues(metric.Interval) []model.SamplePair RangeValues(metric.Interval) []model.SamplePair
} }
// A Preloader preloads series data necessary for a query into memory and pins // A Preloader preloads series data necessary for a query into memory, pins it
// them until released via Close(). Its methods are generally not // until released via Close(), and returns an iterator for the pinned data. Its
// goroutine-safe. // methods are generally not goroutine-safe.
type Preloader interface { type Preloader interface {
PreloadRange( PreloadRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
stalenessDelta time.Duration, ) (SeriesIterator, error)
) error
// Close unpins any previously requested series data from memory. // Close unpins any previously requested series data from memory.
Close() Close()
} }

View File

@ -823,6 +823,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
} }
} }
headChunkClosed := true // Initial assumption.
for i := int64(0); i < numChunkDescs; i++ { for i := int64(0); i < numChunkDescs; i++ {
if i < persistWatermark { if i < persistWatermark {
firstTime, err := binary.ReadVarint(r) firstTime, err := binary.ReadVarint(r)
@ -844,6 +845,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
chunkDescsTotal++ chunkDescsTotal++
} else { } else {
// Non-persisted chunk. // Non-persisted chunk.
// If there are non-persisted chunks at all, we consider
// the head chunk not to be closed yet.
headChunkClosed = false
encoding, err := r.ReadByte() encoding, err := r.ReadByte()
if err != nil { if err != nil {
log.Warn("Could not decode chunk type:", err) log.Warn("Could not decode chunk type:", err)
@ -856,15 +860,15 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
p.dirty = true p.dirty = true
return sm, chunksToPersist, nil return sm, chunksToPersist, nil
} }
chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime()) cd := newChunkDesc(chunk, chunk.firstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
chunksToPersist++ chunksToPersist++
cd.maybePopulateLastTime()
} }
chunkDescs[i] = cd
} }
headChunkClosed := persistWatermark >= numChunkDescs
if !headChunkClosed {
// Head chunk is not ready for persisting yet.
chunksToPersist--
} }
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{

View File

@ -485,6 +485,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS1.headChunkClosed { if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
if loadedS1.head().chunkFirstTime != 1 {
t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime)
}
if loadedS1.head().chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m1) t.Errorf("couldn't find %v in loaded map", m1)
} }
@ -501,6 +507,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if !loadedS3.headChunkClosed { if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false") t.Error("headChunkClosed is false")
} }
if loadedS3.head().chunkFirstTime != 2 {
t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime)
}
if loadedS3.head().chunkLastTime != 2 {
t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime)
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m3) t.Errorf("couldn't find %v in loaded map", m3)
} }
@ -526,6 +538,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS4.headChunkClosed { if loadedS4.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS4.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
)
}
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m4) t.Errorf("couldn't find %v in loaded map", m4)
} }
@ -551,6 +584,34 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS5.headChunkClosed { if loadedS5.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS5.chunkDescs {
if i < 3 {
// Evicted chunks.
if cd.chunkFirstTime == model.Earliest {
t.Errorf("chunkDesc[%d]: chunkLastTime not set", i)
}
continue
}
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS5.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
)
}
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m5) t.Errorf("couldn't find %v in loaded map", m5)
} }

View File

@ -13,11 +13,7 @@
package local package local
import ( import "github.com/prometheus/common/model"
"time"
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage. // memorySeriesPreloader is a Preloader for the memorySeriesStorage.
type memorySeriesPreloader struct { type memorySeriesPreloader struct {
@ -29,74 +25,15 @@ type memorySeriesPreloader struct {
func (p *memorySeriesPreloader) PreloadRange( func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
stalenessDelta time.Duration, ) (SeriesIterator, error) {
) error { cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
cds, err := p.storage.preloadChunksForRange(fp, from, through, stalenessDelta)
if err != nil { if err != nil {
return err return nil, err
} }
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return nil return iter, nil
} }
/*
// MetricAtTime implements Preloader.
func (p *memorySeriesPreloader) MetricAtTime(fp model.Fingerprint, t model.Time) error {
cds, err := p.storage.preloadChunks(fp, &timeSelector{
from: t,
through: t,
})
if err != nil {
return err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return nil
}
// MetricAtInterval implements Preloader.
func (p *memorySeriesPreloader) MetricAtInterval(fp model.Fingerprint, from, through model.Time, interval time.Duration) error {
cds, err := p.storage.preloadChunks(fp, &timeSelector{
from: from,
through: through,
interval: interval,
})
if err != nil {
return err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return
}
// MetricRange implements Preloader.
func (p *memorySeriesPreloader) MetricRange(fp model.Fingerprint, t model.Time, rangeDuration time.Duration) error {
cds, err := p.storage.preloadChunks(fp, &timeSelector{
from: t,
through: t,
rangeDuration: through.Sub(from),
})
if err != nil {
return err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return
}
// MetricRangeAtInterval implements Preloader.
func (p *memorySeriesPreloader) MetricRangeAtInterval(fp model.Fingerprint, from, through model.Time, interval, rangeDuration time.Duration) error {
cds, err := p.storage.preloadChunks(fp, &timeSelector{
from: from,
through: through,
interval: interval,
rangeDuration: rangeDuration,
})
if err != nil {
return err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return
}
*/
// Close implements Preloader. // Close implements Preloader.
func (p *memorySeriesPreloader) Close() { func (p *memorySeriesPreloader) Close() {
for _, cd := range p.pinnedChunkDescs { for _, cd := range p.pinnedChunkDescs {

View File

@ -315,7 +315,7 @@ func (s *memorySeries) dropChunks(t model.Time) {
// preloadChunks is an internal helper method. // preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks( func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *memorySeriesStorage, indexes []int, fp model.Fingerprint, mss *memorySeriesStorage,
) ([]*chunkDesc, error) { ) ([]*chunkDesc, SeriesIterator, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
@ -339,52 +339,47 @@ func (s *memorySeries) preloadChunks(
cd.unpin(mss.evictRequests) cd.unpin(mss.evictRequests)
} }
chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs)))
return nil, err return nil, nopIter, err
} }
for i, c := range chunks { for i, c := range chunks {
s.chunkDescs[loadIndexes[i]].setChunk(c) s.chunkDescs[loadIndexes[i]].setChunk(c)
} }
} }
return pinnedChunkDescs, nil
if !s.headChunkClosed && indexes[len(indexes)-1] == len(s.chunkDescs)-1 {
s.headChunkUsedByIterator = true
} }
/* iter := &boundedIterator{
func (s *memorySeries) preloadChunksAtTime(t model.Time, p *persistence) (chunkDescs, error) { it: s.newIterator(pinnedChunkDescs),
s.mtx.Lock() start: model.Now().Add(-mss.dropAfter),
defer s.mtx.Unlock()
if len(s.chunkDescs) == 0 {
return nil, nil
} }
var pinIndexes []int return pinnedChunkDescs, iter, nil
// Find first chunk where lastTime() is after or equal to t.
i := sort.Search(len(s.chunkDescs), func(i int) bool {
return !s.chunkDescs[i].lastTime().Before(t)
})
switch i {
case 0:
pinIndexes = []int{0}
case len(s.chunkDescs):
pinIndexes = []int{i - 1}
default:
if s.chunkDescs[i].contains(t) {
pinIndexes = []int{i}
} else {
pinIndexes = []int{i - 1, i}
}
} }
return s.preloadChunks(pinIndexes, p) // newIterator returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). The caller must have locked the fingerprint of the
// memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the
// series FP is locked and the chunk is pinned.
chunks = append(chunks, cd.c)
}
return &memorySeriesIterator{
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
}
} }
*/
// preloadChunksForRange loads chunks for the given range from the persistence. // preloadChunksForRange loads chunks for the given range from the persistence.
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) preloadChunksForRange( func (s *memorySeries) preloadChunksForRange(
from model.Time, through model.Time, from model.Time, through model.Time,
fp model.Fingerprint, mss *memorySeriesStorage, fp model.Fingerprint, mss *memorySeriesStorage,
) ([]*chunkDesc, error) { ) ([]*chunkDesc, SeriesIterator, error) {
firstChunkDescTime := model.Latest firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime() firstChunkDescTime = s.chunkDescs[0].firstTime()
@ -392,15 +387,16 @@ func (s *memorySeries) preloadChunksForRange(
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
cds, err := mss.loadChunkDescs(fp, s.persistWatermark) cds, err := mss.loadChunkDescs(fp, s.persistWatermark)
if err != nil { if err != nil {
return nil, err return nil, nopIter, err
} }
s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescs = append(cds, s.chunkDescs...)
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.persistWatermark += len(cds) s.persistWatermark += len(cds)
firstChunkDescTime = s.chunkDescs[0].firstTime()
} }
if len(s.chunkDescs) == 0 { if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) {
return nil, nil return nil, nopIter, nil
} }
// Find first chunk with start time after "from". // Find first chunk with start time after "from".
@ -411,6 +407,13 @@ func (s *memorySeries) preloadChunksForRange(
throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
return s.chunkDescs[i].firstTime().After(through) return s.chunkDescs[i].firstTime().After(through)
}) })
if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything.
if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) {
return nil, nopIter, nil
}
}
if fromIdx > 0 { if fromIdx > 0 {
fromIdx-- fromIdx--
} }
@ -425,25 +428,6 @@ func (s *memorySeries) preloadChunksForRange(
return s.preloadChunks(pinIndexes, fp, mss) return s.preloadChunks(pinIndexes, fp, mss)
} }
// newIterator returns a new SeriesIterator. The caller must have locked the
// fingerprint of the memorySeries.
func (s *memorySeries) newIterator() SeriesIterator {
chunks := make([]chunk, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs {
if chunk := cd.chunk(); chunk != nil {
if i == len(s.chunkDescs)-1 && !s.headChunkClosed {
s.headChunkUsedByIterator = true
}
chunks = append(chunks, chunk)
}
}
return &memorySeriesIterator{
chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)),
}
}
// head returns a pointer to the head chunk descriptor. The caller must have // head returns a pointer to the head chunk descriptor. The caller must have
// locked the fingerprint of the memorySeries. This method will panic if this // locked the fingerprint of the memorySeries. This method will panic if this
// series has no chunk descriptors. // series has no chunk descriptors.
@ -482,70 +466,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
chunkIt chunkIterator // Last chunkIterator used by ValueAtTime. chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators. chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk chunks []chunk
} }
// ValueAtTime implements SeriesIterator. // ValueAtOrBeforeTime implements SeriesIterator.
func (it *memorySeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
// The most common case. We are iterating through a chunk. // The most common case. We are iterating through a chunk.
if it.chunkIt != nil && it.chunkIt.contains(t) { if it.chunkIt != nil && it.chunkIt.contains(t) {
return it.chunkIt.valueAtTime(t) return it.chunkIt.valueAtOrBeforeTime(t)
} }
if len(it.chunks) == 0 { if len(it.chunks) == 0 {
return nil return model.SamplePair{Timestamp: model.Earliest}
} }
// Before or exactly on the first sample of the series. // Find the last chunk where firstTime() is before or equal to t.
it.chunkIt = it.chunkIterator(0)
ts := it.chunkIt.timestampAtIndex(0)
if !t.After(ts) {
// return first value of first chunk
return []model.SamplePair{{
Timestamp: ts,
Value: it.chunkIt.sampleValueAtIndex(0),
}}
}
// After or exactly on the last sample of the series.
it.chunkIt = it.chunkIterator(len(it.chunks) - 1)
ts = it.chunkIt.lastTimestamp()
if !t.Before(ts) {
// return last value of last chunk
return []model.SamplePair{{
Timestamp: ts,
Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1),
}}
}
// Find last chunk where firstTime() is before or equal to t.
l := len(it.chunks) - 1 l := len(it.chunks) - 1
i := sort.Search(len(it.chunks), func(i int) bool { i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[l-i].firstTime().After(t) return !it.chunks[l-i].firstTime().After(t)
}) })
if i == len(it.chunks) { if i == len(it.chunks) {
panic("out of bounds") // Even the first chunk starts after t.
return model.SamplePair{Timestamp: model.Earliest}
} }
it.chunkIt = it.chunkIterator(l - i) it.chunkIt = it.chunkIterator(l - i)
ts = it.chunkIt.lastTimestamp() return it.chunkIt.valueAtOrBeforeTime(t)
if t.After(ts) {
// We ended up between two chunks.
sp1 := model.SamplePair{
Timestamp: ts,
Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1),
}
it.chunkIt = it.chunkIterator(l - i + 1)
return []model.SamplePair{
sp1,
{
Timestamp: it.chunkIt.timestampAtIndex(0),
Value: it.chunkIt.sampleValueAtIndex(0),
},
}
}
return it.chunkIt.valueAtTime(t)
} }
// BoundaryValues implements SeriesIterator. // BoundaryValues implements SeriesIterator.
@ -578,18 +525,24 @@ func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.Sampl
} }
chunkIt := it.chunkIterator(i + j) chunkIt := it.chunkIterator(i + j)
if len(values) == 0 { if len(values) == 0 {
firstValues := chunkIt.valueAtTime(in.OldestInclusive) for s := range chunkIt.values() {
switch len(firstValues) { if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) {
case 2: values = append(values, *s)
values = append(values, firstValues[1]) // We cannot just break out here as we have to consume all
case 1: // the values to not leak a goroutine. This could obviously
values = firstValues // be made much neater with more suitable methods in the chunk
default: // interface. But currently, BoundaryValues is only used by
panic("unexpected return from valueAtTime") // `predict_linear` so we would pollute the chunk interface
// unduly just for one single corner case. Plus, even that use
// of BoundaryValues is suboptimal and should be replaced.
}
} }
} }
if chunkIt.lastTimestamp().After(in.NewestInclusive) { if chunkIt.lastTimestamp().After(in.NewestInclusive) {
values = append(values, chunkIt.valueAtTime(in.NewestInclusive)[0]) s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive)
if s.Timestamp != model.Earliest {
values = append(values, s)
}
break break
} }
} }
@ -644,8 +597,8 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
type nopSeriesIterator struct{} type nopSeriesIterator struct{}
// ValueAtTime implements SeriesIterator. // ValueAtTime implements SeriesIterator.
func (i nopSeriesIterator) ValueAtTime(t model.Time) []model.SamplePair { func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
return []model.SamplePair{} return model.SamplePair{Timestamp: model.Earliest}
} }
// BoundaryValues implements SeriesIterator. // BoundaryValues implements SeriesIterator.
@ -657,3 +610,5 @@ func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair
func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{} return []model.SamplePair{}
} }
var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared.

View File

@ -347,26 +347,6 @@ func (s *memorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing() s.persistence.waitForIndexing()
} }
// NewIterator implements Storage.
func (s *memorySeriesStorage) NewIterator(fp model.Fingerprint) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator{}
}
return &boundedIterator{
it: series.newIterator(),
start: model.Now().Add(-s.dropAfter),
}
}
// LastSampleForFingerprint implements Storage. // LastSampleForFingerprint implements Storage.
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair { func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
@ -386,12 +366,12 @@ type boundedIterator struct {
start model.Time start model.Time
} }
// ValueAtTime implements the SeriesIterator interface. // ValueAtOrBeforeTime implements the SeriesIterator interface.
func (bit *boundedIterator) ValueAtTime(ts model.Time) []model.SamplePair { func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
if ts < bit.start { if ts < bit.start {
return []model.SamplePair{} return model.SamplePair{Timestamp: model.Earliest}
} }
return bit.it.ValueAtTime(ts) return bit.it.ValueAtOrBeforeTime(ts)
} }
// BoundaryValues implements the SeriesIterator interface. // BoundaryValues implements the SeriesIterator interface.
@ -571,6 +551,8 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin
} }
} }
// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest
// timestamp in the series it is appended to.
var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order")
// Append implements Storage. // Append implements Storage.
@ -707,8 +689,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
func (s *memorySeriesStorage) preloadChunksForRange( func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
stalenessDelta time.Duration, ) ([]*chunkDesc, SeriesIterator, error) {
) ([]*chunkDesc, error) {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
@ -716,20 +697,20 @@ func (s *memorySeriesStorage) preloadChunksForRange(
if !ok { if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp) has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
return nil, err return nil, nopIter, err
} }
if !has { if !has {
s.invalidPreloadRequestsCount.Inc() s.invalidPreloadRequestsCount.Inc()
return nil, nil return nil, nopIter, nil
} }
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { if from.Before(last) && through.After(first) {
metric, err := s.persistence.archivedMetric(fp) metric, err := s.persistence.archivedMetric(fp)
if err != nil { if err != nil {
return nil, err return nil, nopIter, err
} }
series = s.getOrCreateSeries(fp, metric) series = s.getOrCreateSeries(fp, metric)
} else { } else {
return nil, nil return nil, nopIter, nil
} }
} }
return series.preloadChunksForRange(from, through, fp, s) return series.preloadChunksForRange(from, through, fp, s)

View File

@ -405,19 +405,17 @@ func TestRetentionCutoff(t *testing.T) {
defer pl.Close() defer pl.Close()
// Preload everything. // Preload everything.
err := pl.PreloadRange(fp, insertStart, now, 5*time.Minute) it, err := pl.PreloadRange(fp, insertStart, now)
if err != nil { if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err) t.Fatalf("Error preloading outdated chunks: %s", err)
} }
it := s.NewIterator(fp) val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
if val.Timestamp != model.Earliest {
vals := it.ValueAtTime(now.Add(-61 * time.Minute))
if len(vals) != 0 {
t.Errorf("unexpected result for timestamp before retention period") t.Errorf("unexpected result for timestamp before retention period")
} }
vals = it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
// We get 59 values here because the model.Now() is slightly later // We get 59 values here because the model.Now() is slightly later
// than our now. // than our now.
if len(vals) != 59 { if len(vals) != 59 {
@ -502,11 +500,18 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2)) t.Errorf("unexpected number of fingerprints: %d", len(fps2))
} }
it := s.NewIterator(fpList[0]) _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1])
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -528,11 +533,18 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3)) t.Errorf("unexpected number of fingerprints: %d", len(fps3))
} }
it = s.NewIterator(fpList[0]) _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1])
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -640,7 +652,7 @@ func TestChunkType1(t *testing.T) {
testChunk(t, 1) testChunk(t, 1)
} }
func testValueAtTime(t *testing.T, encoding chunkEncoding) { func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -658,82 +670,66 @@ func testValueAtTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.NewIterator(fp) _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
// #1 Exactly on a sample. // #1 Exactly on a sample.
for i, expected := range samples { for i, expected := range samples {
actual := it.ValueAtTime(expected.Timestamp) actual := it.ValueAtOrBeforeTime(expected.Timestamp)
if len(actual) != 1 { if expected.Timestamp != actual.Timestamp {
t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) t.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp)
} }
if expected.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
t.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) t.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value)
}
if expected.Value != actual[0].Value {
t.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value)
} }
} }
// #2 Between samples. // #2 Between samples.
for i, expected1 := range samples { for i, expected := range samples {
if i == len(samples)-1 { if i == len(samples)-1 {
continue continue
} }
expected2 := samples[i+1] actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1)
actual := it.ValueAtTime(expected1.Timestamp + 1)
if len(actual) != 2 { if expected.Timestamp != actual.Timestamp {
t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) t.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp)
} }
if expected1.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
t.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) t.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value)
}
if expected1.Value != actual[0].Value {
t.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value)
}
if expected2.Timestamp != actual[1].Timestamp {
t.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp)
}
if expected2.Value != actual[1].Value {
t.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value)
} }
} }
// #3 Corner cases: Just before the first sample, just after the last. // #3 Corner cases: Just before the first sample, just after the last.
expected := samples[0] expected := &model.Sample{Timestamp: model.Earliest}
actual := it.ValueAtTime(expected.Timestamp - 1) actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1)
if len(actual) != 1 { if expected.Timestamp != actual.Timestamp {
t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual)) t.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp)
} }
if expected.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
t.Errorf("3.1. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) t.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value)
}
if expected.Value != actual[0].Value {
t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value)
} }
expected = samples[len(samples)-1] expected = samples[len(samples)-1]
actual = it.ValueAtTime(expected.Timestamp + 1) actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1)
if len(actual) != 1 { if expected.Timestamp != actual.Timestamp {
t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual)) t.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp)
} }
if expected.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
t.Errorf("3.2. Got %v; want %v", actual[0].Timestamp, expected.Timestamp) t.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value)
}
if expected.Value != actual[0].Value {
t.Errorf("3.2. Got %v; want %v", actual[0].Value, expected.Value)
} }
} }
func TestValueAtTimeChunkType0(t *testing.T) { func TestValueAtTimeChunkType0(t *testing.T) {
testValueAtTime(t, 0) testValueAtOrBeforeTime(t, 0)
} }
func TestValueAtTimeChunkType1(t *testing.T) { func TestValueAtTimeChunkType1(t *testing.T) {
testValueAtTime(t, 1) testValueAtOrBeforeTime(t, 1)
} }
func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) { func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
samples := make(model.Samples, 10000) samples := make(model.Samples, 10000)
for i := range samples { for i := range samples {
samples[i] = &model.Sample{ samples[i] = &model.Sample{
@ -751,59 +747,67 @@ func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
it := s.NewIterator(fp)
// #1 Exactly on a sample. // #1 Exactly on a sample.
for i, expected := range samples { for i, expected := range samples {
actual := it.ValueAtTime(expected.Timestamp) actual := it.ValueAtOrBeforeTime(expected.Timestamp)
if len(actual) != 1 { if expected.Timestamp != actual.Timestamp {
b.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual)) b.Errorf("1.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp)
} }
if expected.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
b.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp) b.Errorf("1.%d. Got %v; want %v", i, actual.Value, expected.Value)
}
if expected.Value != actual[0].Value {
b.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value)
} }
} }
// #2 Between samples. // #2 Between samples.
for i, expected1 := range samples { for i, expected := range samples {
if i == len(samples)-1 { if i == len(samples)-1 {
continue continue
} }
expected2 := samples[i+1] actual := it.ValueAtOrBeforeTime(expected.Timestamp + 1)
actual := it.ValueAtTime(expected1.Timestamp + 1)
if len(actual) != 2 { if expected.Timestamp != actual.Timestamp {
b.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual)) b.Errorf("2.%d. Got %v; want %v", i, actual.Timestamp, expected.Timestamp)
} }
if expected1.Timestamp != actual[0].Timestamp { if expected.Value != actual.Value {
b.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp) b.Errorf("2.%d. Got %v; want %v", i, actual.Value, expected.Value)
} }
if expected1.Value != actual[0].Value {
b.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value)
} }
if expected2.Timestamp != actual[1].Timestamp {
b.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp) // #3 Corner cases: Just before the first sample, just after the last.
expected := &model.Sample{Timestamp: model.Earliest}
actual := it.ValueAtOrBeforeTime(samples[0].Timestamp - 1)
if expected.Timestamp != actual.Timestamp {
b.Errorf("3.1. Got %v; want %v", actual.Timestamp, expected.Timestamp)
} }
if expected2.Value != actual[1].Value { if expected.Value != actual.Value {
b.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value) b.Errorf("3.1. Got %v; want %v", actual.Value, expected.Value)
} }
expected = samples[len(samples)-1]
actual = it.ValueAtOrBeforeTime(expected.Timestamp + 1)
if expected.Timestamp != actual.Timestamp {
b.Errorf("3.2. Got %v; want %v", actual.Timestamp, expected.Timestamp)
}
if expected.Value != actual.Value {
b.Errorf("3.2. Got %v; want %v", actual.Value, expected.Value)
} }
} }
} }
func BenchmarkValueAtTimeChunkType0(b *testing.B) { func BenchmarkValueAtOrBeforeTimeChunkType0(b *testing.B) {
benchmarkValueAtTime(b, 0) benchmarkValueAtOrBeforeTime(b, 0)
} }
func BenchmarkValueAtTimeChunkType1(b *testing.B) { func BenchmarkValueAtTimeChunkType1(b *testing.B) {
benchmarkValueAtTime(b, 1) benchmarkValueAtOrBeforeTime(b, 1)
} }
func testRangeValues(t *testing.T, encoding chunkEncoding) { func testRangeValues(t *testing.T, encoding chunkEncoding) {
@ -824,7 +828,10 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
it := s.NewIterator(fp) _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
// #1 Zero length interval at sample. // #1 Zero length interval at sample.
for i, expected := range samples { for i, expected := range samples {
@ -976,12 +983,14 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
it := s.NewIterator(fp)
for _, sample := range samples { for _, sample := range samples {
actual := it.RangeValues(metric.Interval{ actual := it.RangeValues(metric.Interval{
OldestInclusive: sample.Timestamp - 20, OldestInclusive: sample.Timestamp - 20,
@ -1023,7 +1032,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks. // Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
it := s.NewIterator(fp) _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual := it.BoundaryValues(metric.Interval{ actual := it.BoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1041,7 +1053,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop everything. // Drop everything.
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
it = s.NewIterator(fp) _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
actual = it.BoundaryValues(metric.Interval{ actual = it.BoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
@ -1215,7 +1230,7 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) {
// Load everything back. // Load everything back.
p := s.NewPreloader() p := s.NewPreloader()
p.PreloadRange(fp, 0, 100000, time.Hour) p.PreloadRange(fp, 0, 100000)
if oldLen != len(series.chunkDescs) { if oldLen != len(series.chunkDescs) {
t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs))
@ -1513,20 +1528,21 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples,
t.Fatal(err) t.Fatal(err)
} }
p := s.NewPreloader() p := s.NewPreloader()
p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
found := s.NewIterator(fp).ValueAtTime(sample.Timestamp) if err != nil {
if len(found) != 1 { t.Fatal(err)
t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found)) }
found := it.ValueAtOrBeforeTime(sample.Timestamp)
if found.Timestamp == model.Earliest {
t.Errorf("Sample %#v: Expected sample not found.", sample)
result = false result = false
p.Close() p.Close()
continue continue
} }
want := sample.Value if sample.Value != found.Value || sample.Timestamp != found.Timestamp {
got := found[0].Value
if want != got || sample.Timestamp != found[0].Timestamp {
t.Errorf( t.Errorf(
"Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).", "Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v).",
want, sample.Timestamp, got, found[0].Timestamp, sample.Value, sample.Timestamp, found.Value, found.Timestamp,
) )
result = false result = false
} }
@ -1559,13 +1575,11 @@ func TestAppendOutOfOrder(t *testing.T) {
pl := s.NewPreloader() pl := s.NewPreloader()
defer pl.Close() defer pl.Close()
err = pl.PreloadRange(fp, 0, 2, 5*time.Minute) it, err := pl.PreloadRange(fp, 0, 2)
if err != nil { if err != nil {
t.Fatalf("Error preloading chunks: %s", err) t.Fatalf("Error preloading chunks: %s", err)
} }
it := s.NewIterator(fp)
want := []model.SamplePair{ want := []model.SamplePair{
{ {
Timestamp: 0, Timestamp: 0,