prometheus/storage/merge.go

920 lines
27 KiB
Go

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"container/heap"
"context"
"fmt"
"math"
"sync"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/annotations"
)
type mergeGenericQuerier struct {
queriers []genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
primaries = filterQueriers(primaries)
secondaries = filterQueriers(secondaries)
switch {
case len(primaries) == 0 && len(secondaries) == 0:
return noopQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return &querierAdapter{newSecondaryQuerierFrom(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
queriers = append(queriers, newGenericQuerierFrom(q))
}
for _, q := range secondaries {
queriers = append(queriers, newSecondaryQuerierFrom(q))
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &querierAdapter{&mergeGenericQuerier{
mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
func filterQueriers(qs []Querier) []Querier {
ret := make([]Querier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
primaries = filterChunkQueriers(primaries)
secondaries = filterChunkQueriers(secondaries)
switch {
case len(primaries) == 0 && len(secondaries) == 0:
return noopChunkQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return &chunkQuerierAdapter{newSecondaryQuerierFromChunk(secondaries[0])}
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
queriers = append(queriers, newGenericQuerierFromChunk(q))
}
for _, q := range secondaries {
queriers = append(queriers, newSecondaryQuerierFromChunk(q))
}
concurrentSelect := false
if len(secondaries) > 0 {
concurrentSelect = true
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
mergeFn: (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFn}).Merge,
queriers: queriers,
concurrentSelect: concurrentSelect,
}}
}
func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
ret := make([]ChunkQuerier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
var limit int
if hints != nil {
limit = hints.Limit
}
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
var (
wg sync.WaitGroup
seriesSetChan = make(chan genericSeriesSet)
)
// Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers {
// copy the matchers as some queriers may alter the slice.
// See https://github.com/prometheus/prometheus/issues/14723
matchersCopy := make([]*labels.Matcher, len(matchers))
copy(matchersCopy, matchers)
wg.Add(1)
go func(qr genericQuerier, m []*labels.Matcher) {
defer wg.Done()
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr.Select(ctx, true, hints, m...)
}(querier, matchersCopy)
}
go func() {
wg.Wait()
close(seriesSetChan)
}()
for r := range seriesSetChan {
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
type labelGenericQueriers []genericQuerier
func (l labelGenericQueriers) Len() int { return len(l) }
func (l labelGenericQueriers) Get(i int) LabelQuerier { return l[i] }
func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQueriers) {
i := len(l) / 2
return l[:i], l[i:]
}
// LabelValues returns all potential values for a label name.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
return q.LabelValues(ctx, name, hints, matchers...)
})
if err != nil {
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
}
return res, ws, nil
}
// mergeResults performs merge sort on the results of invoking the resultsFn against multiple queriers.
func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *LabelHints, resultsFn func(q LabelQuerier) ([]string, annotations.Annotations, error)) ([]string, annotations.Annotations, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return resultsFn(lq.Get(0))
}
a, b := lq.SplitByHalf()
var ws annotations.Annotations
s1, w, err := q.mergeResults(a, hints, resultsFn)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
s2, w, err := q.mergeResults(b, hints, resultsFn)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
s1 = truncateToLimit(s1, hints)
s2 = truncateToLimit(s2, hints)
merged := mergeStrings(s1, s2)
merged = truncateToLimit(merged, hints)
return merged, ws, nil
}
func mergeStrings(a, b []string) []string {
maxl := len(a)
if len(b) > len(a) {
maxl = len(b)
}
res := make([]string, 0, maxl*10/9)
for len(a) > 0 && len(b) > 0 {
switch {
case a[0] == b[0]:
res = append(res, a[0])
a, b = a[1:], b[1:]
case a[0] < b[0]:
res = append(res, a[0])
a = a[1:]
default:
res = append(res, b[0])
b = b[1:]
}
}
// Append all remaining elements.
res = append(res, a...)
res = append(res, b...)
return res
}
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
return q.LabelNames(ctx, hints, matchers...)
})
if err != nil {
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
}
return res, ws, nil
}
// Close releases the resources of the generic querier.
func (q *mergeGenericQuerier) Close() error {
errs := tsdb_errors.NewMulti()
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
func truncateToLimit(s []string, hints *LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
// If limit is set, the SeriesSet will be limited up-to the limit. 0 means disabled.
func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
seriesLimit int
mergedSeries int // tracks the total number of series merged and returned.
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
// If seriesLimit is set, only limited series are returned.
func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
}
if set.Next() {
heap.Push(&h, set)
}
if err := set.Err(); err != nil {
return errorOnlySeriesSet{err}
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
seriesLimit: seriesLimit,
}
}
func (c *genericMergeSeriesSet) Next() bool {
if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit {
// Exit early if seriesLimit is set.
return false
}
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out,
// we can drop them, otherwise they should be inserted back into the heap.
for _, set := range c.currentSets {
if set.Next() {
heap.Push(&c.heap, set)
}
}
if len(c.heap) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c.currentSets = c.currentSets[:0]
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(genericSeriesSet)
c.currentSets = append(c.currentSets, set)
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len(c.currentSets) != 0 {
break
}
}
c.mergedSeries++
return true
}
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return c.mergeFunc(series...)
}
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
}
}
return nil
}
func (c *genericMergeSeriesSet) Warnings() annotations.Annotations {
var ws annotations.Annotations
for _, set := range c.sets {
ws.Merge(set.Warnings())
}
return ws
}
type genericSeriesSetHeap []genericSeriesSet
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// It's optimized for non-overlap cases as well.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return ChainSampleIteratorFromSeries(it, series)
},
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped. It's optimized for non-overlap cases as well.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h samplesIteratorHeap
curr chunkenc.Iterator
lastT int64
// Whether the previous and the current sample are direct neighbors
// within the same base iterator.
consecutive bool
}
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator {
csi, ok := it.(*chainSampleIterator)
if !ok {
csi = &chainSampleIterator{}
}
if cap(csi.iterators) < length {
csi.iterators = make([]chunkenc.Iterator, length)
} else {
csi.iterators = csi.iterators[:length]
}
csi.h = nil
csi.lastT = math.MinInt64
return csi
}
func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(series))
for i, s := range series {
csi.iterators[i] = s.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromIterables(it chunkenc.Iterator, iterables []chunkenc.Iterable) chunkenc.Iterator {
csi := getChainSampleIterator(it, len(iterables))
for i, c := range iterables {
csi.iterators[i] = c.Iterator(csi.iterators[i])
}
return csi
}
func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator {
csi := getChainSampleIterator(it, 0)
csi.iterators = iterators
return csi
}
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
// No-op check.
if c.curr != nil && c.lastT >= t {
return c.curr.Seek(c.lastT)
}
// Don't bother to find out if the next sample is consecutive. Callers
// of Seek usually aren't interested anyway.
c.consecutive = false
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) == chunkenc.ValNone {
if iter.Err() != nil {
// If any iterator is reporting an error, abort.
return chunkenc.ValNone
}
continue
}
heap.Push(&c.h, iter)
}
if len(c.h) > 0 {
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
c.lastT = c.curr.AtT()
return c.curr.Seek(c.lastT)
}
c.curr = nil
return chunkenc.ValNone
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if c.curr == nil {
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
}
return c.curr.At()
}
func (c *chainSampleIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
if c.curr == nil {
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
}
t, h := c.curr.AtHistogram(h)
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if !c.consecutive && h.CounterResetHint != histogram.GaugeType {
h.CounterResetHint = histogram.UnknownCounterReset
}
return t, h
}
func (c *chainSampleIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
if c.curr == nil {
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
}
t, fh := c.curr.AtFloatHistogram(fh)
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if !c.consecutive && fh.CounterResetHint != histogram.GaugeType {
fh.CounterResetHint = histogram.UnknownCounterReset
}
return t, fh
}
func (c *chainSampleIterator) AtT() int64 {
if c.curr == nil {
panic("chainSampleIterator.AtT called before first .Next or after .Next returned false.")
}
return c.curr.AtT()
}
func (c *chainSampleIterator) Next() chunkenc.ValueType {
var (
currT int64
currValueType chunkenc.ValueType
iteratorChanged bool
)
if c.h == nil {
iteratorChanged = true
c.h = samplesIteratorHeap{}
// We call c.curr.Next() as the first thing below.
// So, we don't call Next() on it here.
c.curr = c.iterators[0]
for _, iter := range c.iterators[1:] {
if iter.Next() == chunkenc.ValNone {
if iter.Err() != nil {
// If any iterator is reporting an error, abort.
// If c.iterators[0] is reporting an error, we'll handle that below.
return chunkenc.ValNone
}
} else {
heap.Push(&c.h, iter)
}
}
}
if c.curr == nil {
return chunkenc.ValNone
}
for {
currValueType = c.curr.Next()
if currValueType == chunkenc.ValNone {
if c.curr.Err() != nil {
// Abort if we've hit an error.
return chunkenc.ValNone
}
if len(c.h) == 0 {
// No iterator left to iterate.
c.curr = nil
return chunkenc.ValNone
}
} else {
currT = c.curr.AtT()
if currT == c.lastT {
// Ignoring sample for the same timestamp.
continue
}
if len(c.h) == 0 {
// curr is the only iterator remaining,
// no need to check with the heap.
break
}
// Check current iterator with the top of the heap.
nextT := c.h[0].AtT()
if currT < nextT {
// Current iterator has smaller timestamp than the heap.
break
}
// Current iterator does not hold the smallest timestamp.
heap.Push(&c.h, c.curr)
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
iteratorChanged = true
currT = c.curr.AtT()
currValueType = c.curr.Seek(currT)
if currT != c.lastT {
break
}
}
c.consecutive = !iteratorChanged
c.lastT = currT
return currValueType
}
func (c *chainSampleIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
return errs.Err()
}
type samplesIteratorHeap []chunkenc.Iterator
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Less(i, j int) bool {
return h[i].AtT() < h[j].AtT()
}
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
iterators: iterators,
}
},
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
err error
curr chunks.Meta
}
func (c *compactChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *compactChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
}
if len(c.h) == 0 {
return false
}
iter := heap.Pop(&c.h).(chunks.Iterator)
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
var (
overlapping []Series
oMaxTime = c.curr.MaxTime
prev = c.curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len(c.h) > 0 {
// Get the next oldest chunk by min, then max time.
next := c.h[0].At()
if next.MinTime > oMaxTime {
// No overlap with current one.
break
}
// Only do something if it is not a perfect duplicate.
if next.MinTime != prev.MinTime ||
next.MaxTime != prev.MaxTime ||
!bytes.Equal(next.Chunk.Bytes(), prev.Chunk.Bytes()) {
// We operate on same series, so labels do not matter here.
overlapping = append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), next))
if next.MaxTime > oMaxTime {
oMaxTime = next.MaxTime
}
prev = next
}
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
if len(overlapping) == 0 {
return true
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(labels.EmptyLabels(), c.curr))...)).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
}
panic("unexpected seriesToChunkEncoder lack of iterations")
}
c.curr = iter.At()
if iter.Next() {
heap.Push(&c.h, iter)
}
return true
}
func (c *compactChunkIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
errs.Add(c.err)
return errs.Err()
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// NewConcatenatingChunkSeriesMerger returns a VerticalChunkSeriesMergeFunc that simply concatenates the
// chunks from the series. The resultant stream of chunks for a series might be overlapping and unsorted.
func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
return func(series ...ChunkSeries) ChunkSeries {
if len(series) == 0 {
return nil
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(nil))
}
return &concatenatingChunkIterator{
iterators: iterators,
}
},
}
}
}
type concatenatingChunkIterator struct {
iterators []chunks.Iterator
idx int
curr chunks.Meta
}
func (c *concatenatingChunkIterator) At() chunks.Meta {
return c.curr
}
func (c *concatenatingChunkIterator) Next() bool {
if c.idx >= len(c.iterators) {
return false
}
if c.iterators[c.idx].Next() {
c.curr = c.iterators[c.idx].At()
return true
}
if c.iterators[c.idx].Err() != nil {
return false
}
c.idx++
return c.Next()
}
func (c *concatenatingChunkIterator) Err() error {
errs := tsdb_errors.NewMulti()
for _, iter := range c.iterators {
errs.Add(iter.Err())
}
return errs.Err()
}