storage: Added Chunks{Queryable/Querier/SeriesSet/Series/Iteratable. Added generic Merge{SeriesSet/Querier} implementation. (#7005)

* storage: Added Chunks{Queryable/Querier/SeriesSet/Series/Iteratable. Added generic Merge{SeriesSet/Querier} implementation.

## Rationales:

In many places (e.g. chunk Remote read, Thanos Receive fetching chunk from TSDB), we operate on encoded chunks not samples.
This means that we unnecessary decode/encode, wasting CPU, time and memory.
This PR adds chunk iterator interfaces and makes the merge code to be reused between both seriesSets

I will make the use of it in following PR inside tsdb itself. For now fanout implements it and mergers.

All merges now also allows passing series mergers. This opens doors for custom deduplications other than TSDB vertical ones (e.g. offline one we have in Thanos).

## Changes

* Added Chunk versions of all iterating methods. It all starts in Querier/ChunkQuerier. The plan is that
Storage will implement both chunked and samples.
* Added Seek to chunks.Iterator interface for iterating over chunks.
* NewMergeChunkQuerier was added; Both this and NewMergeQuerier are now using generigMergeQuerier to share the code. Generic code was added.
* Improved tests.
* Added some TODO for further simplifications in next PRs.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Brian's comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Moved s/Labeled/SeriesLabels as per Krasi suggestion.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Krasi's comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Second iteration of Krasi comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Another round of comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7046/head
Bartlomiej Plotka 2020-03-24 20:15:47 +00:00 committed by GitHub
parent fac7a4a050
commit d5c33877f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1268 additions and 355 deletions

View File

@ -136,6 +136,14 @@ type sample struct {
v float64
}
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
type sampleRing struct {
delta int64

View File

@ -15,11 +15,9 @@ package storage
import (
"math/rand"
"sort"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -107,15 +105,15 @@ func TestBufferedSeriesIterator(t *testing.T) {
testutil.Equals(t, ev, v, "value mismatch")
}
it = NewBufferIterator(newListSeriesIterator([]sample{
{t: 1, v: 2},
{t: 2, v: 3},
{t: 3, v: 4},
{t: 4, v: 5},
{t: 5, v: 6},
{t: 99, v: 8},
{t: 100, v: 9},
{t: 101, v: 10},
it = NewBufferIterator(NewListSeriesIterator([]tsdbutil.Sample{
sample{t: 1, v: 2},
sample{t: 2, v: 3},
sample{t: 3, v: 4},
sample{t: 4, v: 5},
sample{t: 5, v: 6},
sample{t: 99, v: 8},
sample{t: 100, v: 9},
sample{t: 101, v: 10},
}), 2)
testutil.Assert(t, it.Seek(-123), "seek failed")
@ -189,61 +187,6 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
type mockSeries struct {
labels func() labels.Labels
iterator func() chunkenc.Iterator
}
func newMockSeries(lset labels.Labels, samples []sample) Series {
return &mockSeries{
labels: func() labels.Labels {
return lset
},
iterator: func() chunkenc.Iterator {
return newListSeriesIterator(samples)
},
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() chunkenc.Iterator { return m.iterator() }
type listSeriesIterator struct {
list []sample
idx int
}
func newListSeriesIterator(list []sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.t, s.v
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.t >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}
type fakeSeriesIterator struct {
nsamples int64
step int64
@ -268,6 +211,4 @@ func (it *fakeSeriesIterator) Seek(t int64) bool {
return it.idx < it.nsamples
}
func (it *fakeSeriesIterator) Err() error {
return nil
}
func (it *fakeSeriesIterator) Err() error { return nil }

View File

@ -25,6 +25,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
type fanout struct {
@ -68,24 +70,27 @@ func (f *fanout) StartTime() (int64, error) {
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
queriers := make([]Querier, 0, 1+len(f.secondaries))
// Add primary querier
// Add primary querier.
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, primaryQuerier)
// Add secondary queriers
// Add secondary queriers.
for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt)
if err != nil {
NewMergeQuerier(primaryQuerier, queriers).Close()
for _, q := range queriers {
// TODO(bwplotka): Log error.
_ = q.Close()
}
return nil, err
}
queriers = append(queriers, querier)
}
return NewMergeQuerier(primaryQuerier, queriers), nil
return NewMergeQuerier(primaryQuerier, queriers, ChainedSeriesMerge), nil
}
func (f *fanout) Appender() Appender {
@ -181,66 +186,96 @@ func (f *fanoutAppender) Rollback() (err error) {
return nil
}
// mergeQuerier implements Querier.
type mergeQuerier struct {
primaryQuerier Querier
queriers []Querier
type mergeGenericQuerier struct {
mergeFunc genericSeriesMergeFunc
failedQueriers map[Querier]struct{}
setQuerierMap map[SeriesSet]Querier
primaryQuerier genericQuerier
queriers []genericQuerier
failedQueriers map[genericQuerier]struct{}
setQuerierMap map[genericSeriesSet]genericQuerier
}
// NewMergeQuerier returns a new Querier that merges results of input queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// NewMergeQuerier returns a new Querier that merges results of chkQuerierSeries queriers.
// NewMergeQuerier will return NoopQuerier if no queriers are passed to it
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
filtered := make([]Querier, 0, len(queriers))
// The difference between primary and secondary is as follows: f the primaryQuerier returns an error, query fails.
// For secondaries it just return warnings.
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier, mergeFunc VerticalSeriesMergeFunc) Querier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
if querier != NoopQuerier() {
filtered = append(filtered, querier)
if _, ok := querier.(noopQuerier); !ok && querier != nil {
filtered = append(filtered, newGenericQuerierFrom(querier))
}
}
setQuerierMap := make(map[SeriesSet]Querier)
failedQueriers := make(map[Querier]struct{})
if len(filtered) == 0 {
return primaryQuerier
}
switch len(filtered) {
case 0:
return NoopQuerier()
case 1:
return filtered[0]
default:
return &mergeQuerier{
primaryQuerier: primaryQuerier,
queriers: filtered,
failedQueriers: failedQueriers,
setQuerierMap: setQuerierMap,
if primaryQuerier == nil && len(filtered) == 1 {
return &querierAdapter{filtered[0]}
}
return &querierAdapter{&mergeGenericQuerier{
mergeFunc: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge,
primaryQuerier: newGenericQuerierFrom(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
}}
}
// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of chkQuerierSeries chunk queriers.
// NewMergeChunkQuerier will return NoopChunkQuerier if no chunk queriers are passed to it,
// and will filter NoopQuerieNoopChunkQuerierrs from its arguments, in order to reduce overhead
// when only one chunk querier is passed.
func NewMergeChunkQuerier(primaryQuerier ChunkQuerier, queriers []ChunkQuerier, merger VerticalChunkSeriesMergerFunc) ChunkQuerier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
filtered = append(filtered, newGenericQuerierFromChunk(querier))
}
}
if len(filtered) == 0 {
return primaryQuerier
}
if primaryQuerier == nil && len(filtered) == 1 {
return &chunkQuerierAdapter{filtered[0]}
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFunc: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge,
primaryQuerier: newGenericQuerierFromChunk(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var (
seriesSets = make([]SeriesSet, 0, len(q.queriers))
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
warnings Warnings
priErr error
)
type queryResult struct {
qr Querier
set SeriesSet
qr genericQuerier
set genericSeriesSet
wrn Warnings
selectError error
}
queryResultChan := make(chan *queryResult)
for _, querier := range q.queriers {
go func(qr Querier) {
go func(qr genericQuerier) {
// We need to sort for NewMergeSeriesSet to work.
set, wrn, err := qr.Select(true, hints, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
@ -267,16 +302,15 @@ func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*
if priErr != nil {
return nil, nil, priErr
}
return NewMergeSeriesSet(seriesSets, q), warnings, nil
return newGenericMergeSeriesSet(seriesSets, q, q.mergeFunc), warnings, nil
}
// LabelValues returns all potential values for a label name.
func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
var results [][]string
var warnings Warnings
for _, querier := range q.queriers {
values, wrn, err := querier.LabelValues(name)
if wrn != nil {
warnings = append(warnings, wrn...)
}
@ -295,7 +329,7 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
return mergeStringSlices(results), warnings, nil
}
func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
func (q *mergeGenericQuerier) IsFailedSet(set genericSeriesSet) bool {
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
return isFailedQuerier
}
@ -340,18 +374,19 @@ func mergeTwoStringSlices(a, b []string) []string {
}
// LabelNames returns all the unique label names present in the block in sorted order.
func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
labelNamesMap := make(map[string]struct{})
var warnings Warnings
for _, b := range q.queriers {
names, wrn, err := b.LabelNames()
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames()
if wrn != nil {
warnings = append(warnings, wrn...)
}
if err != nil {
// If the error source isn't the primary querier, return the error as a warning and continue.
if b != q.primaryQuerier {
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primaryQuerier querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
@ -374,39 +409,70 @@ func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
}
// Close releases the resources of the Querier.
func (q *mergeQuerier) Close() error {
// TODO return multiple errors?
var lastErr error
func (q *mergeGenericQuerier) Close() error {
var errs tsdb_errors.MultiError
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
lastErr = err
errs.Add(err)
}
}
return lastErr
return errs.Err()
}
// mergeSeriesSet implements SeriesSet
type mergeSeriesSet struct {
// genericMergeSeriesSet implements genericSeriesSet
type genericMergeSeriesSet struct {
currentLabels labels.Labels
currentSets []SeriesSet
heap seriesSetHeap
sets []SeriesSet
mergeFunc genericSeriesMergeFunc
querier *mergeQuerier
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
querier *mergeGenericQuerier
}
// NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating.
// Each input series set must return its series in labels order, otherwise
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// VerticalChunkSeriesMergerFunc returns merged chunk series implementation that merges series with same labels together.
// It has to handle time-overlapped chunk series as well.
type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries
// NewMergeSeriesSet returns a new SeriesSet that merges results of chkQuerierSeries SeriesSets.
func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)}
}
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMergerFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)}
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the chkQuerierSeries series sets when iterating.
// Each chkQuerierSeries series set must return its series in labels order, otherwise
// merged series set will be incorrect.
func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
// Argument 'querier' is optional and can be nil. Pass Querier if you want to retry query in case of failing series set.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuerier, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// Sets need to be pre-advanced, so we can introspect the label of the
// series under the cursor.
var h seriesSetHeap
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
@ -415,14 +481,15 @@ func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
heap.Push(&h, set)
}
}
return &mergeSeriesSet{
heap: h,
sets: sets,
querier: querier,
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
heap: h,
sets: sets,
querier: querier,
}
}
func (c *mergeSeriesSet) Next() bool {
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If a remote querier fails, we discard all series sets from that querier.
// If, for the current label set, all the next series sets come from
@ -443,7 +510,7 @@ func (c *mergeSeriesSet) Next() bool {
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
set := heap.Pop(&c.heap).(genericSeriesSet)
if c.querier != nil && c.querier.IsFailedSet(set) {
continue
}
@ -459,21 +526,18 @@ func (c *mergeSeriesSet) Next() bool {
return true
}
func (c *mergeSeriesSet) At() Series {
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := []Series{}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return &mergeSeries{
labels: c.currentLabels,
series: series,
}
return c.mergeFunc(series...)
}
func (c *mergeSeriesSet) Err() error {
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
@ -482,21 +546,21 @@ func (c *mergeSeriesSet) Err() error {
return nil
}
type seriesSetHeap []SeriesSet
type genericSeriesSetHeap []genericSeriesSet
func (h seriesSetHeap) Len() int { return len(h) }
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h seriesSetHeap) Less(i, j int) bool {
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *seriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(SeriesSet))
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *seriesSetHeap) Pop() interface{} {
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
@ -504,37 +568,53 @@ func (h *seriesSetHeap) Pop() interface{} {
return x
}
type mergeSeries struct {
// ChainedSeriesMerge returns single series from many same series by chaining samples together.
// In case of the timestamp overlap, the first overlapped sample is kept and the rest samples with the same timestamps
// are dropped. We expect the same labels for each given series.
// TODO(bwplotka): This has the same logic as tsdb.verticalChainedSeries. Remove this in favor of ChainedSeriesMerge in next PRs.
func ChainedSeriesMerge(s ...Series) Series {
if len(s) == 0 {
return nil
}
return &chainSeries{
labels: s[0].Labels(),
series: s,
}
}
type chainSeries struct {
labels labels.Labels
series []Series
}
func (m *mergeSeries) Labels() labels.Labels {
func (m *chainSeries) Labels() labels.Labels {
return m.labels
}
func (m *mergeSeries) Iterator() chunkenc.Iterator {
func (m *chainSeries) Iterator() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(m.series))
for _, s := range m.series {
iterators = append(iterators, s.Iterator())
}
return newMergeIterator(iterators)
return newChainSampleIterator(iterators)
}
type mergeIterator struct {
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series.
// If one or more samples overlap, the first one is kept and all others with the same timestamp are dropped.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h seriesIteratorHeap
h samplesIteratorHeap
}
func newMergeIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &mergeIterator{
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &chainSampleIterator{
iterators: iterators,
h: nil,
}
}
func (c *mergeIterator) Seek(t int64) bool {
c.h = seriesIteratorHeap{}
func (c *chainSampleIterator) Seek(t int64) bool {
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) {
heap.Push(&c.h, iter)
@ -543,15 +623,15 @@ func (c *mergeIterator) Seek(t int64) bool {
return len(c.h) > 0
}
func (c *mergeIterator) At() (t int64, v float64) {
func (c *chainSampleIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
panic("mergeIterator.At() called after .Next() returned false.")
panic("chainSampleIterator.At() called after .Next() returned false.")
}
return c.h[0].At()
}
func (c *mergeIterator) Next() bool {
func (c *chainSampleIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
@ -569,6 +649,7 @@ func (c *mergeIterator) Next() bool {
currt, _ := c.At()
for len(c.h) > 0 {
nextt, _ := c.h[0].At()
// All but one of the overlapping samples will be dropped.
if nextt != currt {
break
}
@ -582,7 +663,7 @@ func (c *mergeIterator) Next() bool {
return len(c.h) > 0
}
func (c *mergeIterator) Err() error {
func (c *chainSampleIterator) Err() error {
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
return err
@ -591,22 +672,163 @@ func (c *mergeIterator) Err() error {
return nil
}
type seriesIteratorHeap []chunkenc.Iterator
type samplesIteratorHeap []chunkenc.Iterator
func (h seriesIteratorHeap) Len() int { return len(h) }
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h seriesIteratorHeap) Less(i, j int) bool {
func (h samplesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
return at < bt
}
func (h *seriesIteratorHeap) Push(x interface{}) {
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *seriesIteratorHeap) Pop() interface{} {
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks.
// Passed chunks:
// * have to be sorted by MinTime.
// * have to be part of exactly the same timeseries.
// * have to be populated.
type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator
type verticalChunkSeriesMerger struct {
verticalChunksMerger VerticalChunksMergeFunc
labels labels.Labels
series []ChunkSeries
}
// NewVerticalChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one or more chunks.
// In case of the chunk overlap, given VerticalChunkMergeFunc will be used.
// It expects the same labels for each given series.
func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMergerFunc {
return func(s ...ChunkSeries) ChunkSeries {
if len(s) == 0 {
return nil
}
return &verticalChunkSeriesMerger{
verticalChunksMerger: chunkMerger,
labels: s[0].Labels(),
series: s,
}
}
}
func (s *verticalChunkSeriesMerger) Labels() labels.Labels {
return s.labels
}
func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(s.series))
for _, series := range s.series {
iterators = append(iterators, series.Iterator())
}
return &chainChunkIterator{
overlappedChunksMerger: s.verticalChunksMerger,
iterators: iterators,
h: nil,
}
}
// chainChunkIterator is responsible to chain chunks from different iterators of same time series.
// If they are time overlapping overlappedChunksMerger will be used.
type chainChunkIterator struct {
overlappedChunksMerger VerticalChunksMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
}
func (c *chainChunkIterator) At() chunks.Meta {
if len(c.h) == 0 {
panic("chainChunkIterator.At() called after .Next() returned false.")
}
return c.h[0].At()
}
func (c *chainChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
// Detect the shortest chain of time-overlapped chunks.
last := c.At()
var overlapped []chunks.Meta
for {
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
if len(c.h) == 0 {
break
}
next := c.At()
if next.MinTime > last.MaxTime {
// No overlap with last one.
break
}
overlapped = append(overlapped, last)
last = next
}
if len(overlapped) > 0 {
heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.At())...))
return true
}
return len(c.h) > 0
}
func (c *chainChunkIterator) Err() error {
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
return err
}
}
return nil
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]

View File

@ -16,10 +16,12 @@ package storage
import (
"fmt"
"math"
"sort"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -52,169 +54,361 @@ func TestMergeTwoStringSlices(t *testing.T) {
}
}
func TestMergeSeriesSet(t *testing.T) {
func TestMergeQuerierWithChainMerger(t *testing.T) {
for _, tc := range []struct {
input []SeriesSet
name string
querierSeries [][]Series
extraQueriers []Querier
expected SeriesSet
}{
{
input: []SeriesSet{newMockSeriesSet()},
expected: newMockSeriesSet(),
name: "1 querier with no series",
querierSeries: [][]Series{{}},
expected: NewMockSeriesSet(),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
name: "many queriers with no series",
querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockSeriesSet(),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{2, 2}, {3, 3}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}),
name: "1 querier, two series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
name: "2 queriers, 1 different series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
}, {
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
),
},
{
name: "2 time unsorted queriers, 2 series each",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "5 queriers, only 2 queriers have 2 time unsorted series each",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "2 queriers, only 2 queriers have 2 time unsorted series each, with 3 noop and one nil querier together",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
extraQueriers: []Querier{NoopQuerier(), NoopQuerier(), nil, NoopQuerier()},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "2 queriers, with 2 series, one is overlapping",
querierSeries: [][]Series{{}, {}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
}, {
NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 22}, sample{3, 32}}),
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}),
}, {}},
expected: NewMockSeriesSet(
NewListSeries(
labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}},
),
NewListSeries(
labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}},
),
),
},
{
name: "2 queries, one with NaN samples series",
querierSeries: [][]Series{{
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockSeriesSet(
NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}, sample{1, 1}}),
),
},
} {
merged := NewMergeSeriesSet(tc.input, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
testutil.Equals(t, drainSamples(expectedSeries.Iterator()), drainSamples(actualSeries.Iterator()))
}
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
t.Run(tc.name, func(t *testing.T) {
var qs []Querier
for _, in := range tc.querierSeries {
qs = append(qs, &mockQuerier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeQuerier(qs[0], qs, ChainedSeriesMerge).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator())
actSmpl, actErr := ExpandSamples(actualSeries.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expSmpl, actSmpl)
}
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestMergeIterator(t *testing.T) {
func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []sample
name string
chkQuerierSeries [][]ChunkSeries
extraQueriers []ChunkQuerier
expected ChunkSeriesSet
}{
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
},
expected: []sample{{0, 0}, {1, 1}},
name: "one querier with no series",
chkQuerierSeries: [][]ChunkSeries{{}},
expected: NewMockChunkSeriesSet(),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
name: "many queriers with no series",
chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}},
expected: NewMockChunkSeriesSet(),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
name: "one querier, two series",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{0, 0}, {2, 2}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
name: "two queriers, one different series each",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
name: "two queriers, two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "five queriers, only two have two not in time order series each",
chkQuerierSeries: [][]ChunkSeries{{}, {}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}, {}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "two queriers, with two not in time order series each, with 3 noop queries and one nil together",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
extraQueriers: []ChunkQuerier{NoopChunkedQuerier(), NoopChunkedQuerier(), nil, NoopChunkedQuerier()},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
name: "two queries, one with NaN samples series",
chkQuerierSeries: [][]ChunkSeries{{
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockChunkSeriesSet(
NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}, []tsdbutil.Sample{sample{1, 1}}),
),
},
} {
merged := newMergeIterator(tc.input)
actual := drainSamples(merged)
testutil.Equals(t, tc.expected, actual)
t.Run(tc.name, func(t *testing.T) {
var qs []ChunkQuerier
for _, in := range tc.chkQuerierSeries {
qs = append(qs, &mockChunkQurier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeChunkQuerier(qs[0], qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expChks, expErr := ExpandChunks(expectedSeries.Iterator())
actChks, actErr := ExpandChunks(actualSeries.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expChks, actChks)
}
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestMergeIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []sample
}{
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}, {2, 2}}),
},
seek: 1,
expected: []sample{{1, 1}, {2, 2}},
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}},
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}, {4, 4}, {5, 5}},
},
} {
merged := newMergeIterator(tc.input)
actual := []sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
actual = append(actual, drainSamples(merged)...)
testutil.Equals(t, tc.expected, actual)
}
type mockQuerier struct {
baseQuerier
toReturn []Series
}
func drainSamples(iter chunkenc.Iterator) []sample {
result := []sample{}
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, sample{t, v})
type seriesByLabel []Series
func (a seriesByLabel) Len() int { return len(a) }
func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (SeriesSet, Warnings, error) {
cpy := make([]Series, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(seriesByLabel(cpy))
}
return result
return NewMockSeriesSet(cpy...), nil, nil
}
type mockChunkQurier struct {
baseQuerier
toReturn []ChunkSeries
}
type chunkSeriesByLabel []ChunkSeries
func (a chunkSeriesByLabel) Len() int { return len(a) }
func (a chunkSeriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSeriesByLabel) Less(i, j int) bool {
return labels.Compare(a[i].Labels(), a[j].Labels()) < 0
}
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
cpy := make([]ChunkSeries, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(chunkSeriesByLabel(cpy))
}
return NewMockChunkSeriesSet(cpy...), nil, nil
}
type mockSeriesSet struct {
@ -222,7 +416,7 @@ type mockSeriesSet struct {
series []Series
}
func newMockSeriesSet(series ...Series) SeriesSet {
func NewMockSeriesSet(series ...Series) SeriesSet {
return &mockSeriesSet{
idx: -1,
series: series,
@ -234,41 +428,151 @@ func (m *mockSeriesSet) Next() bool {
return m.idx < len(m.series)
}
func (m *mockSeriesSet) At() Series {
return m.series[m.idx]
func (m *mockSeriesSet) At() Series { return m.series[m.idx] }
func (m *mockSeriesSet) Err() error { return nil }
type mockChunkSeriesSet struct {
idx int
series []ChunkSeries
}
func (m *mockSeriesSet) Err() error {
return nil
func NewMockChunkSeriesSet(series ...ChunkSeries) ChunkSeriesSet {
return &mockChunkSeriesSet{
idx: -1,
series: series,
}
}
var result []sample
func (m *mockChunkSeriesSet) Next() bool {
m.idx++
return m.idx < len(m.series)
}
func (m *mockChunkSeriesSet) At() ChunkSeries { return m.series[m.idx] }
func (m *mockChunkSeriesSet) Err() error { return nil }
func TestChainSampleIterator(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}),
},
expected: []tsdbutil.Sample{
sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
// Overlap.
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{2, 2}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{}),
NewListSeriesIterator([]tsdbutil.Sample{}),
NewListSeriesIterator([]tsdbutil.Sample{}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
} {
merged := newChainSampleIterator(tc.input)
actual, err := ExpandSamples(merged)
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, actual)
}
}
func TestChainSampleIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
},
seek: 1,
expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}),
NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
} {
merged := newChainSampleIterator(tc.input)
actual := []tsdbutil.Sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
s, err := ExpandSamples(merged)
testutil.Ok(t, err)
actual = append(actual, s...)
testutil.Equals(t, tc.expected, actual)
}
}
var result []tsdbutil.Sample
func makeSeriesSet(numSeries, numSamples int) SeriesSet {
series := []Series{}
for j := 0; j < numSeries; j++ {
labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}}
samples := []sample{}
samples := []tsdbutil.Sample{}
for k := 0; k < numSamples; k++ {
samples = append(samples, sample{t: int64(k), v: float64(k)})
}
series = append(series, newMockSeries(labels, samples))
series = append(series, NewListSeries(labels, samples))
}
return newMockSeriesSet(series...)
return NewMockSeriesSet(series...)
}
func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
seriesSets := []SeriesSet{}
seriesSets := []genericSeriesSet{}
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples))
seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)})
}
return NewMergeSeriesSet(seriesSets, nil)
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
var err error
for n := 0; n < b.N; n++ {
for seriesSet.Next() {
result = drainSamples(seriesSet.At().Iterator())
result, err = ExpandSamples(seriesSet.At().Iterator())
testutil.Ok(b, err)
}
}
}

133
storage/generic.go Normal file
View File

@ -0,0 +1,133 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file holds boilerplate adapters for generic MergeSeriesSet and MergeQuerier functions, so we can have one optimized
// solution that works for both ChunkSeriesSet as well as SeriesSet.
package storage
import "github.com/prometheus/prometheus/pkg/labels"
type genericQuerier interface {
baseQuerier
Select(bool, *SelectHints, ...*labels.Matcher) (genericSeriesSet, Warnings, error)
}
type genericSeriesSet interface {
Next() bool
At() Labels
Err() error
}
type genericSeriesMergeFunc func(...Labels) Labels
type genericSeriesSetAdapter struct {
SeriesSet
}
func (a *genericSeriesSetAdapter) At() Labels {
return a.SeriesSet.At()
}
type genericChunkSeriesSetAdapter struct {
ChunkSeriesSet
}
func (a *genericChunkSeriesSetAdapter) At() Labels {
return a.ChunkSeriesSet.At()
}
type genericQuerierAdapter struct {
baseQuerier
// One-of. If both are set, Querier will be used.
q Querier
cq ChunkQuerier
}
func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
if q.q != nil {
s, w, err := q.q.Select(sortSeries, hints, matchers...)
return &genericSeriesSetAdapter{s}, w, err
}
s, w, err := q.cq.Select(sortSeries, hints, matchers...)
return &genericChunkSeriesSetAdapter{s}, w, err
}
func newGenericQuerierFrom(q Querier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: q, q: q}
}
func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: cq, cq: cq}
}
type querierAdapter struct {
genericQuerier
}
type seriesSetAdapter struct {
genericSeriesSet
}
func (a *seriesSetAdapter) At() Series {
return a.genericSeriesSet.At().(Series)
}
func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &seriesSetAdapter{s}, w, err
}
type chunkQuerierAdapter struct {
genericQuerier
}
type chunkSeriesSetAdapter struct {
genericSeriesSet
}
func (a *chunkSeriesSetAdapter) At() ChunkSeries {
return a.genericSeriesSet.At().(ChunkSeries)
}
func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &chunkSeriesSetAdapter{s}, w, err
}
type seriesMergerAdapter struct {
VerticalSeriesMergeFunc
buf []Series
}
func (a *seriesMergerAdapter) Merge(s ...Labels) Labels {
a.buf = a.buf[:0]
for _, ser := range s {
a.buf = append(a.buf, ser.(Series))
}
return a.VerticalSeriesMergeFunc(a.buf...)
}
type chunkSeriesMergerAdapter struct {
VerticalChunkSeriesMergerFunc
buf []ChunkSeries
}
func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels {
a.buf = a.buf[:0]
for _, ser := range s {
a.buf = append(a.buf, ser.(ChunkSeries))
}
return a.VerticalChunkSeriesMergerFunc(a.buf...)
}

View File

@ -39,6 +39,7 @@ type Appendable interface {
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
// TODO(bwplotka): Add ChunkQueryable to Storage in next PR.
type Storage interface {
Queryable
Appendable
@ -51,19 +52,40 @@ type Storage interface {
}
// A Queryable handles queries against a storage.
// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL.
type Queryable interface {
// Querier returns a new Querier on the storage.
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
}
// Querier provides querying access over time series data of a fixed
// time range.
// Querier provides querying access over time series data of a fixed time range.
type Querier interface {
baseQuerier
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error)
}
// A ChunkQueryable handles queries against a storage.
// Use it when you need to have access to samples in encoded format.
type ChunkQueryable interface {
// ChunkQuerier returns a new ChunkQuerier on the storage.
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error)
}
// ChunkQuerier provides querying access over time series data of a fixed time range.
type ChunkQuerier interface {
baseQuerier
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error)
}
type baseQuerier interface {
// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
LabelValues(name string) ([]string, Warnings, error)
@ -149,19 +171,43 @@ func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }
// Series represents a single time series.
// Series exposes a single time series and allows iterating over samples.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() labels.Labels
Labels
SampleIteratable
}
// ChunkSeriesSet contains a set of chunked series.
type ChunkSeriesSet interface {
Next() bool
At() ChunkSeries
Err() error
}
// ChunkSeries exposes a single time series and allows iterating over chunks.
type ChunkSeries interface {
Labels
ChunkIteratable
}
// Labels represents an item that has labels e.g. time series.
type Labels interface {
// Labels returns the complete set of labels. For series it means all labels identifying the series.
Labels() labels.Labels
}
type SampleIteratable interface {
// Iterator returns a new iterator of the data of the series.
Iterator() chunkenc.Iterator
}
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
// actual series itself.
// TODO(bwplotka): Move it to Series like Iterator that iterates over chunks and avoiding loading all of them at once.
type ChunkSeriesSet interface {
type ChunkIteratable interface {
// ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series.
Iterator() chunks.Iterator
}
// TODO(bwplotka): Remove in next Pr.
type DeprecatedChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []chunks.Meta, tombstones.Intervals)
Err() error

View File

@ -28,7 +28,7 @@ func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Wa
return NoopSeriesSet(), nil, nil
}
func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) {
func (noopQuerier) LabelValues(string) ([]string, Warnings, error) {
return nil, nil, nil
}
@ -40,6 +40,29 @@ func (noopQuerier) Close() error {
return nil
}
type noopChunkQuerier struct{}
// NoopChunkedQuerier is a ChunkQuerier that does nothing.
func NoopChunkedQuerier() ChunkQuerier {
return noopChunkQuerier{}
}
func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
return NoopChunkedSeriesSet(), nil, nil
}
func (noopChunkQuerier) LabelValues(string) ([]string, Warnings, error) {
return nil, nil, nil
}
func (noopChunkQuerier) LabelNames() ([]string, Warnings, error) {
return nil, nil, nil
}
func (noopChunkQuerier) Close() error {
return nil
}
type noopSeriesSet struct{}
// NoopSeriesSet is a SeriesSet that does nothing.
@ -52,3 +75,16 @@ func (noopSeriesSet) Next() bool { return false }
func (noopSeriesSet) At() Series { return nil }
func (noopSeriesSet) Err() error { return nil }
type noopChunkedSeriesSet struct{}
// NoopChunkedSeriesSet is a ChunkSeriesSet that does nothing.
func NoopChunkedSeriesSet() ChunkSeriesSet {
return noopChunkedSeriesSet{}
}
func (noopChunkedSeriesSet) Next() bool { return false }
func (noopChunkedSeriesSet) At() ChunkSeries { return nil }
func (noopChunkedSeriesSet) Err() error { return nil }

View File

@ -145,7 +145,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
}
queriers = append(queriers, q)
}
return storage.NewMergeQuerier(nil, queriers), nil
return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil
}
// Appender implements storage.Storage.

138
storage/series.go Normal file
View File

@ -0,0 +1,138 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"sort"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type listSeriesIterator struct {
samples []tsdbutil.Sample
idx int
}
// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps.
func NewListSeriesIterator(samples []tsdbutil.Sample) chunkenc.Iterator {
return &listSeriesIterator{samples: samples, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.samples[it.idx]
return s.T(), s.V()
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.samples)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.samples)-it.idx, func(i int) bool {
s := it.samples[i+it.idx]
return s.T() >= t
})
return it.idx < len(it.samples)
}
func (it *listSeriesIterator) Err() error { return nil }
type listChunkSeriesIterator struct {
chks []chunks.Meta
idx int
}
// NewListChunkSeriesIterator returns listChunkSeriesIterator that allows to iterate over provided chunks. Does not handle overlaps.
func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
return &listChunkSeriesIterator{chks: chks, idx: -1}
}
func (it *listChunkSeriesIterator) At() chunks.Meta {
return it.chks[it.idx]
}
func (it *listChunkSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.chks)
}
func (it *listChunkSeriesIterator) Err() error { return nil }
type chunkSetToSeriesSet struct {
ChunkSeriesSet
chkIterErr error
sameSeriesChunks []Series
bufIterator chunkenc.Iterator
}
// NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one.
func NewSeriesSetFromChunkSeriesSet(chk ChunkSeriesSet) SeriesSet {
return &chunkSetToSeriesSet{ChunkSeriesSet: chk}
}
func (c *chunkSetToSeriesSet) Next() bool {
if c.Err() != nil || !c.ChunkSeriesSet.Next() {
return false
}
iter := c.ChunkSeriesSet.At().Iterator()
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeries{
labels: c.ChunkSeriesSet.At().Labels(),
chk: iter.At(),
buf: c.bufIterator,
})
}
if iter.Err() != nil {
c.chkIterErr = iter.Err()
return false
}
return true
}
func (c *chunkSetToSeriesSet) At() Series {
// Series composed of same chunks for the same series.
return ChainedSeriesMerge(c.sameSeriesChunks...)
}
func (c *chunkSetToSeriesSet) Err() error {
if c.chkIterErr != nil {
return c.chkIterErr
}
return c.ChunkSeriesSet.Err()
}
type chunkToSeries struct {
labels labels.Labels
chk chunks.Meta
buf chunkenc.Iterator
}
func (s *chunkToSeries) Labels() labels.Labels { return s.labels }
func (s *chunkToSeries) Iterator() chunkenc.Iterator { return s.chk.Chunk.Iterator(s.buf) }

84
storage/series_test.go Normal file
View File

@ -0,0 +1,84 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"math"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type MockSeries struct {
labels labels.Labels
SampleIteratorFn func() chunkenc.Iterator
}
func NewListSeries(lset labels.Labels, samples []tsdbutil.Sample) *MockSeries {
return &MockSeries{
labels: lset,
SampleIteratorFn: func() chunkenc.Iterator {
return NewListSeriesIterator(samples)
},
}
}
func (s *MockSeries) Labels() labels.Labels { return s.labels }
func (s *MockSeries) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
type MockChunkSeries struct {
labels labels.Labels
ChunkIteratorFn func() chunks.Iterator
}
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *MockChunkSeries {
var chks []chunks.Meta
return &MockChunkSeries{
labels: lset,
ChunkIteratorFn: func() chunks.Iterator {
// Inefficient chunks encoding implementation, not caring about chunk size.
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
}
return NewListChunkSeriesIterator(chks...)
},
}
}
func (s *MockChunkSeries) Labels() labels.Labels { return s.labels }
func (s *MockChunkSeries) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
func ExpandSamples(iter chunkenc.Iterator) ([]tsdbutil.Sample, error) {
var result []tsdbutil.Sample
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, sample{t, v})
}
return result, iter.Err()
}
func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
var result []chunks.Meta
for iter.Next() {
result = append(result, iter.At())
}
return result, iter.Err()
}

View File

@ -111,10 +111,10 @@ func testChunk(t *testing.T, c Chunk) {
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
var (
t = int64(1234123324)
v = 1243535.123
t = int64(1234123324)
v = 1243535.123
exp []pair
)
var exp []pair
for i := 0; i < b.N; i++ {
// t += int64(rand.Intn(10000) + 1)
t += int64(1000)
@ -146,7 +146,7 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
b.ReportAllocs()
b.ResetTimer()
fmt.Println("num", b.N, "created chunks", len(chunks))
b.Log("num", b.N, "created chunks", len(chunks))
res := make([]float64, 0, 1024)

View File

@ -127,6 +127,7 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator {
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
t: math.MinInt64,
}
}

View File

@ -67,6 +67,17 @@ type Meta struct {
MinTime, MaxTime int64
}
// Iterator iterates over the chunk of a time series.
type Iterator interface {
// At returns the current meta.
// It depends on implementation if the chunk is populated or not.
At() Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}
// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
buf = append(buf[:0], byte(cm.Chunk.Encoding()))

View File

@ -648,7 +648,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
var (
set storage.ChunkSeriesSet
set storage.DeprecatedChunkSeriesSet
symbols index.StringIter
closers = []io.Closer{}
overlapping bool
@ -915,7 +915,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Int
}
type compactionMerger struct {
a, b storage.ChunkSeriesSet
a, b storage.DeprecatedChunkSeriesSet
aok, bok bool
l labels.Labels
@ -923,7 +923,8 @@ type compactionMerger struct {
intervals tombstones.Intervals
}
func newCompactionMerger(a, b storage.ChunkSeriesSet) (*compactionMerger, error) {
// TODO(bwplotka): Move to storage mergers.
func newCompactionMerger(a, b storage.DeprecatedChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,
b: b,

View File

@ -190,7 +190,7 @@ type blockQuerier struct {
}
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
var base storage.ChunkSeriesSet
var base storage.DeprecatedChunkSeriesSet
var err error
if sortSeries {
@ -670,17 +670,17 @@ type baseChunkSeries struct {
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) {
return lookupChunkSeries(false, ir, tr, ms...)
}
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) {
return lookupChunkSeries(true, ir, tr, ms...)
}
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) {
if tr == nil {
tr = tombstones.NewMemTombstones()
}
@ -754,7 +754,7 @@ func (s *baseChunkSeries) Next() bool {
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
set storage.ChunkSeriesSet
set storage.DeprecatedChunkSeriesSet
chunks ChunkReader
mint, maxt int64
@ -822,7 +822,7 @@ func (s *populatedChunkSeries) Next() bool {
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
set storage.ChunkSeriesSet
set storage.DeprecatedChunkSeriesSet
err error
cur storage.Series

View File

@ -15,25 +15,13 @@ package tsdbutil
import (
"math"
)
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
it SeriesIterator
it chunkenc.Iterator
buf *sampleRing
lastTime int64
@ -41,7 +29,7 @@ type BufferedSeriesIterator struct {
// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
func NewBuffer(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
buf: newSampleRing(delta, 16),
@ -56,7 +44,7 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
}
// Buffer returns an iterator over the buffered data.
func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
func (b *BufferedSeriesIterator) Buffer() chunkenc.Iterator {
return b.buf.iterator()
}
@ -145,7 +133,7 @@ func (r *sampleRing) reset() {
r.f = 0
}
func (r *sampleRing) iterator() SeriesIterator {
func (r *sampleRing) iterator() chunkenc.Iterator {
return &sampleRingIterator{r: r, i: -1}
}

View File

@ -539,7 +539,7 @@ func (api *API) series(r *http.Request) apiFuncResult {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, nil)
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
metrics := []labels.Labels{}
for set.Next() {
metrics = append(metrics, set.At().Labels())

View File

@ -98,7 +98,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, nil)
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
for set.Next() {
s := set.At()