prometheus/storage/merge.go

855 lines
25 KiB
Go
Raw Normal View History

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"container/heap"
"context"
"fmt"
"math"
"sync"
"golang.org/x/exp/slices"
Style cleanup of all the changes in sparsehistogram so far A lot of this code was hacked together, literally during a hackathon. This commit intends not to change the code substantially, but just make the code obey the usual style practices. A (possibly incomplete) list of areas: * Generally address linter warnings. * The `pgk` directory is deprecated as per dev-summit. No new packages should be added to it. I moved the new `pkg/histogram` package to `model` anticipating what's proposed in #9478. * Make the naming of the Sparse Histogram more consistent. Including abbreviations, there were just too many names for it: SparseHistogram, Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in general. Only add "Sparse" if it is needed to avoid confusion with conventional Histograms (which is rare because the TSDB really has no notion of conventional Histograms). Use abbreviations only in local scope, and then really abbreviate (not just removing three out of seven letters like in "Histo"). This is in the spirit of https://github.com/golang/go/wiki/CodeReviewComments#variable-names * Several other minor name changes. * A lot of formatting of doc comments. For one, following https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences , but also layout question, anticipating how things will look like when rendered by `godoc` (even where `godoc` doesn't render them right now because they are for unexported types or not a doc comment at all but just a normal code comment - consistency is queen!). * Re-enabled `TestQueryLog` and `TestEndopints` (they pass now, leaving them disabled was presumably an oversight). * Bucket iterator for histogram.Histogram is now created with a method. * HistogramChunk.iterator now allows iterator recycling. (I think @dieterbe only commented it out because he was confused by the question in the comment.) * HistogramAppender.Append panics now because we decided to treat staleness marker differently. Signed-off-by: beorn7 <beorn@grafana.com>
2021-10-09 13:57:07 +00:00
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/annotations"
)
type mergeGenericQuerier struct {
queriers []genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q))
}
}
for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &querierAdapter{&mergeGenericQuerier{
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 0 {
return noopGenericSeriesSet{}
}
if len(q.queriers) == 1 {
return q.queriers[0].Select(ctx, sortSeries, hints, matchers...)
}
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
var (
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
wg.Add(1)
go func(qr genericQuerier) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr.Select(ctx, true, hints, matchers...)
}(querier)
}
go func() {
wg.Wait()
close(seriesSetChan)
}()
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
type labelGenericQueriers []genericQuerier
func (l labelGenericQueriers) Len() int { return len(l) }
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
i := len(l) / 2
return l[:i], l[i:]
}
// LabelValues returns all potential values for a label name.
Add matchers to LabelValues() call (#8400) * Accept matchers in querier LabelValues() Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * create matcher to only select metrics which have searched label Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * test case for merge querier with matchers Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * test LabelValues with matchers on head Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * add test for LabelValues on block Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * formatting fix Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Add comments Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * add missing lock release Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * remove unused parameter Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Benchmarks for LabelValues() methods on block/head Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Better comment Co-authored-by: Julien Pivotto <roidelapluie@gmail.com> Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * update comment Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * minor refactor make code cleaner Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * better comments Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * fix expected errors in test Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Deleting parameter which can only be empty Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * fix comments Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * remove unnecessary lock Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * only lookup label value if label name was looked up Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Return error when there is one Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Call .Get() on decoder before checking errors Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * only lock head.symMtx when necessary Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * remove unnecessary delete() Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * re-use code instead of duplicating it Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Consistently return error from LabelValueFor() Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * move helper func from util.go to querier.go Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * Fix test expectation Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * ensure result de-duplication and sorting works Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> * return named error from LabelValueFor() Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com> Co-authored-by: Julien Pivotto <roidelapluie@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
2021-02-09 17:38:35 +00:00
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, ws, err := q.lvals(ctx, q.queriers, name, matchers...)
if err != nil {
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
}
return res, ws, nil
}
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(ctx, n, matchers...)
}
a, b := lq.SplitByHalf()
var ws annotations.Annotations
s1, w, err := q.lvals(ctx, a, n, matchers...)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
s2, ws, err := q.lvals(ctx, b, n, matchers...)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
return mergeStrings(s1, s2), ws, nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
switch {
case a[0] == b[0]:
res = append(res, a[0])
a, b = a[1:], b[1:]
case a[0] < b[0]:
res = append(res, a[0])
a = a[1:]
default:
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
var (
labelNamesMap = make(map[string]struct{})
warnings annotations.Annotations
)
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames(ctx, matchers...)
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings.Merge(wrn)
}
if err != nil {
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
if len(labelNamesMap) == 0 {
return nil, warnings, nil
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
slices.Sort(labelNames)
return labelNames, warnings, nil
}
// Close releases the resources of the generic querier.
func (q *mergeGenericQuerier) Close() error {
errs := tsdb_errors.NewMulti()
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
}
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
}
}
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out,
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(genericSeriesSet)
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
}
return true
}
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return c.mergeFunc(series...)
}
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
}
}
return nil
}
func (c *genericMergeSeriesSet) Warnings() annotations.Annotations {
var ws annotations.Annotations
for _, set := range c.sets {
ws.Merge(set.Warnings())
}
return ws
}
type genericSeriesSetHeap []genericSeriesSet
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// It's optimized for non-overlap cases as well.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return ChainSampleIteratorFromSeries(it, series)
},
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped. It's optimized for non-overlap cases as well.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h samplesIteratorHeap
curr chunkenc.Iterator
lastT int64
// Whether the previous and the current sample are direct neighbors
// within the same base iterator.
consecutive bool
}
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator {
csi, ok := it.(*chainSampleIterator)
if !ok {
csi = &chainSampleIterator{}
}
if cap(csi.iterators) < length {
csi.iterators = make([]chunkenc.Iterator, length)
} else {
csi.iterators = csi.iterators[:length]
}
csi.h = nil
csi.lastT = math.MinInt64
return csi
}
func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(series))
for i, s := range series {
csi.iterators[i] = s.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(chunks))
for i, c := range chunks {
csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator {
csi := getChainSampleIterator(it, 0)
csi.iterators = iterators
return csi
}
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
// No-op check.
if c.curr != nil && c.lastT >= t {
return c.curr.Seek(c.lastT)
}
// Don't bother to find out if the next sample is consecutive. Callers
// of Seek usually aren't interested anyway.
c.consecutive = false
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) != chunkenc.ValNone {
heap.Push(&c.h, iter)
}
}
if len(c.h) > 0 {
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
c.lastT = c.curr.AtT()
return c.curr.Seek(c.lastT)
}
c.curr = nil
return chunkenc.ValNone
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if c.curr == nil {
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
}
return c.curr.At()
}
func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
if c.curr == nil {
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
}
t, h := c.curr.AtHistogram()
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if !c.consecutive && h.CounterResetHint != histogram.GaugeType {
h.CounterResetHint = histogram.UnknownCounterReset
}
return t, h
}
func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
if c.curr == nil {
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
}
t, fh := c.curr.AtFloatHistogram()
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if !c.consecutive && fh.CounterResetHint != histogram.GaugeType {
fh.CounterResetHint = histogram.UnknownCounterReset
}
return t, fh
}
func (c *chainSampleIterator) AtT() int64 {
if c.curr == nil {
panic("chainSampleIterator.AtT called before first .Next or after .Next returned false.")
}
return c.curr.AtT()
}
func (c *chainSampleIterator) Next() chunkenc.ValueType {
var (
currT int64
currValueType chunkenc.ValueType
iteratorChanged bool
)
if c.h == nil {
iteratorChanged = true
c.h = samplesIteratorHeap{}
// We call c.curr.Next() as the first thing below.
// So, we don't call Next() on it here.
c.curr = c.iterators[0]
for _, iter := range c.iterators[1:] {
if iter.Next() != chunkenc.ValNone {
heap.Push(&c.h, iter)
}
}
}
if c.curr == nil {
return chunkenc.ValNone
}
for {
currValueType = c.curr.Next()
if currValueType != chunkenc.ValNone {
currT = c.curr.AtT()
if currT == c.lastT {
// Ignoring sample for the same timestamp.
continue
}
if len(c.h) == 0 {
// curr is the only iterator remaining,
// no need to check with the heap.
break
}
// Check current iterator with the top of the heap.
nextT := c.h[0].AtT()
if currT < nextT {
// Current iterator has smaller timestamp than the heap.
break
}
// Current iterator does not hold the smallest timestamp.
heap.Push(&c.h, c.curr)
} else if len(c.h) == 0 {
// No iterator left to iterate.
c.curr = nil
return chunkenc.ValNone
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
iteratorChanged = true
currT = c.curr.AtT()
currValueType = c.curr.Seek(currT)
if currT != c.lastT {
break
}
}
c.consecutive = !iteratorChanged
c.lastT = currT
return currValueType
}
func (c *chainSampleIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
return errs.Err()
}
type samplesIteratorHeap []chunkenc.Iterator
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Less(i, j int) bool {
return h[i].AtT() < h[j].AtT()
}
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
err error
curr chunks.Meta
}
func (c *compactChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *compactChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
}
if len(c.h) == 0 {
return false
}
iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
var (
overlapping []Series
oMaxTime = c.curr.MaxTime
prev = c.curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
}
// Only do something if it is not a perfect duplicate.
if next.MinTime != prev.MinTime ||
next.MaxTime != prev.MaxTime ||
!bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// We operate on same series, so labels do not matter here.
overlapping = append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), next))
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
prev = next
}
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
if len(overlapping) == 0 {
return true
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
return true
}
func (c *compactChunkIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
errs.Add(c.err)
return errs.Err()
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
Add out-of-order sample support to the TSDB (#11075) * Introduce out-of-order TSDB support This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Most of the additions have been borrowed from https://github.com/grafana/mimir-prometheus/ Here is the list ist of the original commits cherry picked from mimir-prometheus into this branch: - 4b2198d7ec47d50989b7c2df66b7b207c32f7f6e - 2836e5513f1bc591535a859f5d41154a75e7c6bc - 00b379c3a5b1ec3799699b6242f300a2b3ea30f0 - ff0dc757587cada63ca948d2d5eb00bf090d63e0 - a632c73352a7e39d60b445700beb47d691549c3e - c6f3d4ab339ab80bbbce74c9946237ced01f0509 - 5e8406a1d4a50d0052bbee83e28ca3b3371408aa - abde1e0ba128936b9eb0224ee1551e56216ebd4a - e70e7698897bb03860bee0467c733fa44e14c9bd - df59320886e03a555d379ac4b0b3130f661407e0 Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * gofumpt files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add license header to missing files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO tests due to existing chunk disk mapper implementation Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix truncate int overflow Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add Sync method to the WAL and update tests Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * remove useless sync Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Update minOOOTime after truncating Head * Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Load OutOfOrderTimeWindow only once per appender Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO Head LabelValues and PostingsForMatchers Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix replay of OOO mmap chunks Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Remove unnecessary err check Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Prevent panic with ApplyConfig Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run OOO compaction after restart if there is OOO data from WBL Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Apply Bartek's suggestions Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Refactor OOO compaction Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address comments and TODOs - Added a comment explaining why we need the allow overlapping compaction toggle - Clarified TSDBConfig OutOfOrderTimeWindow doc - Added an owner to all the TODOs in the code Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run go format Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix remaining review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Change wbl reference when truncating ooo in TestHeadMinOOOTimeUpdate Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix TestWBLAndMmapReplay test failure on windows Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address most of the feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Refactor the block meta for out of order Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix windows error Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
2022-09-20 17:05:50 +00:00
// NewConcatenatingChunkSeriesMerger returns a VerticalChunkSeriesMergeFunc that simply concatenates the
// chunks from the series. The resultant stream of chunks for a series might be overlapping and unsorted.
func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
Add out-of-order sample support to the TSDB (#11075) * Introduce out-of-order TSDB support This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Most of the additions have been borrowed from https://github.com/grafana/mimir-prometheus/ Here is the list ist of the original commits cherry picked from mimir-prometheus into this branch: - 4b2198d7ec47d50989b7c2df66b7b207c32f7f6e - 2836e5513f1bc591535a859f5d41154a75e7c6bc - 00b379c3a5b1ec3799699b6242f300a2b3ea30f0 - ff0dc757587cada63ca948d2d5eb00bf090d63e0 - a632c73352a7e39d60b445700beb47d691549c3e - c6f3d4ab339ab80bbbce74c9946237ced01f0509 - 5e8406a1d4a50d0052bbee83e28ca3b3371408aa - abde1e0ba128936b9eb0224ee1551e56216ebd4a - e70e7698897bb03860bee0467c733fa44e14c9bd - df59320886e03a555d379ac4b0b3130f661407e0 Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * gofumpt files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add license header to missing files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO tests due to existing chunk disk mapper implementation Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix truncate int overflow Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add Sync method to the WAL and update tests Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * remove useless sync Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Update minOOOTime after truncating Head * Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Load OutOfOrderTimeWindow only once per appender Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO Head LabelValues and PostingsForMatchers Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix replay of OOO mmap chunks Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Remove unnecessary err check Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Prevent panic with ApplyConfig Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run OOO compaction after restart if there is OOO data from WBL Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Apply Bartek's suggestions Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Refactor OOO compaction Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address comments and TODOs - Added a comment explaining why we need the allow overlapping compaction toggle - Clarified TSDBConfig OutOfOrderTimeWindow doc - Added an owner to all the TODOs in the code Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run go format Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix remaining review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Change wbl reference when truncating ooo in TestHeadMinOOOTimeUpdate Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix TestWBLAndMmapReplay test failure on windows Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address most of the feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Refactor the block meta for out of order Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix windows error Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
2022-09-20 17:05:50 +00:00
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
Add out-of-order sample support to the TSDB (#11075) * Introduce out-of-order TSDB support This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Most of the additions have been borrowed from https://github.com/grafana/mimir-prometheus/ Here is the list ist of the original commits cherry picked from mimir-prometheus into this branch: - 4b2198d7ec47d50989b7c2df66b7b207c32f7f6e - 2836e5513f1bc591535a859f5d41154a75e7c6bc - 00b379c3a5b1ec3799699b6242f300a2b3ea30f0 - ff0dc757587cada63ca948d2d5eb00bf090d63e0 - a632c73352a7e39d60b445700beb47d691549c3e - c6f3d4ab339ab80bbbce74c9946237ced01f0509 - 5e8406a1d4a50d0052bbee83e28ca3b3371408aa - abde1e0ba128936b9eb0224ee1551e56216ebd4a - e70e7698897bb03860bee0467c733fa44e14c9bd - df59320886e03a555d379ac4b0b3130f661407e0 Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * gofumpt files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add license header to missing files Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO tests due to existing chunk disk mapper implementation Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix truncate int overflow Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Add Sync method to the WAL and update tests Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * remove useless sync Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Update minOOOTime after truncating Head * Update minOOOTime after truncating Head Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix lint Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Add a unit test Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Load OutOfOrderTimeWindow only once per appender Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix OOO Head LabelValues and PostingsForMatchers Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix replay of OOO mmap chunks Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Remove unnecessary err check Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Prevent panic with ApplyConfig Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run OOO compaction after restart if there is OOO data from WBL Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Apply Bartek's suggestions Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Refactor OOO compaction Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address comments and TODOs - Added a comment explaining why we need the allow overlapping compaction toggle - Clarified TSDBConfig OutOfOrderTimeWindow doc - Added an owner to all the TODOs in the code Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Run go format Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix remaining review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix tests Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Change wbl reference when truncating ooo in TestHeadMinOOOTimeUpdate Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> * Fix TestWBLAndMmapReplay test failure on windows Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Address most of the feedback Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Refactor the block meta for out of order Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix windows error Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix review comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar 15064823+codesome@users.noreply.github.com Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
2022-09-20 17:05:50 +00:00
}
return &concatenatingChunkIterator{
iterators: iterators,
}
},
}
}
}
type concatenatingChunkIterator struct {
iterators []chunks.Iterator
idx int
curr chunks.Meta
}
func (c *concatenatingChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *concatenatingChunkIterator) Next() bool {
if c.idx >= len(c.iterators) {
return false
}
if c.iterators[c.idx].Next() {
c.curr = c.iterators[c.idx].At()
return true
}
c.idx++
return c.Next()
}
func (c *concatenatingChunkIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
return errs.Err()
}