tsdb: Moved code merge series and iterators to differen files; cleanup. No functional changes just move! (#7714)

I did not want to move those in previous PR to make it easier to review. Now small cleanup time for readability. (:

## Changes

* Merge series goes to `storage/merge.go` leaving `fanout.go` for just fanout code.
* Moved `fanout test` code from weird separate package to storage.
* Unskiped one test: TestFanout_SelectSorted/chunk_querier
* Moved block series set codes responsible for querying blocks to `querier.go` from `compact.go`



Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7733/head
Bartlomiej Plotka 2020-08-03 11:32:56 +01:00 committed by GitHub
parent e6f2b8ad60
commit 28c5cfaf0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2338 additions and 2319 deletions

View File

@ -14,20 +14,12 @@
package storage
import (
"bytes"
"container/heap"
"context"
"sort"
"strings"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
@ -208,661 +200,3 @@ func (f *fanoutAppender) Rollback() (err error) {
}
return nil
}
type mergeGenericQuerier struct {
queriers []genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries []Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q))
}
}
for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &querierAdapter{&mergeGenericQuerier{
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries []ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 0 {
return noopGenericSeriesSet{}
}
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
var (
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
wg.Add(1)
go func(qr genericQuerier) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr.Select(true, hints, matchers...)
}(querier)
}
go func() {
wg.Wait()
close(seriesSetChan)
}()
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
type labelGenericQueriers []genericQuerier
func (l labelGenericQueriers) Len() int { return len(l) }
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
i := len(l) / 2
return l[:i], l[i:]
}
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
res, ws, err := q.lvals(q.queriers, name)
if err != nil {
return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name)
}
return res, ws, nil
}
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string) ([]string, Warnings, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(n)
}
a, b := lq.SplitByHalf()
var ws Warnings
s1, w, err := q.lvals(a, n)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
s2, ws, err := q.lvals(b, n)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
return mergeStrings(s1, s2), ws, nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
var (
labelNamesMap = make(map[string]struct{})
warnings Warnings
)
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames()
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings = append(warnings, wrn...)
}
if err != nil {
return nil, nil, errors.Wrap(err, "LabelNames() from merge generic querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
if len(labelNamesMap) == 0 {
return nil, warnings, nil
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, warnings, nil
}
// Close releases the resources of the generic querier.
func (q *mergeGenericQuerier) Close() error {
errs := tsdb_errors.MultiError{}
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
}
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
}
}
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(genericSeriesSet)
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
}
return true
}
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return c.mergeFunc(series...)
}
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
}
}
return nil
}
func (c *genericMergeSeriesSet) Warnings() Warnings {
var ws Warnings
for _, set := range c.sets {
ws = append(ws, set.Warnings()...)
}
return ws
}
type genericSeriesSetHeap []genericSeriesSet
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
// to handle overlaps between series.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return newChainSampleIterator(iterators)
},
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h samplesIteratorHeap
}
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &chainSampleIterator{
iterators: iterators,
h: nil,
}
}
func (c *chainSampleIterator) Seek(t int64) bool {
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
panic("chainSampleIterator.At() called after .Next() returned false.")
}
return c.h[0].At()
}
func (c *chainSampleIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
currt, _ := c.At()
for len(c.h) > 0 {
nextt, _ := c.h[0].At()
// All but one of the overlapping samples will be dropped.
if nextt != currt {
break
}
iter := heap.Pop(&c.h).(chunkenc.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
func (c *chainSampleIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
type samplesIteratorHeap []chunkenc.Iterator
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
return at < bt
}
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
err error
curr chunks.Meta
}
func (c *compactChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *compactChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
}
if len(c.h) == 0 {
return false
}
iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
var (
overlapping []Series
oMaxTime = c.curr.MaxTime
prev = c.curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
}
if next.MinTime == prev.MinTime &&
next.MaxTime == prev.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// 1:1 duplicates, skip it.
} else {
// We operate on same series, so labels does not matter here.
overlapping = append(overlapping, newChunkToSeriesDecoder(nil, next))
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
prev = next
}
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
if len(overlapping) == 0 {
return true
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = (&seriesToChunkEncoder{Series: c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)}).Iterator()
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
return true
}
func (c *compactChunkIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
errs.Add(err)
}
}
errs.Add(c.err)
return errs.Err()
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

View File

@ -1,246 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
)
func TestSelectSorted(t *testing.T) {
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
inputTotalSize := 0
ctx := context.Background()
priStorage := teststorage.New(t)
defer priStorage.Close()
app1 := priStorage.Appender(ctx)
app1.Add(inputLabel, 0, 0)
inputTotalSize++
app1.Add(inputLabel, 1000, 1)
inputTotalSize++
app1.Add(inputLabel, 2000, 2)
inputTotalSize++
err := app1.Commit()
testutil.Ok(t, err)
remoteStorage1 := teststorage.New(t)
defer remoteStorage1.Close()
app2 := remoteStorage1.Appender(ctx)
app2.Add(inputLabel, 3000, 3)
inputTotalSize++
app2.Add(inputLabel, 4000, 4)
inputTotalSize++
app2.Add(inputLabel, 5000, 5)
inputTotalSize++
err = app2.Commit()
testutil.Ok(t, err)
remoteStorage2 := teststorage.New(t)
defer remoteStorage2.Close()
app3 := remoteStorage2.Appender(ctx)
app3.Add(inputLabel, 6000, 6)
inputTotalSize++
app3.Add(inputLabel, 7000, 7)
inputTotalSize++
app3.Add(inputLabel, 8000, 8)
inputTotalSize++
err = app3.Commit()
testutil.Ok(t, err)
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
t.Run("querier", func(t *testing.T) {
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)
seriesSet := querier.Select(true, nil, matcher)
result := make(map[int64]float64)
var labelsResult labels.Labels
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
for iterator.Next() {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
testutil.Equals(t, labelsResult, outputLabel)
testutil.Equals(t, inputTotalSize, len(result))
})
t.Run("chunk querier", func(t *testing.T) {
t.Skip("TODO(bwplotka: Unskip when db will implement ChunkQuerier.")
querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher))
result := make(map[int64]float64)
var labelsResult labels.Labels
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
for iterator.Next() {
timestamp, value := iterator.At()
result[timestamp] = value
}
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, labelsResult, outputLabel)
testutil.Equals(t, inputTotalSize, len(result))
})
}
func TestFanoutErrors(t *testing.T) {
workingStorage := teststorage.New(t)
defer workingStorage.Close()
cases := []struct {
primary storage.Storage
secondary storage.Storage
warning error
err error
}{
{
primary: workingStorage,
secondary: errStorage{},
warning: errSelect,
err: nil,
},
{
primary: errStorage{},
secondary: workingStorage,
warning: nil,
err: errSelect,
},
}
for _, tc := range cases {
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
t.Run("samples", func(t *testing.T) {
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(true, nil, matcher)
// Exhaust.
for ss.Next() {
ss.At()
}
if tc.err != nil {
testutil.NotOk(t, ss.Err())
testutil.Equals(t, tc.err.Error(), ss.Err().Error())
}
if tc.warning != nil {
testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected")
testutil.NotOk(t, ss.Warnings()[0])
testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error())
}
})
t.Run("chunks", func(t *testing.T) {
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
testutil.Ok(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
ss := querier.Select(true, nil, matcher)
// Exhaust.
for ss.Next() {
ss.At()
}
if tc.err != nil {
testutil.NotOk(t, ss.Err())
testutil.Equals(t, tc.err.Error(), ss.Err().Error())
}
if tc.warning != nil {
testutil.Assert(t, len(ss.Warnings()) > 0, "warnings expected")
testutil.NotOk(t, ss.Warnings()[0])
testutil.Equals(t, tc.warning.Error(), ss.Warnings()[0].Error())
}
})
}
}
var errSelect = errors.New("select error")
type errStorage struct{}
type errQuerier struct{}
func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) {
return errQuerier{}, nil
}
type errChunkQuerier struct{ errQuerier }
func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
return errChunkQuerier{}, nil
}
func (errStorage) Appender(_ context.Context) storage.Appender { return nil }
func (errStorage) StartTime() (int64, error) { return 0, nil }
func (errStorage) Close() error { return nil }
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return storage.ErrSeriesSet(errSelect)
}
func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
return nil, nil, errors.New("label values error")
}
func (errQuerier) LabelNames() ([]string, storage.Warnings, error) {
return nil, nil, errors.New("label names error")
}
func (errQuerier) Close() error { return nil }
func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
return storage.ErrChunkSeriesSet(errSelect)
}

File diff suppressed because it is too large Load Diff

686
storage/merge.go Normal file
View File

@ -0,0 +1,686 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"container/heap"
"sort"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
type mergeGenericQuerier struct {
queriers []genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries []Querier, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q))
}
}
for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &querierAdapter{&mergeGenericQuerier{
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries []ChunkQuerier, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
}
for _, querier := range secondaries {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 0 {
return noopGenericSeriesSet{}
}
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
var (
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
wg.Add(1)
go func(qr genericQuerier) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr.Select(true, hints, matchers...)
}(querier)
}
go func() {
wg.Wait()
close(seriesSetChan)
}()
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
return s, s.Next()
}}
}
type labelGenericQueriers []genericQuerier
func (l labelGenericQueriers) Len() int { return len(l) }
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
i := len(l) / 2
return l[:i], l[i:]
}
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
res, ws, err := q.lvals(q.queriers, name)
if err != nil {
return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name)
}
return res, ws, nil
}
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string) ([]string, Warnings, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(n)
}
a, b := lq.SplitByHalf()
var ws Warnings
s1, w, err := q.lvals(a, n)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
s2, ws, err := q.lvals(b, n)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
return mergeStrings(s1, s2), ws, nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
d := strings.Compare(a[0], b[0])
if d == 0 {
res = append(res, a[0])
a, b = a[1:], b[1:]
} else if d < 0 {
res = append(res, a[0])
a = a[1:]
} else if d > 0 {
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
var (
labelNamesMap = make(map[string]struct{})
warnings Warnings
)
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames()
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings = append(warnings, wrn...)
}
if err != nil {
return nil, nil, errors.Wrap(err, "LabelNames() from merge generic querier")
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
if len(labelNamesMap) == 0 {
return nil, warnings, nil
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
}
sort.Strings(labelNames)
return labelNames, warnings, nil
}
// Close releases the resources of the generic querier.
func (q *mergeGenericQuerier) Close() error {
errs := tsdb_errors.MultiError{}
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
}
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
}
}
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(genericSeriesSet)
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
}
return true
}
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return c.mergeFunc(series...)
}
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
}
}
return nil
}
func (c *genericMergeSeriesSet) Warnings() Warnings {
var ws Warnings
for _, set := range c.sets {
ws = append(ws, set.Warnings()...)
}
return ws
}
type genericSeriesSetHeap []genericSeriesSet
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
// to handle overlaps between series.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return newChainSampleIterator(iterators)
},
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h samplesIteratorHeap
}
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &chainSampleIterator{
iterators: iterators,
h: nil,
}
}
func (c *chainSampleIterator) Seek(t int64) bool {
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
panic("chainSampleIterator.At() called after .Next() returned false.")
}
return c.h[0].At()
}
func (c *chainSampleIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
currt, _ := c.At()
for len(c.h) > 0 {
nextt, _ := c.h[0].At()
// All but one of the overlapping samples will be dropped.
if nextt != currt {
break
}
iter := heap.Pop(&c.h).(chunkenc.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
func (c *chainSampleIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
type samplesIteratorHeap []chunkenc.Iterator
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
return at < bt
}
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
err error
curr chunks.Meta
}
func (c *compactChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *compactChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
}
if len(c.h) == 0 {
return false
}
iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
var (
overlapping []Series
oMaxTime = c.curr.MaxTime
prev = c.curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
}
if next.MinTime == prev.MinTime &&
next.MaxTime == prev.MaxTime &&
bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// 1:1 duplicates, skip it.
} else {
// We operate on same series, so labels does not matter here.
overlapping = append(overlapping, newChunkToSeriesDecoder(nil, next))
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
prev = next
}
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
if len(overlapping) == 0 {
return true
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = (&seriesToChunkEncoder{Series: c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)}).Iterator()
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
return true
}
func (c *compactChunkIterator) Err() error {
var errs tsdb_errors.MultiError
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
errs.Add(err)
}
}
errs.Add(c.err)
return errs.Err()
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

972
storage/merge_test.go Normal file
View File

@ -0,0 +1,972 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"math"
"sort"
"sync"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
func TestMergeQuerierWithChainMerger(t *testing.T) {
for _, tc := range []struct {
name string
primaryQuerierSeries []Series
querierSeries [][]Series
extraQueriers []Querier
expected SeriesSet
}{
{
name: "one primary querier with no series",
primaryQuerierSeries: []Series{},
expected: NewMockSeriesSet(),
},
{
name: "one secondary querier with no series",
querierSeries: [][]Series{{}},
expected: NewMockSeriesSet(),
},
{
name: "many secondary queriers with no series",
querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockSeriesSet(),
},
{
name: "mix of queriers with no series",
primaryQuerierSeries: []Series{},
querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockSeriesSet(),
},
// Test rest of cases on secondary queriers as the different between primary vs secondary is just error handling.
{
name: "one querier, two series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
),
},
{
name: "two queriers, one different series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
}, {
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
),
},
{
name: "two time unsorted queriers, two series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "five queriers, only two queriers have two time unsorted series each",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "two queriers, only two queriers have two time unsorted series each, with 3 noop and one nil querier together",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
extraQueriers: []Querier{NoopQuerier(), NoopQuerier(), nil, NoopQuerier()},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "two queriers, with two series, one is overlapping",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 22}, sample{3, 32}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "two queries, one with NaN samples series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}, sample{1, 1}}),
),
},
} {
t.Run(tc.name, func(t *testing.T) {
var p Querier
if tc.primaryQuerierSeries != nil {
p = &mockQuerier{toReturn: tc.primaryQuerierSeries}
}
var qs []Querier
for _, in := range tc.querierSeries {
qs = append(qs, &mockQuerier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(false, nil)
// Get all merged series upfront to make sure there are no incorrectly retained shared
// buffers causing bugs.
var mergedSeries []Series
for mergedQuerier.Next() {
mergedSeries = append(mergedSeries, mergedQuerier.At())
}
testutil.Ok(t, mergedQuerier.Err())
for _, actualSeries := range mergedSeries {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil)
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expSmpl, actSmpl)
}
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
for _, tc := range []struct {
name string
primaryChkQuerierSeries []ChunkSeries
chkQuerierSeries [][]ChunkSeries
extraQueriers []ChunkQuerier
expected ChunkSeriesSet
}{
{
name: "one primary querier with no series",
primaryChkQuerierSeries: []ChunkSeries{},
expected: NewMockChunkSeriesSet(),
},
{
name: "one secondary querier with no series",
chkQuerierSeries: [][]ChunkSeries{{}},
expected: NewMockChunkSeriesSet(),
},
{
name: "many secondary queriers with no series",
chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockChunkSeriesSet(),
},
{
name: "mix of queriers with no series",
primaryChkQuerierSeries: []ChunkSeries{},
chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockChunkSeriesSet(),
},
// Test rest of cases on secondary queriers as the different between primary vs secondary is just error handling.
{
name: "one querier, two series",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
name: "two secondaries, one different series each",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
name: "two secondaries, two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "five secondaries, only two have two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{}, {}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}, {}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "two secondaries, with two not in time order series each, with 3 noop queries and one nil together",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
extraQueriers: []ChunkQuerier{NoopChunkedQuerier(), NoopChunkedQuerier(), nil, NoopChunkedQuerier()},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "two queries, one with NaN samples series",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}, []tsdbutil.Sample{sample{1, 1}}),
),
},
} {
t.Run(tc.name, func(t *testing.T) {
var p ChunkQuerier
if tc.primaryChkQuerierSeries != nil {
p = &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries}
}
var qs []ChunkQuerier
for _, in := range tc.chkQuerierSeries {
qs = append(qs, &mockChunkQurier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expChks, expErr := ExpandChunks(expectedSeries.Iterator())
actChks, actErr := ExpandChunks(actualSeries.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expChks, actChks)
}
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestCompactingChunkSeriesMerger(t *testing.T) {
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
for _, tc := range []struct {
name string
input []ChunkSeries
expected ChunkSeries
}{
{
name: "single empty series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "single series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
},
{
name: "two empty series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), nil),
},
{
name: "two non overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{7, 7}, sample{8, 8}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
},
{
name: "two duplicated",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
},
{
name: "three overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}}),
},
{
name: "three in chained overlap",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}, sample{6, 6}, sample{10, 10}}),
},
{
name: "three in chained overlap complex",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{0, 0}, sample{2, 2}, sample{5, 5}, sample{10, 10}, sample{15, 15}, sample{18, 18}, sample{20, 20}, sample{25, 25}, sample{26, 26}, sample{30, 30}},
[]tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
),
},
} {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
testutil.Equals(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expChks, actChks)
})
}
}
type mockQuerier struct {
LabelQuerier
toReturn []Series
}
type seriesByLabel []Series
func (a seriesByLabel) Len() int { return len(a) }
func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
cpy := make([]Series, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(seriesByLabel(cpy))
}
return NewMockSeriesSet(cpy...)
}
type mockChunkQurier struct {
LabelQuerier
toReturn []ChunkSeries
}
type chunkSeriesByLabel []ChunkSeries
func (a chunkSeriesByLabel) Len() int { return len(a) }
func (a chunkSeriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSeriesByLabel) Less(i, j int) bool {
return labels.Compare(a[i].Labels(), a[j].Labels()) < 0
}
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet {
cpy := make([]ChunkSeries, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(chunkSeriesByLabel(cpy))
}
return NewMockChunkSeriesSet(cpy...)
}
type mockSeriesSet struct {
idx int
series []Series
}
func NewMockSeriesSet(series ...Series) SeriesSet {
return &mockSeriesSet{
idx: -1,
series: series,
}
}
func (m *mockSeriesSet) Next() bool {
m.idx++
return m.idx < len(m.series)
}
func (m *mockSeriesSet) At() Series { return m.series[m.idx] }
func (m *mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Warnings() Warnings { return nil }
type mockChunkSeriesSet struct {
idx int
series []ChunkSeries
}
func NewMockChunkSeriesSet(series ...ChunkSeries) ChunkSeriesSet {
return &mockChunkSeriesSet{
idx: -1,
series: series,
}
}
func (m *mockChunkSeriesSet) Next() bool {
m.idx++
return m.idx < len(m.series)
}
func (m *mockChunkSeriesSet) At() ChunkSeries { return m.series[m.idx] }
func (m *mockChunkSeriesSet) Err() error { return nil }
func (m *mockChunkSeriesSet) Warnings() Warnings { return nil }
func TestChainSampleIterator(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}),
},
expected: []tsdbutil.Sample{
sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
// Overlap.
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator(samples{sample{0, 0}, sample{2, 2}}),
NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
NewListSeriesIterator(samples{}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
} {
merged := newChainSampleIterator(tc.input)
actual, err := ExpandSamples(merged, nil)
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, actual)
}
}
func TestChainSampleIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
},
seek: 1,
expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator(samples{sample{2, 2}, sample{3, 3}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator(samples{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator(samples{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator(samples{sample{2, 2}, sample{5, 5}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
} {
merged := newChainSampleIterator(tc.input)
actual := []tsdbutil.Sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
s, err := ExpandSamples(merged, nil)
testutil.Ok(t, err)
actual = append(actual, s...)
testutil.Equals(t, tc.expected, actual)
}
}
var result []tsdbutil.Sample
func makeSeriesSet(numSeries, numSamples int) SeriesSet {
series := []Series{}
for j := 0; j < numSeries; j++ {
labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}}
samples := []tsdbutil.Sample{}
for k := 0; k < numSamples; k++ {
samples = append(samples, sample{t: int64(k), v: float64(k)})
}
series = append(series, NewListSeries(labels, samples))
}
return NewMockSeriesSet(series...)
}
func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
seriesSets := []genericSeriesSet{}
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
var err error
for n := 0; n < b.N; n++ {
for seriesSet.Next() {
result, err = ExpandSamples(seriesSet.At().Iterator(), nil)
testutil.Ok(b, err)
}
}
}
func BenchmarkNoMergeSeriesSet_100_100(b *testing.B) {
seriesSet := makeSeriesSet(100, 100)
benchmarkDrain(seriesSet, b)
}
func BenchmarkMergeSeriesSet(b *testing.B) {
for _, bm := range []struct {
numSeriesSets, numSeries, numSamples int
}{
{1, 100, 100},
{10, 100, 100},
{100, 100, 100},
} {
seriesSet := makeMergeSeriesSet(bm.numSeriesSets, bm.numSeries, bm.numSamples)
b.Run(fmt.Sprintf("%d_%d_%d", bm.numSeriesSets, bm.numSeries, bm.numSamples), func(b *testing.B) {
benchmarkDrain(seriesSet, b)
})
}
}
type mockGenericQuerier struct {
mtx sync.Mutex
closed bool
labelNamesCalls int
labelNamesRequested []string
sortedSeriesRequested []bool
resp []string
warnings Warnings
err error
}
func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
m.mtx.Lock()
m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
m.mtx.Unlock()
return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
}
func (m *mockGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, name)
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) LabelNames() ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesCalls++
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
func (m *mockGenericQuerier) Close() error {
m.closed = true
return nil
}
type mockGenericSeriesSet struct {
resp []string
warnings Warnings
err error
curr int
}
func (m *mockGenericSeriesSet) Next() bool {
if m.err != nil {
return false
}
if m.curr >= len(m.resp) {
return false
}
m.curr++
return true
}
func (m *mockGenericSeriesSet) Err() error { return m.err }
func (m *mockGenericSeriesSet) Warnings() Warnings { return m.warnings }
func (m *mockGenericSeriesSet) At() Labels {
return mockLabels(m.resp[m.curr-1])
}
type mockLabels string
func (l mockLabels) Labels() labels.Labels {
return labels.FromStrings("test", string(l))
}
func unwrapMockGenericQuerier(t *testing.T, qr genericQuerier) *mockGenericQuerier {
m, ok := qr.(*mockGenericQuerier)
if !ok {
s, ok := qr.(*secondaryQuerier)
testutil.Assert(t, ok, "expected secondaryQuerier got something else")
m, ok = s.genericQuerier.(*mockGenericQuerier)
testutil.Assert(t, ok, "expected mockGenericQuerier got something else")
}
return m
}
func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
var (
errStorage = errors.New("storage error")
warnStorage = errors.New("storage warning")
)
for _, tcase := range []struct {
name string
queriers []genericQuerier
expectedSelectsSeries []labels.Labels
expectedLabels []string
expectedWarnings [3]Warnings
expectedErrs [3]error
}{
{},
{
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
},
{
name: "multiple successful primary queriers",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&mockGenericQuerier{resp: []string{"b", "c"}, warnings: nil, err: nil},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"a", "b", "c"},
},
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"a", "b", "c"},
},
{
name: "one successful primary querier with empty response and successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
},
expectedLabels: []string{"b", "c"},
},
{
name: "one failed primary querier with successful secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{warnings: nil, err: errStorage},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: nil, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: errStorage}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: errStorage}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
},
expectedLabels: []string{"a"},
expectedWarnings: [3]Warnings{
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
},
},
{
name: "successful queriers with warnings",
queriers: []genericQuerier{
&mockGenericQuerier{resp: []string{"a"}, warnings: []error{warnStorage}, err: nil},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: []error{warnStorage}, err: nil}},
},
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
expectedWarnings: [3]Warnings{
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
q := &mergeGenericQuerier{
queriers: tcase.queriers,
mergeFn: func(l ...Labels) Labels { return l[0] },
}
t.Run("Select", func(t *testing.T) {
res := q.Select(false, nil)
var lbls []labels.Labels
for res.Next() {
lbls = append(lbls, res.At().Labels())
}
testutil.Equals(t, tcase.expectedWarnings[0], res.Warnings())
testutil.Equals(t, tcase.expectedErrs[0], res.Err())
testutil.Assert(t, errors.Is(res.Err(), tcase.expectedErrs[0]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedSelectsSeries, lbls)
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
exp := []bool{true}
if len(q.queriers) == 1 {
exp[0] = false
}
testutil.Equals(t, exp, m.sortedSeriesRequested)
}
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames()
testutil.Equals(t, tcase.expectedWarnings[1], w)
testutil.Assert(t, errors.Is(err, tcase.expectedErrs[1]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
testutil.Equals(t, 1, m.labelNamesCalls)
}
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues("test")
testutil.Equals(t, tcase.expectedWarnings[2], w)
testutil.Assert(t, errors.Is(err, tcase.expectedErrs[2]), "expected error doesn't match")
testutil.Equals(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
testutil.Equals(t, []string{"test"}, m.labelNamesRequested)
}
})
})
}
}

View File

@ -29,7 +29,6 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -787,397 +786,3 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
}
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
mint, maxt int64
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
bufChks []chunks.Meta
err error
}
func (b *blockBaseSeriesSet) Next() bool {
var lbls labels.Labels
for b.p.Next() {
if err := b.index.Series(b.p.At(), &lbls, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
if len(b.bufChks) == 0 {
continue
}
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
return false
}
// NOTE:
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
// * requested time ranges are closed: [req.Start, req.End].
var trimFront, trimBack bool
// Copy chunks as iteratables are reusable.
chks := make([]chunks.Meta, 0, len(b.bufChks))
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
for _, chk := range b.bufChks {
if chk.MaxTime < b.mint {
continue
}
if chk.MinTime > b.maxt {
continue
}
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
chks = append(chks, chk)
}
// If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint {
trimFront = true
}
if chk.MaxTime > b.maxt {
trimBack = true
}
}
if len(chks) == 0 {
continue
}
if trimFront {
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
}
if trimBack {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = lbls
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
}
return true
}
return false
}
func (b *blockBaseSeriesSet) Err() error {
if b.err != nil {
return b.err
}
return b.p.Err()
}
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
i int
err error
bufIter *deletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
intervals: intervals,
}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
if p.err != nil || p.i >= len(p.chks)-1 {
return false
}
p.i++
p.currChkMeta = p.chks[p.i]
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
return false
}
p.bufIter.intervals = p.bufIter.intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
}
}
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
}
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
}
for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
}
return nil
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}
if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}
t, v := p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
for p.currDelIter.Next() {
t, v = p.currDelIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
// Samples from chunks are also trimmed to requested min and max time.
type blockSeriesSet struct {
blockBaseSeriesSet
}
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
return &blockSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
}
}
// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
}
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
}
type mergedStringIter struct {
a index.StringIter
b index.StringIter
aok, bok bool
cur string
}
func (m *mergedStringIter) Next() bool {
if (!m.aok && !m.bok) || (m.Err() != nil) {
return false
}
if !m.aok {
m.cur = m.b.At()
m.bok = m.b.Next()
} else if !m.bok {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.b.At() > m.a.At() {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.a.At() > m.b.At() {
m.cur = m.b.At()
m.bok = m.b.Next()
} else { // Equal.
m.cur = m.b.At()
m.aok = m.a.Next()
m.bok = m.b.Next()
}
return true
}
func (m mergedStringIter) At() string { return m.cur }
func (m mergedStringIter) Err() error {
if m.a.Err() != nil {
return m.a.Err()
}
return m.b.Err()
}

View File

@ -31,7 +31,6 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/testutil"
)
@ -1307,98 +1306,3 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
})
}
}
func TestBlockBaseSeriesSet(t *testing.T) {
type refdSeries struct {
lset labels.Labels
chunks []chunks.Meta
ref uint64
}
cases := []struct {
series []refdSeries
// Postings should be in the sorted order of the series
postings []uint64
expIdxs []int
}{
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}}...),
chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 121},
},
ref: 12,
},
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 1,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
},
ref: 108,
},
},
postings: []uint64{12, 13, 10, 108}, // 13 doesn't exist and should just be skipped over.
expIdxs: []int{0, 1, 3},
},
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 3,
},
},
postings: []uint64{},
expIdxs: []int{},
},
}
for _, tc := range cases {
mi := newMockIndex()
for _, s := range tc.series {
testutil.Ok(t, mi.AddSeries(s.ref, s.lset, s.chunks...))
}
bcs := &blockBaseSeriesSet{
p: index.NewListPostings(tc.postings),
index: mi,
tombstones: tombstones.NewMemTombstones(),
}
i := 0
for bcs.Next() {
chks := bcs.currIterFn().chks
idx := tc.expIdxs[i]
testutil.Equals(t, tc.series[idx].lset, bcs.currLabels)
testutil.Equals(t, tc.series[idx].chunks, chks)
i++
}
testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err())
}
}

View File

@ -14,6 +14,7 @@
package tsdb
import (
"math"
"sort"
"strings"
"unicode/utf8"
@ -22,6 +23,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
@ -359,6 +361,400 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
return ix.Postings(m.Name, res...)
}
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
type blockBaseSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
mint, maxt int64
currIterFn func() *populateWithDelGenericSeriesIterator
currLabels labels.Labels
bufChks []chunks.Meta
err error
}
func (b *blockBaseSeriesSet) Next() bool {
var lbls labels.Labels
for b.p.Next() {
if err := b.index.Series(b.p.At(), &lbls, &b.bufChks); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == storage.ErrNotFound {
continue
}
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
if len(b.bufChks) == 0 {
continue
}
intervals, err := b.tombstones.Get(b.p.At())
if err != nil {
b.err = errors.Wrap(err, "get tombstones")
return false
}
// NOTE:
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
// * requested time ranges are closed: [req.Start, req.End].
var trimFront, trimBack bool
// Copy chunks as iteratables are reusable.
chks := make([]chunks.Meta, 0, len(b.bufChks))
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
for _, chk := range b.bufChks {
if chk.MaxTime < b.mint {
continue
}
if chk.MinTime > b.maxt {
continue
}
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) {
chks = append(chks, chk)
}
// If still not entirely deleted, check if trim is needed based on requested time range.
if chk.MinTime < b.mint {
trimFront = true
}
if chk.MaxTime > b.maxt {
trimBack = true
}
}
if len(chks) == 0 {
continue
}
if trimFront {
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1})
}
if trimBack {
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64})
}
b.currLabels = lbls
b.currIterFn = func() *populateWithDelGenericSeriesIterator {
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals)
}
return true
}
return false
}
func (b *blockBaseSeriesSet) Err() error {
if b.err != nil {
return b.err
}
return b.p.Err()
}
func (b *blockBaseSeriesSet) Warnings() storage.Warnings { return nil }
// populateWithDelGenericSeriesIterator allows to iterate over given chunk metas. In each iteration it ensures
// that chunks are trimmed based on given tombstones interval if any.
//
// populateWithDelGenericSeriesIterator assumes that chunks that would be fully removed by intervals are filtered out in previous phase.
//
// On each iteration currChkMeta is available. If currDelIter is not nil, it means that chunk iterator in currChkMeta
// is invalid and chunk rewrite is needed, currDelIter should be used.
type populateWithDelGenericSeriesIterator struct {
chunks ChunkReader
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
i int
err error
bufIter *deletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
}
func newPopulateWithDelGenericSeriesIterator(
chunks ChunkReader,
chks []chunks.Meta,
intervals tombstones.Intervals,
) *populateWithDelGenericSeriesIterator {
return &populateWithDelGenericSeriesIterator{
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
intervals: intervals,
}
}
func (p *populateWithDelGenericSeriesIterator) next() bool {
if p.err != nil || p.i >= len(p.chks)-1 {
return false
}
p.i++
p.currChkMeta = p.chks[p.i]
p.currChkMeta.Chunk, p.err = p.chunks.Chunk(p.currChkMeta.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.currChkMeta.Ref)
return false
}
p.bufIter.intervals = p.bufIter.intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
}
}
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block or just fetching chunks from TSDB.
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
}
func (p *populateWithDelGenericSeriesIterator) Err() error { return p.err }
func (p *populateWithDelGenericSeriesIterator) toSeriesIterator() chunkenc.Iterator {
return &populateWithDelSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
func (p *populateWithDelGenericSeriesIterator) toChunkSeriesIterator() chunks.Iterator {
return &populateWithDelChunkSeriesIterator{populateWithDelGenericSeriesIterator: p}
}
// populateWithDelSeriesIterator allows to iterate over samples for the single series.
type populateWithDelSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunkenc.Iterator
}
func (p *populateWithDelSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
}
for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
}
}
return false
}
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) Err() error {
if err := p.populateWithDelGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
}
return nil
}
type populateWithDelChunkSeriesIterator struct {
*populateWithDelGenericSeriesIterator
curr chunks.Meta
}
func (p *populateWithDelChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}
p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}
// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}
if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}
t, v := p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)
for p.currDelIter.Next() {
t, v = p.currDelIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}
func (p *populateWithDelChunkSeriesIterator) At() chunks.Meta { return p.curr }
// blockSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Series with no samples.
// Samples from chunks are also trimmed to requested min and max time.
type blockSeriesSet struct {
blockBaseSeriesSet
}
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet {
return &blockSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockSeriesSet) At() storage.Series {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
}
}
// blockChunkSeriesSet allows to iterate over sorted, populated series with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested [min and max] (keeping samples with min and max timestamps).
type blockChunkSeriesSet struct {
blockBaseSeriesSet
}
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet {
return &blockChunkSeriesSet{
blockBaseSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
mint: mint,
maxt: maxt,
},
}
}
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
// At can be looped over before iterating, so save the current value locally.
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
}
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
}
type mergedStringIter struct {
a index.StringIter
b index.StringIter
aok, bok bool
cur string
}
func (m *mergedStringIter) Next() bool {
if (!m.aok && !m.bok) || (m.Err() != nil) {
return false
}
if !m.aok {
m.cur = m.b.At()
m.bok = m.b.Next()
} else if !m.bok {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.b.At() > m.a.At() {
m.cur = m.a.At()
m.aok = m.a.Next()
} else if m.a.At() > m.b.At() {
m.cur = m.b.At()
m.bok = m.b.Next()
} else { // Equal.
m.cur = m.b.At()
m.aok = m.a.Next()
m.bok = m.b.Next()
}
return true
}
func (m mergedStringIter) At() string { return m.cur }
func (m mergedStringIter) Err() error {
if m.a.Err() != nil {
return m.a.Err()
}
return m.b.Err()
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {

View File

@ -2023,3 +2023,98 @@ func TestPostingsForMatcher(t *testing.T) {
}
}
}
func TestBlockBaseSeriesSet(t *testing.T) {
type refdSeries struct {
lset labels.Labels
chunks []chunks.Meta
ref uint64
}
cases := []struct {
series []refdSeries
// Postings should be in the sorted order of the series
postings []uint64
expIdxs []int
}{
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}}...),
chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 121},
},
ref: 12,
},
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 1,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
},
ref: 108,
},
},
postings: []uint64{12, 13, 10, 108}, // 13 doesn't exist and should just be skipped over.
expIdxs: []int{0, 1, 3},
},
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 3,
},
},
postings: []uint64{},
expIdxs: []int{},
},
}
for _, tc := range cases {
mi := newMockIndex()
for _, s := range tc.series {
testutil.Ok(t, mi.AddSeries(s.ref, s.lset, s.chunks...))
}
bcs := &blockBaseSeriesSet{
p: index.NewListPostings(tc.postings),
index: mi,
tombstones: tombstones.NewMemTombstones(),
}
i := 0
for bcs.Next() {
chks := bcs.currIterFn().chks
idx := tc.expIdxs[i]
testutil.Equals(t, tc.series[idx].lset, bcs.currLabels)
testutil.Equals(t, tc.series[idx].chunks, chks)
i++
}
testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err())
}
}