Merge pull request #1483 from prometheus/beorn7/storage

Accumulated merge, already reviewed.
pull/1478/merge
Björn Rabenstein 2016-03-09 17:18:05 +01:00
commit 583b1f3753
20 changed files with 1272 additions and 653 deletions

View File

@ -41,14 +41,20 @@ type Analyzer struct {
// fingerprints. One of these structs is collected for each offset by the query // fingerprints. One of these structs is collected for each offset by the query
// analyzer. // analyzer.
type preloadTimes struct { type preloadTimes struct {
// Instants require single samples to be loaded along the entire query // Ranges require loading a range of samples. They can be triggered by
// range, with intervals between the samples corresponding to the query // two type of expressions: First a range expression AKA matrix
// resolution. // selector, where the Duration in the ranges map is the length of the
instants map[model.Fingerprint]struct{} // range in the range expression. Second an instant expression AKA
// Ranges require loading a range of samples at each resolution step, // vector selector, where the Duration in the ranges map is the
// stretching backwards from the current evaluation timestamp. The length of // StalenessDelta. In preloading, both types of expressions result in
// the range into the past is given by the duration, as in "foo[5m]". // the same effect: Preload everything between the specified start time
// minus the Duration in the ranges map up to the specified end time.
ranges map[model.Fingerprint]time.Duration ranges map[model.Fingerprint]time.Duration
// Instants require a single sample to be loaded. This only happens for
// instant expressions AKA vector selectors iff the specified start ond
// end time are the same, Thus, instants is only populated if start and
// end time are the same.
instants map[model.Fingerprint]struct{}
} }
// Analyze the provided expression and attach metrics and fingerprints to data-selecting // Analyze the provided expression and attach metrics and fingerprints to data-selecting
@ -57,13 +63,15 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
a.offsetPreloadTimes = map[time.Duration]preloadTimes{} a.offsetPreloadTimes = map[time.Duration]preloadTimes{}
getPreloadTimes := func(offset time.Duration) preloadTimes { getPreloadTimes := func(offset time.Duration) preloadTimes {
if _, ok := a.offsetPreloadTimes[offset]; !ok { if pt, ok := a.offsetPreloadTimes[offset]; ok {
a.offsetPreloadTimes[offset] = preloadTimes{ return pt
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
} }
return a.offsetPreloadTimes[offset] pt := preloadTimes{
instants: map[model.Fingerprint]struct{}{},
ranges: map[model.Fingerprint]time.Duration{},
}
a.offsetPreloadTimes[offset] = pt
return pt
} }
// Retrieve fingerprints and metrics for the required time range for // Retrieve fingerprints and metrics for the required time range for
@ -76,11 +84,14 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
pt := getPreloadTimes(n.Offset) pt := getPreloadTimes(n.Offset)
for fp := range n.metrics { for fp := range n.metrics {
// Only add the fingerprint to the instants if not yet present in the r, alreadyInRanges := pt.ranges[fp]
// ranges. Ranges always contain more points and span more time than if a.Start.Equal(a.End) && !alreadyInRanges {
// instants for the same offset. // A true instant, we only need one value.
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
pt.instants[fp] = struct{}{} pt.instants[fp] = struct{}{}
continue
}
if r < StalenessDelta {
pt.ranges[fp] = StalenessDelta
} }
} }
case *MatrixSelector: case *MatrixSelector:
@ -135,35 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
startOfRange := start.Add(-rangeDuration) itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end)
if StalenessDelta > rangeDuration {
// Cover a weird corner case: The expression
// mixes up instants and ranges for the same
// series. We'll handle that over-all as
// range. But if the rangeDuration is smaller
// than the StalenessDelta, the range wouldn't
// cover everything potentially needed for the
// instant, so we have to extend startOfRange.
startOfRange = start.Add(-StalenessDelta)
}
iter, err := p.PreloadRange(fp, startOfRange, end)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
} }
for fp := range pt.instants { for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
// Need to look backwards by StalenessDelta but not itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta)
// forward because we always return the closest sample
// _before_ the reference time.
iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end)
if err != nil {
return nil, err
}
itersForDuration[fp] = iter
} }
} }

View File

@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix {
return mat return mat
} }
// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise.
func (ev *evaluator) evalMatrixBounds(e Expr) matrix {
ms, ok := e.(*MatrixSelector)
if !ok {
ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e)
}
return ev.matrixSelectorBounds(ms)
}
// evalString attempts to evaluate e to a string value and errors otherwise. // evalString attempts to evaluate e to a string value and errors otherwise.
func (ev *evaluator) evalString(e Expr) *model.String { func (ev *evaluator) evalString(e Expr) *model.String {
val := ev.eval(e) val := ev.eval(e)
@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix {
return matrix(sampleStreams) return matrix(sampleStreams)
} }
// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector.
func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix {
interval := metric.Interval{
OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset),
NewestInclusive: ev.Timestamp.Add(-node.Offset),
}
sampleStreams := make([]*sampleStream, 0, len(node.iterators))
for fp, it := range node.iterators {
samplePairs := it.BoundaryValues(interval)
if len(samplePairs) == 0 {
continue
}
ss := &sampleStream{
Metric: node.metrics[fp],
Values: samplePairs,
}
sampleStreams = append(sampleStreams, ss)
}
return matrix(sampleStreams)
}
func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector { func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector {
if matching.Card != CardManyToMany { if matching.Card != CardManyToMany {
panic("logical operations must always be many-to-many matching") panic("logical operations must always be many-to-many matching")

View File

@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value {
return vector return vector
} }
// linearRegression performs a least-square linear regression analysis on the
// provided SamplePairs. It returns the slope, and the intercept value at the
// provided time.
func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) {
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples {
x := model.SampleValue(
model.Time(sample.Timestamp-interceptTime).UnixNano(),
) / 1e9
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
covXY := sumXY - sumX*sumY/n
varX := sumX2 - sumX*sumX/n
slope = covXY / varX
intercept = sumY/n - slope*sumX/n
return slope, intercept
}
// === deriv(node model.ValMatrix) Vector === // === deriv(node model.ValMatrix) Vector ===
func funcDeriv(ev *evaluator, args Expressions) model.Value { func funcDeriv(ev *evaluator, args Expressions) model.Value {
resultVector := vector{}
mat := ev.evalMatrix(args[0]) mat := ev.evalMatrix(args[0])
resultVector := make(vector, 0, len(mat))
for _, samples := range mat { for _, samples := range mat {
// No sense in trying to compute a derivative without at least two points. // No sense in trying to compute a derivative without at least two points.
@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
if len(samples.Values) < 2 { if len(samples.Values) < 2 {
continue continue
} }
slope, _ := linearRegression(samples.Values, 0)
// Least squares.
var (
n model.SampleValue
sumX, sumY model.SampleValue
sumXY, sumX2 model.SampleValue
)
for _, sample := range samples.Values {
x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9)
n += 1.0
sumY += sample.Value
sumX += x
sumXY += x * sample.Value
sumX2 += x * x
}
numerator := sumXY - sumX*sumY/n
denominator := sumX2 - (sumX*sumX)/n
resultValue := numerator / denominator
resultSample := &sample{ resultSample := &sample{
Metric: samples.Metric, Metric: samples.Metric,
Value: resultValue, Value: slope,
Timestamp: ev.Timestamp, Timestamp: ev.Timestamp,
} }
resultSample.Metric.Del(model.MetricNameLabel) resultSample.Metric.Del(model.MetricNameLabel)
@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value {
// === predict_linear(node model.ValMatrix, k model.ValScalar) Vector === // === predict_linear(node model.ValMatrix, k model.ValScalar) Vector ===
func funcPredictLinear(ev *evaluator, args Expressions) model.Value { func funcPredictLinear(ev *evaluator, args Expressions) model.Value {
vec := funcDeriv(ev, args[0:1]).(vector) mat := ev.evalMatrix(args[0])
duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1]))) resultVector := make(vector, 0, len(mat))
duration := model.SampleValue(ev.evalFloat(args[1]))
excludedLabels := map[model.LabelName]struct{}{ for _, samples := range mat {
model.MetricNameLabel: {}, // No sense in trying to predict anything without at least two points.
} // Drop this vector element.
// Calculate predicted delta over the duration.
signatureToDelta := map[uint64]model.SampleValue{}
for _, el := range vec {
signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels)
signatureToDelta[signature] = el.Value * duration
}
// add predicted delta to last value.
// TODO(beorn7): This is arguably suboptimal. The funcDeriv above has
// given us an estimate over the range. So we should add the delta to
// the value predicted for the end of the range. Also, once this has
// been rectified, we are not using BoundaryValues anywhere anymore, so
// we can kick out a whole lot of code.
matrixBounds := ev.evalMatrixBounds(args[0])
outVec := make(vector, 0, len(signatureToDelta))
for _, samples := range matrixBounds {
if len(samples.Values) < 2 { if len(samples.Values) < 2 {
continue continue
} }
signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels) slope, intercept := linearRegression(samples.Values, ev.Timestamp)
delta, ok := signatureToDelta[signature] resultSample := &sample{
if ok { Metric: samples.Metric,
samples.Metric.Del(model.MetricNameLabel) Value: slope*duration + intercept,
outVec = append(outVec, &sample{ Timestamp: ev.Timestamp,
Metric: samples.Metric,
Value: delta + samples.Values[1].Value,
Timestamp: ev.Timestamp,
})
} }
resultSample.Metric.Del(model.MetricNameLabel)
resultVector = append(resultVector, resultSample)
} }
return outVec return resultVector
} }
// === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector === // === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector ===

View File

@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m])
{} 0.010606060606060607 {} 0.010606060606060607
# predict_linear should return correct result. # predict_linear should return correct result.
# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000]
# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50]
# sumX = 16500
# sumY = 250
# sumXY = 480000
# sumX2 = 34650000
# n = 11
# covXY = 105000
# varX = 9900000
# slope = 0.010606060606060607
# intercept at t=0: 6.818181818181818
# intercept at t=3000: 38.63636363636364
# intercept at t=3000+3600: 76.81818181818181
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600)
{} 88.181818181818185200 {} 76.81818181818181
# predict_linear is syntactic sugar around deriv. # With http_requests, there is a sample value exactly at the end of
# the range, and it has exactly the predicted value, so predict_linear
# can be emulated with deriv.
eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600) eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600)
{group="canary", instance="1", job="app-server"} 0 {group="canary", instance="1", job="app-server"} 0
eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600)
{} 0
clear clear
# Tests for label_replace. # Tests for label_replace.

View File

@ -81,13 +81,6 @@ const (
// is populated upon creation of a chunkDesc, so it is alway safe to call // is populated upon creation of a chunkDesc, so it is alway safe to call
// firstTime. The firstTime method is arguably not needed and only there for // firstTime. The firstTime method is arguably not needed and only there for
// consistency with lastTime. // consistency with lastTime.
//
// Yet another (deprecated) case is lastSamplePair. It's used in federation and
// must be callable without pinning. Locking the fingerprint of the series is
// still required (to avoid concurrent appends to the chunk). The call is
// relatively expensive because of the required acquisition of the evict
// mutex. It will go away, though, once tracking the lastSamplePair has been
// moved into the series object.
type chunkDesc struct { type chunkDesc struct {
sync.Mutex // Protects pinning. sync.Mutex // Protects pinning.
c chunk // nil if chunk is evicted. c chunk // nil if chunk is evicted.
@ -119,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc {
// add adds a sample pair to the underlying chunk. For safe concurrent access, // add adds a sample pair to the underlying chunk. For safe concurrent access,
// The chunk must be pinned, and the caller must have locked the fingerprint of // The chunk must be pinned, and the caller must have locked the fingerprint of
// the series. // the series.
func (cd *chunkDesc) add(s *model.SamplePair) []chunk { func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) {
return cd.c.add(s) return cd.c.add(s)
} }
@ -176,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time {
// lastTime returns the timestamp of the last sample in the chunk. For safe // lastTime returns the timestamp of the last sample in the chunk. For safe
// concurrent access, this method requires the fingerprint of the time series to // concurrent access, this method requires the fingerprint of the time series to
// be locked. // be locked.
func (cd *chunkDesc) lastTime() model.Time { func (cd *chunkDesc) lastTime() (model.Time, error) {
if cd.chunkLastTime != model.Earliest || cd.c == nil { if cd.chunkLastTime != model.Earliest || cd.c == nil {
return cd.chunkLastTime return cd.chunkLastTime, nil
} }
return cd.c.newIterator().lastTimestamp() return cd.c.newIterator().lastTimestamp()
} }
@ -188,28 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time {
// last sample to a chunk or after closing a head chunk due to age. For safe // last sample to a chunk or after closing a head chunk due to age. For safe
// concurrent access, the chunk must be pinned, and the caller must have locked // concurrent access, the chunk must be pinned, and the caller must have locked
// the fingerprint of the series. // the fingerprint of the series.
func (cd *chunkDesc) maybePopulateLastTime() { func (cd *chunkDesc) maybePopulateLastTime() error {
if cd.chunkLastTime == model.Earliest && cd.c != nil { if cd.chunkLastTime == model.Earliest && cd.c != nil {
cd.chunkLastTime = cd.c.newIterator().lastTimestamp() t, err := cd.c.newIterator().lastTimestamp()
} if err != nil {
} return err
}
// lastSamplePair returns the last sample pair of the underlying chunk, or nil cd.chunkLastTime = t
// if the chunk is evicted. For safe concurrent access, this method requires the
// fingerprint of the time series to be locked.
// TODO(beorn7): Move up into memorySeries.
func (cd *chunkDesc) lastSamplePair() *model.SamplePair {
cd.Lock()
defer cd.Unlock()
if cd.c == nil {
return nil
}
it := cd.c.newIterator()
return &model.SamplePair{
Timestamp: it.lastTimestamp(),
Value: it.lastSampleValue(),
} }
return nil
} }
// isEvicted returns whether the chunk is evicted. For safe concurrent access, // isEvicted returns whether the chunk is evicted. For safe concurrent access,
@ -266,14 +246,14 @@ type chunk interface {
// any. The first chunk returned might be the same as the original one // any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as // or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk. // the relevant one and discard the original chunk.
add(sample *model.SamplePair) []chunk add(sample model.SamplePair) ([]chunk, error)
clone() chunk clone() chunk
firstTime() model.Time firstTime() model.Time
newIterator() chunkIterator newIterator() chunkIterator
marshal(io.Writer) error marshal(io.Writer) error
marshalToBuf([]byte) error marshalToBuf([]byte) error
unmarshal(io.Reader) error unmarshal(io.Reader) error
unmarshalFromBuf([]byte) unmarshalFromBuf([]byte) error
encoding() chunkEncoding encoding() chunkEncoding
} }
@ -284,57 +264,73 @@ type chunkIterator interface {
// length returns the number of samples in the chunk. // length returns the number of samples in the chunk.
length() int length() int
// Gets the timestamp of the n-th sample in the chunk. // Gets the timestamp of the n-th sample in the chunk.
timestampAtIndex(int) model.Time timestampAtIndex(int) (model.Time, error)
// Gets the last timestamp in the chunk. // Gets the last timestamp in the chunk.
lastTimestamp() model.Time lastTimestamp() (model.Time, error)
// Gets the sample value of the n-th sample in the chunk. // Gets the sample value of the n-th sample in the chunk.
sampleValueAtIndex(int) model.SampleValue sampleValueAtIndex(int) (model.SampleValue, error)
// Gets the last sample value in the chunk. // Gets the last sample value in the chunk.
lastSampleValue() model.SampleValue lastSampleValue() (model.SampleValue, error)
// Gets the value that is closest before the given time. In case a value // Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no // exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest // applicable value exists, ZeroSamplePair is returned.
// and value 0.0 is returned. valueAtOrBeforeTime(model.Time) (model.SamplePair, error)
valueAtOrBeforeTime(model.Time) model.SamplePair
// Gets all values contained within a given interval. // Gets all values contained within a given interval.
rangeValues(metric.Interval) []model.SamplePair rangeValues(metric.Interval) ([]model.SamplePair, error)
// Whether a given timestamp is contained between first and last value // Whether a given timestamp is contained between first and last value
// in the chunk. // in the chunk.
contains(model.Time) bool contains(model.Time) (bool, error)
// values returns a channel, from which all sample values in the chunk // values returns a channel, from which all sample values in the chunk
// can be received in order. The channel is closed after the last // can be received in order. The channel is closed after the last
// one. It is generally not safe to mutate the chunk while the channel // one. It is generally not safe to mutate the chunk while the channel
// is still open. // is still open. If a value is returned with error!=nil, no further
values() <-chan *model.SamplePair // values will be returned and the channel is closed.
values() <-chan struct {
model.SamplePair
error
}
} }
func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk { func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) {
chunkOps.WithLabelValues(transcode).Inc() chunkOps.WithLabelValues(transcode).Inc()
head := dst head := dst
body := []chunk{} body := []chunk{}
for v := range src.newIterator().values() { for v := range src.newIterator().values() {
newChunks := head.add(v) if v.error != nil {
return nil, v.error
}
newChunks, err := head.add(v.SamplePair)
if err != nil {
return nil, err
}
body = append(body, newChunks[:len(newChunks)-1]...) body = append(body, newChunks[:len(newChunks)-1]...)
head = newChunks[len(newChunks)-1] head = newChunks[len(newChunks)-1]
} }
newChunks := head.add(s) newChunks, err := head.add(s)
return append(body, newChunks...) if err != nil {
return nil, err
}
return append(body, newChunks...), nil
} }
// newChunk creates a new chunk according to the encoding set by the // newChunk creates a new chunk according to the encoding set by the
// defaultChunkEncoding flag. // defaultChunkEncoding flag.
func newChunk() chunk { func newChunk() chunk {
return newChunkForEncoding(DefaultChunkEncoding) chunk, err := newChunkForEncoding(DefaultChunkEncoding)
if err != nil {
panic(err)
}
return chunk
} }
func newChunkForEncoding(encoding chunkEncoding) chunk { func newChunkForEncoding(encoding chunkEncoding) (chunk, error) {
switch encoding { switch encoding {
case delta: case delta:
return newDeltaEncodedChunk(d1, d0, true, chunkLen) return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil
case doubleDelta: case doubleDelta:
return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil
default: default:
panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) return nil, fmt.Errorf("unknown chunk encoding: %v", encoding)
} }
} }

View File

@ -14,10 +14,11 @@
package local package local
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"path" "path/filepath"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
log.Info("Scanning files.") log.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ { for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname) dir, err := os.Open(dirname)
if os.IsNotExist(err) { if os.IsNotExist(err) {
continue continue
@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
} }
} }
p.setDirty(false) p.setDirty(false, nil)
log.Warn("Crash recovery complete.") log.Warn("Crash recovery complete.")
return nil return nil
} }
@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries(
fingerprintToSeries map[model.Fingerprint]*memorySeries, fingerprintToSeries map[model.Fingerprint]*memorySeries,
fpm fpMappings, fpm fpMappings,
) (model.Fingerprint, bool) { ) (model.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name()) var (
fp model.Fingerprint
err error
filename = filepath.Join(dirname, fi.Name())
s *memorySeries
)
purge := func() { purge := func() {
var err error if fp != 0 {
defer func() { var metric model.Metric
if err != nil { if s != nil {
log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err) metric = s.metric
if err = os.Remove(filename); err != nil {
log.Errorf("Even deleting file %s did not work: %s", filename, err)
}
} }
}() if err = p.quarantineSeriesFile(
orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname)) fp, errors.New("purge during crash recovery"), metric,
if err = os.MkdirAll(orphanedDir, 0700); err != nil { ); err == nil {
return return
}
log.
With("file", filename).
With("error", err).
Error("Failed to move lost series file to orphaned directory.")
} }
if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil { // If we are here, we are either purging an incorrectly named
return // file, or quarantining has failed. So simply delete the file.
if err = os.Remove(filename); err != nil {
log.
With("file", filename).
With("error", err).
Error("Failed to delete lost series file.")
} }
} }
var fp model.Fingerprint
var err error
if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) || if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) ||
!strings.HasSuffix(fi.Name(), seriesFileSuffix) { !strings.HasSuffix(fi.Name(), seriesFileSuffix) {
log.Warnf("Unexpected series file name %s.", filename) log.Warnf("Unexpected series file name %s.", filename)
purge() purge()
return fp, false return fp, false
} }
if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
log.Warnf("Error parsing file name %s: %s", filename, err) log.Warnf("Error parsing file name %s: %s", filename, err)
purge() purge()
return fp, false return fp, false
@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries(
s.chunkDescs = cds s.chunkDescs = cds
s.chunkDescsOffset = 0 s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].firstTime() s.savedFirstTime = cds[0].firstTime()
s.lastTime = cds[len(cds)-1].lastTime() s.lastTime, err = cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
s.persistWatermark = len(cds) s.persistWatermark = len(cds)
s.modTime = modTime s.modTime = modTime
return fp, true return fp, true
@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries(
s.savedFirstTime = cds[0].firstTime() s.savedFirstTime = cds[0].firstTime()
s.modTime = modTime s.modTime = modTime
lastTime := cds[len(cds)-1].lastTime() lastTime, err := cds[len(cds)-1].lastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
keepIdx := -1 keepIdx := -1
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if cd.firstTime() >= lastTime { if cd.firstTime() >= lastTime {
@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes(
if err != nil { if err != nil {
return err return err
} }
series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp)))
if err != nil {
return err
}
fpToSeries[model.Fingerprint(fp)] = series fpToSeries[model.Fingerprint(fp)] = series
return nil return nil
}); err != nil { }); err != nil {

View File

@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
} }
// add implements chunk. // add implements chunk.
func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 { if c.len() == 0 {
c = c[:deltaHeaderBytes] c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not, // Do we generally have space for another sample in this chunk? If not,
// overflow into a new one. // overflow into a new one.
if remainingBytes < sampleSize { if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s) overflowChunks, err := newChunk().add(s)
return []chunk{&c, overflowChunks[0]} if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
} }
baseValue := c.baseValue() baseValue := c.baseValue()
dt := s.Timestamp - c.baseTime() dt := s.Timestamp - c.baseTime()
if dt < 0 { if dt < 0 {
panic("time delta is less than zero") return nil, fmt.Errorf("time delta is less than zero: %v", dt)
} }
dv := s.Value - baseValue dv := s.Value - baseValue
@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
} }
// Chunk is already half full. Better create a new one and save the transcoding efforts. // Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s) overflowChunks, err := newChunk().add(s)
return []chunk{&c, overflowChunks[0]} if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
} }
offset := len(c) offset := len(c)
@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8. // Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default: default:
panic("invalid number of bytes for time delta") return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
} }
offset += int(tb) offset += int(tb)
@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
// d8 must not happen. Those samples are encoded as float64. // d8 must not happen. Those samples are encoded as float64.
default: default:
panic("invalid number of bytes for integer delta") return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
} }
} else { } else {
switch vb { switch vb {
@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8. // Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default: default:
panic("invalid number of bytes for floating point delta") return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
} }
} }
return []chunk{&c} return []chunk{&c}, nil
} }
// clone implements chunk. // clone implements chunk.
@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
} }
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil return nil
} }
// unmarshalFromBuf implements chunk. // unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
} }
// encoding implements chunk. // encoding implements chunk.
@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct {
func (it *deltaEncodedChunkIterator) length() int { return it.len } func (it *deltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator. // valueAtOrBeforeTime implements chunkIterator.
func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t) ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
}) })
if i == 0 { if i == 0 || lastErr != nil {
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair, lastErr
} }
return model.SamplePair{ ts, err := it.timestampAtIndex(i - 1)
Timestamp: it.timestampAtIndex(i - 1), if err != nil {
Value: it.sampleValueAtIndex(i - 1), return ZeroSamplePair, err
} }
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
} }
// rangeValues implements chunkIterator. // rangeValues implements chunkIterator.
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool { oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive) t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
}) })
newest := sort.Search(it.len, func(i int) bool { newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive) t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
}) })
if oldest == it.len { if oldest == it.len || lastErr != nil {
return nil return nil, lastErr
} }
result := make([]model.SamplePair, 0, newest-oldest) result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ { for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{ t, err := it.timestampAtIndex(i)
Timestamp: it.timestampAtIndex(i), if err != nil {
Value: it.sampleValueAtIndex(i), return nil, err
}) }
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
} }
return result return result, nil
} }
// contains implements chunkIterator. // contains implements chunkIterator.
func (it *deltaEncodedChunkIterator) contains(t model.Time) bool { func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
} }
// values implements chunkIterator. // values implements chunkIterator.
func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { func (it *deltaEncodedChunkIterator) values() <-chan struct {
valuesChan := make(chan *model.SamplePair) model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() { go func() {
for i := 0; i < it.len; i++ { for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{ t, err := it.timestampAtIndex(i)
Timestamp: it.timestampAtIndex(i), if err != nil {
Value: it.sampleValueAtIndex(i), valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
} }
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
} }
close(valuesChan) close(valuesChan)
}() }()
@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair {
} }
// timestampAtIndex implements chunkIterator. // timestampAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
switch it.tBytes { switch it.tBytes {
case d1: case d1:
return it.baseT + model.Time(uint8(it.c[offset])) return it.baseT + model.Time(uint8(it.c[offset])), nil
case d2: case d2:
return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil
case d4: case d4:
return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default: default:
panic("invalid number of bytes for time delta") return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
} }
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time { func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1) return it.timestampAtIndex(it.len - 1)
} }
// sampleValueAtIndex implements chunkIterator. // sampleValueAtIndex implements chunkIterator.
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
if it.isInt { if it.isInt {
switch it.vBytes { switch it.vBytes {
case d0: case d0:
return it.baseV return it.baseV, nil
case d1: case d1:
return it.baseV + model.SampleValue(int8(it.c[offset])) return it.baseV + model.SampleValue(int8(it.c[offset])), nil
case d2: case d2:
return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4: case d4:
return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints. // No d8 for ints.
default: default:
panic("invalid number of bytes for integer delta") return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
} }
} else { } else {
switch it.vBytes { switch it.vBytes {
case d4: case d4:
return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default: default:
panic("invalid number of bytes for floating point delta") return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
} }
} }
} }
// lastSampleValue implements chunkIterator. // lastSampleValue implements chunkIterator.
func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue { func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1) return it.sampleValueAtIndex(it.len - 1)
} }

View File

@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
} }
// add implements chunk. // add implements chunk.
func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
if c.len() == 0 { if c.len() == 0 {
return c.addFirstSample(s) return c.addFirstSample(s), nil
} }
tb := c.timeBytes() tb := c.timeBytes()
@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not, // Do we generally have space for another sample in this chunk? If not,
// overflow into a new one. // overflow into a new one.
if remainingBytes < sampleSize { if remainingBytes < sampleSize {
overflowChunks := newChunk().add(s) overflowChunks, err := newChunk().add(s)
return []chunk{&c, overflowChunks[0]} if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
} }
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
} }
// Chunk is already half full. Better create a new one and save the transcoding efforts. // Chunk is already half full. Better create a new one and save the transcoding efforts.
overflowChunks := newChunk().add(s) overflowChunks, err := newChunk().add(s)
return []chunk{&c, overflowChunks[0]} if err != nil {
return nil, err
}
return []chunk{&c, overflowChunks[0]}, nil
} }
offset := len(c) offset := len(c)
@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8. // Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default: default:
panic("invalid number of bytes for time delta") return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
} }
offset += int(tb) offset += int(tb)
@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
// d8 must not happen. Those samples are encoded as float64. // d8 must not happen. Those samples are encoded as float64.
default: default:
panic("invalid number of bytes for integer delta") return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
} }
} else { } else {
switch vb { switch vb {
@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk {
// Store the absolute value (no delta) in case of d8. // Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default: default:
panic("invalid number of bytes for floating point delta") return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
} }
} }
return []chunk{&c} return []chunk{&c}, nil
} }
// clone implements chunk. // clone implements chunk.
@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
} }
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil return nil
} }
// unmarshalFromBuf implements chunk. // unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
*c = (*c)[:l]
return nil
} }
// encoding implements chunk. // encoding implements chunk.
@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and // addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value. // value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8] c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:], c[doubleDeltaHeaderBaseTimeOffset:],
@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk {
// addSecondSample is a helper method only used by c.add(). It calculates the // addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk. // base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk { func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) {
baseTimeDelta := s.Timestamp - c.baseTime() baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 { if baseTimeDelta < 0 {
panic("base time delta is less than zero") return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
} }
c = c[:doubleDeltaHeaderBytes] c = c[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del
math.Float64bits(float64(baseValueDelta)), math.Float64bits(float64(baseValueDelta)),
) )
} }
return []chunk{&c} return []chunk{&c}, nil
} }
// doubleDeltaEncodedChunkIterator implements chunkIterator. // doubleDeltaEncodedChunkIterator implements chunkIterator.
@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct {
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
// valueAtOrBeforeTime implements chunkIterator. // valueAtOrBeforeTime implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) {
var lastErr error
i := sort.Search(it.len, func(i int) bool { i := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(t) ts, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return ts.After(t)
}) })
if i == 0 { if i == 0 || lastErr != nil {
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair, lastErr
} }
return model.SamplePair{ ts, err := it.timestampAtIndex(i - 1)
Timestamp: it.timestampAtIndex(i - 1), if err != nil {
Value: it.sampleValueAtIndex(i - 1), return ZeroSamplePair, err
} }
v, err := it.sampleValueAtIndex(i - 1)
if err != nil {
return ZeroSamplePair, err
}
return model.SamplePair{Timestamp: ts, Value: v}, nil
} }
// rangeValues implements chunkIterator. // rangeValues implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) {
var lastErr error
oldest := sort.Search(it.len, func(i int) bool { oldest := sort.Search(it.len, func(i int) bool {
return !it.timestampAtIndex(i).Before(in.OldestInclusive) t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return !t.Before(in.OldestInclusive)
}) })
newest := sort.Search(it.len, func(i int) bool { newest := sort.Search(it.len, func(i int) bool {
return it.timestampAtIndex(i).After(in.NewestInclusive) t, err := it.timestampAtIndex(i)
if err != nil {
lastErr = err
}
return t.After(in.NewestInclusive)
}) })
if oldest == it.len { if oldest == it.len || lastErr != nil {
return nil return nil, lastErr
} }
result := make([]model.SamplePair, 0, newest-oldest) result := make([]model.SamplePair, 0, newest-oldest)
for i := oldest; i < newest; i++ { for i := oldest; i < newest; i++ {
result = append(result, model.SamplePair{ t, err := it.timestampAtIndex(i)
Timestamp: it.timestampAtIndex(i), if err != nil {
Value: it.sampleValueAtIndex(i), return nil, err
}) }
v, err := it.sampleValueAtIndex(i)
if err != nil {
return nil, err
}
result = append(result, model.SamplePair{Timestamp: t, Value: v})
} }
return result return result, nil
} }
// contains implements chunkIterator. // contains implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool { func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) {
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) lastT, err := it.timestampAtIndex(it.len - 1)
if err != nil {
return false, err
}
return !t.Before(it.baseT) && !t.After(lastT), nil
} }
// values implements chunkIterator. // values implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct {
valuesChan := make(chan *model.SamplePair) model.SamplePair
error
} {
valuesChan := make(chan struct {
model.SamplePair
error
})
go func() { go func() {
for i := 0; i < it.len; i++ { for i := 0; i < it.len; i++ {
valuesChan <- &model.SamplePair{ t, err := it.timestampAtIndex(i)
Timestamp: it.timestampAtIndex(i), if err != nil {
Value: it.sampleValueAtIndex(i), valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
} }
v, err := it.sampleValueAtIndex(i)
if err != nil {
valuesChan <- struct {
model.SamplePair
error
}{ZeroSamplePair, err}
break
}
valuesChan <- struct {
model.SamplePair
error
}{model.SamplePair{Timestamp: t, Value: v}, nil}
} }
close(valuesChan) close(valuesChan)
}() }()
@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair {
} }
// timestampAtIndex implements chunkIterator. // timestampAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) {
if idx == 0 { if idx == 0 {
return it.baseT return it.baseT, nil
} }
if idx == 1 { if idx == 1 {
// If time bytes are at d8, the time is saved directly rather // If time bytes are at d8, the time is saved directly rather
// than as a difference. // than as a difference.
if it.tBytes == d8 { if it.tBytes == d8 {
return it.baseΔT return it.baseΔT, nil
} }
return it.baseT + it.baseΔT return it.baseT + it.baseΔT, nil
} }
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time
case d1: case d1:
return it.baseT + return it.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*it.baseΔT +
model.Time(int8(it.c[offset])) model.Time(int8(it.c[offset])), nil
case d2: case d2:
return it.baseT + return it.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*it.baseΔT +
model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4: case d4:
return it.baseT + return it.baseT +
model.Time(idx)*it.baseΔT + model.Time(idx)*it.baseΔT +
model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil
default: default:
panic("invalid number of bytes for time delta") return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes)
} }
} }
// lastTimestamp implements chunkIterator. // lastTimestamp implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time { func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) {
return it.timestampAtIndex(it.len - 1) return it.timestampAtIndex(it.len - 1)
} }
// sampleValueAtIndex implements chunkIterator. // sampleValueAtIndex implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) {
if idx == 0 { if idx == 0 {
return it.baseV return it.baseV, nil
} }
if idx == 1 { if idx == 1 {
// If value bytes are at d8, the value is saved directly rather // If value bytes are at d8, the value is saved directly rather
// than as a difference. // than as a difference.
if it.vBytes == d8 { if it.vBytes == d8 {
return it.baseΔV return it.baseΔV, nil
} }
return it.baseV + it.baseΔV return it.baseV + it.baseΔV, nil
} }
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam
switch it.vBytes { switch it.vBytes {
case d0: case d0:
return it.baseV + return it.baseV +
model.SampleValue(idx)*it.baseΔV model.SampleValue(idx)*it.baseΔV, nil
case d1: case d1:
return it.baseV + return it.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int8(it.c[offset])) model.SampleValue(int8(it.c[offset])), nil
case d2: case d2:
return it.baseV + return it.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil
case d4: case d4:
return it.baseV + return it.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV +
model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil
// No d8 for ints. // No d8 for ints.
default: default:
panic("invalid number of bytes for integer delta") return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes)
} }
} else { } else {
switch it.vBytes { switch it.vBytes {
case d4: case d4:
return it.baseV + return it.baseV +
model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV +
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil
case d8: case d8:
// Take absolute value for d8. // Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil
default: default:
panic("invalid number of bytes for floating point delta") return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes)
} }
} }
} }
// lastSampleValue implements chunkIterator. // lastSampleValue implements chunkIterator.
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue { func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) {
return it.sampleValueAtIndex(it.len - 1) return it.sampleValueAtIndex(it.len - 1)
} }

View File

@ -107,6 +107,8 @@ func (hs *headsScanner) scan() bool {
firstTime int64 firstTime int64
lastTime int64 lastTime int64
encoding byte encoding byte
ch chunk
lastTimeHead model.Time
) )
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
return false return false
@ -174,11 +176,13 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false return false
} }
chunk := newChunkForEncoding(chunkEncoding(encoding)) if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil {
if hs.err = chunk.unmarshal(hs.r); hs.err != nil {
return false return false
} }
cd := newChunkDesc(chunk, chunk.firstTime()) if hs.err = ch.unmarshal(hs.r); hs.err != nil {
return false
}
cd := newChunkDesc(ch, ch.firstTime())
if i < numChunkDescs-1 { if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk // This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime. // to be persisted, and we need to populate lastTime.
@ -189,6 +193,10 @@ func (hs *headsScanner) scan() bool {
} }
} }
if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil {
return false
}
hs.series = &memorySeries{ hs.series = &memorySeries{
metric: model.Metric(metric), metric: model.Metric(metric),
chunkDescs: chunkDescs, chunkDescs: chunkDescs,
@ -196,7 +204,7 @@ func (hs *headsScanner) scan() bool {
modTime: modTime, modTime: modTime,
chunkDescsOffset: int(chunkDescsOffset), chunkDescsOffset: int(chunkDescsOffset),
savedFirstTime: model.Time(savedFirstTime), savedFirstTime: model.Time(savedFirstTime),
lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), lastTime: lastTimeHead,
headChunkClosed: headChunkClosed, headChunkClosed: headChunkClosed,
} }
hs.seriesCurrent++ hs.seriesCurrent++

View File

@ -19,6 +19,7 @@ package index
import ( import (
"os" "os"
"path" "path"
"path/filepath"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr
// ready to use. // ready to use.
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintToMetricDir), Path: filepath.Join(basePath, fingerprintToMetricDir),
CacheSizeBytes: FingerprintMetricCacheSize, CacheSizeBytes: FingerprintMetricCacheSize,
}) })
if err != nil { if err != nil {
@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod
// LabelNameLabelValuesIndex ready to use. // LabelNameLabelValuesIndex ready to use.
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelNameToLabelValuesDir), Path: filepath.Join(basePath, labelNameToLabelValuesDir),
CacheSizeBytes: LabelNameLabelValuesCacheSize, CacheSizeBytes: LabelNameLabelValuesCacheSize,
}) })
if err != nil { if err != nil {
@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model.
// LabelPairFingerprintIndex ready to use. // LabelPairFingerprintIndex ready to use.
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, labelPairToFingerprintsDir), Path: filepath.Join(basePath, labelPairToFingerprintsDir),
CacheSizeBytes: LabelPairFingerprintsCacheSize, CacheSizeBytes: LabelPairFingerprintsCacheSize,
}) })
if err != nil { if err != nil {
@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las
// FingerprintTimeRangeIndex ready to use. // FingerprintTimeRangeIndex ready to use.
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
Path: path.Join(basePath, fingerprintTimeRangeDir), Path: filepath.Join(basePath, fingerprintTimeRangeDir),
CacheSizeBytes: FingerprintTimeRangeCacheSize, CacheSizeBytes: FingerprintTimeRangeCacheSize,
}) })
if err != nil { if err != nil {

View File

@ -60,6 +60,9 @@ const (
requestedPurge = "purge_on_request" requestedPurge = "purge_on_request"
memoryMaintenance = "maintenance_in_memory" memoryMaintenance = "maintenance_in_memory"
archiveMaintenance = "maintenance_in_archive" archiveMaintenance = "maintenance_in_archive"
completedQurantine = "quarantine_completed"
droppedQuarantine = "quarantine_dropped"
failedQuarantine = "quarantine_failed"
// Op-types for chunkOps. // Op-types for chunkOps.
createAndPin = "create" // A chunkDesc creation with refCount=1. createAndPin = "create" // A chunkDesc creation with refCount=1.

View File

@ -14,6 +14,8 @@
package local package local
import ( import (
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -42,10 +44,12 @@ type Storage interface {
// label matchers. At least one label matcher must be specified that does not // label matchers. At least one label matcher must be specified that does not
// match the empty string. // match the empty string.
MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric
// LastSamplePairForFingerprint returns the last sample pair for the // LastSamplePairForFingerprint returns the last sample pair that has
// provided fingerprint. If the respective time series does not exist or // been ingested for the provided fingerprint. If this instance of the
// has an evicted head chunk, nil is returned. // Storage has never ingested a sample for the provided fingerprint (or
LastSamplePairForFingerprint(model.Fingerprint) *model.SamplePair // the last ingestion is so long ago that the series has been archived),
// ZeroSamplePair is returned.
LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair
// Get all of the label values that are associated with a given label name. // Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues LabelValuesForLabelName(model.LabelName) model.LabelValues
// Get the metric associated with the provided fingerprint. // Get the metric associated with the provided fingerprint.
@ -69,16 +73,12 @@ type Storage interface {
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of // methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
// a series, i.e. it is safe to continue using a SeriesIterator after or during // a series, i.e. it is safe to continue using a SeriesIterator after or during
// modifying the corresponding series, but the iterator will represent the state // modifying the corresponding series, but the iterator will represent the state
// of the series prior the modification. // of the series prior to the modification.
type SeriesIterator interface { type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value // Gets the value that is closest before the given time. In case a value
// exists at precisely the given time, that value is returned. If no // exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest // applicable value exists, ZeroSamplePair is returned.
// and value 0.0 is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair ValueAtOrBeforeTime(model.Time) model.SamplePair
// Gets the boundary values of an interval: the first and last value
// within a given interval.
BoundaryValues(metric.Interval) []model.SamplePair
// Gets all values contained within a given interval. // Gets all values contained within a given interval.
RangeValues(metric.Interval) []model.SamplePair RangeValues(metric.Interval) []model.SamplePair
} }
@ -90,7 +90,18 @@ type Preloader interface {
PreloadRange( PreloadRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) (SeriesIterator, error) ) SeriesIterator
PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator
// Close unpins any previously requested series data from memory. // Close unpins any previously requested series data from memory.
Close() Close()
} }
// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local
// package to signal a non-existing sample. It is a SamplePair with timestamp
// model.Earliest and value 0.0. Note that the natural zero value of SamplePair
// has a timestamp of 0, which is possible to appear in a real SamplePair and
// thus not suitable to signal a non-existing SamplePair.
var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest}

View File

@ -20,7 +20,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
@ -46,6 +45,7 @@ const (
seriesFileSuffix = ".db" seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp" seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
hintFileSuffix = ".hint"
mappingsFileName = "mappings.db" mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp" mappingsTempFileName = "mappings.db.tmp"
@ -315,8 +315,9 @@ func (p *persistence) isDirty() bool {
// setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was
// set to true with this method, it cannot be set to false again. (If we became // set to true with this method, it cannot be set to false again. (If we became
// dirty during our runtime, there is no way back. If we were dirty from the // dirty during our runtime, there is no way back. If we were dirty from the
// start, a clean-up might make us clean again.) // start, a clean-up might make us clean again.) The provided error will be
func (p *persistence) setDirty(dirty bool) { // logged as a reason if dirty is true.
func (p *persistence) setDirty(dirty bool, err error) {
if dirty { if dirty {
p.dirtyCounter.Inc() p.dirtyCounter.Inc()
} }
@ -328,7 +329,7 @@ func (p *persistence) setDirty(dirty bool) {
p.dirty = dirty p.dirty = dirty
if dirty { if dirty {
p.becameDirty = true p.becameDirty = true
log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
} }
} }
@ -365,8 +366,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
log.Error("Error persisting chunks: ", err) p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err))
p.setDirty(true)
} }
}() }()
@ -435,8 +435,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse
return nil, err return nil, err
} }
for c := 0; c < batchSize; c++ { for c := 0; c < batchSize; c++ {
chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) if err != nil {
return nil, err
}
if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err
}
chunks = append(chunks, chunk) chunks = append(chunks, chunk)
} }
} }
@ -464,7 +469,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
return nil, err return nil, err
} }
if fi.Size()%int64(chunkLenWithHeader) != 0 { if fi.Size()%int64(chunkLenWithHeader) != 0 {
p.setDirty(true) // The returned error will bubble up and lead to quarantining of the whole series.
return nil, fmt.Errorf( return nil, fmt.Errorf(
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
fp, fi.Size(), chunkLenWithHeader, fp, fi.Size(), chunkLenWithHeader,
@ -642,7 +647,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return return
} }
if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { lt, err := chunkDesc.lastTime()
if err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
return return
} }
} else { } else {
@ -748,8 +757,7 @@ func (p *persistence) dropAndPersistChunks(
// please handle with care! // please handle with care!
defer func() { defer func() {
if err != nil { if err != nil {
log.Error("Error dropping and/or persisting chunks: ", err) p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err))
p.setDirty(true)
} }
}() }()
@ -758,7 +766,15 @@ func (p *persistence) dropAndPersistChunks(
// too old. If that's the case, the chunks in the series file // too old. If that's the case, the chunks in the series file
// are all too old, too. // are all too old, too.
i := 0 i := 0
for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ { for ; i < len(chunks); i++ {
var lt model.Time
lt, err = chunks[i].newIterator().lastTimestamp()
if err != nil {
return
}
if !lt.Before(beforeTime) {
break
}
} }
if i < len(chunks) { if i < len(chunks) {
firstTimeNotDropped = chunks[i].firstTime() firstTimeNotDropped = chunks[i].firstTime()
@ -911,6 +927,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
return numChunks, nil return numChunks, nil
} }
// quarantineSeriesFile moves a series file to the orphaned directory. It also
// writes a hint file with the provided quarantine reason and, if series is
// non-nil, the string representation of the metric.
func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error {
var (
oldName = p.fileNameForFingerprint(fp)
orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName)))
newName = filepath.Join(orphanedDir, filepath.Base(oldName))
hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix
)
renameErr := os.MkdirAll(orphanedDir, 0700)
if renameErr != nil {
return renameErr
}
renameErr = os.Rename(oldName, newName)
if os.IsNotExist(renameErr) {
// Source file dosn't exist. That's normal.
renameErr = nil
}
// Write hint file even if the rename ended in an error. At least try...
// And ignore errors writing the hint file. It's best effort.
if f, err := os.Create(hintName); err == nil {
if metric != nil {
f.WriteString(metric.String() + "\n")
} else {
f.WriteString("[UNKNOWN METRIC]\n")
}
if quarantineReason != nil {
f.WriteString(quarantineReason.Error() + "\n")
} else {
f.WriteString("[UNKNOWN REASON]\n")
}
f.Close()
}
return renameErr
}
// seriesFileModTime returns the modification time of the series file belonging // seriesFileModTime returns the modification time of the series file belonging
// to the provided fingerprint. In case of an error, the zero value of time.Time // to the provided fingerprint. In case of an error, the zero value of time.Time
// is returned. // is returned.
@ -962,11 +1016,11 @@ func (p *persistence) archiveMetric(
fp model.Fingerprint, m model.Metric, first, last model.Time, fp model.Fingerprint, m model.Metric, first, last model.Time,
) error { ) error {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
p.setDirty(true) p.setDirty(true, err)
return err return err
} }
if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
p.setDirty(true) p.setDirty(true, err)
return err return err
} }
return nil return nil
@ -979,6 +1033,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
hasMetric bool, firstTime, lastTime model.Time, err error, hasMetric bool, firstTime, lastTime model.Time, err error,
) { ) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
if err != nil {
p.setDirty(true, err)
}
return return
} }
@ -1027,7 +1084,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error)
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
p.setDirty(true) p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err))
} }
}() }()
@ -1058,12 +1115,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
// was actually deleted, the method returns true and the first time and last // was actually deleted, the method returns true and the first time and last
// time of the deleted metric. The caller must have locked the fingerprint. // time of the deleted metric. The caller must have locked the fingerprint.
func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) { func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) {
defer func() { // An error returned here will bubble up and lead to quarantining of the
if err != nil { // series, so no setDirty required.
p.setDirty(true)
}
}()
deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
if err != nil || !deleted { if err != nil || !deleted {
return false, err return false, err
@ -1119,17 +1172,17 @@ func (p *persistence) close() error {
func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string { func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String() fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen]) return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen])
} }
func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string { func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String() fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
} }
func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string { func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string {
fpStr := fp.String() fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
} }
func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) { func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) {
@ -1162,19 +1215,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e
} }
func (p *persistence) headsFileName() string { func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName) return filepath.Join(p.basePath, headsFileName)
} }
func (p *persistence) headsTempFileName() string { func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName) return filepath.Join(p.basePath, headsTempFileName)
} }
func (p *persistence) mappingsFileName() string { func (p *persistence) mappingsFileName() string {
return path.Join(p.basePath, mappingsFileName) return filepath.Join(p.basePath, mappingsFileName)
} }
func (p *persistence) mappingsTempFileName() string { func (p *persistence) mappingsTempFileName() string {
return path.Join(p.basePath, mappingsTempFileName) return filepath.Join(p.basePath, mappingsTempFileName)
} }
func (p *persistence) processIndexingQueue() { func (p *persistence) processIndexingQueue() {
@ -1456,7 +1509,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
b = b[:writeSize] b = b[:writeSize]
for i, chunk := range chunks[:batchSize] { for i, chunk := range chunks[:batchSize] {
writeChunkHeader(b[i*chunkLenWithHeader:], chunk) if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
return err
}
if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return err return err
} }
@ -1482,14 +1537,19 @@ func chunkIndexForOffset(offset int64) (int, error) {
return int(offset) / chunkLenWithHeader, nil return int(offset) / chunkLenWithHeader, nil
} }
func writeChunkHeader(header []byte, c chunk) { func writeChunkHeader(header []byte, c chunk) error {
header[chunkHeaderTypeOffset] = byte(c.encoding()) header[chunkHeaderTypeOffset] = byte(c.encoding())
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
header[chunkHeaderFirstTimeOffset:], header[chunkHeaderFirstTimeOffset:],
uint64(c.firstTime()), uint64(c.firstTime()),
) )
lt, err := c.newIterator().lastTimestamp()
if err != nil {
return err
}
binary.LittleEndian.PutUint64( binary.LittleEndian.PutUint64(
header[chunkHeaderLastTimeOffset:], header[chunkHeaderLastTimeOffset:],
uint64(c.newIterator().lastTimestamp()), uint64(lt),
) )
return nil
} }

View File

@ -14,6 +14,10 @@
package local package local
import ( import (
"bufio"
"errors"
"os"
"path/filepath"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes
}) })
} }
func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk {
fps := model.Fingerprints{ fps := model.Fingerprints{
m1.FastFingerprint(), m1.FastFingerprint(),
m2.FastFingerprint(), m2.FastFingerprint(),
@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk {
for _, fp := range fps { for _, fp := range fps {
fpToChunks[fp] = make([]chunk, 0, 10) fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{ ch, err := newChunkForEncoding(encoding)
if err != nil {
t.Fatal(err)
}
chs, err := ch.add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(fp), Value: model.SampleValue(fp),
})[0]) })
if err != nil {
t.Fatal(err)
}
fpToChunks[fp] = append(fpToChunks[fp], chs[0])
} }
} }
return fpToChunks return fpToChunks
@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool {
values2 := c2.newIterator().values() values2 := c2.newIterator().values()
for v1 := range c1.newIterator().values() { for v1 := range c1.newIterator().values() {
v2 := <-values2 v2 := <-values2
if !v1.Equal(v2) { if !(v1 == v2) {
return false return false
} }
} }
@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()
fpToChunks := buildTestChunks(encoding) fpToChunks := buildTestChunks(t, encoding)
for fp, chunks := range fpToChunks { for fp, chunks := range fpToChunks {
firstTimeNotDropped, offset, numDropped, allDropped, err := firstTimeNotDropped, offset, numDropped, allDropped, err :=
@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10)
} }
for i, cd := range actualChunkDescs { for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf( t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.", "Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(), i, cd.firstTime(), lastTime,
) )
} }
@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5)
} }
for i, cd := range actualChunkDescs { for i, cd := range actualChunkDescs {
if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { lastTime, err := cd.lastTime()
if err != nil {
t.Fatal(err)
}
if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) {
t.Errorf( t.Errorf(
"Want ts=%v, got firstTime=%v, lastTime=%v.", "Want ts=%v, got firstTime=%v, lastTime=%v.",
i, cd.firstTime(), cd.lastTime(), i, cd.firstTime(), lastTime,
) )
} }
@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
fpLocker := newFingerprintLocker(10) fpLocker := newFingerprintLocker(10)
sm := newSeriesMap() sm := newSeriesMap()
s1 := newMemorySeries(m1, nil, time.Time{}) s1, _ := newMemorySeries(m1, nil, time.Time{})
s2 := newMemorySeries(m2, nil, time.Time{}) s2, _ := newMemorySeries(m2, nil, time.Time{})
s3 := newMemorySeries(m3, nil, time.Time{}) s3, _ := newMemorySeries(m3, nil, time.Time{})
s4 := newMemorySeries(m4, nil, time.Time{}) s4, _ := newMemorySeries(m4, nil, time.Time{})
s5 := newMemorySeries(m5, nil, time.Time{}) s5, _ := newMemorySeries(m5, nil, time.Time{})
s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14}) s1.add(model.SamplePair{Timestamp: 1, Value: 3.14})
s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7}) s3.add(model.SamplePair{Timestamp: 2, Value: 2.7})
s3.headChunkClosed = true s3.headChunkClosed = true
s3.persistWatermark = 1 s3.persistWatermark = 1
for i := 0; i < 10000; i++ { for i := 0; i < 10000; i++ {
s4.add(&model.SamplePair{ s4.add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(i) / 2, Value: model.SampleValue(i) / 2,
}) })
s5.add(&model.SamplePair{ s5.add(model.SamplePair{
Timestamp: model.Time(i), Timestamp: model.Time(i),
Value: model.SampleValue(i * i), Value: model.SampleValue(i * i),
}) })
@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
} }
continue continue
} }
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, i, lastTime, cd.chunkLastTime,
) )
} }
} }
@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
} }
continue continue
} }
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { lastTime, err := cd.c.newIterator().lastTimestamp()
if err != nil {
t.Fatal(err)
}
if cd.chunkLastTime != lastTime {
t.Errorf( t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, i, cd.chunkLastTime, lastTime,
) )
} }
} }
@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
} }
} }
func TestQuranatineSeriesFile(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) {
var (
fpStr = fp.String()
originalFile = p.fileNameForFingerprint(fp)
quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix)
)
if _, err := os.Stat(originalFile); !os.IsNotExist(err) {
t.Errorf("Expected file %q to not exist.", originalFile)
}
if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) {
t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err)
}
f, err := os.Open(hintFile)
defer f.Close()
if err != nil {
t.Errorf("Could not open hint file %q: %s", hintFile, err)
return
}
scanner := bufio.NewScanner(f)
for _, want := range contentHintFile {
if !scanner.Scan() {
t.Errorf("Unexpected end of hint file %q.", hintFile)
return
}
got := scanner.Text()
if want != got {
t.Errorf("Want hint line %q, got %q.", want, got)
}
}
if scanner.Scan() {
t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text())
}
}
if err := p.quarantineSeriesFile(0, nil, nil); err != nil {
t.Error(err)
}
verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
1, errors.New("file does not exist"),
nil,
); err != nil {
t.Error(err)
}
verify(1, false, "[UNKNOWN METRIC]", "file does not exist")
if err := p.quarantineSeriesFile(
2, errors.New("file does not exist"),
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist")
if err := p.quarantineSeriesFile(
3, nil,
model.Metric{"foo": "bar", "dings": "bums"},
); err != nil {
t.Error(err)
}
verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]")
err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(p.fileNameForFingerprint(4))
if err != nil {
t.Fatal(err)
}
f.Close()
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
verify(4, true, `{sound="cloud"}`, "file exists")
if err := p.quarantineSeriesFile(4, nil, nil); err != nil {
t.Error(err)
}
// Overwrites hint file but leaves series file intact.
verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]")
if err := p.quarantineSeriesFile(
4, errors.New("file exists"),
model.Metric{"sound": "cloud"},
); err != nil {
t.Error(err)
}
// Overwrites everything.
verify(4, true, `{sound="cloud"}`, "file exists")
}
var fpStrings = []string{ var fpStrings = []string{
"b004b821ca50ba26", "b004b821ca50ba26",
"b037c21e884e4fc5", "b037c21e884e4fc5",

View File

@ -13,7 +13,11 @@
package local package local
import "github.com/prometheus/common/model" import (
"time"
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage. // memorySeriesPreloader is a Preloader for the memorySeriesStorage.
type memorySeriesPreloader struct { type memorySeriesPreloader struct {
@ -25,13 +29,20 @@ type memorySeriesPreloader struct {
func (p *memorySeriesPreloader) PreloadRange( func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) (SeriesIterator, error) { ) SeriesIterator {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) cds, iter := p.storage.preloadChunksForRange(fp, from, through)
if err != nil {
return nil, err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil return iter
}
// PreloadInstant implements Preloader
func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) SeriesIterator {
cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter
} }
// Close implements Preloader. // Close implements Preloader.

View File

@ -162,9 +162,15 @@ type memorySeries struct {
// first chunk before its chunk desc is evicted. In doubt, this field is // first chunk before its chunk desc is evicted. In doubt, this field is
// just set to the oldest possible timestamp. // just set to the oldest possible timestamp.
savedFirstTime model.Time savedFirstTime model.Time
// The timestamp of the last sample in this series. Needed for fast access to // The timestamp of the last sample in this series. Needed for fast
// ensure timestamp monotonicity during ingestion. // access for federation and to ensure timestamp monotonicity during
// ingestion.
lastTime model.Time lastTime model.Time
// The last ingested sample value. Needed for fast access for
// federation.
lastSampleValue model.SampleValue
// Whether lastSampleValue has been set already.
lastSampleValueSet bool
// Whether the current head chunk has already been finished. If true, // Whether the current head chunk has already been finished. If true,
// the current head chunk must not be modified anymore. // the current head chunk must not be modified anymore.
headChunkClosed bool headChunkClosed bool
@ -185,12 +191,15 @@ type memorySeries struct {
// set to model.Earliest. The zero value for modTime can be used if the // set to model.Earliest. The zero value for modTime can be used if the
// modification time of the series file is unknown (e.g. if this is a genuinely // modification time of the series file is unknown (e.g. if this is a genuinely
// new series). // new series).
func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) {
var err error
firstTime := model.Earliest firstTime := model.Earliest
lastTime := model.Earliest lastTime := model.Earliest
if len(chunkDescs) > 0 { if len(chunkDescs) > 0 {
firstTime = chunkDescs[0].firstTime() firstTime = chunkDescs[0].firstTime()
lastTime = chunkDescs[len(chunkDescs)-1].lastTime() if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil {
return nil, err
}
} }
return &memorySeries{ return &memorySeries{
metric: m, metric: m,
@ -200,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time)
lastTime: lastTime, lastTime: lastTime,
persistWatermark: len(chunkDescs), persistWatermark: len(chunkDescs),
modTime: modTime, modTime: modTime,
} }, nil
} }
// add adds a sample pair to the series. It returns the number of newly // add adds a sample pair to the series. It returns the number of newly
// completed chunks (which are now eligible for persistence). // completed chunks (which are now eligible for persistence).
// //
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(v *model.SamplePair) int { func (s *memorySeries) add(v model.SamplePair) (int, error) {
if len(s.chunkDescs) == 0 || s.headChunkClosed { if len(s.chunkDescs) == 0 || s.headChunkClosed {
newHead := newChunkDesc(newChunk(), v.Timestamp) newHead := newChunkDesc(newChunk(), v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead) s.chunkDescs = append(s.chunkDescs, newHead)
@ -229,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int {
s.headChunkUsedByIterator = false s.headChunkUsedByIterator = false
} }
chunks := s.head().add(v) chunks, err := s.head().add(v)
if err != nil {
return 0, err
}
s.head().c = chunks[0] s.head().c = chunks[0]
for _, c := range chunks[1:] { for _, c := range chunks[1:] {
@ -242,7 +254,9 @@ func (s *memorySeries) add(v *model.SamplePair) int {
} }
s.lastTime = v.Timestamp s.lastTime = v.Timestamp
return len(chunks) - 1 s.lastSampleValue = v.Value
s.lastSampleValueSet = true
return len(chunks) - 1, nil
} }
// maybeCloseHeadChunk closes the head chunk if it has not been touched for the // maybeCloseHeadChunk closes the head chunk if it has not been touched for the
@ -287,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
// dropChunks removes chunkDescs older than t. The caller must have locked the // dropChunks removes chunkDescs older than t. The caller must have locked the
// fingerprint of the series. // fingerprint of the series.
func (s *memorySeries) dropChunks(t model.Time) { func (s *memorySeries) dropChunks(t model.Time) error {
keepIdx := len(s.chunkDescs) keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) { lt, err := cd.lastTime()
if err != nil {
return err
}
if !lt.Before(t) {
keepIdx = i keepIdx = i
break break
} }
@ -310,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) {
numMemChunkDescs.Sub(float64(keepIdx)) numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true s.dirty = true
} }
return nil
} }
// preloadChunks is an internal helper method. // preloadChunks is an internal helper method.
@ -350,8 +369,12 @@ func (s *memorySeries) preloadChunks(
s.headChunkUsedByIterator = true s.headChunkUsedByIterator = true
} }
curriedQuarantineSeries := func(err error) {
mss.quarantineSeries(fp, s.metric, err)
}
iter := &boundedIterator{ iter := &boundedIterator{
it: s.newIterator(pinnedChunkDescs), it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries),
start: model.Now().Add(-mss.dropAfter), start: model.Now().Add(-mss.dropAfter),
} }
@ -359,9 +382,10 @@ func (s *memorySeries) preloadChunks(
} }
// newIterator returns a new SeriesIterator for the provided chunkDescs (which // newIterator returns a new SeriesIterator for the provided chunkDescs (which
// must be pinned). The caller must have locked the fingerprint of the // must be pinned).
// memorySeries. //
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { // The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator {
chunks := make([]chunk, 0, len(pinnedChunkDescs)) chunks := make([]chunk, 0, len(pinnedChunkDescs))
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
// It's OK to directly access cd.c here (without locking) as the // It's OK to directly access cd.c here (without locking) as the
@ -369,16 +393,45 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
chunks = append(chunks, cd.c) chunks = append(chunks, cd.c)
} }
return &memorySeriesIterator{ return &memorySeriesIterator{
chunks: chunks, chunks: chunks,
chunkIts: make([]chunkIterator, len(chunks)), chunkIts: make([]chunkIterator, len(chunks)),
quarantine: quarantine,
} }
} }
// preloadChunksForInstant preloads chunks for the latest value in the given
// range. If the last sample saved in the memorySeries itself is the latest
// value in the given range, it will in fact preload zero chunks and just take
// that value.
func (s *memorySeries) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have a lastSamplePair in the series, and thas last samplePair
// is in the interval, just take it in a singleSampleSeriesIterator. No
// need to pin or load anything.
lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair {
iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample},
start: model.Now().Add(-mss.dropAfter),
}
return nil, iter, nil
}
// If we are here, we are out of luck and have to delegate to the more
// expensive method.
return s.preloadChunksForRange(fp, from, through, mss)
}
// preloadChunksForRange loads chunks for the given range from the persistence. // preloadChunksForRange loads chunks for the given range from the persistence.
// The caller must have locked the fingerprint of the series. // The caller must have locked the fingerprint of the series.
func (s *memorySeries) preloadChunksForRange( func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
fp model.Fingerprint, mss *memorySeriesStorage, mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) { ) ([]*chunkDesc, SeriesIterator, error) {
firstChunkDescTime := model.Latest firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 { if len(s.chunkDescs) > 0 {
@ -410,7 +463,11 @@ func (s *memorySeries) preloadChunksForRange(
if fromIdx == len(s.chunkDescs) { if fromIdx == len(s.chunkDescs) {
// Even the last chunk starts before "from". Find out if the // Even the last chunk starts before "from". Find out if the
// series ends before "from" and we don't need to do anything. // series ends before "from" and we don't need to do anything.
if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime()
if err != nil {
return nil, nopIter, err
}
if lt.Before(from) {
return nil, nopIter, nil return nil, nopIter, nil
} }
} }
@ -435,8 +492,9 @@ func (s *memorySeries) head() *chunkDesc {
return s.chunkDescs[len(s.chunkDescs)-1] return s.chunkDescs[len(s.chunkDescs)-1]
} }
// firstTime returns the timestamp of the first sample in the series. The caller // firstTime returns the timestamp of the first sample in the series.
// must have locked the fingerprint of the memorySeries. //
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) firstTime() model.Time { func (s *memorySeries) firstTime() model.Time {
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
return s.chunkDescs[0].firstTime() return s.chunkDescs[0].firstTime()
@ -444,6 +502,23 @@ func (s *memorySeries) firstTime() model.Time {
return s.savedFirstTime return s.savedFirstTime
} }
// lastSamplePair returns the last ingested SamplePair. It returns
// ZeroSamplePair if this memorySeries has never received a sample (via the add
// method), which is the case for freshly unarchived series or newly created
// ones and also for all series after a server restart. However, in that case,
// series will most likely be considered stale anyway.
//
// The caller must have locked the fingerprint of the memorySeries.
func (s *memorySeries) lastSamplePair() model.SamplePair {
if !s.lastSampleValueSet {
return ZeroSamplePair
}
return model.SamplePair{
Timestamp: s.lastTime,
Value: s.lastSampleValue,
}
}
// chunksToPersist returns a slice of chunkDescs eligible for persistence. It's // chunksToPersist returns a slice of chunkDescs eligible for persistence. It's
// the caller's responsibility to actually persist the returned chunks // the caller's responsibility to actually persist the returned chunks
// afterwards. The method sets the persistWatermark and the dirty flag // afterwards. The method sets the persistWatermark and the dirty flag
@ -466,20 +541,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc {
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime.
chunkIts []chunkIterator // Caches chunkIterators. chunkIts []chunkIterator // Caches chunkIterators.
chunks []chunk chunks []chunk
quarantine func(error) // Call to quarantine the series this iterator belongs to.
} }
// ValueAtOrBeforeTime implements SeriesIterator. // ValueAtOrBeforeTime implements SeriesIterator.
func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
// The most common case. We are iterating through a chunk. // The most common case. We are iterating through a chunk.
if it.chunkIt != nil && it.chunkIt.contains(t) { if it.chunkIt != nil {
return it.chunkIt.valueAtOrBeforeTime(t) containsT, err := it.chunkIt.contains(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
if containsT {
value, err := it.chunkIt.valueAtOrBeforeTime(t)
if err != nil {
it.quarantine(err)
return ZeroSamplePair
}
return value
}
} }
if len(it.chunks) == 0 { if len(it.chunks) == 0 {
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair
} }
// Find the last chunk where firstTime() is before or equal to t. // Find the last chunk where firstTime() is before or equal to t.
@ -489,75 +577,15 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa
}) })
if i == len(it.chunks) { if i == len(it.chunks) {
// Even the first chunk starts after t. // Even the first chunk starts after t.
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair
} }
it.chunkIt = it.chunkIterator(l - i) it.chunkIt = it.chunkIterator(l - i)
return it.chunkIt.valueAtOrBeforeTime(t) value, err := it.chunkIt.valueAtOrBeforeTime(t)
} if err != nil {
it.quarantine(err)
// BoundaryValues implements SeriesIterator. return ZeroSamplePair
func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
// Find the first chunk for which the first sample is within the interval.
i := sort.Search(len(it.chunks), func(i int) bool {
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
})
// Only now check the last timestamp of the previous chunk (which is
// fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
i--
} }
return value
values := make([]model.SamplePair, 0, 2)
for j, c := range it.chunks[i:] {
if c.firstTime().After(in.NewestInclusive) {
if len(values) == 1 {
// We found the first value before but are now
// already past the last value. The value we
// want must be the last value of the previous
// chunk. So backtrack...
chunkIt := it.chunkIterator(i + j - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
break
}
chunkIt := it.chunkIterator(i + j)
if len(values) == 0 {
for s := range chunkIt.values() {
if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) {
values = append(values, *s)
// We cannot just break out here as we have to consume all
// the values to not leak a goroutine. This could obviously
// be made much neater with more suitable methods in the chunk
// interface. But currently, BoundaryValues is only used by
// `predict_linear` so we would pollute the chunk interface
// unduly just for one single corner case. Plus, even that use
// of BoundaryValues is suboptimal and should be replaced.
}
}
}
if chunkIt.lastTimestamp().After(in.NewestInclusive) {
s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive)
if s.Timestamp != model.Earliest {
values = append(values, s)
}
break
}
}
if len(values) == 1 {
// We found exactly one value. In that case, add the most recent we know.
chunkIt := it.chunkIterator(len(it.chunks) - 1)
values = append(values, model.SamplePair{
Timestamp: chunkIt.lastTimestamp(),
Value: chunkIt.lastSampleValue(),
})
}
if len(values) == 2 && values[0].Equal(&values[1]) {
return values[:1]
}
return values
} }
// RangeValues implements SeriesIterator. // RangeValues implements SeriesIterator.
@ -568,8 +596,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
}) })
// Only now check the last timestamp of the previous chunk (which is // Only now check the last timestamp of the previous chunk (which is
// fairly expensive). // fairly expensive).
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { if i > 0 {
i-- lt, err := it.chunkIterator(i - 1).lastTimestamp()
if err != nil {
it.quarantine(err)
return nil
}
if !lt.Before(in.OldestInclusive) {
i--
}
} }
values := []model.SamplePair{} values := []model.SamplePair{}
@ -577,7 +612,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa
if c.firstTime().After(in.NewestInclusive) { if c.firstTime().After(in.NewestInclusive) {
break break
} }
values = append(values, it.chunkIterator(i+j).rangeValues(in)...) chValues, err := it.chunkIterator(i + j).rangeValues(in)
if err != nil {
it.quarantine(err)
return nil
}
values = append(values, chValues...)
} }
return values return values
} }
@ -593,17 +633,36 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
return chunkIt return chunkIt
} }
// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut
// iterator" that returns a single samplee only. The sample is saved in the
// iterator itself, so no chunks need to be pinned.
type singleSampleSeriesIterator struct {
samplePair model.SamplePair
}
// ValueAtTime implements SeriesIterator.
func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
if it.samplePair.Timestamp.After(t) {
return ZeroSamplePair
}
return it.samplePair
}
// RangeValues implements SeriesIterator.
func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair {
if it.samplePair.Timestamp.After(in.NewestInclusive) ||
it.samplePair.Timestamp.Before(in.OldestInclusive) {
return []model.SamplePair{}
}
return []model.SamplePair{it.samplePair}
}
// nopSeriesIterator implements Series Iterator. It never returns any values. // nopSeriesIterator implements Series Iterator. It never returns any values.
type nopSeriesIterator struct{} type nopSeriesIterator struct{}
// ValueAtTime implements SeriesIterator. // ValueAtTime implements SeriesIterator.
func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair
}
// BoundaryValues implements SeriesIterator.
func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair {
return []model.SamplePair{}
} }
// RangeValues implements SeriesIterator. // RangeValues implements SeriesIterator.

View File

@ -30,8 +30,9 @@ import (
) )
const ( const (
evictRequestsCap = 1024 evictRequestsCap = 1024
chunkLen = 1024 quarantineRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP. // See waitForNextFP.
fpMaxSweepTime = 6 * time.Hour fpMaxSweepTime = 6 * time.Hour
@ -77,6 +78,12 @@ type evictRequest struct {
evict bool evict bool
} }
type quarantineRequest struct {
fp model.Fingerprint
metric model.Metric
reason error
}
// SyncStrategy is an enum to select a sync strategy for series files. // SyncStrategy is an enum to select a sync strategy for series files.
type SyncStrategy int type SyncStrategy int
@ -147,6 +154,9 @@ type memorySeriesStorage struct {
evictRequests chan evictRequest evictRequests chan evictRequest
evictStopping, evictStopped chan struct{} evictStopping, evictStopped chan struct{}
quarantineRequests chan quarantineRequest
quarantineStopping, quarantineStopped chan struct{}
persistErrors prometheus.Counter persistErrors prometheus.Counter
numSeries prometheus.Gauge numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec seriesOps *prometheus.CounterVec
@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
evictStopping: make(chan struct{}), evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}), evictStopped: make(chan struct{}),
quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
quarantineStopping: make(chan struct{}),
quarantineStopped: make(chan struct{}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{ persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) {
} }
go s.handleEvictList() go s.handleEvictList()
go s.handleQuarantine()
go s.logThrottling() go s.logThrottling()
go s.loop() go s.loop()
@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error {
close(s.loopStopping) close(s.loopStopping)
<-s.loopStopped <-s.loopStopped
log.Info("Stopping series quarantining...")
close(s.quarantineStopping)
<-s.quarantineStopped
log.Info("Stopping chunk eviction...") log.Info("Stopping chunk eviction...")
close(s.evictStopping) close(s.evictStopping)
<-s.evictStopped <-s.evictStopped
@ -348,15 +367,15 @@ func (s *memorySeriesStorage) WaitForIndexing() {
} }
// LastSampleForFingerprint implements Storage. // LastSampleForFingerprint implements Storage.
func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair { func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
return nil return ZeroSamplePair
} }
return series.head().lastSamplePair() return series.lastSamplePair()
} }
// boundedIterator wraps a SeriesIterator and does not allow fetching // boundedIterator wraps a SeriesIterator and does not allow fetching
@ -369,22 +388,11 @@ type boundedIterator struct {
// ValueAtOrBeforeTime implements the SeriesIterator interface. // ValueAtOrBeforeTime implements the SeriesIterator interface.
func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
if ts < bit.start { if ts < bit.start {
return model.SamplePair{Timestamp: model.Earliest} return ZeroSamplePair
} }
return bit.it.ValueAtOrBeforeTime(ts) return bit.it.ValueAtOrBeforeTime(ts)
} }
// BoundaryValues implements the SeriesIterator interface.
func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start {
return []model.SamplePair{}
}
if interval.OldestInclusive < bit.start {
interval.OldestInclusive = bit.start
}
return bit.it.BoundaryValues(interval)
}
// RangeValues implements the SeriesIterator interface. // RangeValues implements the SeriesIterator interface.
func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair { func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair {
if interval.NewestInclusive < bit.start { if interval.NewestInclusive < bit.start {
@ -532,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric.
// DropMetric implements Storage. // DropMetric implements Storage.
func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) {
for _, fp := range fps { for _, fp := range fps {
s.fpLocker.Lock(fp) s.purgeSeries(fp, nil, nil)
if series, ok := s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.persistence.unindexMetric(fp, series.metric)
} else if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging metric with fingerprint %v: %v", fp, err)
}
// Attempt to delete series file in any case.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
s.fpLocker.Unlock(fp)
s.seriesOps.WithLabelValues(requestedPurge).Inc()
} }
} }
@ -565,34 +558,44 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
rawFP := sample.Metric.FastFingerprint() rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP) s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric) fp, err := s.mapper.mapFP(rawFP, sample.Metric)
defer func() {
s.fpLocker.Unlock(fp)
}() // Func wrapper because fp might change below.
if err != nil { if err != nil {
log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err) s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err))
s.persistence.setDirty(true) return err
} }
if fp != rawFP { if fp != rawFP {
// Switch locks. // Switch locks.
s.fpLocker.Unlock(rawFP) s.fpLocker.Unlock(rawFP)
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
} }
series := s.getOrCreateSeries(fp, sample.Metric) series, err := s.getOrCreateSeries(fp, sample.Metric)
if err != nil {
return err // getOrCreateSeries took care of quarantining already.
}
if sample.Timestamp <= series.lastTime { if sample.Timestamp <= series.lastTime {
s.fpLocker.Unlock(fp) // Don't report "no-op appends", i.e. where timestamp and sample
// Don't log and track equal timestamps, as they are a common occurrence // value are the same as for the last append, as they are a
// when using client-side timestamps (e.g. Pushgateway or federation). // common occurrence when using client-side timestamps
// It would be even better to also compare the sample values here, but // (e.g. Pushgateway or federation).
// we don't have efficient access to a series's last value. if sample.Timestamp == series.lastTime &&
if sample.Timestamp != series.lastTime { series.lastSampleValueSet &&
s.outOfOrderSamplesCount.Inc() sample.Value == series.lastSampleValue {
return ErrOutOfOrderSample return nil
} }
return nil s.outOfOrderSamplesCount.Inc()
return ErrOutOfOrderSample // Caused by the caller.
} }
completedChunksCount := series.add(&model.SamplePair{ completedChunksCount, err := series.add(model.SamplePair{
Value: sample.Value, Value: sample.Value,
Timestamp: sample.Timestamp, Timestamp: sample.Timestamp,
}) })
s.fpLocker.Unlock(fp) if err != nil {
s.quarantineSeries(fp, sample.Metric, err)
return err
}
s.ingestedSamplesCount.Inc() s.ingestedSamplesCount.Inc()
s.incNumChunksToPersist(completedChunksCount) s.incNumChunksToPersist(completedChunksCount)
@ -653,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() {
} }
} }
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
var cds []*chunkDesc var cds []*chunkDesc
@ -661,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
unarchived, err := s.persistence.unarchiveMetric(fp) unarchived, err := s.persistence.unarchiveMetric(fp)
if err != nil { if err != nil {
log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
return nil, err
} }
if unarchived { if unarchived {
s.seriesOps.WithLabelValues(unarchive).Inc() s.seriesOps.WithLabelValues(unarchive).Inc()
@ -671,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
// appear as archived or purged). // appear as archived or purged).
cds, err = s.loadChunkDescs(fp, 0) cds, err = s.loadChunkDescs(fp, 0)
if err != nil { if err != nil {
log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) s.quarantineSeries(fp, m, err)
return nil, err
} }
modTime = s.persistence.seriesFileModTime(fp) modTime = s.persistence.seriesFileModTime(fp)
} else { } else {
@ -679,41 +684,87 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
s.persistence.indexMetric(fp, m) s.persistence.indexMetric(fp, m)
s.seriesOps.WithLabelValues(create).Inc() s.seriesOps.WithLabelValues(create).Inc()
} }
series = newMemorySeries(m, cds, modTime) series, err = newMemorySeries(m, cds, modTime)
if err != nil {
s.quarantineSeries(fp, m, err)
return nil, err
}
s.fpToSeries.put(fp, series) s.fpToSeries.put(fp, series)
s.numSeries.Inc() s.numSeries.Inc()
} }
return series, nil
}
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
func (s *memorySeriesStorage) getSeriesForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if ok {
return series
}
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil
}
if last.Before(from) || first.After(through) {
return nil
}
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.")
return nil
}
series, err = s.getOrCreateSeries(fp, metric)
if err != nil {
// getOrCreateSeries took care of quarantining already.
return nil
}
return series return series
} }
func (s *memorySeriesStorage) preloadChunksForRange( func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint, fp model.Fingerprint,
from model.Time, through model.Time, from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator, error) { ) ([]*chunkDesc, SeriesIterator) {
s.fpLocker.Lock(fp) s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp) defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp) series := s.getSeriesForRange(fp, from, through)
if !ok { if series == nil {
has, first, last, err := s.persistence.hasArchivedMetric(fp) return nil, nopIter
if err != nil {
return nil, nopIter, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nopIter, nil
}
if from.Before(last) && through.After(first) {
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
return nil, nopIter, err
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nopIter, nil
}
} }
return series.preloadChunksForRange(from, through, fp, s) cds, iter, err := series.preloadChunksForRange(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
return nil, nopIter
}
return cds, iter
}
func (s *memorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series := s.getSeriesForRange(fp, from, through)
if series == nil {
return nil, nopIter
}
cds, iter, err := series.preloadChunksForInstant(fp, from, through, s)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
return nil, nopIter
}
return cds, iter
} }
func (s *memorySeriesStorage) handleEvictList() { func (s *memorySeriesStorage) handleEvictList() {
@ -1129,7 +1180,10 @@ func (s *memorySeriesStorage) writeMemorySeries(
s.persistErrors.Inc() s.persistErrors.Inc()
return false return false
} }
series.dropChunks(beforeTime) if err := series.dropChunks(beforeTime); err != nil {
s.persistErrors.Inc()
return false
}
if len(series.chunkDescs) == 0 && allDroppedFromPersistence { if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good. // All chunks dropped from both memory and persistence. Delete the series for good.
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
@ -1144,8 +1198,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
} else { } else {
series.chunkDescsOffset -= numDroppedFromPersistence series.chunkDescsOffset -= numDroppedFromPersistence
if series.chunkDescsOffset < 0 { if series.chunkDescsOffset < 0 {
log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series) s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series))
s.persistence.setDirty(true)
series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery.
} }
} }
@ -1299,6 +1352,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
return score return score
} }
// quarantineSeries registers the provided fingerprint for quarantining. It
// always returns immediately. Quarantine requests are processed
// asynchronously. If there are too many requests queued, they are simply
// dropped.
//
// Quarantining means that the series file is moved to the orphaned directory,
// and all its traces are removed from indices. Call this method if an
// unrecoverable error is detected while dealing with a series, and pass in the
// encountered error. It will be saved as a hint in the orphaned directory.
func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
req := quarantineRequest{fp: fp, metric: metric, reason: err}
select {
case s.quarantineRequests <- req:
// Request submitted.
default:
log.
With("fingerprint", fp).
With("metric", metric).
With("reason", err).
Warn("Quarantine queue full. Dropped quarantine request.")
s.seriesOps.WithLabelValues(droppedQuarantine).Inc()
}
}
func (s *memorySeriesStorage) handleQuarantine() {
for {
select {
case req := <-s.quarantineRequests:
s.purgeSeries(req.fp, req.metric, req.reason)
log.
With("fingerprint", req.fp).
With("metric", req.metric).
With("reason", req.reason).
Warn("Series quarantined.")
case <-s.quarantineStopping:
log.Info("Series quarantining stopped.")
close(s.quarantineStopped)
return
}
}
}
// purgeSeries removes all traces of a series. If a non-nil quarantine reason is
// provided, the series file will not be deleted completely, but moved to the
// orphaned directory with the reason and the metric in a hint file. The
// provided metric might be nil if unknown.
func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
s.fpLocker.Lock(fp)
var (
series *memorySeries
ok bool
)
if series, ok = s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
m = series.metric
// Adjust s.numChunksToPersist and numMemChunks down by
// the number of chunks in this series that are not
// persisted yet. Persisted chunks will be deducted from
// numMemChunks upon eviction.
numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted))
if !series.headChunkClosed {
// Head chunk wasn't counted as waiting for persistence yet.
// (But it was counted as a chunk in memory.)
numChunksNotYetPersisted--
}
s.incNumChunksToPersist(-numChunksNotYetPersisted)
} else {
if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error purging metric from archive.")
}
}
if m != nil {
// If we know a metric now, unindex it in any case.
// purgeArchivedMetric might have done so already, but we cannot
// be sure. Unindexing in idempotent, though.
s.persistence.unindexMetric(fp, m)
}
// Attempt to delete/quarantine the series file in any case.
if quarantineReason == nil {
// No reason stated, simply delete the file.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.
With("fingerprint", fp).
With("metric", m).
With("error", err).
Error("Error deleting series file.")
}
s.seriesOps.WithLabelValues(requestedPurge).Inc()
} else {
if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil {
s.seriesOps.WithLabelValues(completedQurantine).Inc()
} else {
s.seriesOps.WithLabelValues(failedQuarantine).Inc()
log.
With("fingerprint", fp).
With("metric", m).
With("reason", quarantineReason).
With("error", err).
Error("Error quarantining series file.")
}
}
s.fpLocker.Unlock(fp)
}
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch) s.persistence.Describe(ch)

View File

@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) {
defer pl.Close() defer pl.Close()
// Preload everything. // Preload everything.
it, err := pl.PreloadRange(fp, insertStart, now) it := pl.PreloadRange(fp, insertStart, now)
if err != nil {
t.Fatalf("Error preloading outdated chunks: %s", err)
}
val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute))
if val.Timestamp != model.Earliest { if val.Timestamp != model.Earliest {
@ -424,14 +421,6 @@ func TestRetentionCutoff(t *testing.T) {
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
} }
vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now})
if len(vals) != 2 {
t.Errorf("expected 2 values but got %d", len(vals))
}
if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt {
t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time())
}
} }
func TestDropMetrics(t *testing.T) { func TestDropMetrics(t *testing.T) {
@ -500,18 +489,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2)) t.Errorf("unexpected number of fingerprints: %d", len(fps2))
} }
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -533,18 +516,12 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3)) t.Errorf("unexpected number of fingerprints: %d", len(fps3))
} }
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Errorf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
@ -557,6 +534,95 @@ func TestDropMetrics(t *testing.T) {
} }
} }
func TestQuarantineMetric(t *testing.T) {
now := model.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(t, 1)
defer closer.Close()
chunkFileExists := func(fp model.Fingerprint) (bool, error) {
f, err := s.persistence.openChunkFileForReading(fp)
if err == nil {
f.Close()
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"}
m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"}
m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"}
N := 120000
for j, m := range []model.Metric{m1, m2, m3} {
for i := 0; i < N; i++ {
smpl := &model.Sample{
Metric: m,
Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals.
Value: model.SampleValue(j),
}
s.Append(smpl)
}
}
s.WaitForIndexing()
// Archive m3, but first maintain it so that at least something is written to disk.
fpToBeArchived := m3.FastFingerprint()
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived)
// Corrupt the series file for m3.
f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived))
if err != nil {
t.Fatal(err)
}
if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil {
t.Fatal(err)
}
if f.Close(); err != nil {
t.Fatal(err)
}
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
if len(fps) != 3 {
t.Errorf("unexpected number of fingerprints: %d", len(fps))
}
pl := s.NewPreloader()
// This will access the corrupt file and lead to quarantining.
pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute)
pl.Close()
time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait.
s.WaitForIndexing()
fps2 := s.fingerprintsForLabelPairs(model.LabelPair{
Name: model.MetricNameLabel, Value: "test",
})
if len(fps2) != 2 {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
exists, err := chunkFileExists(fpToBeArchived)
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("chunk file exists for fp=%v", fpToBeArchived)
}
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and // TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster. // off without disaster.
func TestLoop(t *testing.T) { func TestLoop(t *testing.T) {
@ -627,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
continue continue
} }
for sample := range cd.c.newIterator().values() { for sample := range cd.c.newIterator().values() {
values = append(values, *sample) if sample.error != nil {
t.Error(sample.error)
}
values = append(values, sample.SamplePair)
} }
} }
@ -670,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
// #1 Exactly on a sample. // #1 Exactly on a sample.
for i, expected := range samples { for i, expected := range samples {
@ -747,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
b.ResetTimer() b.ResetTimer()
@ -828,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
// #1 Zero length interval at sample. // #1 Zero length interval at sample.
for i, expected := range samples { for i, expected := range samples {
@ -983,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint() fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
b.ResetTimer() b.ResetTimer()
@ -1032,32 +1089,26 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks. // Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil { actual := it.RangeValues(metric.Interval{
t.Fatalf("Error preloading everything: %s", err)
}
actual := it.BoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
}) })
if len(actual) != 2 { if len(actual) < 4000 {
t.Fatal("expected two results after purging half of series") t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual))
} }
if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 { if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 {
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
} }
want := model.Time(19998) want := model.Time(19998)
if actual[1].Timestamp != want { if actual[len(actual)-1].Timestamp != want {
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
} }
// Drop everything. // Drop everything.
s.maintainMemorySeries(fp, 100000) s.maintainMemorySeries(fp, 100000)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest) _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil { actual = it.RangeValues(metric.Interval{
t.Fatalf("Error preloading everything: %s", err)
}
actual = it.BoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
NewestInclusive: 100000, NewestInclusive: 100000,
}) })
@ -1082,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics. // Archive metrics.
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
lastTime, err := series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(), fp, series.metric, series.firstTime(), lastTime,
); err != nil { ); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1133,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Archive metrics. // Archive metrics.
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
lastTime, err = series.head().lastTime()
if err != nil {
t.Fatal(err)
}
if err := s.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(), fp, series.metric, series.firstTime(), lastTime,
); err != nil { ); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -1528,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples,
t.Fatal(err) t.Fatal(err)
} }
p := s.NewPreloader() p := s.NewPreloader()
it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp)
if err != nil {
t.Fatal(err)
}
found := it.ValueAtOrBeforeTime(sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp)
if found.Timestamp == model.Earliest { if found.Timestamp == model.Earliest {
t.Errorf("Sample %#v: Expected sample not found.", sample) t.Errorf("Sample %#v: Expected sample not found.", sample)
@ -1575,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) {
pl := s.NewPreloader() pl := s.NewPreloader()
defer pl.Close() defer pl.Close()
it, err := pl.PreloadRange(fp, 0, 2) it := pl.PreloadRange(fp, 0, 2)
if err != nil {
t.Fatalf("Error preloading chunks: %s", err)
}
want := []model.SamplePair{ want := []model.SamplePair{
{ {

View File

@ -29,7 +29,6 @@ const (
ResultAppendTime ResultAppendTime
QueryAnalysisTime QueryAnalysisTime
GetValueAtTimeTime GetValueAtTimeTime
GetBoundaryValuesTime
GetRangeValuesTime GetRangeValuesTime
ExecQueueTime ExecQueueTime
ViewDiskPreparationTime ViewDiskPreparationTime
@ -60,8 +59,6 @@ func (s QueryTiming) String() string {
return "Query analysis time" return "Query analysis time"
case GetValueAtTimeTime: case GetValueAtTimeTime:
return "GetValueAtTime() time" return "GetValueAtTime() time"
case GetBoundaryValuesTime:
return "GetBoundaryValues() time"
case GetRangeValuesTime: case GetRangeValuesTime:
return "GetRangeValues() time" return "GetRangeValues() time"
case ExecQueueTime: case ExecQueueTime:

View File

@ -67,7 +67,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sp := h.storage.LastSamplePairForFingerprint(fp) sp := h.storage.LastSamplePairForFingerprint(fp)
// Discard if sample does not exist or lays before the staleness interval. // Discard if sample does not exist or lays before the staleness interval.
if sp == nil || sp.Timestamp.Before(minTimestamp) { if sp.Timestamp.Before(minTimestamp) {
continue continue
} }