diff --git a/storage/buffer.go b/storage/buffer.go index 318d16b24..feca1d91e 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -136,6 +136,14 @@ type sample struct { v float64 } +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + type sampleRing struct { delta int64 diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 4ec4591a4..dc69568f1 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -15,11 +15,9 @@ package storage import ( "math/rand" - "sort" "testing" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -107,15 +105,15 @@ func TestBufferedSeriesIterator(t *testing.T) { testutil.Equals(t, ev, v, "value mismatch") } - it = NewBufferIterator(newListSeriesIterator([]sample{ - {t: 1, v: 2}, - {t: 2, v: 3}, - {t: 3, v: 4}, - {t: 4, v: 5}, - {t: 5, v: 6}, - {t: 99, v: 8}, - {t: 100, v: 9}, - {t: 101, v: 10}, + it = NewBufferIterator(NewListSeriesIterator([]tsdbutil.Sample{ + sample{t: 1, v: 2}, + sample{t: 2, v: 3}, + sample{t: 3, v: 4}, + sample{t: 4, v: 5}, + sample{t: 5, v: 6}, + sample{t: 99, v: 8}, + sample{t: 100, v: 9}, + sample{t: 101, v: 10}, }), 2) testutil.Assert(t, it.Seek(-123), "seek failed") @@ -189,61 +187,6 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() bool { return m.next() } func (m *mockSeriesIterator) Err() error { return m.err() } -type mockSeries struct { - labels func() labels.Labels - iterator func() chunkenc.Iterator -} - -func newMockSeries(lset labels.Labels, samples []sample) Series { - return &mockSeries{ - labels: func() labels.Labels { - return lset - }, - iterator: func() chunkenc.Iterator { - return newListSeriesIterator(samples) - }, - } -} - -func (m *mockSeries) Labels() labels.Labels { return m.labels() } -func (m *mockSeries) Iterator() chunkenc.Iterator { return m.iterator() } - -type listSeriesIterator struct { - list []sample - idx int -} - -func newListSeriesIterator(list []sample) *listSeriesIterator { - return &listSeriesIterator{list: list, idx: -1} -} - -func (it *listSeriesIterator) At() (int64, float64) { - s := it.list[it.idx] - return s.t, s.v -} - -func (it *listSeriesIterator) Next() bool { - it.idx++ - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Seek(t int64) bool { - if it.idx == -1 { - it.idx = 0 - } - // Do binary search between current position and end. - it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool { - s := it.list[i+it.idx] - return s.t >= t - }) - - return it.idx < len(it.list) -} - -func (it *listSeriesIterator) Err() error { - return nil -} - type fakeSeriesIterator struct { nsamples int64 step int64 @@ -268,6 +211,4 @@ func (it *fakeSeriesIterator) Seek(t int64) bool { return it.idx < it.nsamples } -func (it *fakeSeriesIterator) Err() error { - return nil -} +func (it *fakeSeriesIterator) Err() error { return nil } diff --git a/storage/fanout.go b/storage/fanout.go index 2366fb272..94a4acb3d 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -25,6 +25,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" ) type fanout struct { @@ -68,24 +70,27 @@ func (f *fanout) StartTime() (int64, error) { func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { queriers := make([]Querier, 0, 1+len(f.secondaries)) - // Add primary querier + // Add primary querier. primaryQuerier, err := f.primary.Querier(ctx, mint, maxt) if err != nil { return nil, err } queriers = append(queriers, primaryQuerier) - // Add secondary queriers + // Add secondary queriers. for _, storage := range f.secondaries { querier, err := storage.Querier(ctx, mint, maxt) if err != nil { - NewMergeQuerier(primaryQuerier, queriers).Close() + for _, q := range queriers { + // TODO(bwplotka): Log error. + _ = q.Close() + } return nil, err } queriers = append(queriers, querier) } - return NewMergeQuerier(primaryQuerier, queriers), nil + return NewMergeQuerier(primaryQuerier, queriers, ChainedSeriesMerge), nil } func (f *fanout) Appender() Appender { @@ -181,66 +186,96 @@ func (f *fanoutAppender) Rollback() (err error) { return nil } -// mergeQuerier implements Querier. -type mergeQuerier struct { - primaryQuerier Querier - queriers []Querier +type mergeGenericQuerier struct { + mergeFunc genericSeriesMergeFunc - failedQueriers map[Querier]struct{} - setQuerierMap map[SeriesSet]Querier + primaryQuerier genericQuerier + queriers []genericQuerier + failedQueriers map[genericQuerier]struct{} + setQuerierMap map[genericSeriesSet]genericQuerier } -// NewMergeQuerier returns a new Querier that merges results of input queriers. -// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it, +// NewMergeQuerier returns a new Querier that merges results of chkQuerierSeries queriers. +// NewMergeQuerier will return NoopQuerier if no queriers are passed to it // and will filter NoopQueriers from its arguments, in order to reduce overhead // when only one querier is passed. -func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { - filtered := make([]Querier, 0, len(queriers)) +// The difference between primary and secondary is as follows: f the primaryQuerier returns an error, query fails. +// For secondaries it just return warnings. +func NewMergeQuerier(primaryQuerier Querier, queriers []Querier, mergeFunc VerticalSeriesMergeFunc) Querier { + filtered := make([]genericQuerier, 0, len(queriers)) for _, querier := range queriers { - if querier != NoopQuerier() { - filtered = append(filtered, querier) + if _, ok := querier.(noopQuerier); !ok && querier != nil { + filtered = append(filtered, newGenericQuerierFrom(querier)) } } - setQuerierMap := make(map[SeriesSet]Querier) - failedQueriers := make(map[Querier]struct{}) + if len(filtered) == 0 { + return primaryQuerier + } - switch len(filtered) { - case 0: - return NoopQuerier() - case 1: - return filtered[0] - default: - return &mergeQuerier{ - primaryQuerier: primaryQuerier, - queriers: filtered, - failedQueriers: failedQueriers, - setQuerierMap: setQuerierMap, + if primaryQuerier == nil && len(filtered) == 1 { + return &querierAdapter{filtered[0]} + } + + return &querierAdapter{&mergeGenericQuerier{ + mergeFunc: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge, + primaryQuerier: newGenericQuerierFrom(primaryQuerier), + queriers: filtered, + failedQueriers: make(map[genericQuerier]struct{}), + setQuerierMap: make(map[genericSeriesSet]genericQuerier), + }} +} + +// NewMergeChunkQuerier returns a new ChunkQuerier that merges results of chkQuerierSeries chunk queriers. +// NewMergeChunkQuerier will return NoopChunkQuerier if no chunk queriers are passed to it, +// and will filter NoopQuerieNoopChunkQuerierrs from its arguments, in order to reduce overhead +// when only one chunk querier is passed. +func NewMergeChunkQuerier(primaryQuerier ChunkQuerier, queriers []ChunkQuerier, merger VerticalChunkSeriesMergerFunc) ChunkQuerier { + filtered := make([]genericQuerier, 0, len(queriers)) + for _, querier := range queriers { + if _, ok := querier.(noopChunkQuerier); !ok && querier != nil { + filtered = append(filtered, newGenericQuerierFromChunk(querier)) } } + + if len(filtered) == 0 { + return primaryQuerier + } + + if primaryQuerier == nil && len(filtered) == 1 { + return &chunkQuerierAdapter{filtered[0]} + } + + return &chunkQuerierAdapter{&mergeGenericQuerier{ + mergeFunc: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge, + primaryQuerier: newGenericQuerierFromChunk(primaryQuerier), + queriers: filtered, + failedQueriers: make(map[genericQuerier]struct{}), + setQuerierMap: make(map[genericSeriesSet]genericQuerier), + }} } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { +func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) { if len(q.queriers) == 1 { return q.queriers[0].Select(sortSeries, hints, matchers...) } var ( - seriesSets = make([]SeriesSet, 0, len(q.queriers)) + seriesSets = make([]genericSeriesSet, 0, len(q.queriers)) warnings Warnings priErr error ) type queryResult struct { - qr Querier - set SeriesSet + qr genericQuerier + set genericSeriesSet wrn Warnings selectError error } queryResultChan := make(chan *queryResult) for _, querier := range q.queriers { - go func(qr Querier) { + go func(qr genericQuerier) { // We need to sort for NewMergeSeriesSet to work. set, wrn, err := qr.Select(true, hints, matchers...) queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} @@ -267,16 +302,15 @@ func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...* if priErr != nil { return nil, nil, priErr } - return NewMergeSeriesSet(seriesSets, q), warnings, nil + return newGenericMergeSeriesSet(seriesSets, q, q.mergeFunc), warnings, nil } // LabelValues returns all potential values for a label name. -func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) { +func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) { var results [][]string var warnings Warnings for _, querier := range q.queriers { values, wrn, err := querier.LabelValues(name) - if wrn != nil { warnings = append(warnings, wrn...) } @@ -295,7 +329,7 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) { return mergeStringSlices(results), warnings, nil } -func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool { +func (q *mergeGenericQuerier) IsFailedSet(set genericSeriesSet) bool { _, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]] return isFailedQuerier } @@ -340,18 +374,19 @@ func mergeTwoStringSlices(a, b []string) []string { } // LabelNames returns all the unique label names present in the block in sorted order. -func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) { +func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) { labelNamesMap := make(map[string]struct{}) var warnings Warnings - for _, b := range q.queriers { - names, wrn, err := b.LabelNames() + for _, querier := range q.queriers { + names, wrn, err := querier.LabelNames() if wrn != nil { warnings = append(warnings, wrn...) } if err != nil { - // If the error source isn't the primary querier, return the error as a warning and continue. - if b != q.primaryQuerier { + q.failedQueriers[querier] = struct{}{} + // If the error source isn't the primaryQuerier querier, return the error as a warning and continue. + if querier != q.primaryQuerier { warnings = append(warnings, err) continue } else { @@ -374,39 +409,70 @@ func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) { } // Close releases the resources of the Querier. -func (q *mergeQuerier) Close() error { - // TODO return multiple errors? - var lastErr error +func (q *mergeGenericQuerier) Close() error { + var errs tsdb_errors.MultiError for _, querier := range q.queriers { if err := querier.Close(); err != nil { - lastErr = err + errs.Add(err) } } - return lastErr + return errs.Err() } -// mergeSeriesSet implements SeriesSet -type mergeSeriesSet struct { +// genericMergeSeriesSet implements genericSeriesSet +type genericMergeSeriesSet struct { currentLabels labels.Labels - currentSets []SeriesSet - heap seriesSetHeap - sets []SeriesSet + mergeFunc genericSeriesMergeFunc - querier *mergeQuerier + heap genericSeriesSetHeap + sets []genericSeriesSet + + currentSets []genericSeriesSet + querier *mergeGenericQuerier } -// NewMergeSeriesSet returns a new series set that merges (deduplicates) -// series returned by the input series sets when iterating. -// Each input series set must return its series in labels order, otherwise +// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together. +// It has to handle time-overlapped series as well. +type VerticalSeriesMergeFunc func(...Series) Series + +// VerticalChunkSeriesMergerFunc returns merged chunk series implementation that merges series with same labels together. +// It has to handle time-overlapped chunk series as well. +type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries + +// NewMergeSeriesSet returns a new SeriesSet that merges results of chkQuerierSeries SeriesSets. +func NewMergeSeriesSet(sets []SeriesSet, merger VerticalSeriesMergeFunc) SeriesSet { + genericSets := make([]genericSeriesSet, 0, len(sets)) + for _, s := range sets { + genericSets = append(genericSets, &genericSeriesSetAdapter{s}) + + } + return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: merger}).Merge)} +} + +// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of chkQuerierSeries ChunkSeriesSets. +func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMergerFunc) ChunkSeriesSet { + genericSets := make([]genericSeriesSet, 0, len(sets)) + for _, s := range sets { + genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s}) + + } + return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergerFunc: merger}).Merge)} +} + +// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates) +// series returned by the chkQuerierSeries series sets when iterating. +// Each chkQuerierSeries series set must return its series in labels order, otherwise // merged series set will be incorrect. -func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet { +// Argument 'querier' is optional and can be nil. Pass Querier if you want to retry query in case of failing series set. +// Overlapped situations are merged using provided mergeFunc. +func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuerier, mergeFunc genericSeriesMergeFunc) genericSeriesSet { if len(sets) == 1 { return sets[0] } // Sets need to be pre-advanced, so we can introspect the label of the // series under the cursor. - var h seriesSetHeap + var h genericSeriesSetHeap for _, set := range sets { if set == nil { continue @@ -415,14 +481,15 @@ func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet { heap.Push(&h, set) } } - return &mergeSeriesSet{ - heap: h, - sets: sets, - querier: querier, + return &genericMergeSeriesSet{ + mergeFunc: mergeFunc, + heap: h, + sets: sets, + querier: querier, } } -func (c *mergeSeriesSet) Next() bool { +func (c *genericMergeSeriesSet) Next() bool { // Run in a loop because the "next" series sets may not be valid anymore. // If a remote querier fails, we discard all series sets from that querier. // If, for the current label set, all the next series sets come from @@ -443,7 +510,7 @@ func (c *mergeSeriesSet) Next() bool { c.currentSets = nil c.currentLabels = c.heap[0].At().Labels() for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { - set := heap.Pop(&c.heap).(SeriesSet) + set := heap.Pop(&c.heap).(genericSeriesSet) if c.querier != nil && c.querier.IsFailedSet(set) { continue } @@ -459,21 +526,18 @@ func (c *mergeSeriesSet) Next() bool { return true } -func (c *mergeSeriesSet) At() Series { +func (c *genericMergeSeriesSet) At() Labels { if len(c.currentSets) == 1 { return c.currentSets[0].At() } - series := []Series{} + series := make([]Labels, 0, len(c.currentSets)) for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) } - return &mergeSeries{ - labels: c.currentLabels, - series: series, - } + return c.mergeFunc(series...) } -func (c *mergeSeriesSet) Err() error { +func (c *genericMergeSeriesSet) Err() error { for _, set := range c.sets { if err := set.Err(); err != nil { return err @@ -482,21 +546,21 @@ func (c *mergeSeriesSet) Err() error { return nil } -type seriesSetHeap []SeriesSet +type genericSeriesSetHeap []genericSeriesSet -func (h seriesSetHeap) Len() int { return len(h) } -func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h genericSeriesSetHeap) Len() int { return len(h) } +func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h seriesSetHeap) Less(i, j int) bool { +func (h genericSeriesSetHeap) Less(i, j int) bool { a, b := h[i].At().Labels(), h[j].At().Labels() return labels.Compare(a, b) < 0 } -func (h *seriesSetHeap) Push(x interface{}) { - *h = append(*h, x.(SeriesSet)) +func (h *genericSeriesSetHeap) Push(x interface{}) { + *h = append(*h, x.(genericSeriesSet)) } -func (h *seriesSetHeap) Pop() interface{} { +func (h *genericSeriesSetHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] @@ -504,37 +568,53 @@ func (h *seriesSetHeap) Pop() interface{} { return x } -type mergeSeries struct { +// ChainedSeriesMerge returns single series from many same series by chaining samples together. +// In case of the timestamp overlap, the first overlapped sample is kept and the rest samples with the same timestamps +// are dropped. We expect the same labels for each given series. +// TODO(bwplotka): This has the same logic as tsdb.verticalChainedSeries. Remove this in favor of ChainedSeriesMerge in next PRs. +func ChainedSeriesMerge(s ...Series) Series { + if len(s) == 0 { + return nil + } + return &chainSeries{ + labels: s[0].Labels(), + series: s, + } +} + +type chainSeries struct { labels labels.Labels series []Series } -func (m *mergeSeries) Labels() labels.Labels { +func (m *chainSeries) Labels() labels.Labels { return m.labels } -func (m *mergeSeries) Iterator() chunkenc.Iterator { +func (m *chainSeries) Iterator() chunkenc.Iterator { iterators := make([]chunkenc.Iterator, 0, len(m.series)) for _, s := range m.series { iterators = append(iterators, s.Iterator()) } - return newMergeIterator(iterators) + return newChainSampleIterator(iterators) } -type mergeIterator struct { +// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series. +// If one or more samples overlap, the first one is kept and all others with the same timestamp are dropped. +type chainSampleIterator struct { iterators []chunkenc.Iterator - h seriesIteratorHeap + h samplesIteratorHeap } -func newMergeIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { - return &mergeIterator{ +func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { + return &chainSampleIterator{ iterators: iterators, h: nil, } } -func (c *mergeIterator) Seek(t int64) bool { - c.h = seriesIteratorHeap{} +func (c *chainSampleIterator) Seek(t int64) bool { + c.h = samplesIteratorHeap{} for _, iter := range c.iterators { if iter.Seek(t) { heap.Push(&c.h, iter) @@ -543,15 +623,15 @@ func (c *mergeIterator) Seek(t int64) bool { return len(c.h) > 0 } -func (c *mergeIterator) At() (t int64, v float64) { +func (c *chainSampleIterator) At() (t int64, v float64) { if len(c.h) == 0 { - panic("mergeIterator.At() called after .Next() returned false.") + panic("chainSampleIterator.At() called after .Next() returned false.") } return c.h[0].At() } -func (c *mergeIterator) Next() bool { +func (c *chainSampleIterator) Next() bool { if c.h == nil { for _, iter := range c.iterators { if iter.Next() { @@ -569,6 +649,7 @@ func (c *mergeIterator) Next() bool { currt, _ := c.At() for len(c.h) > 0 { nextt, _ := c.h[0].At() + // All but one of the overlapping samples will be dropped. if nextt != currt { break } @@ -582,7 +663,7 @@ func (c *mergeIterator) Next() bool { return len(c.h) > 0 } -func (c *mergeIterator) Err() error { +func (c *chainSampleIterator) Err() error { for _, iter := range c.iterators { if err := iter.Err(); err != nil { return err @@ -591,22 +672,163 @@ func (c *mergeIterator) Err() error { return nil } -type seriesIteratorHeap []chunkenc.Iterator +type samplesIteratorHeap []chunkenc.Iterator -func (h seriesIteratorHeap) Len() int { return len(h) } -func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h samplesIteratorHeap) Len() int { return len(h) } +func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h seriesIteratorHeap) Less(i, j int) bool { +func (h samplesIteratorHeap) Less(i, j int) bool { at, _ := h[i].At() bt, _ := h[j].At() return at < bt } -func (h *seriesIteratorHeap) Push(x interface{}) { +func (h *samplesIteratorHeap) Push(x interface{}) { *h = append(*h, x.(chunkenc.Iterator)) } -func (h *seriesIteratorHeap) Pop() interface{} { +func (h *samplesIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks. +// Passed chunks: +// * have to be sorted by MinTime. +// * have to be part of exactly the same timeseries. +// * have to be populated. +type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator + +type verticalChunkSeriesMerger struct { + verticalChunksMerger VerticalChunksMergeFunc + + labels labels.Labels + series []ChunkSeries +} + +// NewVerticalChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one or more chunks. +// In case of the chunk overlap, given VerticalChunkMergeFunc will be used. +// It expects the same labels for each given series. +func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMergerFunc { + return func(s ...ChunkSeries) ChunkSeries { + if len(s) == 0 { + return nil + } + return &verticalChunkSeriesMerger{ + verticalChunksMerger: chunkMerger, + labels: s[0].Labels(), + series: s, + } + } +} + +func (s *verticalChunkSeriesMerger) Labels() labels.Labels { + return s.labels +} + +func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator { + iterators := make([]chunks.Iterator, 0, len(s.series)) + for _, series := range s.series { + iterators = append(iterators, series.Iterator()) + } + return &chainChunkIterator{ + overlappedChunksMerger: s.verticalChunksMerger, + iterators: iterators, + h: nil, + } +} + +// chainChunkIterator is responsible to chain chunks from different iterators of same time series. +// If they are time overlapping overlappedChunksMerger will be used. +type chainChunkIterator struct { + overlappedChunksMerger VerticalChunksMergeFunc + + iterators []chunks.Iterator + h chunkIteratorHeap +} + +func (c *chainChunkIterator) At() chunks.Meta { + if len(c.h) == 0 { + panic("chainChunkIterator.At() called after .Next() returned false.") + } + + return c.h[0].At() +} + +func (c *chainChunkIterator) Next() bool { + if c.h == nil { + for _, iter := range c.iterators { + if iter.Next() { + heap.Push(&c.h, iter) + } + } + + return len(c.h) > 0 + } + + if len(c.h) == 0 { + return false + } + + // Detect the shortest chain of time-overlapped chunks. + last := c.At() + var overlapped []chunks.Meta + for { + iter := heap.Pop(&c.h).(chunks.Iterator) + if iter.Next() { + heap.Push(&c.h, iter) + } + + if len(c.h) == 0 { + break + } + + next := c.At() + if next.MinTime > last.MaxTime { + // No overlap with last one. + break + } + overlapped = append(overlapped, last) + last = next + } + if len(overlapped) > 0 { + heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.At())...)) + return true + } + return len(c.h) > 0 +} + +func (c *chainChunkIterator) Err() error { + for _, iter := range c.iterators { + if err := iter.Err(); err != nil { + return err + } + } + return nil +} + +type chunkIteratorHeap []chunks.Iterator + +func (h chunkIteratorHeap) Len() int { return len(h) } +func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h chunkIteratorHeap) Less(i, j int) bool { + at := h[i].At() + bt := h[j].At() + if at.MinTime == bt.MinTime { + return at.MaxTime < bt.MaxTime + } + return at.MinTime < bt.MinTime +} + +func (h *chunkIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(chunks.Iterator)) +} + +func (h *chunkIteratorHeap) Pop() interface{} { old := *h n := len(old) x := old[n-1] diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 97ffcddb2..cc839b6ed 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -16,10 +16,12 @@ package storage import ( "fmt" "math" + "sort" "testing" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/testutil" ) @@ -52,169 +54,361 @@ func TestMergeTwoStringSlices(t *testing.T) { } } -func TestMergeSeriesSet(t *testing.T) { +func TestMergeQuerierWithChainMerger(t *testing.T) { for _, tc := range []struct { - input []SeriesSet + name string + querierSeries [][]Series + extraQueriers []Querier + expected SeriesSet }{ { - input: []SeriesSet{newMockSeriesSet()}, - expected: newMockSeriesSet(), + name: "1 querier with no series", + querierSeries: [][]Series{{}}, + expected: NewMockSeriesSet(), }, - { - input: []SeriesSet{newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), - )}, - expected: newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), - ), + name: "many queriers with no series", + querierSeries: [][]Series{{}, {}, {}, {}, {}, {}, {}}, + expected: NewMockSeriesSet(), }, - { - input: []SeriesSet{newMockSeriesSet( - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), - ), newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), - )}, - expected: newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), - ), - }, - - { - input: []SeriesSet{newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), - ), newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{3, 3}, {4, 4}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{2, 2}, {3, 3}}), - )}, - expected: newMockSeriesSet( - newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}}), - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}), + name: "1 querier, two series", + querierSeries: [][]Series{{ + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }}, + expected: NewMockSeriesSet( + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), ), }, { - input: []SeriesSet{newMockSeriesSet( - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}), - ), newMockSeriesSet( - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}), - )}, - expected: newMockSeriesSet( - newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}), + name: "2 queriers, 1 different series each", + querierSeries: [][]Series{{ + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + }, { + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }}, + expected: NewMockSeriesSet( + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + ), + }, + { + name: "2 time unsorted queriers, 2 series each", + querierSeries: [][]Series{{ + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}), + }}, + expected: NewMockSeriesSet( + NewListSeries( + labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}}, + ), + NewListSeries( + labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}}, + ), + ), + }, + { + name: "5 queriers, only 2 queriers have 2 time unsorted series each", + querierSeries: [][]Series{{}, {}, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}), + }, {}}, + expected: NewMockSeriesSet( + NewListSeries( + labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}}, + ), + NewListSeries( + labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}}, + ), + ), + }, + { + name: "2 queriers, only 2 queriers have 2 time unsorted series each, with 3 noop and one nil querier together", + querierSeries: [][]Series{{}, {}, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}, sample{6, 6}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}), + }, {}}, + extraQueriers: []Querier{NoopQuerier(), NoopQuerier(), nil, NoopQuerier()}, + expected: NewMockSeriesSet( + NewListSeries( + labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}, sample{6, 6}}, + ), + NewListSeries( + labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}}, + ), + ), + }, + { + name: "2 queriers, with 2 series, one is overlapping", + querierSeries: [][]Series{{}, {}, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }, { + NewListSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 22}, sample{3, 32}}), + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}, sample{4, 4}}), + }, {}}, + expected: NewMockSeriesSet( + NewListSeries( + labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 21}, sample{3, 31}, sample{5, 5}, sample{6, 6}}, + ), + NewListSeries( + labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}}, + ), + ), + }, + { + name: "2 queries, one with NaN samples series", + querierSeries: [][]Series{{ + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}), + }, { + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}), + }}, + expected: NewMockSeriesSet( + NewListSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}, sample{1, 1}}), ), }, } { - merged := NewMergeSeriesSet(tc.input, nil) - for merged.Next() { - testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") - actualSeries := merged.At() - expectedSeries := tc.expected.At() - testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels()) - testutil.Equals(t, drainSamples(expectedSeries.Iterator()), drainSamples(actualSeries.Iterator())) - } - testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false") + t.Run(tc.name, func(t *testing.T) { + var qs []Querier + for _, in := range tc.querierSeries { + qs = append(qs, &mockQuerier{toReturn: in}) + } + qs = append(qs, tc.extraQueriers...) + + merged, _, _ := NewMergeQuerier(qs[0], qs, ChainedSeriesMerge).Select(false, nil) + for merged.Next() { + testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") + actualSeries := merged.At() + expectedSeries := tc.expected.At() + testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels()) + + expSmpl, expErr := ExpandSamples(expectedSeries.Iterator()) + actSmpl, actErr := ExpandSamples(actualSeries.Iterator()) + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expSmpl, actSmpl) + } + testutil.Ok(t, merged.Err()) + testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false") + }) } } -func TestMergeIterator(t *testing.T) { +func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { for _, tc := range []struct { - input []chunkenc.Iterator - expected []sample + name string + chkQuerierSeries [][]ChunkSeries + extraQueriers []ChunkQuerier + + expected ChunkSeriesSet }{ { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {1, 1}}), - }, - expected: []sample{{0, 0}, {1, 1}}, + name: "one querier with no series", + chkQuerierSeries: [][]ChunkSeries{{}}, + expected: NewMockChunkSeriesSet(), }, { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {1, 1}}), - newListSeriesIterator([]sample{{2, 2}, {3, 3}}), - }, - expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}, + name: "many queriers with no series", + chkQuerierSeries: [][]ChunkSeries{{}, {}, {}, {}, {}, {}, {}}, + expected: NewMockChunkSeriesSet(), }, { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {3, 3}}), - newListSeriesIterator([]sample{{1, 1}, {4, 4}}), - newListSeriesIterator([]sample{{2, 2}, {5, 5}}), - }, - expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, + name: "one querier, two series", + chkQuerierSeries: [][]ChunkSeries{{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + }}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + ), }, { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {1, 1}}), - newListSeriesIterator([]sample{{0, 0}, {2, 2}}), - newListSeriesIterator([]sample{{2, 2}, {3, 3}}), - }, - expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}, + name: "two queriers, one different series each", + chkQuerierSeries: [][]ChunkSeries{{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + }, { + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + }}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + ), + }, + { + name: "two queriers, two not in time order series each", + chkQuerierSeries: [][]ChunkSeries{{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + }, { + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}), + }}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{5, 5}}, + []tsdbutil.Sample{sample{6, 6}}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, + []tsdbutil.Sample{sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{4, 4}}, + ), + ), + }, + { + name: "five queriers, only two have two not in time order series each", + chkQuerierSeries: [][]ChunkSeries{{}, {}, { + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + }, { + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}), + }, {}}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{5, 5}}, + []tsdbutil.Sample{sample{6, 6}}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, + []tsdbutil.Sample{sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{4, 4}}, + ), + ), + }, + { + name: "two queriers, with two not in time order series each, with 3 noop queries and one nil together", + chkQuerierSeries: [][]ChunkSeries{{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}), + }, { + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}), + }}, + extraQueriers: []ChunkQuerier{NoopChunkedQuerier(), NoopChunkedQuerier(), nil, NoopChunkedQuerier()}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{5, 5}}, + []tsdbutil.Sample{sample{6, 6}}, + ), + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), + []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, + []tsdbutil.Sample{sample{2, 2}}, + []tsdbutil.Sample{sample{3, 3}}, + []tsdbutil.Sample{sample{4, 4}}, + ), + ), + }, + { + name: "two queries, one with NaN samples series", + chkQuerierSeries: [][]ChunkSeries{{ + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}), + }, { + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}), + }}, + expected: NewMockChunkSeriesSet( + NewListChunkSeriesFromSamples(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}, []tsdbutil.Sample{sample{1, 1}}), + ), }, } { - merged := newMergeIterator(tc.input) - actual := drainSamples(merged) - testutil.Equals(t, tc.expected, actual) + t.Run(tc.name, func(t *testing.T) { + var qs []ChunkQuerier + for _, in := range tc.chkQuerierSeries { + qs = append(qs, &mockChunkQurier{toReturn: in}) + } + qs = append(qs, tc.extraQueriers...) + + merged, _, _ := NewMergeChunkQuerier(qs[0], qs, NewVerticalChunkSeriesMerger(nil)).Select(false, nil) + for merged.Next() { + testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true") + actualSeries := merged.At() + expectedSeries := tc.expected.At() + testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels()) + + expChks, expErr := ExpandChunks(expectedSeries.Iterator()) + actChks, actErr := ExpandChunks(actualSeries.Iterator()) + testutil.Equals(t, expErr, actErr) + testutil.Equals(t, expChks, actChks) + + } + testutil.Ok(t, merged.Err()) + testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false") + }) } } -func TestMergeIteratorSeek(t *testing.T) { - for _, tc := range []struct { - input []chunkenc.Iterator - seek int64 - expected []sample - }{ - { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {1, 1}, {2, 2}}), - }, - seek: 1, - expected: []sample{{1, 1}, {2, 2}}, - }, - { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {1, 1}}), - newListSeriesIterator([]sample{{2, 2}, {3, 3}}), - }, - seek: 2, - expected: []sample{{2, 2}, {3, 3}}, - }, - { - input: []chunkenc.Iterator{ - newListSeriesIterator([]sample{{0, 0}, {3, 3}}), - newListSeriesIterator([]sample{{1, 1}, {4, 4}}), - newListSeriesIterator([]sample{{2, 2}, {5, 5}}), - }, - seek: 2, - expected: []sample{{2, 2}, {3, 3}, {4, 4}, {5, 5}}, - }, - } { - merged := newMergeIterator(tc.input) - actual := []sample{} - if merged.Seek(tc.seek) { - t, v := merged.At() - actual = append(actual, sample{t, v}) - } - actual = append(actual, drainSamples(merged)...) - testutil.Equals(t, tc.expected, actual) - } +type mockQuerier struct { + baseQuerier + + toReturn []Series } -func drainSamples(iter chunkenc.Iterator) []sample { - result := []sample{} - for iter.Next() { - t, v := iter.At() - // NaNs can't be compared normally, so substitute for another value. - if math.IsNaN(v) { - v = -42 - } - result = append(result, sample{t, v}) +type seriesByLabel []Series + +func (a seriesByLabel) Len() int { return len(a) } +func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + +func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (SeriesSet, Warnings, error) { + cpy := make([]Series, len(m.toReturn)) + copy(cpy, m.toReturn) + if sortSeries { + sort.Sort(seriesByLabel(cpy)) } - return result + + return NewMockSeriesSet(cpy...), nil, nil +} + +type mockChunkQurier struct { + baseQuerier + + toReturn []ChunkSeries +} + +type chunkSeriesByLabel []ChunkSeries + +func (a chunkSeriesByLabel) Len() int { return len(a) } +func (a chunkSeriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a chunkSeriesByLabel) Less(i, j int) bool { + return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 +} + +func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) { + cpy := make([]ChunkSeries, len(m.toReturn)) + copy(cpy, m.toReturn) + if sortSeries { + sort.Sort(chunkSeriesByLabel(cpy)) + } + + return NewMockChunkSeriesSet(cpy...), nil, nil } type mockSeriesSet struct { @@ -222,7 +416,7 @@ type mockSeriesSet struct { series []Series } -func newMockSeriesSet(series ...Series) SeriesSet { +func NewMockSeriesSet(series ...Series) SeriesSet { return &mockSeriesSet{ idx: -1, series: series, @@ -234,41 +428,151 @@ func (m *mockSeriesSet) Next() bool { return m.idx < len(m.series) } -func (m *mockSeriesSet) At() Series { - return m.series[m.idx] +func (m *mockSeriesSet) At() Series { return m.series[m.idx] } + +func (m *mockSeriesSet) Err() error { return nil } + +type mockChunkSeriesSet struct { + idx int + series []ChunkSeries } -func (m *mockSeriesSet) Err() error { - return nil +func NewMockChunkSeriesSet(series ...ChunkSeries) ChunkSeriesSet { + return &mockChunkSeriesSet{ + idx: -1, + series: series, + } } -var result []sample +func (m *mockChunkSeriesSet) Next() bool { + m.idx++ + return m.idx < len(m.series) +} + +func (m *mockChunkSeriesSet) At() ChunkSeries { return m.series[m.idx] } + +func (m *mockChunkSeriesSet) Err() error { return nil } + +func TestChainSampleIterator(t *testing.T) { + for _, tc := range []struct { + input []chunkenc.Iterator + expected []tsdbutil.Sample + }{ + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), + }, + expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, + }, + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), + }, + expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, + }, + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), + }, + expected: []tsdbutil.Sample{ + sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, + }, + // Overlap. + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{2, 2}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), + NewListSeriesIterator([]tsdbutil.Sample{}), + NewListSeriesIterator([]tsdbutil.Sample{}), + NewListSeriesIterator([]tsdbutil.Sample{}), + }, + expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}}, + }, + } { + merged := newChainSampleIterator(tc.input) + actual, err := ExpandSamples(merged) + testutil.Ok(t, err) + testutil.Equals(t, tc.expected, actual) + } +} + +func TestChainSampleIteratorSeek(t *testing.T) { + for _, tc := range []struct { + input []chunkenc.Iterator + seek int64 + expected []tsdbutil.Sample + }{ + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}}), + }, + seek: 1, + expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, + }, + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{1, 1}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{3, 3}}), + }, + seek: 2, + expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}}, + }, + { + input: []chunkenc.Iterator{ + NewListSeriesIterator([]tsdbutil.Sample{sample{0, 0}, sample{3, 3}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{1, 1}, sample{4, 4}}), + NewListSeriesIterator([]tsdbutil.Sample{sample{2, 2}, sample{5, 5}}), + }, + seek: 2, + expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}}, + }, + } { + merged := newChainSampleIterator(tc.input) + actual := []tsdbutil.Sample{} + if merged.Seek(tc.seek) { + t, v := merged.At() + actual = append(actual, sample{t, v}) + } + s, err := ExpandSamples(merged) + testutil.Ok(t, err) + actual = append(actual, s...) + testutil.Equals(t, tc.expected, actual) + } +} + +var result []tsdbutil.Sample func makeSeriesSet(numSeries, numSamples int) SeriesSet { series := []Series{} for j := 0; j < numSeries; j++ { labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}} - samples := []sample{} + samples := []tsdbutil.Sample{} for k := 0; k < numSamples; k++ { samples = append(samples, sample{t: int64(k), v: float64(k)}) } - series = append(series, newMockSeries(labels, samples)) + series = append(series, NewListSeries(labels, samples)) } - return newMockSeriesSet(series...) + return NewMockSeriesSet(series...) } func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet { - seriesSets := []SeriesSet{} + seriesSets := []genericSeriesSet{} for i := 0; i < numSeriesSets; i++ { - seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples)) + seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)}) } - return NewMergeSeriesSet(seriesSets, nil) + return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, nil, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)} } func benchmarkDrain(seriesSet SeriesSet, b *testing.B) { + var err error for n := 0; n < b.N; n++ { for seriesSet.Next() { - result = drainSamples(seriesSet.At().Iterator()) + result, err = ExpandSamples(seriesSet.At().Iterator()) + testutil.Ok(b, err) } } } diff --git a/storage/generic.go b/storage/generic.go new file mode 100644 index 000000000..a5a694cb2 --- /dev/null +++ b/storage/generic.go @@ -0,0 +1,133 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file holds boilerplate adapters for generic MergeSeriesSet and MergeQuerier functions, so we can have one optimized +// solution that works for both ChunkSeriesSet as well as SeriesSet. + +package storage + +import "github.com/prometheus/prometheus/pkg/labels" + +type genericQuerier interface { + baseQuerier + Select(bool, *SelectHints, ...*labels.Matcher) (genericSeriesSet, Warnings, error) +} + +type genericSeriesSet interface { + Next() bool + At() Labels + Err() error +} + +type genericSeriesMergeFunc func(...Labels) Labels + +type genericSeriesSetAdapter struct { + SeriesSet +} + +func (a *genericSeriesSetAdapter) At() Labels { + return a.SeriesSet.At() +} + +type genericChunkSeriesSetAdapter struct { + ChunkSeriesSet +} + +func (a *genericChunkSeriesSetAdapter) At() Labels { + return a.ChunkSeriesSet.At() +} + +type genericQuerierAdapter struct { + baseQuerier + + // One-of. If both are set, Querier will be used. + q Querier + cq ChunkQuerier +} + +func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) { + if q.q != nil { + s, w, err := q.q.Select(sortSeries, hints, matchers...) + return &genericSeriesSetAdapter{s}, w, err + } + s, w, err := q.cq.Select(sortSeries, hints, matchers...) + return &genericChunkSeriesSetAdapter{s}, w, err +} + +func newGenericQuerierFrom(q Querier) genericQuerier { + return &genericQuerierAdapter{baseQuerier: q, q: q} +} + +func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier { + return &genericQuerierAdapter{baseQuerier: cq, cq: cq} +} + +type querierAdapter struct { + genericQuerier +} + +type seriesSetAdapter struct { + genericSeriesSet +} + +func (a *seriesSetAdapter) At() Series { + return a.genericSeriesSet.At().(Series) +} + +func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...) + return &seriesSetAdapter{s}, w, err +} + +type chunkQuerierAdapter struct { + genericQuerier +} + +type chunkSeriesSetAdapter struct { + genericSeriesSet +} + +func (a *chunkSeriesSetAdapter) At() ChunkSeries { + return a.genericSeriesSet.At().(ChunkSeries) +} + +func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) { + s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...) + return &chunkSeriesSetAdapter{s}, w, err +} + +type seriesMergerAdapter struct { + VerticalSeriesMergeFunc + buf []Series +} + +func (a *seriesMergerAdapter) Merge(s ...Labels) Labels { + a.buf = a.buf[:0] + for _, ser := range s { + a.buf = append(a.buf, ser.(Series)) + } + return a.VerticalSeriesMergeFunc(a.buf...) +} + +type chunkSeriesMergerAdapter struct { + VerticalChunkSeriesMergerFunc + buf []ChunkSeries +} + +func (a *chunkSeriesMergerAdapter) Merge(s ...Labels) Labels { + a.buf = a.buf[:0] + for _, ser := range s { + a.buf = append(a.buf, ser.(ChunkSeries)) + } + return a.VerticalChunkSeriesMergerFunc(a.buf...) +} diff --git a/storage/interface.go b/storage/interface.go index 4ac799ebb..0dc3b9efb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -39,6 +39,7 @@ type Appendable interface { // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. +// TODO(bwplotka): Add ChunkQueryable to Storage in next PR. type Storage interface { Queryable Appendable @@ -51,19 +52,40 @@ type Storage interface { } // A Queryable handles queries against a storage. +// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL. type Queryable interface { // Querier returns a new Querier on the storage. Querier(ctx context.Context, mint, maxt int64) (Querier, error) } -// Querier provides querying access over time series data of a fixed -// time range. +// Querier provides querying access over time series data of a fixed time range. type Querier interface { + baseQuerier + // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) +} +// A ChunkQueryable handles queries against a storage. +// Use it when you need to have access to samples in encoded format. +type ChunkQueryable interface { + // ChunkQuerier returns a new ChunkQuerier on the storage. + ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error) +} + +// ChunkQuerier provides querying access over time series data of a fixed time range. +type ChunkQuerier interface { + baseQuerier + + // Select returns a set of series that matches the given label matchers. + // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. + // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. + Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) +} + +type baseQuerier interface { // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. LabelValues(name string) ([]string, Warnings, error) @@ -149,19 +171,43 @@ func (s errSeriesSet) Next() bool { return false } func (s errSeriesSet) At() Series { return nil } func (s errSeriesSet) Err() error { return s.err } -// Series represents a single time series. +// Series exposes a single time series and allows iterating over samples. type Series interface { - // Labels returns the complete set of labels identifying the series. - Labels() labels.Labels + Labels + SampleIteratable +} +// ChunkSeriesSet contains a set of chunked series. +type ChunkSeriesSet interface { + Next() bool + At() ChunkSeries + Err() error +} + +// ChunkSeries exposes a single time series and allows iterating over chunks. +type ChunkSeries interface { + Labels + ChunkIteratable +} + +// Labels represents an item that has labels e.g. time series. +type Labels interface { + // Labels returns the complete set of labels. For series it means all labels identifying the series. + Labels() labels.Labels +} + +type SampleIteratable interface { // Iterator returns a new iterator of the data of the series. Iterator() chunkenc.Iterator } -// ChunkSeriesSet exposes the chunks and intervals of a series instead of the -// actual series itself. -// TODO(bwplotka): Move it to Series like Iterator that iterates over chunks and avoiding loading all of them at once. -type ChunkSeriesSet interface { +type ChunkIteratable interface { + // ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series. + Iterator() chunks.Iterator +} + +// TODO(bwplotka): Remove in next Pr. +type DeprecatedChunkSeriesSet interface { Next() bool At() (labels.Labels, []chunks.Meta, tombstones.Intervals) Err() error diff --git a/storage/noop.go b/storage/noop.go index 4c0383233..c4ab1ba34 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -28,7 +28,7 @@ func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Wa return NoopSeriesSet(), nil, nil } -func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) { +func (noopQuerier) LabelValues(string) ([]string, Warnings, error) { return nil, nil, nil } @@ -40,6 +40,29 @@ func (noopQuerier) Close() error { return nil } +type noopChunkQuerier struct{} + +// NoopChunkedQuerier is a ChunkQuerier that does nothing. +func NoopChunkedQuerier() ChunkQuerier { + return noopChunkQuerier{} +} + +func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) { + return NoopChunkedSeriesSet(), nil, nil +} + +func (noopChunkQuerier) LabelValues(string) ([]string, Warnings, error) { + return nil, nil, nil +} + +func (noopChunkQuerier) LabelNames() ([]string, Warnings, error) { + return nil, nil, nil +} + +func (noopChunkQuerier) Close() error { + return nil +} + type noopSeriesSet struct{} // NoopSeriesSet is a SeriesSet that does nothing. @@ -52,3 +75,16 @@ func (noopSeriesSet) Next() bool { return false } func (noopSeriesSet) At() Series { return nil } func (noopSeriesSet) Err() error { return nil } + +type noopChunkedSeriesSet struct{} + +// NoopChunkedSeriesSet is a ChunkSeriesSet that does nothing. +func NoopChunkedSeriesSet() ChunkSeriesSet { + return noopChunkedSeriesSet{} +} + +func (noopChunkedSeriesSet) Next() bool { return false } + +func (noopChunkedSeriesSet) At() ChunkSeries { return nil } + +func (noopChunkedSeriesSet) Err() error { return nil } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 81883e4fe..5451fc245 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -145,7 +145,7 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie } queriers = append(queriers, q) } - return storage.NewMergeQuerier(nil, queriers), nil + return storage.NewMergeQuerier(nil, queriers, storage.ChainedSeriesMerge), nil } // Appender implements storage.Storage. diff --git a/storage/series.go b/storage/series.go new file mode 100644 index 000000000..58b5d98b3 --- /dev/null +++ b/storage/series.go @@ -0,0 +1,138 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "sort" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +type listSeriesIterator struct { + samples []tsdbutil.Sample + idx int +} + +// NewListSeriesIterator returns listSeriesIterator that allows to iterate over provided samples. Does not handle overlaps. +func NewListSeriesIterator(samples []tsdbutil.Sample) chunkenc.Iterator { + return &listSeriesIterator{samples: samples, idx: -1} +} + +func (it *listSeriesIterator) At() (int64, float64) { + s := it.samples[it.idx] + return s.T(), s.V() +} + +func (it *listSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.samples) +} + +func (it *listSeriesIterator) Seek(t int64) bool { + if it.idx == -1 { + it.idx = 0 + } + // Do binary search between current position and end. + it.idx = sort.Search(len(it.samples)-it.idx, func(i int) bool { + s := it.samples[i+it.idx] + return s.T() >= t + }) + + return it.idx < len(it.samples) +} + +func (it *listSeriesIterator) Err() error { return nil } + +type listChunkSeriesIterator struct { + chks []chunks.Meta + + idx int +} + +// NewListChunkSeriesIterator returns listChunkSeriesIterator that allows to iterate over provided chunks. Does not handle overlaps. +func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator { + return &listChunkSeriesIterator{chks: chks, idx: -1} +} + +func (it *listChunkSeriesIterator) At() chunks.Meta { + return it.chks[it.idx] +} + +func (it *listChunkSeriesIterator) Next() bool { + it.idx++ + return it.idx < len(it.chks) +} + +func (it *listChunkSeriesIterator) Err() error { return nil } + +type chunkSetToSeriesSet struct { + ChunkSeriesSet + + chkIterErr error + sameSeriesChunks []Series + bufIterator chunkenc.Iterator +} + +// NewSeriesSetFromChunkSeriesSet converts ChunkSeriesSet to SeriesSet by decoding chunks one by one. +func NewSeriesSetFromChunkSeriesSet(chk ChunkSeriesSet) SeriesSet { + return &chunkSetToSeriesSet{ChunkSeriesSet: chk} +} + +func (c *chunkSetToSeriesSet) Next() bool { + if c.Err() != nil || !c.ChunkSeriesSet.Next() { + return false + } + + iter := c.ChunkSeriesSet.At().Iterator() + c.sameSeriesChunks = c.sameSeriesChunks[:0] + + for iter.Next() { + c.sameSeriesChunks = append(c.sameSeriesChunks, &chunkToSeries{ + labels: c.ChunkSeriesSet.At().Labels(), + chk: iter.At(), + buf: c.bufIterator, + }) + } + + if iter.Err() != nil { + c.chkIterErr = iter.Err() + return false + } + + return true +} + +func (c *chunkSetToSeriesSet) At() Series { + // Series composed of same chunks for the same series. + return ChainedSeriesMerge(c.sameSeriesChunks...) +} + +func (c *chunkSetToSeriesSet) Err() error { + if c.chkIterErr != nil { + return c.chkIterErr + } + return c.ChunkSeriesSet.Err() +} + +type chunkToSeries struct { + labels labels.Labels + chk chunks.Meta + buf chunkenc.Iterator +} + +func (s *chunkToSeries) Labels() labels.Labels { return s.labels } +func (s *chunkToSeries) Iterator() chunkenc.Iterator { return s.chk.Chunk.Iterator(s.buf) } diff --git a/storage/series_test.go b/storage/series_test.go new file mode 100644 index 000000000..197e0f58a --- /dev/null +++ b/storage/series_test.go @@ -0,0 +1,84 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "math" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tsdbutil" +) + +type MockSeries struct { + labels labels.Labels + SampleIteratorFn func() chunkenc.Iterator +} + +func NewListSeries(lset labels.Labels, samples []tsdbutil.Sample) *MockSeries { + return &MockSeries{ + labels: lset, + SampleIteratorFn: func() chunkenc.Iterator { + return NewListSeriesIterator(samples) + }, + } +} + +func (s *MockSeries) Labels() labels.Labels { return s.labels } +func (s *MockSeries) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() } + +type MockChunkSeries struct { + labels labels.Labels + ChunkIteratorFn func() chunks.Iterator +} + +func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *MockChunkSeries { + var chks []chunks.Meta + + return &MockChunkSeries{ + labels: lset, + ChunkIteratorFn: func() chunks.Iterator { + // Inefficient chunks encoding implementation, not caring about chunk size. + for _, s := range samples { + chks = append(chks, tsdbutil.ChunkFromSamples(s)) + } + return NewListChunkSeriesIterator(chks...) + }, + } +} + +func (s *MockChunkSeries) Labels() labels.Labels { return s.labels } +func (s *MockChunkSeries) Iterator() chunks.Iterator { return s.ChunkIteratorFn() } + +func ExpandSamples(iter chunkenc.Iterator) ([]tsdbutil.Sample, error) { + var result []tsdbutil.Sample + for iter.Next() { + t, v := iter.At() + // NaNs can't be compared normally, so substitute for another value. + if math.IsNaN(v) { + v = -42 + } + result = append(result, sample{t, v}) + } + return result, iter.Err() +} + +func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) { + var result []chunks.Meta + for iter.Next() { + result = append(result, iter.At()) + } + return result, iter.Err() +} diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index 4d54d9e26..2534a1cdc 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -111,10 +111,10 @@ func testChunk(t *testing.T, c Chunk) { func benchmarkIterator(b *testing.B, newChunk func() Chunk) { var ( - t = int64(1234123324) - v = 1243535.123 + t = int64(1234123324) + v = 1243535.123 + exp []pair ) - var exp []pair for i := 0; i < b.N; i++ { // t += int64(rand.Intn(10000) + 1) t += int64(1000) @@ -146,7 +146,7 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) { b.ReportAllocs() b.ResetTimer() - fmt.Println("num", b.N, "created chunks", len(chunks)) + b.Log("num", b.N, "created chunks", len(chunks)) res := make([]float64, 0, 1024) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 2f76c08ca..33c7f1040 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -127,6 +127,7 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator { // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), + t: math.MinInt64, } } diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 20b76ce8d..1e78351db 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -67,6 +67,17 @@ type Meta struct { MinTime, MaxTime int64 } +// Iterator iterates over the chunk of a time series. +type Iterator interface { + // At returns the current meta. + // It depends on implementation if the chunk is populated or not. + At() Meta + // Next advances the iterator by one. + Next() bool + // Err returns optional error if Next is false. + Err() error +} + // writeHash writes the chunk encoding and raw data into the provided hash. func (cm *Meta) writeHash(h hash.Hash, buf []byte) error { buf = append(buf[:0], byte(cm.Chunk.Encoding())) diff --git a/tsdb/compact.go b/tsdb/compact.go index 70aa9eee0..28bad5119 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -648,7 +648,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } var ( - set storage.ChunkSeriesSet + set storage.DeprecatedChunkSeriesSet symbols index.StringIter closers = []io.Closer{} overlapping bool @@ -915,7 +915,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Int } type compactionMerger struct { - a, b storage.ChunkSeriesSet + a, b storage.DeprecatedChunkSeriesSet aok, bok bool l labels.Labels @@ -923,7 +923,8 @@ type compactionMerger struct { intervals tombstones.Intervals } -func newCompactionMerger(a, b storage.ChunkSeriesSet) (*compactionMerger, error) { +// TODO(bwplotka): Move to storage mergers. +func newCompactionMerger(a, b storage.DeprecatedChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/tsdb/querier.go b/tsdb/querier.go index ea1100a52..e024fb96a 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -190,7 +190,7 @@ type blockQuerier struct { } func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - var base storage.ChunkSeriesSet + var base storage.DeprecatedChunkSeriesSet var err error if sortSeries { @@ -670,17 +670,17 @@ type baseChunkSeries struct { // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. -func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) { +func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) { return lookupChunkSeries(false, ir, tr, ms...) } // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. Series will be in order. -func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) { +func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) { return lookupChunkSeries(true, ir, tr, ms...) } -func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) { +func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.DeprecatedChunkSeriesSet, error) { if tr == nil { tr = tombstones.NewMemTombstones() } @@ -754,7 +754,7 @@ func (s *baseChunkSeries) Next() bool { // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set storage.ChunkSeriesSet + set storage.DeprecatedChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -822,7 +822,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set storage.ChunkSeriesSet + set storage.DeprecatedChunkSeriesSet err error cur storage.Series diff --git a/tsdb/tsdbutil/buffer.go b/tsdb/tsdbutil/buffer.go index dc2d960d2..a24d50472 100644 --- a/tsdb/tsdbutil/buffer.go +++ b/tsdb/tsdbutil/buffer.go @@ -15,25 +15,13 @@ package tsdbutil import ( "math" -) -// SeriesIterator iterates over the data of a time series. -type SeriesIterator interface { - // Seek advances the iterator forward to the given timestamp. - // If there's no value exactly at t, it advances to the first value - // after t. - Seek(t int64) bool - // At returns the current timestamp/value pair. - At() (t int64, v float64) - // Next advances the iterator by one. - Next() bool - // Err returns the current error. - Err() error -} + "github.com/prometheus/prometheus/tsdb/chunkenc" +) // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { - it SeriesIterator + it chunkenc.Iterator buf *sampleRing lastTime int64 @@ -41,7 +29,7 @@ type BufferedSeriesIterator struct { // NewBuffer returns a new iterator that buffers the values within the time range // of the current element and the duration of delta before. -func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { +func NewBuffer(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator { return &BufferedSeriesIterator{ it: it, buf: newSampleRing(delta, 16), @@ -56,7 +44,7 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) { } // Buffer returns an iterator over the buffered data. -func (b *BufferedSeriesIterator) Buffer() SeriesIterator { +func (b *BufferedSeriesIterator) Buffer() chunkenc.Iterator { return b.buf.iterator() } @@ -145,7 +133,7 @@ func (r *sampleRing) reset() { r.f = 0 } -func (r *sampleRing) iterator() SeriesIterator { +func (r *sampleRing) iterator() chunkenc.Iterator { return &sampleRingIterator{r: r, i: -1} } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 204b2ff38..80ca8250e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -539,7 +539,7 @@ func (api *API) series(r *http.Request) apiFuncResult { sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets, nil) + set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) metrics := []labels.Labels{} for set.Next() { metrics = append(metrics, set.At().Labels()) diff --git a/web/federate.go b/web/federate.go index 93c9aece3..1c96d6ce9 100644 --- a/web/federate.go +++ b/web/federate.go @@ -98,7 +98,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets, nil) + set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) for set.Next() { s := set.At()