diff --git a/promql/analyzer.go b/promql/analyzer.go index 6e052656d..bad5fbd92 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -41,14 +41,20 @@ type Analyzer struct { // fingerprints. One of these structs is collected for each offset by the query // analyzer. type preloadTimes struct { - // Instants require single samples to be loaded along the entire query - // range, with intervals between the samples corresponding to the query - // resolution. - instants map[model.Fingerprint]struct{} - // Ranges require loading a range of samples at each resolution step, - // stretching backwards from the current evaluation timestamp. The length of - // the range into the past is given by the duration, as in "foo[5m]". + // Ranges require loading a range of samples. They can be triggered by + // two type of expressions: First a range expression AKA matrix + // selector, where the Duration in the ranges map is the length of the + // range in the range expression. Second an instant expression AKA + // vector selector, where the Duration in the ranges map is the + // StalenessDelta. In preloading, both types of expressions result in + // the same effect: Preload everything between the specified start time + // minus the Duration in the ranges map up to the specified end time. ranges map[model.Fingerprint]time.Duration + // Instants require a single sample to be loaded. This only happens for + // instant expressions AKA vector selectors iff the specified start ond + // end time are the same, Thus, instants is only populated if start and + // end time are the same. + instants map[model.Fingerprint]struct{} } // Analyze the provided expression and attach metrics and fingerprints to data-selecting @@ -57,13 +63,15 @@ func (a *Analyzer) Analyze(ctx context.Context) error { a.offsetPreloadTimes = map[time.Duration]preloadTimes{} getPreloadTimes := func(offset time.Duration) preloadTimes { - if _, ok := a.offsetPreloadTimes[offset]; !ok { - a.offsetPreloadTimes[offset] = preloadTimes{ - instants: map[model.Fingerprint]struct{}{}, - ranges: map[model.Fingerprint]time.Duration{}, - } + if pt, ok := a.offsetPreloadTimes[offset]; ok { + return pt } - return a.offsetPreloadTimes[offset] + pt := preloadTimes{ + instants: map[model.Fingerprint]struct{}{}, + ranges: map[model.Fingerprint]time.Duration{}, + } + a.offsetPreloadTimes[offset] = pt + return pt } // Retrieve fingerprints and metrics for the required time range for @@ -76,11 +84,14 @@ func (a *Analyzer) Analyze(ctx context.Context) error { pt := getPreloadTimes(n.Offset) for fp := range n.metrics { - // Only add the fingerprint to the instants if not yet present in the - // ranges. Ranges always contain more points and span more time than - // instants for the same offset. - if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges { + r, alreadyInRanges := pt.ranges[fp] + if a.Start.Equal(a.End) && !alreadyInRanges { + // A true instant, we only need one value. pt.instants[fp] = struct{}{} + continue + } + if r < StalenessDelta { + pt.ranges[fp] = StalenessDelta } } case *MatrixSelector: @@ -135,35 +146,13 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - startOfRange := start.Add(-rangeDuration) - if StalenessDelta > rangeDuration { - // Cover a weird corner case: The expression - // mixes up instants and ranges for the same - // series. We'll handle that over-all as - // range. But if the rangeDuration is smaller - // than the StalenessDelta, the range wouldn't - // cover everything potentially needed for the - // instant, so we have to extend startOfRange. - startOfRange = start.Add(-StalenessDelta) - } - iter, err := p.PreloadRange(fp, startOfRange, end) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end) } for fp := range pt.instants { if err = contextDone(ctx, env); err != nil { return nil, err } - // Need to look backwards by StalenessDelta but not - // forward because we always return the closest sample - // _before_ the reference time. - iter, err := p.PreloadRange(fp, start.Add(-StalenessDelta), end) - if err != nil { - return nil, err - } - itersForDuration[fp] = iter + itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta) } } diff --git a/promql/engine.go b/promql/engine.go index 599486cae..836710181 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -575,15 +575,6 @@ func (ev *evaluator) evalMatrix(e Expr) matrix { return mat } -// evalMatrixBounds attempts to evaluate e to matrix boundaries and errors otherwise. -func (ev *evaluator) evalMatrixBounds(e Expr) matrix { - ms, ok := e.(*MatrixSelector) - if !ok { - ev.errorf("matrix bounds can only be evaluated for matrix selectors, got %T", e) - } - return ev.matrixSelectorBounds(ms) -} - // evalString attempts to evaluate e to a string value and errors otherwise. func (ev *evaluator) evalString(e Expr) *model.String { val := ev.eval(e) @@ -731,29 +722,6 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix { return matrix(sampleStreams) } -// matrixSelectorBounds evaluates the boundaries of a *MatrixSelector. -func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) matrix { - interval := metric.Interval{ - OldestInclusive: ev.Timestamp.Add(-node.Range - node.Offset), - NewestInclusive: ev.Timestamp.Add(-node.Offset), - } - - sampleStreams := make([]*sampleStream, 0, len(node.iterators)) - for fp, it := range node.iterators { - samplePairs := it.BoundaryValues(interval) - if len(samplePairs) == 0 { - continue - } - - ss := &sampleStream{ - Metric: node.metrics[fp], - Values: samplePairs, - } - sampleStreams = append(sampleStreams, ss) - } - return matrix(sampleStreams) -} - func (ev *evaluator) vectorAnd(lhs, rhs vector, matching *VectorMatching) vector { if matching.Card != CardManyToMany { panic("logical operations must always be many-to-many matching") diff --git a/promql/functions.go b/promql/functions.go index 4728b1e96..433c06f02 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -524,10 +524,37 @@ func funcLog10(ev *evaluator, args Expressions) model.Value { return vector } +// linearRegression performs a least-square linear regression analysis on the +// provided SamplePairs. It returns the slope, and the intercept value at the +// provided time. +func linearRegression(samples []model.SamplePair, interceptTime model.Time) (slope, intercept model.SampleValue) { + var ( + n model.SampleValue + sumX, sumY model.SampleValue + sumXY, sumX2 model.SampleValue + ) + for _, sample := range samples { + x := model.SampleValue( + model.Time(sample.Timestamp-interceptTime).UnixNano(), + ) / 1e9 + n += 1.0 + sumY += sample.Value + sumX += x + sumXY += x * sample.Value + sumX2 += x * x + } + covXY := sumXY - sumX*sumY/n + varX := sumX2 - sumX*sumX/n + + slope = covXY / varX + intercept = sumY/n - slope*sumX/n + return slope, intercept +} + // === deriv(node model.ValMatrix) Vector === func funcDeriv(ev *evaluator, args Expressions) model.Value { - resultVector := vector{} mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) for _, samples := range mat { // No sense in trying to compute a derivative without at least two points. @@ -535,29 +562,10 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { if len(samples.Values) < 2 { continue } - - // Least squares. - var ( - n model.SampleValue - sumX, sumY model.SampleValue - sumXY, sumX2 model.SampleValue - ) - for _, sample := range samples.Values { - x := model.SampleValue(sample.Timestamp.UnixNano() / 1e9) - n += 1.0 - sumY += sample.Value - sumX += x - sumXY += x * sample.Value - sumX2 += x * x - } - numerator := sumXY - sumX*sumY/n - denominator := sumX2 - (sumX*sumX)/n - - resultValue := numerator / denominator - + slope, _ := linearRegression(samples.Values, 0) resultSample := &sample{ Metric: samples.Metric, - Value: resultValue, + Value: slope, Timestamp: ev.Timestamp, } resultSample.Metric.Del(model.MetricNameLabel) @@ -568,44 +576,26 @@ func funcDeriv(ev *evaluator, args Expressions) model.Value { // === predict_linear(node model.ValMatrix, k model.ValScalar) Vector === func funcPredictLinear(ev *evaluator, args Expressions) model.Value { - vec := funcDeriv(ev, args[0:1]).(vector) - duration := model.SampleValue(model.SampleValue(ev.evalFloat(args[1]))) + mat := ev.evalMatrix(args[0]) + resultVector := make(vector, 0, len(mat)) + duration := model.SampleValue(ev.evalFloat(args[1])) - excludedLabels := map[model.LabelName]struct{}{ - model.MetricNameLabel: {}, - } - - // Calculate predicted delta over the duration. - signatureToDelta := map[uint64]model.SampleValue{} - for _, el := range vec { - signature := model.SignatureWithoutLabels(el.Metric.Metric, excludedLabels) - signatureToDelta[signature] = el.Value * duration - } - - // add predicted delta to last value. - // TODO(beorn7): This is arguably suboptimal. The funcDeriv above has - // given us an estimate over the range. So we should add the delta to - // the value predicted for the end of the range. Also, once this has - // been rectified, we are not using BoundaryValues anywhere anymore, so - // we can kick out a whole lot of code. - matrixBounds := ev.evalMatrixBounds(args[0]) - outVec := make(vector, 0, len(signatureToDelta)) - for _, samples := range matrixBounds { + for _, samples := range mat { + // No sense in trying to predict anything without at least two points. + // Drop this vector element. if len(samples.Values) < 2 { continue } - signature := model.SignatureWithoutLabels(samples.Metric.Metric, excludedLabels) - delta, ok := signatureToDelta[signature] - if ok { - samples.Metric.Del(model.MetricNameLabel) - outVec = append(outVec, &sample{ - Metric: samples.Metric, - Value: delta + samples.Values[1].Value, - Timestamp: ev.Timestamp, - }) + slope, intercept := linearRegression(samples.Values, ev.Timestamp) + resultSample := &sample{ + Metric: samples.Metric, + Value: slope*duration + intercept, + Timestamp: ev.Timestamp, } + resultSample.Metric.Del(model.MetricNameLabel) + resultVector = append(resultVector, resultSample) } - return outVec + return resultVector } // === histogram_quantile(k model.ValScalar, vector model.ValVector) Vector === diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index bccec5a1a..28233033e 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -102,16 +102,28 @@ eval instant at 50m deriv(testcounter_reset_middle[100m]) {} 0.010606060606060607 # predict_linear should return correct result. +# X/s = [ 0, 300, 600, 900,1200,1500,1800,2100,2400,2700,3000] +# Y = [ 0, 10, 20, 30, 40, 0, 10, 20, 30, 40, 50] +# sumX = 16500 +# sumY = 250 +# sumXY = 480000 +# sumX2 = 34650000 +# n = 11 +# covXY = 105000 +# varX = 9900000 +# slope = 0.010606060606060607 +# intercept at t=0: 6.818181818181818 +# intercept at t=3000: 38.63636363636364 +# intercept at t=3000+3600: 76.81818181818181 eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - {} 88.181818181818185200 + {} 76.81818181818181 -# predict_linear is syntactic sugar around deriv. +# With http_requests, there is a sample value exactly at the end of +# the range, and it has exactly the predicted value, so predict_linear +# can be emulated with deriv. eval instant at 50m predict_linear(http_requests[50m], 3600) - (http_requests + deriv(http_requests[50m]) * 3600) {group="canary", instance="1", job="app-server"} 0 -eval instant at 50m predict_linear(testcounter_reset_middle[100m], 3600) - (testcounter_reset_middle + deriv(testcounter_reset_middle[100m]) * 3600) - {} 0 - clear # Tests for label_replace. diff --git a/storage/local/chunk.go b/storage/local/chunk.go index e0be10b14..d8b1ae2b8 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -81,13 +81,6 @@ const ( // is populated upon creation of a chunkDesc, so it is alway safe to call // firstTime. The firstTime method is arguably not needed and only there for // consistency with lastTime. -// -// Yet another (deprecated) case is lastSamplePair. It's used in federation and -// must be callable without pinning. Locking the fingerprint of the series is -// still required (to avoid concurrent appends to the chunk). The call is -// relatively expensive because of the required acquisition of the evict -// mutex. It will go away, though, once tracking the lastSamplePair has been -// moved into the series object. type chunkDesc struct { sync.Mutex // Protects pinning. c chunk // nil if chunk is evicted. @@ -119,7 +112,7 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { // add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (cd *chunkDesc) add(s *model.SamplePair) []chunk { +func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) { return cd.c.add(s) } @@ -176,9 +169,9 @@ func (cd *chunkDesc) firstTime() model.Time { // lastTime returns the timestamp of the last sample in the chunk. For safe // concurrent access, this method requires the fingerprint of the time series to // be locked. -func (cd *chunkDesc) lastTime() model.Time { +func (cd *chunkDesc) lastTime() (model.Time, error) { if cd.chunkLastTime != model.Earliest || cd.c == nil { - return cd.chunkLastTime + return cd.chunkLastTime, nil } return cd.c.newIterator().lastTimestamp() } @@ -188,28 +181,15 @@ func (cd *chunkDesc) lastTime() model.Time { // last sample to a chunk or after closing a head chunk due to age. For safe // concurrent access, the chunk must be pinned, and the caller must have locked // the fingerprint of the series. -func (cd *chunkDesc) maybePopulateLastTime() { +func (cd *chunkDesc) maybePopulateLastTime() error { if cd.chunkLastTime == model.Earliest && cd.c != nil { - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() - } -} - -// lastSamplePair returns the last sample pair of the underlying chunk, or nil -// if the chunk is evicted. For safe concurrent access, this method requires the -// fingerprint of the time series to be locked. -// TODO(beorn7): Move up into memorySeries. -func (cd *chunkDesc) lastSamplePair() *model.SamplePair { - cd.Lock() - defer cd.Unlock() - - if cd.c == nil { - return nil - } - it := cd.c.newIterator() - return &model.SamplePair{ - Timestamp: it.lastTimestamp(), - Value: it.lastSampleValue(), + t, err := cd.c.newIterator().lastTimestamp() + if err != nil { + return err + } + cd.chunkLastTime = t } + return nil } // isEvicted returns whether the chunk is evicted. For safe concurrent access, @@ -266,14 +246,14 @@ type chunk interface { // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. - add(sample *model.SamplePair) []chunk + add(sample model.SamplePair) ([]chunk, error) clone() chunk firstTime() model.Time newIterator() chunkIterator marshal(io.Writer) error marshalToBuf([]byte) error unmarshal(io.Reader) error - unmarshalFromBuf([]byte) + unmarshalFromBuf([]byte) error encoding() chunkEncoding } @@ -284,57 +264,73 @@ type chunkIterator interface { // length returns the number of samples in the chunk. length() int // Gets the timestamp of the n-th sample in the chunk. - timestampAtIndex(int) model.Time + timestampAtIndex(int) (model.Time, error) // Gets the last timestamp in the chunk. - lastTimestamp() model.Time + lastTimestamp() (model.Time, error) // Gets the sample value of the n-th sample in the chunk. - sampleValueAtIndex(int) model.SampleValue + sampleValueAtIndex(int) (model.SampleValue, error) // Gets the last sample value in the chunk. - lastSampleValue() model.SampleValue + lastSampleValue() (model.SampleValue, error) // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no - // applicable value exists, a SamplePair with timestamp model.Earliest - // and value 0.0 is returned. - valueAtOrBeforeTime(model.Time) model.SamplePair + // applicable value exists, ZeroSamplePair is returned. + valueAtOrBeforeTime(model.Time) (model.SamplePair, error) // Gets all values contained within a given interval. - rangeValues(metric.Interval) []model.SamplePair + rangeValues(metric.Interval) ([]model.SamplePair, error) // Whether a given timestamp is contained between first and last value // in the chunk. - contains(model.Time) bool + contains(model.Time) (bool, error) // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last // one. It is generally not safe to mutate the chunk while the channel - // is still open. - values() <-chan *model.SamplePair + // is still open. If a value is returned with error!=nil, no further + // values will be returned and the channel is closed. + values() <-chan struct { + model.SamplePair + error + } } -func transcodeAndAdd(dst chunk, src chunk, s *model.SamplePair) []chunk { +func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { chunkOps.WithLabelValues(transcode).Inc() head := dst body := []chunk{} for v := range src.newIterator().values() { - newChunks := head.add(v) + if v.error != nil { + return nil, v.error + } + newChunks, err := head.add(v.SamplePair) + if err != nil { + return nil, err + } body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] } - newChunks := head.add(s) - return append(body, newChunks...) + newChunks, err := head.add(s) + if err != nil { + return nil, err + } + return append(body, newChunks...), nil } // newChunk creates a new chunk according to the encoding set by the // defaultChunkEncoding flag. func newChunk() chunk { - return newChunkForEncoding(DefaultChunkEncoding) + chunk, err := newChunkForEncoding(DefaultChunkEncoding) + if err != nil { + panic(err) + } + return chunk } -func newChunkForEncoding(encoding chunkEncoding) chunk { +func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { switch encoding { case delta: - return newDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil case doubleDelta: - return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen) + return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil default: - panic(fmt.Errorf("unknown chunk encoding: %v", encoding)) + return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index d4c9a4fa0..f51e54e7b 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -14,10 +14,11 @@ package local import ( + "errors" "fmt" "io" "os" - "path" + "path/filepath" "strings" "sync/atomic" @@ -52,7 +53,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint log.Info("Scanning files.") for i := 0; i < 1<<(seriesDirNameLen*4); i++ { - dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) + dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue @@ -139,7 +140,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint } } - p.setDirty(false) + p.setDirty(false, nil) log.Warn("Crash recovery complete.") return nil } @@ -175,36 +176,46 @@ func (p *persistence) sanitizeSeries( fingerprintToSeries map[model.Fingerprint]*memorySeries, fpm fpMappings, ) (model.Fingerprint, bool) { - filename := path.Join(dirname, fi.Name()) + var ( + fp model.Fingerprint + err error + filename = filepath.Join(dirname, fi.Name()) + s *memorySeries + ) + purge := func() { - var err error - defer func() { - if err != nil { - log.Errorf("Failed to move lost series file %s to orphaned directory, deleting it instead. Error was: %s", filename, err) - if err = os.Remove(filename); err != nil { - log.Errorf("Even deleting file %s did not work: %s", filename, err) - } + if fp != 0 { + var metric model.Metric + if s != nil { + metric = s.metric } - }() - orphanedDir := path.Join(p.basePath, "orphaned", path.Base(dirname)) - if err = os.MkdirAll(orphanedDir, 0700); err != nil { - return + if err = p.quarantineSeriesFile( + fp, errors.New("purge during crash recovery"), metric, + ); err == nil { + return + } + log. + With("file", filename). + With("error", err). + Error("Failed to move lost series file to orphaned directory.") } - if err = os.Rename(filename, path.Join(orphanedDir, fi.Name())); err != nil { - return + // If we are here, we are either purging an incorrectly named + // file, or quarantining has failed. So simply delete the file. + if err = os.Remove(filename); err != nil { + log. + With("file", filename). + With("error", err). + Error("Failed to delete lost series file.") } } - var fp model.Fingerprint - var err error - if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) || !strings.HasSuffix(fi.Name(), seriesFileSuffix) { log.Warnf("Unexpected series file name %s.", filename) purge() return fp, false } - if fp, err = model.FingerprintFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { + if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { log.Warnf("Error parsing file name %s: %s", filename, err) purge() return fp, false @@ -274,7 +285,15 @@ func (p *persistence) sanitizeSeries( s.chunkDescs = cds s.chunkDescsOffset = 0 s.savedFirstTime = cds[0].firstTime() - s.lastTime = cds[len(cds)-1].lastTime() + s.lastTime, err = cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } s.persistWatermark = len(cds) s.modTime = modTime return fp, true @@ -304,7 +323,15 @@ func (p *persistence) sanitizeSeries( s.savedFirstTime = cds[0].firstTime() s.modTime = modTime - lastTime := cds[len(cds)-1].lastTime() + lastTime, err := cds[len(cds)-1].lastTime() + if err != nil { + log.Errorf( + "Failed to determine time of the last sample for metric %v, fingerprint %v: %s", + s.metric, fp, err, + ) + purge() + return fp, false + } keepIdx := -1 for i, cd := range s.chunkDescs { if cd.firstTime() >= lastTime { @@ -414,7 +441,10 @@ func (p *persistence) cleanUpArchiveIndexes( if err != nil { return err } - series := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + series, err := newMemorySeries(model.Metric(m), cds, p.seriesFileModTime(model.Fingerprint(fp))) + if err != nil { + return err + } fpToSeries[model.Fingerprint(fp)] = series return nil }); err != nil { diff --git a/storage/local/delta.go b/storage/local/delta.go index 7222c5a15..c78702072 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -76,7 +76,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod } // add implements chunk. -func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) @@ -89,14 +89,17 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } baseValue := c.baseValue() dt := s.Timestamp - c.baseTime() if dt < 0 { - panic("time delta is less than zero") + return nil, fmt.Errorf("time delta is less than zero: %v", dt) } dv := s.Value - baseValue @@ -130,8 +133,11 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -148,7 +154,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -165,7 +171,7 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -175,10 +181,10 @@ func (c deltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -243,15 +249,24 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -302,57 +317,108 @@ type deltaEncodedChunkIterator struct { func (it *deltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *deltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return model.SamplePair{Timestamp: model.Earliest} + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *deltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *deltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *deltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -360,61 +426,61 @@ func (it *deltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) switch it.tBytes { case d1: - return it.baseT + model.Time(uint8(it.c[offset])) + return it.baseT + model.Time(uint8(it.c[offset])), nil case d2: - return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint16(it.c[offset:])), nil case d4: - return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])) + return it.baseT + model.Time(binary.LittleEndian.Uint32(it.c[offset:])), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *deltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) if it.isInt { switch it.vBytes { case d0: - return it.baseV + return it.baseV, nil case d1: - return it.baseV + model.SampleValue(int8(it.c[offset])) + return it.baseV + model.SampleValue(int8(it.c[offset])), nil case d2: - return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + return it.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: - return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: - return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + return it.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *deltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *deltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index e0e0856ce..257c84544 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -83,9 +83,9 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // add implements chunk. -func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { if c.len() == 0 { - return c.addFirstSample(s) + return c.addFirstSample(s), nil } tb := c.timeBytes() @@ -101,8 +101,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() @@ -136,8 +139,11 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s) } // Chunk is already half full. Better create a new one and save the transcoding efforts. - overflowChunks := newChunk().add(s) - return []chunk{&c, overflowChunks[0]} + overflowChunks, err := newChunk().add(s) + if err != nil { + return nil, err + } + return []chunk{&c, overflowChunks[0]}, nil } offset := len(c) @@ -154,7 +160,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp)) default: - panic("invalid number of bytes for time delta") + return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb) } offset += int(tb) @@ -171,7 +177,7 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv))) // d8 must not happen. Those samples are encoded as float64. default: - panic("invalid number of bytes for integer delta") + return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb) } } else { switch vb { @@ -181,10 +187,10 @@ func (c doubleDeltaEncodedChunk) add(s *model.SamplePair) []chunk { // Store the absolute value (no delta) in case of d8. binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value))) default: - panic("invalid number of bytes for floating point delta") + return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c} + return []chunk{&c}, nil } // clone implements chunk. @@ -251,15 +257,24 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { if _, err := io.ReadFull(r, *c); err != nil { return err } - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] return nil } // unmarshalFromBuf implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { +func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) - *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] + l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) + if int(l) > cap(*c) { + return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) + } + *c = (*c)[:l] + return nil } // encoding implements chunk. @@ -335,7 +350,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { c = c[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( c[doubleDeltaHeaderBaseTimeOffset:], @@ -350,10 +365,10 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s *model.SamplePair) []chunk { // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb deltaBytes) []chunk { +func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { - panic("base time delta is less than zero") + return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) } c = c[:doubleDeltaHeaderBytes] if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 { @@ -391,7 +406,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *model.SamplePair, tb, vb del math.Float64bits(float64(baseValueDelta)), ) } - return []chunk{&c} + return []chunk{&c}, nil } // doubleDeltaEncodedChunkIterator implements chunkIterator. @@ -408,57 +423,108 @@ type doubleDeltaEncodedChunkIterator struct { func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } // valueAtOrBeforeTime implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) valueAtOrBeforeTime(t model.Time) (model.SamplePair, error) { + var lastErr error i := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(t) + ts, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return ts.After(t) }) - if i == 0 { - return model.SamplePair{Timestamp: model.Earliest} + if i == 0 || lastErr != nil { + return ZeroSamplePair, lastErr } - return model.SamplePair{ - Timestamp: it.timestampAtIndex(i - 1), - Value: it.sampleValueAtIndex(i - 1), + ts, err := it.timestampAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err } + v, err := it.sampleValueAtIndex(i - 1) + if err != nil { + return ZeroSamplePair, err + } + return model.SamplePair{Timestamp: ts, Value: v}, nil } // rangeValues implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) []model.SamplePair { +func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) ([]model.SamplePair, error) { + var lastErr error + oldest := sort.Search(it.len, func(i int) bool { - return !it.timestampAtIndex(i).Before(in.OldestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return !t.Before(in.OldestInclusive) }) newest := sort.Search(it.len, func(i int) bool { - return it.timestampAtIndex(i).After(in.NewestInclusive) + t, err := it.timestampAtIndex(i) + if err != nil { + lastErr = err + } + return t.After(in.NewestInclusive) }) - if oldest == it.len { - return nil + if oldest == it.len || lastErr != nil { + return nil, lastErr } result := make([]model.SamplePair, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), - }) + t, err := it.timestampAtIndex(i) + if err != nil { + return nil, err + } + v, err := it.sampleValueAtIndex(i) + if err != nil { + return nil, err + } + result = append(result, model.SamplePair{Timestamp: t, Value: v}) } - return result + return result, nil } // contains implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) bool { - return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1)) +func (it *doubleDeltaEncodedChunkIterator) contains(t model.Time) (bool, error) { + lastT, err := it.timestampAtIndex(it.len - 1) + if err != nil { + return false, err + } + return !t.Before(it.baseT) && !t.After(lastT), nil } // values implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { - valuesChan := make(chan *model.SamplePair) +func (it *doubleDeltaEncodedChunkIterator) values() <-chan struct { + model.SamplePair + error +} { + valuesChan := make(chan struct { + model.SamplePair + error + }) go func() { for i := 0; i < it.len; i++ { - valuesChan <- &model.SamplePair{ - Timestamp: it.timestampAtIndex(i), - Value: it.sampleValueAtIndex(i), + t, err := it.timestampAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break } + v, err := it.sampleValueAtIndex(i) + if err != nil { + valuesChan <- struct { + model.SamplePair + error + }{ZeroSamplePair, err} + break + } + valuesChan <- struct { + model.SamplePair + error + }{model.SamplePair{Timestamp: t, Value: v}, nil} } close(valuesChan) }() @@ -466,17 +532,17 @@ func (it *doubleDeltaEncodedChunkIterator) values() <-chan *model.SamplePair { } // timestampAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time { +func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) (model.Time, error) { if idx == 0 { - return it.baseT + return it.baseT, nil } if idx == 1 { // If time bytes are at d8, the time is saved directly rather // than as a difference. if it.tBytes == d8 { - return it.baseΔT + return it.baseΔT, nil } - return it.baseT + it.baseΔT + return it.baseT + it.baseΔT, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) @@ -485,40 +551,40 @@ func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) model.Time case d1: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int8(it.c[offset])) + model.Time(int8(it.c[offset])), nil case d2: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.Time(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseT + model.Time(idx)*it.baseΔT + - model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.Time(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.Time(binary.LittleEndian.Uint64(it.c[offset:])) + return model.Time(binary.LittleEndian.Uint64(it.c[offset:])), nil default: - panic("invalid number of bytes for time delta") + return 0, fmt.Errorf("invalid number of bytes for time delta: %d", it.tBytes) } } // lastTimestamp implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() model.Time { +func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() (model.Time, error) { return it.timestampAtIndex(it.len - 1) } // sampleValueAtIndex implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) (model.SampleValue, error) { if idx == 0 { - return it.baseV + return it.baseV, nil } if idx == 1 { // If value bytes are at d8, the value is saved directly rather // than as a difference. if it.vBytes == d8 { - return it.baseΔV + return it.baseΔV, nil } - return it.baseV + it.baseΔV + return it.baseV + it.baseΔV, nil } offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) @@ -527,39 +593,39 @@ func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) model.Sam switch it.vBytes { case d0: return it.baseV + - model.SampleValue(idx)*it.baseΔV + model.SampleValue(idx)*it.baseΔV, nil case d1: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int8(it.c[offset])) + model.SampleValue(int8(it.c[offset])), nil case d2: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + model.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))), nil case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))), nil // No d8 for ints. default: - panic("invalid number of bytes for integer delta") + return 0, fmt.Errorf("invalid number of bytes for integer delta: %d", it.vBytes) } } else { switch it.vBytes { case d4: return it.baseV + model.SampleValue(idx)*it.baseΔV + - model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))), nil case d8: // Take absolute value for d8. - return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))), nil default: - panic("invalid number of bytes for floating point delta") + return 0, fmt.Errorf("invalid number of bytes for floating point delta: %d", it.vBytes) } } } // lastSampleValue implements chunkIterator. -func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() model.SampleValue { +func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() (model.SampleValue, error) { return it.sampleValueAtIndex(it.len - 1) } diff --git a/storage/local/heads.go b/storage/local/heads.go index 49a142fbd..60176a1fc 100644 --- a/storage/local/heads.go +++ b/storage/local/heads.go @@ -107,6 +107,8 @@ func (hs *headsScanner) scan() bool { firstTime int64 lastTime int64 encoding byte + ch chunk + lastTimeHead model.Time ) if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { return false @@ -174,11 +176,13 @@ func (hs *headsScanner) scan() bool { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { return false } - chunk := newChunkForEncoding(chunkEncoding(encoding)) - if hs.err = chunk.unmarshal(hs.r); hs.err != nil { + if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil { return false } - cd := newChunkDesc(chunk, chunk.firstTime()) + if hs.err = ch.unmarshal(hs.r); hs.err != nil { + return false + } + cd := newChunkDesc(ch, ch.firstTime()) if i < numChunkDescs-1 { // This is NOT the head chunk. So it's a chunk // to be persisted, and we need to populate lastTime. @@ -189,6 +193,10 @@ func (hs *headsScanner) scan() bool { } } + if lastTimeHead, hs.err = chunkDescs[len(chunkDescs)-1].lastTime(); hs.err != nil { + return false + } + hs.series = &memorySeries{ metric: model.Metric(metric), chunkDescs: chunkDescs, @@ -196,7 +204,7 @@ func (hs *headsScanner) scan() bool { modTime: modTime, chunkDescsOffset: int(chunkDescsOffset), savedFirstTime: model.Time(savedFirstTime), - lastTime: chunkDescs[len(chunkDescs)-1].lastTime(), + lastTime: lastTimeHead, headChunkClosed: headChunkClosed, } hs.seriesCurrent++ diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 14200092a..e189004cc 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -19,6 +19,7 @@ package index import ( "os" "path" + "path/filepath" "github.com/prometheus/common/model" @@ -95,7 +96,7 @@ func (i *FingerprintMetricIndex) Lookup(fp model.Fingerprint) (metric model.Metr // ready to use. func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintToMetricDir), + Path: filepath.Join(basePath, fingerprintToMetricDir), CacheSizeBytes: FingerprintMetricCacheSize, }) if err != nil { @@ -167,7 +168,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l model.LabelName) (values map[mod // LabelNameLabelValuesIndex ready to use. func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelNameToLabelValuesDir), + Path: filepath.Join(basePath, labelNameToLabelValuesDir), CacheSizeBytes: LabelNameLabelValuesCacheSize, }) if err != nil { @@ -245,7 +246,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p model.LabelPair) (fps map[model. // LabelPairFingerprintIndex ready to use. func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, labelPairToFingerprintsDir), + Path: filepath.Join(basePath, labelPairToFingerprintsDir), CacheSizeBytes: LabelPairFingerprintsCacheSize, }) if err != nil { @@ -283,7 +284,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp model.Fingerprint) (firstTime, las // FingerprintTimeRangeIndex ready to use. func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ - Path: path.Join(basePath, fingerprintTimeRangeDir), + Path: filepath.Join(basePath, fingerprintTimeRangeDir), CacheSizeBytes: FingerprintTimeRangeCacheSize, }) if err != nil { diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 85a7aa5e0..6d43ebd39 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -60,6 +60,9 @@ const ( requestedPurge = "purge_on_request" memoryMaintenance = "maintenance_in_memory" archiveMaintenance = "maintenance_in_archive" + completedQurantine = "quarantine_completed" + droppedQuarantine = "quarantine_dropped" + failedQuarantine = "quarantine_failed" // Op-types for chunkOps. createAndPin = "create" // A chunkDesc creation with refCount=1. diff --git a/storage/local/interface.go b/storage/local/interface.go index 005b39726..d9dbc4f21 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -14,6 +14,8 @@ package local import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -42,10 +44,12 @@ type Storage interface { // label matchers. At least one label matcher must be specified that does not // match the empty string. MetricsForLabelMatchers(...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric - // LastSamplePairForFingerprint returns the last sample pair for the - // provided fingerprint. If the respective time series does not exist or - // has an evicted head chunk, nil is returned. - LastSamplePairForFingerprint(model.Fingerprint) *model.SamplePair + // LastSamplePairForFingerprint returns the last sample pair that has + // been ingested for the provided fingerprint. If this instance of the + // Storage has never ingested a sample for the provided fingerprint (or + // the last ingestion is so long ago that the series has been archived), + // ZeroSamplePair is returned. + LastSamplePairForFingerprint(model.Fingerprint) model.SamplePair // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues // Get the metric associated with the provided fingerprint. @@ -69,16 +73,12 @@ type Storage interface { // methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of // a series, i.e. it is safe to continue using a SeriesIterator after or during // modifying the corresponding series, but the iterator will represent the state -// of the series prior the modification. +// of the series prior to the modification. type SeriesIterator interface { // Gets the value that is closest before the given time. In case a value // exists at precisely the given time, that value is returned. If no - // applicable value exists, a SamplePair with timestamp model.Earliest - // and value 0.0 is returned. + // applicable value exists, ZeroSamplePair is returned. ValueAtOrBeforeTime(model.Time) model.SamplePair - // Gets the boundary values of an interval: the first and last value - // within a given interval. - BoundaryValues(metric.Interval) []model.SamplePair // Gets all values contained within a given interval. RangeValues(metric.Interval) []model.SamplePair } @@ -90,7 +90,18 @@ type Preloader interface { PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, - ) (SeriesIterator, error) + ) SeriesIterator + PreloadInstant( + fp model.Fingerprint, + timestamp model.Time, stalenessDelta time.Duration, + ) SeriesIterator // Close unpins any previously requested series data from memory. Close() } + +// ZeroSamplePair is the pseudo zero-value of model.SamplePair used by the local +// package to signal a non-existing sample. It is a SamplePair with timestamp +// model.Earliest and value 0.0. Note that the natural zero value of SamplePair +// has a timestamp of 0, which is possible to appear in a real SamplePair and +// thus not suitable to signal a non-existing SamplePair. +var ZeroSamplePair = model.SamplePair{Timestamp: model.Earliest} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 725673cb1..e713feea7 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -20,7 +20,6 @@ import ( "io" "io/ioutil" "os" - "path" "path/filepath" "strconv" "strings" @@ -46,6 +45,7 @@ const ( seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. + hintFileSuffix = ".hint" mappingsFileName = "mappings.db" mappingsTempFileName = "mappings.db.tmp" @@ -315,8 +315,9 @@ func (p *persistence) isDirty() bool { // setDirty sets the dirty flag in a goroutine-safe way. Once the dirty flag was // set to true with this method, it cannot be set to false again. (If we became // dirty during our runtime, there is no way back. If we were dirty from the -// start, a clean-up might make us clean again.) -func (p *persistence) setDirty(dirty bool) { +// start, a clean-up might make us clean again.) The provided error will be +// logged as a reason if dirty is true. +func (p *persistence) setDirty(dirty bool, err error) { if dirty { p.dirtyCounter.Inc() } @@ -328,7 +329,7 @@ func (p *persistence) setDirty(dirty bool) { p.dirty = dirty if dirty { p.becameDirty = true - log.Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") + log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.") } } @@ -365,8 +366,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { defer func() { if err != nil { - log.Error("Error persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method persistChunks: %s", err)) } }() @@ -435,8 +435,13 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse return nil, err } for c := 0; c < batchSize; c++ { - chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) - chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) + chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + if err != nil { + return nil, err + } + if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + return nil, err + } chunks = append(chunks, chunk) } } @@ -464,7 +469,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ return nil, err } if fi.Size()%int64(chunkLenWithHeader) != 0 { - p.setDirty(true) + // The returned error will bubble up and lead to quarantining of the whole series. return nil, fmt.Errorf( "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", fp, fi.Size(), chunkLenWithHeader, @@ -642,7 +647,11 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return } - if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { + lt, err := chunkDesc.lastTime() + if err != nil { + return + } + if _, err = codable.EncodeVarint(w, int64(lt)); err != nil { return } } else { @@ -748,8 +757,7 @@ func (p *persistence) dropAndPersistChunks( // please handle with care! defer func() { if err != nil { - log.Error("Error dropping and/or persisting chunks: ", err) - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method dropAndPersistChunks: %s", err)) } }() @@ -758,7 +766,15 @@ func (p *persistence) dropAndPersistChunks( // too old. If that's the case, the chunks in the series file // are all too old, too. i := 0 - for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ { + for ; i < len(chunks); i++ { + var lt model.Time + lt, err = chunks[i].newIterator().lastTimestamp() + if err != nil { + return + } + if !lt.Before(beforeTime) { + break + } } if i < len(chunks) { firstTimeNotDropped = chunks[i].firstTime() @@ -911,6 +927,44 @@ func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) { return numChunks, nil } +// quarantineSeriesFile moves a series file to the orphaned directory. It also +// writes a hint file with the provided quarantine reason and, if series is +// non-nil, the string representation of the metric. +func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error { + var ( + oldName = p.fileNameForFingerprint(fp) + orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName))) + newName = filepath.Join(orphanedDir, filepath.Base(oldName)) + hintName = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix + ) + + renameErr := os.MkdirAll(orphanedDir, 0700) + if renameErr != nil { + return renameErr + } + renameErr = os.Rename(oldName, newName) + if os.IsNotExist(renameErr) { + // Source file dosn't exist. That's normal. + renameErr = nil + } + // Write hint file even if the rename ended in an error. At least try... + // And ignore errors writing the hint file. It's best effort. + if f, err := os.Create(hintName); err == nil { + if metric != nil { + f.WriteString(metric.String() + "\n") + } else { + f.WriteString("[UNKNOWN METRIC]\n") + } + if quarantineReason != nil { + f.WriteString(quarantineReason.Error() + "\n") + } else { + f.WriteString("[UNKNOWN REASON]\n") + } + f.Close() + } + return renameErr +} + // seriesFileModTime returns the modification time of the series file belonging // to the provided fingerprint. In case of an error, the zero value of time.Time // is returned. @@ -962,11 +1016,11 @@ func (p *persistence) archiveMetric( fp model.Fingerprint, m model.Metric, first, last model.Time, ) error { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { - p.setDirty(true) + p.setDirty(true, err) return err } return nil @@ -979,6 +1033,9 @@ func (p *persistence) hasArchivedMetric(fp model.Fingerprint) ( hasMetric bool, firstTime, lastTime model.Time, err error, ) { firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) + if err != nil { + p.setDirty(true, err) + } return } @@ -1027,7 +1084,7 @@ func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { defer func() { if err != nil { - p.setDirty(true) + p.setDirty(true, fmt.Errorf("error in method purgeArchivedMetric: %s", err)) } }() @@ -1058,12 +1115,8 @@ func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) { // was actually deleted, the method returns true and the first time and last // time of the deleted metric. The caller must have locked the fingerprint. func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) { - defer func() { - if err != nil { - p.setDirty(true) - } - }() - + // An error returned here will bubble up and lead to quarantining of the + // series, so no setDirty required. deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)) if err != nil || !deleted { return false, err @@ -1119,17 +1172,17 @@ func (p *persistence) close() error { func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen]) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen]) } func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) + return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) { @@ -1162,19 +1215,19 @@ func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, e } func (p *persistence) headsFileName() string { - return path.Join(p.basePath, headsFileName) + return filepath.Join(p.basePath, headsFileName) } func (p *persistence) headsTempFileName() string { - return path.Join(p.basePath, headsTempFileName) + return filepath.Join(p.basePath, headsTempFileName) } func (p *persistence) mappingsFileName() string { - return path.Join(p.basePath, mappingsFileName) + return filepath.Join(p.basePath, mappingsFileName) } func (p *persistence) mappingsTempFileName() string { - return path.Join(p.basePath, mappingsTempFileName) + return filepath.Join(p.basePath, mappingsTempFileName) } func (p *persistence) processIndexingQueue() { @@ -1456,7 +1509,9 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { b = b[:writeSize] for i, chunk := range chunks[:batchSize] { - writeChunkHeader(b[i*chunkLenWithHeader:], chunk) + if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { + return err + } if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { return err } @@ -1482,14 +1537,19 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(header []byte, c chunk) { +func writeChunkHeader(header []byte, c chunk) error { header[chunkHeaderTypeOffset] = byte(c.encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()), ) + lt, err := c.newIterator().lastTimestamp() + if err != nil { + return err + } binary.LittleEndian.PutUint64( header[chunkHeaderLastTimeOffset:], - uint64(c.newIterator().lastTimestamp()), + uint64(lt), ) + return nil } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 47cf9078a..e1894032a 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -14,6 +14,10 @@ package local import ( + "bufio" + "errors" + "os" + "path/filepath" "reflect" "sync" "testing" @@ -49,7 +53,7 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes }) } -func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { +func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk { fps := model.Fingerprints{ m1.FastFingerprint(), m2.FastFingerprint(), @@ -60,10 +64,18 @@ func buildTestChunks(encoding chunkEncoding) map[model.Fingerprint][]chunk { for _, fp := range fps { fpToChunks[fp] = make([]chunk, 0, 10) for i := 0; i < 10; i++ { - fpToChunks[fp] = append(fpToChunks[fp], newChunkForEncoding(encoding).add(&model.SamplePair{ + ch, err := newChunkForEncoding(encoding) + if err != nil { + t.Fatal(err) + } + chs, err := ch.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(fp), - })[0]) + }) + if err != nil { + t.Fatal(err) + } + fpToChunks[fp] = append(fpToChunks[fp], chs[0]) } } return fpToChunks @@ -73,7 +85,7 @@ func chunksEqual(c1, c2 chunk) bool { values2 := c2.newIterator().values() for v1 := range c1.newIterator().values() { v2 := <-values2 - if !v1.Equal(v2) { + if !(v1 == v2) { return false } } @@ -84,7 +96,7 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() - fpToChunks := buildTestChunks(encoding) + fpToChunks := buildTestChunks(t, encoding) for fp, chunks := range fpToChunks { firstTimeNotDropped, offset, numDropped, allDropped, err := @@ -126,10 +138,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 10) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -140,10 +156,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { t.Errorf("Got %d chunkDescs, want %d.", len(actualChunkDescs), 5) } for i, cd := range actualChunkDescs { - if cd.firstTime() != model.Time(i) || cd.lastTime() != model.Time(i) { + lastTime, err := cd.lastTime() + if err != nil { + t.Fatal(err) + } + if cd.firstTime() != model.Time(i) || lastTime != model.Time(i) { t.Errorf( "Want ts=%v, got firstTime=%v, lastTime=%v.", - i, cd.firstTime(), cd.lastTime(), + i, cd.firstTime(), lastTime, ) } @@ -433,21 +453,21 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding fpLocker := newFingerprintLocker(10) sm := newSeriesMap() - s1 := newMemorySeries(m1, nil, time.Time{}) - s2 := newMemorySeries(m2, nil, time.Time{}) - s3 := newMemorySeries(m3, nil, time.Time{}) - s4 := newMemorySeries(m4, nil, time.Time{}) - s5 := newMemorySeries(m5, nil, time.Time{}) - s1.add(&model.SamplePair{Timestamp: 1, Value: 3.14}) - s3.add(&model.SamplePair{Timestamp: 2, Value: 2.7}) + s1, _ := newMemorySeries(m1, nil, time.Time{}) + s2, _ := newMemorySeries(m2, nil, time.Time{}) + s3, _ := newMemorySeries(m3, nil, time.Time{}) + s4, _ := newMemorySeries(m4, nil, time.Time{}) + s5, _ := newMemorySeries(m5, nil, time.Time{}) + s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) + s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true s3.persistWatermark = 1 for i := 0; i < 10000; i++ { - s4.add(&model.SamplePair{ + s4.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i) / 2, }) - s5.add(&model.SamplePair{ + s5.add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i * i), }) @@ -552,10 +572,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, lastTime, cd.chunkLastTime, ) } } @@ -605,10 +629,14 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() { + lastTime, err := cd.c.newIterator().lastTimestamp() + if err != nil { + t.Fatal(err) + } + if cd.chunkLastTime != lastTime { t.Errorf( "chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d", - i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime, + i, cd.chunkLastTime, lastTime, ) } } @@ -1051,6 +1079,108 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } } +func TestQuranatineSeriesFile(t *testing.T) { + p, closer := newTestPersistence(t, 1) + defer closer.Close() + + verify := func(fp model.Fingerprint, seriesFileShouldExist bool, contentHintFile ...string) { + var ( + fpStr = fp.String() + originalFile = p.fileNameForFingerprint(fp) + quarantinedFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) + hintFile = filepath.Join(p.basePath, "orphaned", fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+hintFileSuffix) + ) + if _, err := os.Stat(originalFile); !os.IsNotExist(err) { + t.Errorf("Expected file %q to not exist.", originalFile) + } + if _, err := os.Stat(quarantinedFile); (os.IsNotExist(err) && seriesFileShouldExist) || (err == nil && !seriesFileShouldExist) { + t.Errorf("Unexpected state of quarantined file %q. Expected it to exist: %t. os.Stat returned: %s.", quarantinedFile, seriesFileShouldExist, err) + } + f, err := os.Open(hintFile) + defer f.Close() + if err != nil { + t.Errorf("Could not open hint file %q: %s", hintFile, err) + return + } + scanner := bufio.NewScanner(f) + for _, want := range contentHintFile { + if !scanner.Scan() { + t.Errorf("Unexpected end of hint file %q.", hintFile) + return + } + got := scanner.Text() + if want != got { + t.Errorf("Want hint line %q, got %q.", want, got) + } + } + if scanner.Scan() { + t.Errorf("Unexpected spurious content in hint file %q: %q", hintFile, scanner.Text()) + } + } + + if err := p.quarantineSeriesFile(0, nil, nil); err != nil { + t.Error(err) + } + verify(0, false, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 1, errors.New("file does not exist"), + nil, + ); err != nil { + t.Error(err) + } + verify(1, false, "[UNKNOWN METRIC]", "file does not exist") + + if err := p.quarantineSeriesFile( + 2, errors.New("file does not exist"), + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(2, false, `{dings="bums", foo="bar"}`, "file does not exist") + + if err := p.quarantineSeriesFile( + 3, nil, + model.Metric{"foo": "bar", "dings": "bums"}, + ); err != nil { + t.Error(err) + } + verify(3, false, `{dings="bums", foo="bar"}`, "[UNKNOWN REASON]") + + err := os.Mkdir(filepath.Join(p.basePath, "00"), os.ModePerm) + if err != nil { + t.Fatal(err) + } + f, err := os.Create(p.fileNameForFingerprint(4)) + if err != nil { + t.Fatal(err) + } + f.Close() + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + verify(4, true, `{sound="cloud"}`, "file exists") + + if err := p.quarantineSeriesFile(4, nil, nil); err != nil { + t.Error(err) + } + // Overwrites hint file but leaves series file intact. + verify(4, true, "[UNKNOWN METRIC]", "[UNKNOWN REASON]") + + if err := p.quarantineSeriesFile( + 4, errors.New("file exists"), + model.Metric{"sound": "cloud"}, + ); err != nil { + t.Error(err) + } + // Overwrites everything. + verify(4, true, `{sound="cloud"}`, "file exists") +} + var fpStrings = []string{ "b004b821ca50ba26", "b037c21e884e4fc5", diff --git a/storage/local/preload.go b/storage/local/preload.go index 08a88875f..fb6a21f64 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -13,7 +13,11 @@ package local -import "github.com/prometheus/common/model" +import ( + "time" + + "github.com/prometheus/common/model" +) // memorySeriesPreloader is a Preloader for the memorySeriesStorage. type memorySeriesPreloader struct { @@ -25,13 +29,20 @@ type memorySeriesPreloader struct { func (p *memorySeriesPreloader) PreloadRange( fp model.Fingerprint, from model.Time, through model.Time, -) (SeriesIterator, error) { - cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) - if err != nil { - return nil, err - } +) SeriesIterator { + cds, iter := p.storage.preloadChunksForRange(fp, from, through) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter, nil + return iter +} + +// PreloadInstant implements Preloader +func (p *memorySeriesPreloader) PreloadInstant( + fp model.Fingerprint, + timestamp model.Time, stalenessDelta time.Duration, +) SeriesIterator { + cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp) + p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) + return iter } // Close implements Preloader. diff --git a/storage/local/series.go b/storage/local/series.go index 6943e925a..51ad865a9 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -162,9 +162,15 @@ type memorySeries struct { // first chunk before its chunk desc is evicted. In doubt, this field is // just set to the oldest possible timestamp. savedFirstTime model.Time - // The timestamp of the last sample in this series. Needed for fast access to - // ensure timestamp monotonicity during ingestion. + // The timestamp of the last sample in this series. Needed for fast + // access for federation and to ensure timestamp monotonicity during + // ingestion. lastTime model.Time + // The last ingested sample value. Needed for fast access for + // federation. + lastSampleValue model.SampleValue + // Whether lastSampleValue has been set already. + lastSampleValueSet bool // Whether the current head chunk has already been finished. If true, // the current head chunk must not be modified anymore. headChunkClosed bool @@ -185,12 +191,15 @@ type memorySeries struct { // set to model.Earliest. The zero value for modTime can be used if the // modification time of the series file is unknown (e.g. if this is a genuinely // new series). -func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) *memorySeries { +func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) { + var err error firstTime := model.Earliest lastTime := model.Earliest if len(chunkDescs) > 0 { firstTime = chunkDescs[0].firstTime() - lastTime = chunkDescs[len(chunkDescs)-1].lastTime() + if lastTime, err = chunkDescs[len(chunkDescs)-1].lastTime(); err != nil { + return nil, err + } } return &memorySeries{ metric: m, @@ -200,14 +209,14 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) lastTime: lastTime, persistWatermark: len(chunkDescs), modTime: modTime, - } + }, nil } // add adds a sample pair to the series. It returns the number of newly // completed chunks (which are now eligible for persistence). // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v *model.SamplePair) int { +func (s *memorySeries) add(v model.SamplePair) (int, error) { if len(s.chunkDescs) == 0 || s.headChunkClosed { newHead := newChunkDesc(newChunk(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) @@ -229,7 +238,10 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.headChunkUsedByIterator = false } - chunks := s.head().add(v) + chunks, err := s.head().add(v) + if err != nil { + return 0, err + } s.head().c = chunks[0] for _, c := range chunks[1:] { @@ -242,7 +254,9 @@ func (s *memorySeries) add(v *model.SamplePair) int { } s.lastTime = v.Timestamp - return len(chunks) - 1 + s.lastSampleValue = v.Value + s.lastSampleValueSet = true + return len(chunks) - 1, nil } // maybeCloseHeadChunk closes the head chunk if it has not been touched for the @@ -287,10 +301,14 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { // dropChunks removes chunkDescs older than t. The caller must have locked the // fingerprint of the series. -func (s *memorySeries) dropChunks(t model.Time) { +func (s *memorySeries) dropChunks(t model.Time) error { keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { - if !cd.lastTime().Before(t) { + lt, err := cd.lastTime() + if err != nil { + return err + } + if !lt.Before(t) { keepIdx = i break } @@ -310,6 +328,7 @@ func (s *memorySeries) dropChunks(t model.Time) { numMemChunkDescs.Sub(float64(keepIdx)) s.dirty = true } + return nil } // preloadChunks is an internal helper method. @@ -350,8 +369,12 @@ func (s *memorySeries) preloadChunks( s.headChunkUsedByIterator = true } + curriedQuarantineSeries := func(err error) { + mss.quarantineSeries(fp, s.metric, err) + } + iter := &boundedIterator{ - it: s.newIterator(pinnedChunkDescs), + it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries), start: model.Now().Add(-mss.dropAfter), } @@ -359,9 +382,10 @@ func (s *memorySeries) preloadChunks( } // newIterator returns a new SeriesIterator for the provided chunkDescs (which -// must be pinned). The caller must have locked the fingerprint of the -// memorySeries. -func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator { +// must be pinned). +// +// The caller must have locked the fingerprint of the memorySeries. +func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator { chunks := make([]chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the @@ -369,16 +393,45 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator chunks = append(chunks, cd.c) } return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + quarantine: quarantine, } } +// preloadChunksForInstant preloads chunks for the latest value in the given +// range. If the last sample saved in the memorySeries itself is the latest +// value in the given range, it will in fact preload zero chunks and just take +// that value. +func (s *memorySeries) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, + mss *memorySeriesStorage, +) ([]*chunkDesc, SeriesIterator, error) { + // If we have a lastSamplePair in the series, and thas last samplePair + // is in the interval, just take it in a singleSampleSeriesIterator. No + // need to pin or load anything. + lastSample := s.lastSamplePair() + if !through.Before(lastSample.Timestamp) && + !from.After(lastSample.Timestamp) && + lastSample != ZeroSamplePair { + iter := &boundedIterator{ + it: &singleSampleSeriesIterator{samplePair: lastSample}, + start: model.Now().Add(-mss.dropAfter), + } + return nil, iter, nil + } + // If we are here, we are out of luck and have to delegate to the more + // expensive method. + return s.preloadChunksForRange(fp, from, through, mss) +} + // preloadChunksForRange loads chunks for the given range from the persistence. // The caller must have locked the fingerprint of the series. func (s *memorySeries) preloadChunksForRange( + fp model.Fingerprint, from model.Time, through model.Time, - fp model.Fingerprint, mss *memorySeriesStorage, + mss *memorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { @@ -410,7 +463,11 @@ func (s *memorySeries) preloadChunksForRange( if fromIdx == len(s.chunkDescs) { // Even the last chunk starts before "from". Find out if the // series ends before "from" and we don't need to do anything. - if s.chunkDescs[len(s.chunkDescs)-1].lastTime().Before(from) { + lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() + if err != nil { + return nil, nopIter, err + } + if lt.Before(from) { return nil, nopIter, nil } } @@ -435,8 +492,9 @@ func (s *memorySeries) head() *chunkDesc { return s.chunkDescs[len(s.chunkDescs)-1] } -// firstTime returns the timestamp of the first sample in the series. The caller -// must have locked the fingerprint of the memorySeries. +// firstTime returns the timestamp of the first sample in the series. +// +// The caller must have locked the fingerprint of the memorySeries. func (s *memorySeries) firstTime() model.Time { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { return s.chunkDescs[0].firstTime() @@ -444,6 +502,23 @@ func (s *memorySeries) firstTime() model.Time { return s.savedFirstTime } +// lastSamplePair returns the last ingested SamplePair. It returns +// ZeroSamplePair if this memorySeries has never received a sample (via the add +// method), which is the case for freshly unarchived series or newly created +// ones and also for all series after a server restart. However, in that case, +// series will most likely be considered stale anyway. +// +// The caller must have locked the fingerprint of the memorySeries. +func (s *memorySeries) lastSamplePair() model.SamplePair { + if !s.lastSampleValueSet { + return ZeroSamplePair + } + return model.SamplePair{ + Timestamp: s.lastTime, + Value: s.lastSampleValue, + } +} + // chunksToPersist returns a slice of chunkDescs eligible for persistence. It's // the caller's responsibility to actually persist the returned chunks // afterwards. The method sets the persistWatermark and the dirty flag @@ -466,20 +541,33 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIts []chunkIterator // Caches chunkIterators. - chunks []chunk + chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. + chunkIts []chunkIterator // Caches chunkIterators. + chunks []chunk + quarantine func(error) // Call to quarantine the series this iterator belongs to. } // ValueAtOrBeforeTime implements SeriesIterator. func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. - if it.chunkIt != nil && it.chunkIt.contains(t) { - return it.chunkIt.valueAtOrBeforeTime(t) + if it.chunkIt != nil { + containsT, err := it.chunkIt.contains(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + if containsT { + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair + } + return value + } } if len(it.chunks) == 0 { - return model.SamplePair{Timestamp: model.Earliest} + return ZeroSamplePair } // Find the last chunk where firstTime() is before or equal to t. @@ -489,75 +577,15 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa }) if i == len(it.chunks) { // Even the first chunk starts after t. - return model.SamplePair{Timestamp: model.Earliest} + return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - return it.chunkIt.valueAtOrBeforeTime(t) -} - -// BoundaryValues implements SeriesIterator. -func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - // Find the first chunk for which the first sample is within the interval. - i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].firstTime().Before(in.OldestInclusive) - }) - // Only now check the last timestamp of the previous chunk (which is - // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- + value, err := it.chunkIt.valueAtOrBeforeTime(t) + if err != nil { + it.quarantine(err) + return ZeroSamplePair } - - values := make([]model.SamplePair, 0, 2) - for j, c := range it.chunks[i:] { - if c.firstTime().After(in.NewestInclusive) { - if len(values) == 1 { - // We found the first value before but are now - // already past the last value. The value we - // want must be the last value of the previous - // chunk. So backtrack... - chunkIt := it.chunkIterator(i + j - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - break - } - chunkIt := it.chunkIterator(i + j) - if len(values) == 0 { - for s := range chunkIt.values() { - if len(values) == 0 && !s.Timestamp.Before(in.OldestInclusive) { - values = append(values, *s) - // We cannot just break out here as we have to consume all - // the values to not leak a goroutine. This could obviously - // be made much neater with more suitable methods in the chunk - // interface. But currently, BoundaryValues is only used by - // `predict_linear` so we would pollute the chunk interface - // unduly just for one single corner case. Plus, even that use - // of BoundaryValues is suboptimal and should be replaced. - } - } - } - if chunkIt.lastTimestamp().After(in.NewestInclusive) { - s := chunkIt.valueAtOrBeforeTime(in.NewestInclusive) - if s.Timestamp != model.Earliest { - values = append(values, s) - } - break - } - } - if len(values) == 1 { - // We found exactly one value. In that case, add the most recent we know. - chunkIt := it.chunkIterator(len(it.chunks) - 1) - values = append(values, model.SamplePair{ - Timestamp: chunkIt.lastTimestamp(), - Value: chunkIt.lastSampleValue(), - }) - } - if len(values) == 2 && values[0].Equal(&values[1]) { - return values[:1] - } - return values + return value } // RangeValues implements SeriesIterator. @@ -568,8 +596,15 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa }) // Only now check the last timestamp of the previous chunk (which is // fairly expensive). - if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) { - i-- + if i > 0 { + lt, err := it.chunkIterator(i - 1).lastTimestamp() + if err != nil { + it.quarantine(err) + return nil + } + if !lt.Before(in.OldestInclusive) { + i-- + } } values := []model.SamplePair{} @@ -577,7 +612,12 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa if c.firstTime().After(in.NewestInclusive) { break } - values = append(values, it.chunkIterator(i+j).rangeValues(in)...) + chValues, err := it.chunkIterator(i + j).rangeValues(in) + if err != nil { + it.quarantine(err) + return nil + } + values = append(values, chValues...) } return values } @@ -593,17 +633,36 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { return chunkIt } +// singleSampleSeriesIterator implements Series Iterator. It is a "shortcut +// iterator" that returns a single samplee only. The sample is saved in the +// iterator itself, so no chunks need to be pinned. +type singleSampleSeriesIterator struct { + samplePair model.SamplePair +} + +// ValueAtTime implements SeriesIterator. +func (it *singleSampleSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { + if it.samplePair.Timestamp.After(t) { + return ZeroSamplePair + } + return it.samplePair +} + +// RangeValues implements SeriesIterator. +func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { + if it.samplePair.Timestamp.After(in.NewestInclusive) || + it.samplePair.Timestamp.Before(in.OldestInclusive) { + return []model.SamplePair{} + } + return []model.SamplePair{it.samplePair} +} + // nopSeriesIterator implements Series Iterator. It never returns any values. type nopSeriesIterator struct{} // ValueAtTime implements SeriesIterator. func (i nopSeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { - return model.SamplePair{Timestamp: model.Earliest} -} - -// BoundaryValues implements SeriesIterator. -func (i nopSeriesIterator) BoundaryValues(in metric.Interval) []model.SamplePair { - return []model.SamplePair{} + return ZeroSamplePair } // RangeValues implements SeriesIterator. diff --git a/storage/local/storage.go b/storage/local/storage.go index 36cef69ea..ede1bfc61 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -30,8 +30,9 @@ import ( ) const ( - evictRequestsCap = 1024 - chunkLen = 1024 + evictRequestsCap = 1024 + quarantineRequestsCap = 1024 + chunkLen = 1024 // See waitForNextFP. fpMaxSweepTime = 6 * time.Hour @@ -77,6 +78,12 @@ type evictRequest struct { evict bool } +type quarantineRequest struct { + fp model.Fingerprint + metric model.Metric + reason error +} + // SyncStrategy is an enum to select a sync strategy for series files. type SyncStrategy int @@ -147,6 +154,9 @@ type memorySeriesStorage struct { evictRequests chan evictRequest evictStopping, evictStopped chan struct{} + quarantineRequests chan quarantineRequest + quarantineStopping, quarantineStopped chan struct{} + persistErrors prometheus.Counter numSeries prometheus.Gauge seriesOps *prometheus.CounterVec @@ -198,6 +208,10 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { evictStopping: make(chan struct{}), evictStopped: make(chan struct{}), + quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap), + quarantineStopping: make(chan struct{}), + quarantineStopped: make(chan struct{}), + persistErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -312,6 +326,7 @@ func (s *memorySeriesStorage) Start() (err error) { } go s.handleEvictList() + go s.handleQuarantine() go s.logThrottling() go s.loop() @@ -326,6 +341,10 @@ func (s *memorySeriesStorage) Stop() error { close(s.loopStopping) <-s.loopStopped + log.Info("Stopping series quarantining...") + close(s.quarantineStopping) + <-s.quarantineStopped + log.Info("Stopping chunk eviction...") close(s.evictStopping) <-s.evictStopped @@ -348,15 +367,15 @@ func (s *memorySeriesStorage) WaitForIndexing() { } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) *model.SamplePair { +func (s *memorySeriesStorage) LastSamplePairForFingerprint(fp model.Fingerprint) model.SamplePair { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series, ok := s.fpToSeries.get(fp) if !ok { - return nil + return ZeroSamplePair } - return series.head().lastSamplePair() + return series.lastSamplePair() } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -369,22 +388,11 @@ type boundedIterator struct { // ValueAtOrBeforeTime implements the SeriesIterator interface. func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair { if ts < bit.start { - return model.SamplePair{Timestamp: model.Earliest} + return ZeroSamplePair } return bit.it.ValueAtOrBeforeTime(ts) } -// BoundaryValues implements the SeriesIterator interface. -func (bit *boundedIterator) BoundaryValues(interval metric.Interval) []model.SamplePair { - if interval.NewestInclusive < bit.start { - return []model.SamplePair{} - } - if interval.OldestInclusive < bit.start { - interval.OldestInclusive = bit.start - } - return bit.it.BoundaryValues(interval) -} - // RangeValues implements the SeriesIterator interface. func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair { if interval.NewestInclusive < bit.start { @@ -532,22 +540,7 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp model.Fingerprint) metric. // DropMetric implements Storage. func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { - s.fpLocker.Lock(fp) - - if series, ok := s.fpToSeries.get(fp); ok { - s.fpToSeries.del(fp) - s.numSeries.Dec() - s.persistence.unindexMetric(fp, series.metric) - } else if err := s.persistence.purgeArchivedMetric(fp); err != nil { - log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) - } - // Attempt to delete series file in any case. - if _, err := s.persistence.deleteSeriesFile(fp); err != nil { - log.Errorf("Error deleting series file for %v: %v", fp, err) - } - - s.fpLocker.Unlock(fp) - s.seriesOps.WithLabelValues(requestedPurge).Inc() + s.purgeSeries(fp, nil, nil) } } @@ -565,34 +558,44 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { rawFP := sample.Metric.FastFingerprint() s.fpLocker.Lock(rawFP) fp, err := s.mapper.mapFP(rawFP, sample.Metric) + defer func() { + s.fpLocker.Unlock(fp) + }() // Func wrapper because fp might change below. if err != nil { - log.Errorf("Error while mapping fingerprint %v: %v", rawFP, err) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("error while mapping fingerprint %v: %s", rawFP, err)) + return err } if fp != rawFP { // Switch locks. s.fpLocker.Unlock(rawFP) s.fpLocker.Lock(fp) } - series := s.getOrCreateSeries(fp, sample.Metric) + series, err := s.getOrCreateSeries(fp, sample.Metric) + if err != nil { + return err // getOrCreateSeries took care of quarantining already. + } if sample.Timestamp <= series.lastTime { - s.fpLocker.Unlock(fp) - // Don't log and track equal timestamps, as they are a common occurrence - // when using client-side timestamps (e.g. Pushgateway or federation). - // It would be even better to also compare the sample values here, but - // we don't have efficient access to a series's last value. - if sample.Timestamp != series.lastTime { - s.outOfOrderSamplesCount.Inc() - return ErrOutOfOrderSample + // Don't report "no-op appends", i.e. where timestamp and sample + // value are the same as for the last append, as they are a + // common occurrence when using client-side timestamps + // (e.g. Pushgateway or federation). + if sample.Timestamp == series.lastTime && + series.lastSampleValueSet && + sample.Value == series.lastSampleValue { + return nil } - return nil + s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample // Caused by the caller. } - completedChunksCount := series.add(&model.SamplePair{ + completedChunksCount, err := series.add(model.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, }) - s.fpLocker.Unlock(fp) + if err != nil { + s.quarantineSeries(fp, sample.Metric, err) + return err + } s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) @@ -653,7 +656,7 @@ func (s *memorySeriesStorage) logThrottling() { } } -func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) *memorySeries { +func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { var cds []*chunkDesc @@ -661,6 +664,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err) + return nil, err } if unarchived { s.seriesOps.WithLabelValues(unarchive).Inc() @@ -671,7 +675,8 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me // appear as archived or purged). cds, err = s.loadChunkDescs(fp, 0) if err != nil { - log.Errorf("Error loading chunk descs for fingerprint %v (metric %v): %v", fp, m, err) + s.quarantineSeries(fp, m, err) + return nil, err } modTime = s.persistence.seriesFileModTime(fp) } else { @@ -679,41 +684,87 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me s.persistence.indexMetric(fp, m) s.seriesOps.WithLabelValues(create).Inc() } - series = newMemorySeries(m, cds, modTime) + series, err = newMemorySeries(m, cds, modTime) + if err != nil { + s.quarantineSeries(fp, m, err) + return nil, err + } s.fpToSeries.put(fp, series) s.numSeries.Inc() } + return series, nil +} + +// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. +func (s *memorySeriesStorage) getSeriesForRange( + fp model.Fingerprint, + from model.Time, through model.Time, +) *memorySeries { + series, ok := s.fpToSeries.get(fp) + if ok { + return series + } + has, first, last, err := s.persistence.hasArchivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + if !has { + s.invalidPreloadRequestsCount.Inc() + return nil + } + if last.Before(from) || first.After(through) { + return nil + } + metric, err := s.persistence.archivedMetric(fp) + if err != nil { + log.With("fingerprint", fp).With("error", err).Error("Archive index error while preloading chunks.") + return nil + } + series, err = s.getOrCreateSeries(fp, metric) + if err != nil { + // getOrCreateSeries took care of quarantining already. + return nil + } return series } func (s *memorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, -) ([]*chunkDesc, SeriesIterator, error) { +) ([]*chunkDesc, SeriesIterator) { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) - series, ok := s.fpToSeries.get(fp) - if !ok { - has, first, last, err := s.persistence.hasArchivedMetric(fp) - if err != nil { - return nil, nopIter, err - } - if !has { - s.invalidPreloadRequestsCount.Inc() - return nil, nopIter, nil - } - if from.Before(last) && through.After(first) { - metric, err := s.persistence.archivedMetric(fp) - if err != nil { - return nil, nopIter, err - } - series = s.getOrCreateSeries(fp, metric) - } else { - return nil, nopIter, nil - } + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter } - return series.preloadChunksForRange(from, through, fp, s) + cds, iter, err := series.preloadChunksForRange(fp, from, through, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, iter +} + +func (s *memorySeriesStorage) preloadChunksForInstant( + fp model.Fingerprint, + from model.Time, through model.Time, +) ([]*chunkDesc, SeriesIterator) { + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + + series := s.getSeriesForRange(fp, from, through) + if series == nil { + return nil, nopIter + } + cds, iter, err := series.preloadChunksForInstant(fp, from, through, s) + if err != nil { + s.quarantineSeries(fp, series.metric, err) + return nil, nopIter + } + return cds, iter } func (s *memorySeriesStorage) handleEvictList() { @@ -1129,7 +1180,10 @@ func (s *memorySeriesStorage) writeMemorySeries( s.persistErrors.Inc() return false } - series.dropChunks(beforeTime) + if err := series.dropChunks(beforeTime); err != nil { + s.persistErrors.Inc() + return false + } if len(series.chunkDescs) == 0 && allDroppedFromPersistence { // All chunks dropped from both memory and persistence. Delete the series for good. s.fpToSeries.del(fp) @@ -1144,8 +1198,7 @@ func (s *memorySeriesStorage) writeMemorySeries( } else { series.chunkDescsOffset -= numDroppedFromPersistence if series.chunkDescsOffset < 0 { - log.Errorf("Dropped more chunks from persistence than from memory for fingerprint %v, series %v.", fp, series) - s.persistence.setDirty(true) + s.persistence.setDirty(true, fmt.Errorf("dropped more chunks from persistence than from memory for fingerprint %v, series %v", fp, series)) series.chunkDescsOffset = -1 // Makes sure it will be looked at during crash recovery. } } @@ -1299,6 +1352,122 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { return score } +// quarantineSeries registers the provided fingerprint for quarantining. It +// always returns immediately. Quarantine requests are processed +// asynchronously. If there are too many requests queued, they are simply +// dropped. +// +// Quarantining means that the series file is moved to the orphaned directory, +// and all its traces are removed from indices. Call this method if an +// unrecoverable error is detected while dealing with a series, and pass in the +// encountered error. It will be saved as a hint in the orphaned directory. +func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { + req := quarantineRequest{fp: fp, metric: metric, reason: err} + select { + case s.quarantineRequests <- req: + // Request submitted. + default: + log. + With("fingerprint", fp). + With("metric", metric). + With("reason", err). + Warn("Quarantine queue full. Dropped quarantine request.") + s.seriesOps.WithLabelValues(droppedQuarantine).Inc() + } +} + +func (s *memorySeriesStorage) handleQuarantine() { + for { + select { + case req := <-s.quarantineRequests: + s.purgeSeries(req.fp, req.metric, req.reason) + log. + With("fingerprint", req.fp). + With("metric", req.metric). + With("reason", req.reason). + Warn("Series quarantined.") + case <-s.quarantineStopping: + log.Info("Series quarantining stopped.") + close(s.quarantineStopped) + return + } + } + +} + +// purgeSeries removes all traces of a series. If a non-nil quarantine reason is +// provided, the series file will not be deleted completely, but moved to the +// orphaned directory with the reason and the metric in a hint file. The +// provided metric might be nil if unknown. +func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { + s.fpLocker.Lock(fp) + + var ( + series *memorySeries + ok bool + ) + + if series, ok = s.fpToSeries.get(fp); ok { + s.fpToSeries.del(fp) + s.numSeries.Dec() + m = series.metric + + // Adjust s.numChunksToPersist and numMemChunks down by + // the number of chunks in this series that are not + // persisted yet. Persisted chunks will be deducted from + // numMemChunks upon eviction. + numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark + atomic.AddInt64(&numMemChunks, int64(-numChunksNotYetPersisted)) + if !series.headChunkClosed { + // Head chunk wasn't counted as waiting for persistence yet. + // (But it was counted as a chunk in memory.) + numChunksNotYetPersisted-- + } + s.incNumChunksToPersist(-numChunksNotYetPersisted) + + } else { + if err := s.persistence.purgeArchivedMetric(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error purging metric from archive.") + } + } + if m != nil { + // If we know a metric now, unindex it in any case. + // purgeArchivedMetric might have done so already, but we cannot + // be sure. Unindexing in idempotent, though. + s.persistence.unindexMetric(fp, m) + } + // Attempt to delete/quarantine the series file in any case. + if quarantineReason == nil { + // No reason stated, simply delete the file. + if _, err := s.persistence.deleteSeriesFile(fp); err != nil { + log. + With("fingerprint", fp). + With("metric", m). + With("error", err). + Error("Error deleting series file.") + } + s.seriesOps.WithLabelValues(requestedPurge).Inc() + } else { + if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil { + s.seriesOps.WithLabelValues(completedQurantine).Inc() + } else { + s.seriesOps.WithLabelValues(failedQuarantine).Inc() + log. + With("fingerprint", fp). + With("metric", m). + With("reason", quarantineReason). + With("error", err). + Error("Error quarantining series file.") + } + } + + s.fpLocker.Unlock(fp) +} + // Describe implements prometheus.Collector. func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 4c7034f64..33d1a9918 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -405,10 +405,7 @@ func TestRetentionCutoff(t *testing.T) { defer pl.Close() // Preload everything. - it, err := pl.PreloadRange(fp, insertStart, now) - if err != nil { - t.Fatalf("Error preloading outdated chunks: %s", err) - } + it := pl.PreloadRange(fp, insertStart, now) val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) if val.Timestamp != model.Earliest { @@ -424,14 +421,6 @@ func TestRetentionCutoff(t *testing.T) { if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) } - - vals = it.BoundaryValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) - if len(vals) != 2 { - t.Errorf("expected 2 values but got %d", len(vals)) - } - if expt := now.Add(-1 * time.Hour).Add(time.Minute); vals[0].Timestamp != expt { - t.Errorf("unexpected timestamp for first sample: %v, expected %v", vals[0].Timestamp.Time(), expt.Time()) - } } func TestDropMetrics(t *testing.T) { @@ -500,18 +489,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -533,18 +516,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -557,6 +534,95 @@ func TestDropMetrics(t *testing.T) { } } +func TestQuarantineMetric(t *testing.T) { + now := model.Now() + insertStart := now.Add(-2 * time.Hour) + + s, closer := NewTestStorage(t, 1) + defer closer.Close() + + chunkFileExists := func(fp model.Fingerprint) (bool, error) { + f, err := s.persistence.openChunkFileForReading(fp) + if err == nil { + f.Close() + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} + m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} + m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} + + N := 120000 + + for j, m := range []model.Metric{m1, m2, m3} { + for i := 0; i < N; i++ { + smpl := &model.Sample{ + Metric: m, + Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals. + Value: model.SampleValue(j), + } + s.Append(smpl) + } + } + s.WaitForIndexing() + + // Archive m3, but first maintain it so that at least something is written to disk. + fpToBeArchived := m3.FastFingerprint() + s.maintainMemorySeries(fpToBeArchived, 0) + s.fpLocker.Lock(fpToBeArchived) + s.fpToSeries.del(fpToBeArchived) + if err := s.persistence.archiveMetric( + fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond), + ); err != nil { + t.Error(err) + } + s.fpLocker.Unlock(fpToBeArchived) + + // Corrupt the series file for m3. + f, err := os.Create(s.persistence.fileNameForFingerprint(fpToBeArchived)) + if err != nil { + t.Fatal(err) + } + if _, err := f.WriteString("This is clearly not the content of a series file."); err != nil { + t.Fatal(err) + } + if f.Close(); err != nil { + t.Fatal(err) + } + + fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) + if len(fps) != 3 { + t.Errorf("unexpected number of fingerprints: %d", len(fps)) + } + + pl := s.NewPreloader() + // This will access the corrupt file and lead to quarantining. + pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute) + pl.Close() + time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. + s.WaitForIndexing() + + fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ + Name: model.MetricNameLabel, Value: "test", + }) + if len(fps2) != 2 { + t.Errorf("unexpected number of fingerprints: %d", len(fps2)) + } + + exists, err := chunkFileExists(fpToBeArchived) + if err != nil { + t.Fatal(err) + } + if exists { + t.Errorf("chunk file exists for fp=%v", fpToBeArchived) + } +} + // TestLoop is just a smoke test for the loop method, if we can switch it on and // off without disaster. func TestLoop(t *testing.T) { @@ -627,7 +693,10 @@ func testChunk(t *testing.T, encoding chunkEncoding) { continue } for sample := range cd.c.newIterator().values() { - values = append(values, *sample) + if sample.error != nil { + t.Error(sample.error) + } + values = append(values, sample.SamplePair) } } @@ -670,10 +739,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -747,10 +813,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -828,10 +891,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -983,10 +1043,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - b.Fatalf("Error preloading everything: %s", err) - } + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -1032,32 +1089,26 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } - actual := it.BoundaryValues(metric.Interval{ + _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) - if len(actual) != 2 { - t.Fatal("expected two results after purging half of series") + if len(actual) < 4000 { + t.Fatalf("expected more than %d results after purging half of series, got %d", 4000, len(actual)) } if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 { t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp) } want := model.Time(19998) - if actual[1].Timestamp != want { + if actual[len(actual)-1].Timestamp != want { t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) } // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest) - if err != nil { - t.Fatalf("Error preloading everything: %s", err) - } - actual = it.BoundaryValues(metric.Interval{ + _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, }) @@ -1082,8 +1133,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err := series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1133,8 +1188,12 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Archive metrics. s.fpToSeries.del(fp) + lastTime, err = series.head().lastTime() + if err != nil { + t.Fatal(err) + } if err := s.persistence.archiveMetric( - fp, series.metric, series.firstTime(), series.head().lastTime(), + fp, series.metric, series.firstTime(), lastTime, ); err != nil { t.Fatal(err) } @@ -1528,10 +1587,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples model.Samples, t.Fatal(err) } p := s.NewPreloader() - it, err := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) - if err != nil { - t.Fatal(err) - } + it := p.PreloadRange(fp, sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) if found.Timestamp == model.Earliest { t.Errorf("Sample %#v: Expected sample not found.", sample) @@ -1575,10 +1631,7 @@ func TestAppendOutOfOrder(t *testing.T) { pl := s.NewPreloader() defer pl.Close() - it, err := pl.PreloadRange(fp, 0, 2) - if err != nil { - t.Fatalf("Error preloading chunks: %s", err) - } + it := pl.PreloadRange(fp, 0, 2) want := []model.SamplePair{ { diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index e739142b4..6d92361a0 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -29,7 +29,6 @@ const ( ResultAppendTime QueryAnalysisTime GetValueAtTimeTime - GetBoundaryValuesTime GetRangeValuesTime ExecQueueTime ViewDiskPreparationTime @@ -60,8 +59,6 @@ func (s QueryTiming) String() string { return "Query analysis time" case GetValueAtTimeTime: return "GetValueAtTime() time" - case GetBoundaryValuesTime: - return "GetBoundaryValues() time" case GetRangeValuesTime: return "GetRangeValues() time" case ExecQueueTime: diff --git a/web/federate.go b/web/federate.go index 81f39302e..d9baf676b 100644 --- a/web/federate.go +++ b/web/federate.go @@ -67,7 +67,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sp := h.storage.LastSamplePairForFingerprint(fp) // Discard if sample does not exist or lays before the staleness interval. - if sp == nil || sp.Timestamp.Before(minTimestamp) { + if sp.Timestamp.Before(minTimestamp) { continue }