// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"bytes"
"container/heap"
"context"
"fmt"
"math"
"slices"
"sync"
Style cleanup of all the changes in sparsehistogram so far
A lot of this code was hacked together, literally during a
hackathon. This commit intends not to change the code substantially,
but just make the code obey the usual style practices.
A (possibly incomplete) list of areas:
* Generally address linter warnings.
* The `pgk` directory is deprecated as per dev-summit. No new packages should
be added to it. I moved the new `pkg/histogram` package to `model`
anticipating what's proposed in #9478.
* Make the naming of the Sparse Histogram more consistent. Including
abbreviations, there were just too many names for it: SparseHistogram,
Histogram, Histo, hist, his, shs, h. The idea is to call it "Histogram" in
general. Only add "Sparse" if it is needed to avoid confusion with
conventional Histograms (which is rare because the TSDB really has no notion
of conventional Histograms). Use abbreviations only in local scope, and then
really abbreviate (not just removing three out of seven letters like in
"Histo"). This is in the spirit of
https://github.com/golang/go/wiki/CodeReviewComments#variable-names
* Several other minor name changes.
* A lot of formatting of doc comments. For one, following
https://github.com/golang/go/wiki/CodeReviewComments#comment-sentences
, but also layout question, anticipating how things will look like
when rendered by `godoc` (even where `godoc` doesn't render them
right now because they are for unexported types or not a doc comment
at all but just a normal code comment - consistency is queen!).
* Re-enabled `TestQueryLog` and `TestEndopints` (they pass now,
leaving them disabled was presumably an oversight).
* Bucket iterator for histogram.Histogram is now created with a
method.
* HistogramChunk.iterator now allows iterator recycling. (I think
@dieterbe only commented it out because he was confused by the
question in the comment.)
* HistogramAppender.Append panics now because we decided to treat
staleness marker differently.
Signed-off-by: beorn7 <beorn@grafana.com>
3 years ago
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/annotations"
)
type mergeGenericQuerier struct {
queriers [ ] genericQuerier
// mergeFn is used when we see series from different queriers Selects with the same labels.
mergeFn genericSeriesMergeFunc
// TODO(bwplotka): Remove once remote queries are asynchronous. False by default.
concurrentSelect bool
}
// NewMergeQuerier returns a new Querier that merges results of given primary and secondary queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier ( primaries , secondaries [ ] Querier , mergeFn VerticalSeriesMergeFunc ) Querier {
primaries = filterQueriers ( primaries )
secondaries = filterQueriers ( secondaries )
switch {
case len ( primaries ) == 0 && len ( secondaries ) == 0 :
return noopQuerier { }
case len ( primaries ) == 1 && len ( secondaries ) == 0 :
return primaries [ 0 ]
case len ( primaries ) == 0 && len ( secondaries ) == 1 :
return & querierAdapter { newSecondaryQuerierFrom ( secondaries [ 0 ] ) }
}
queriers := make ( [ ] genericQuerier , 0 , len ( primaries ) + len ( secondaries ) )
for _ , q := range primaries {
queriers = append ( queriers , newGenericQuerierFrom ( q ) )
}
for _ , q := range secondaries {
queriers = append ( queriers , newSecondaryQuerierFrom ( q ) )
}
concurrentSelect := false
if len ( secondaries ) > 0 {
concurrentSelect = true
}
return & querierAdapter { & mergeGenericQuerier {
mergeFn : ( & seriesMergerAdapter { VerticalSeriesMergeFunc : mergeFn } ) . Merge ,
queriers : queriers ,
concurrentSelect : concurrentSelect ,
} }
}
func filterQueriers ( qs [ ] Querier ) [ ] Querier {
ret := make ( [ ] Querier , 0 , len ( qs ) )
for _ , q := range qs {
if _ , ok := q . ( noopQuerier ) ; ! ok && q != nil {
ret = append ( ret , q )
}
}
return ret
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences.
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier ( primaries , secondaries [ ] ChunkQuerier , mergeFn VerticalChunkSeriesMergeFunc ) ChunkQuerier {
primaries = filterChunkQueriers ( primaries )
secondaries = filterChunkQueriers ( secondaries )
switch {
case len ( primaries ) == 0 && len ( secondaries ) == 0 :
return noopChunkQuerier { }
case len ( primaries ) == 1 && len ( secondaries ) == 0 :
return primaries [ 0 ]
case len ( primaries ) == 0 && len ( secondaries ) == 1 :
return & chunkQuerierAdapter { newSecondaryQuerierFromChunk ( secondaries [ 0 ] ) }
}
queriers := make ( [ ] genericQuerier , 0 , len ( primaries ) + len ( secondaries ) )
for _ , q := range primaries {
queriers = append ( queriers , newGenericQuerierFromChunk ( q ) )
}
for _ , q := range secondaries {
queriers = append ( queriers , newSecondaryQuerierFromChunk ( q ) )
}
concurrentSelect := false
if len ( secondaries ) > 0 {
concurrentSelect = true
}
return & chunkQuerierAdapter { & mergeGenericQuerier {
mergeFn : ( & chunkSeriesMergerAdapter { VerticalChunkSeriesMergeFunc : mergeFn } ) . Merge ,
queriers : queriers ,
concurrentSelect : concurrentSelect ,
} }
}
func filterChunkQueriers ( qs [ ] ChunkQuerier ) [ ] ChunkQuerier {
ret := make ( [ ] ChunkQuerier , 0 , len ( qs ) )
for _ , q := range qs {
if _ , ok := q . ( noopChunkQuerier ) ; ! ok && q != nil {
ret = append ( ret , q )
}
}
return ret
}
// Select returns a set of series that matches the given label matchers.
func ( q * mergeGenericQuerier ) Select ( ctx context . Context , sortSeries bool , hints * SelectHints , matchers ... * labels . Matcher ) genericSeriesSet {
seriesSets := make ( [ ] genericSeriesSet , 0 , len ( q . queriers ) )
if ! q . concurrentSelect {
for _ , querier := range q . queriers {
// We need to sort for merge to work.
seriesSets = append ( seriesSets , querier . Select ( ctx , true , hints , matchers ... ) )
}
return & lazyGenericSeriesSet { init : func ( ) ( genericSeriesSet , bool ) {
s := newGenericMergeSeriesSet ( seriesSets , q . mergeFn )
return s , s . Next ( )
} }
}
var (
wg sync . WaitGroup
seriesSetChan = make ( chan genericSeriesSet )
)
// Schedule all Selects for all queriers we know about.
for _ , querier := range q . queriers {
wg . Add ( 1 )
go func ( qr genericQuerier ) {
defer wg . Done ( )
// We need to sort for NewMergeSeriesSet to work.
seriesSetChan <- qr . Select ( ctx , true , hints , matchers ... )
} ( querier )
}
go func ( ) {
wg . Wait ( )
close ( seriesSetChan )
} ( )
for r := range seriesSetChan {
seriesSets = append ( seriesSets , r )
}
return & lazyGenericSeriesSet { init : func ( ) ( genericSeriesSet , bool ) {
s := newGenericMergeSeriesSet ( seriesSets , q . mergeFn )
return s , s . Next ( )
} }
}
type labelGenericQueriers [ ] genericQuerier
func ( l labelGenericQueriers ) Len ( ) int { return len ( l ) }
func ( l labelGenericQueriers ) Get ( i int ) LabelQuerier { return l [ i ] }
func ( l labelGenericQueriers ) SplitByHalf ( ) ( labelGenericQueriers , labelGenericQueriers ) {
i := len ( l ) / 2
return l [ : i ] , l [ i : ]
}
// LabelValues returns all potential values for a label name.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func ( q * mergeGenericQuerier ) LabelValues ( ctx context . Context , name string , hints * LabelHints , matchers ... * labels . Matcher ) ( [ ] string , annotations . Annotations , error ) {
res , ws , err := q . lvals ( ctx , q . queriers , name , hints , matchers ... )
if err != nil {
return nil , nil , fmt . Errorf ( "LabelValues() from merge generic querier for label %s: %w" , name , err )
}
return res , ws , nil
}
// lvals performs merge sort for LabelValues from multiple queriers.
func ( q * mergeGenericQuerier ) lvals ( ctx context . Context , lq labelGenericQueriers , n string , hints * LabelHints , matchers ... * labels . Matcher ) ( [ ] string , annotations . Annotations , error ) {
if lq . Len ( ) == 0 {
return nil , nil , nil
}
if lq . Len ( ) == 1 {
return lq . Get ( 0 ) . LabelValues ( ctx , n , hints , matchers ... )
}
a , b := lq . SplitByHalf ( )
var ws annotations . Annotations
s1 , w , err := q . lvals ( ctx , a , n , hints , matchers ... )
ws . Merge ( w )
if err != nil {
return nil , ws , err
}
s2 , ws , err := q . lvals ( ctx , b , n , hints , matchers ... )
ws . Merge ( w )
if err != nil {
return nil , ws , err
}
return mergeStrings ( s1 , s2 ) , ws , nil
}
func mergeStrings ( a , b [ ] string ) [ ] string {
maxl := len ( a )
if len ( b ) > len ( a ) {
maxl = len ( b )
}
res := make ( [ ] string , 0 , maxl * 10 / 9 )
for len ( a ) > 0 && len ( b ) > 0 {
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
switch {
case a [ 0 ] == b [ 0 ] :
res = append ( res , a [ 0 ] )
a , b = a [ 1 : ] , b [ 1 : ]
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
case a [ 0 ] < b [ 0 ] :
res = append ( res , a [ 0 ] )
a = a [ 1 : ]
style: Replace `else if` cascades with `switch`
Wiser coders than myself have come to the conclusion that a `switch`
statement is almost always superior to a statement that includes any
`else if`.
The exceptions that I have found in our codebase are just these two:
* The `if else` is followed by an additional statement before the next
condition (separated by a `;`).
* The whole thing is within a `for` loop and `break` statements are
used. In this case, using `switch` would require tagging the `for`
loop, which probably tips the balance.
Why are `switch` statements more readable?
For one, fewer curly braces. But more importantly, the conditions all
have the same alignment, so the whole thing follows the natural flow
of going down a list of conditions. With `else if`, in contrast, all
conditions but the first are "hidden" behind `} else if `, harder to
spot and (for no good reason) presented differently from the first
condition.
I'm sure the aforemention wise coders can list even more reasons.
In any case, I like it so much that I have found myself recommending
it in code reviews. I would like to make it a habit in our code base,
without making it a hard requirement that we would test on the CI. But
for that, there has to be a role model, so this commit eliminates all
`if else` occurrences, unless it is autogenerated code or fits one of
the exceptions above.
Signed-off-by: beorn7 <beorn@grafana.com>
2 years ago
default :
res = append ( res , b [ 0 ] )
b = b [ 1 : ]
}
}
// Append all remaining elements.
res = append ( res , a ... )
res = append ( res , b ... )
return res
}
// LabelNames returns all the unique label names present in all queriers in sorted order.
func ( q * mergeGenericQuerier ) LabelNames ( ctx context . Context , hints * LabelHints , matchers ... * labels . Matcher ) ( [ ] string , annotations . Annotations , error ) {
var (
labelNamesMap = make ( map [ string ] struct { } )
warnings annotations . Annotations
)
for _ , querier := range q . queriers {
names , wrn , err := querier . LabelNames ( ctx , hints , matchers ... )
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings . Merge ( wrn )
}
if err != nil {
return nil , nil , fmt . Errorf ( "LabelNames() from merge generic querier: %w" , err )
}
for _ , name := range names {
labelNamesMap [ name ] = struct { } { }
}
}
if len ( labelNamesMap ) == 0 {
return nil , warnings , nil
}
labelNames := make ( [ ] string , 0 , len ( labelNamesMap ) )
for name := range labelNamesMap {
labelNames = append ( labelNames , name )
}
slices . Sort ( labelNames )
return labelNames , warnings , nil
}
// Close releases the resources of the generic querier.
func ( q * mergeGenericQuerier ) Close ( ) error {
errs := tsdb_errors . NewMulti ( )
for _ , querier := range q . queriers {
if err := querier . Close ( ) ; err != nil {
errs . Add ( err )
}
}
return errs . Err ( )
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func ( ... Series ) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet ( sets [ ] SeriesSet , mergeFunc VerticalSeriesMergeFunc ) SeriesSet {
genericSets := make ( [ ] genericSeriesSet , 0 , len ( sets ) )
for _ , s := range sets {
genericSets = append ( genericSets , & genericSeriesSetAdapter { s } )
}
return & seriesSetAdapter { newGenericMergeSeriesSet ( genericSets , ( & seriesMergerAdapter { VerticalSeriesMergeFunc : mergeFunc } ) . Merge ) }
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
// chunk series with the same labels into single ChunkSeries.
//
// NOTE: It's up to implementation how series are vertically merged (if chunks are sorted, re-encoded etc).
type VerticalChunkSeriesMergeFunc func ( ... ChunkSeries ) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet ( sets [ ] ChunkSeriesSet , mergeFunc VerticalChunkSeriesMergeFunc ) ChunkSeriesSet {
genericSets := make ( [ ] genericSeriesSet , 0 , len ( sets ) )
for _ , s := range sets {
genericSets = append ( genericSets , & genericChunkSeriesSetAdapter { s } )
}
return & chunkSeriesSetAdapter { newGenericMergeSeriesSet ( genericSets , ( & chunkSeriesMergerAdapter { VerticalChunkSeriesMergeFunc : mergeFunc } ) . Merge ) }
}
// genericMergeSeriesSet implements genericSeriesSet.
type genericMergeSeriesSet struct {
currentLabels labels . Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets [ ] genericSeriesSet
currentSets [ ] genericSeriesSet
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
// series returned by the series sets when iterating.
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet ( sets [ ] genericSeriesSet , mergeFunc genericSeriesMergeFunc ) genericSeriesSet {
if len ( sets ) == 1 {
return sets [ 0 ]
}
// We are pre-advancing sets, so we can introspect the label of the
// series under the cursor.
var h genericSeriesSetHeap
for _ , set := range sets {
if set == nil {
continue
}
if set . Next ( ) {
heap . Push ( & h , set )
}
if err := set . Err ( ) ; err != nil {
return errorOnlySeriesSet { err }
}
}
return & genericMergeSeriesSet {
mergeFunc : mergeFunc ,
sets : sets ,
heap : h ,
}
}
func ( c * genericMergeSeriesSet ) Next ( ) bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
for {
// Firstly advance all the current series sets. If any of them have run out,
// we can drop them, otherwise they should be inserted back into the heap.
for _ , set := range c . currentSets {
if set . Next ( ) {
heap . Push ( & c . heap , set )
}
}
if len ( c . heap ) == 0 {
return false
}
// Now, pop items of the heap that have equal label sets.
c . currentSets = c . currentSets [ : 0 ]
c . currentLabels = c . heap [ 0 ] . At ( ) . Labels ( )
for len ( c . heap ) > 0 && labels . Equal ( c . currentLabels , c . heap [ 0 ] . At ( ) . Labels ( ) ) {
set := heap . Pop ( & c . heap ) . ( genericSeriesSet )
c . currentSets = append ( c . currentSets , set )
}
// As long as the current set contains at least 1 set,
// then it should return true.
if len ( c . currentSets ) != 0 {
break
}
}
return true
}
func ( c * genericMergeSeriesSet ) At ( ) Labels {
if len ( c . currentSets ) == 1 {
return c . currentSets [ 0 ] . At ( )
}
series := make ( [ ] Labels , 0 , len ( c . currentSets ) )
for _ , seriesSet := range c . currentSets {
series = append ( series , seriesSet . At ( ) )
}
return c . mergeFunc ( series ... )
}
func ( c * genericMergeSeriesSet ) Err ( ) error {
for _ , set := range c . sets {
if err := set . Err ( ) ; err != nil {
return err
}
}
return nil
}
func ( c * genericMergeSeriesSet ) Warnings ( ) annotations . Annotations {
var ws annotations . Annotations
for _ , set := range c . sets {
ws . Merge ( set . Warnings ( ) )
}
return ws
}
type genericSeriesSetHeap [ ] genericSeriesSet
func ( h genericSeriesSetHeap ) Len ( ) int { return len ( h ) }
func ( h genericSeriesSetHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h genericSeriesSetHeap ) Less ( i , j int ) bool {
a , b := h [ i ] . At ( ) . Labels ( ) , h [ j ] . At ( ) . Labels ( )
return labels . Compare ( a , b ) < 0
}
func ( h * genericSeriesSetHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( genericSeriesSet ) )
}
func ( h * genericSeriesSetHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
// ChainedSeriesMerge returns single series from many same, potentially overlapping series by chaining samples together.
// If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
//
// This works the best with replicated series, where data from two series are exactly the same. This does not work well
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// It's optimized for non-overlap cases as well.
func ChainedSeriesMerge ( series ... Series ) Series {
if len ( series ) == 0 {
return nil
}
return & SeriesEntry {
Lset : series [ 0 ] . Labels ( ) ,
SampleIteratorFn : func ( it chunkenc . Iterator ) chunkenc . Iterator {
return ChainSampleIteratorFromSeries ( it , series )
} ,
}
}
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped. It's optimized for non-overlap cases as well.
type chainSampleIterator struct {
iterators [ ] chunkenc . Iterator
h samplesIteratorHeap
curr chunkenc . Iterator
lastT int64
// Whether the previous and the current sample are direct neighbors
// within the same base iterator.
consecutive bool
}
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
func getChainSampleIterator ( it chunkenc . Iterator , length int ) * chainSampleIterator {
csi , ok := it . ( * chainSampleIterator )
if ! ok {
csi = & chainSampleIterator { }
}
if cap ( csi . iterators ) < length {
csi . iterators = make ( [ ] chunkenc . Iterator , length )
} else {
csi . iterators = csi . iterators [ : length ]
}
csi . h = nil
csi . lastT = math . MinInt64
return csi
}
func ChainSampleIteratorFromSeries ( it chunkenc . Iterator , series [ ] Series ) chunkenc . Iterator {
csi := getChainSampleIterator ( it , len ( series ) )
for i , s := range series {
csi . iterators [ i ] = s . Iterator ( csi . iterators [ i ] )
}
return csi
}
func ChainSampleIteratorFromIterables ( it chunkenc . Iterator , iterables [ ] chunkenc . Iterable ) chunkenc . Iterator {
csi := getChainSampleIterator ( it , len ( iterables ) )
for i , c := range iterables {
csi . iterators [ i ] = c . Iterator ( csi . iterators [ i ] )
}
return csi
}
func ChainSampleIteratorFromIterators ( it chunkenc . Iterator , iterators [ ] chunkenc . Iterator ) chunkenc . Iterator {
csi := getChainSampleIterator ( it , 0 )
csi . iterators = iterators
return csi
}
func ( c * chainSampleIterator ) Seek ( t int64 ) chunkenc . ValueType {
// No-op check.
if c . curr != nil && c . lastT >= t {
return c . curr . Seek ( c . lastT )
}
// Don't bother to find out if the next sample is consecutive. Callers
// of Seek usually aren't interested anyway.
c . consecutive = false
c . h = samplesIteratorHeap { }
for _ , iter := range c . iterators {
if iter . Seek ( t ) == chunkenc . ValNone {
if iter . Err ( ) != nil {
// If any iterator is reporting an error, abort.
return chunkenc . ValNone
}
continue
}
heap . Push ( & c . h , iter )
}
if len ( c . h ) > 0 {
c . curr = heap . Pop ( & c . h ) . ( chunkenc . Iterator )
c . lastT = c . curr . AtT ( )
return c . curr . Seek ( c . lastT )
}
c . curr = nil
return chunkenc . ValNone
}
func ( c * chainSampleIterator ) At ( ) ( t int64 , v float64 ) {
if c . curr == nil {
panic ( "chainSampleIterator.At called before first .Next or after .Next returned false." )
}
return c . curr . At ( )
}
func ( c * chainSampleIterator ) AtHistogram ( h * histogram . Histogram ) ( int64 , * histogram . Histogram ) {
if c . curr == nil {
panic ( "chainSampleIterator.AtHistogram called before first .Next or after .Next returned false." )
}
t , h := c . curr . AtHistogram ( h )
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if ! c . consecutive && h . CounterResetHint != histogram . GaugeType {
h . CounterResetHint = histogram . UnknownCounterReset
}
return t , h
}
func ( c * chainSampleIterator ) AtFloatHistogram ( fh * histogram . FloatHistogram ) ( int64 , * histogram . FloatHistogram ) {
if c . curr == nil {
panic ( "chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false." )
}
t , fh := c . curr . AtFloatHistogram ( fh )
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore about counter resets for counter histograms.
// TODO(beorn7): If a `NotCounterReset` sample is followed by a
// non-consecutive `CounterReset` sample, we could keep the hint as
// `CounterReset`. But then we needed to track the previous sample
// in more detail, which might not be worth it.
if ! c . consecutive && fh . CounterResetHint != histogram . GaugeType {
fh . CounterResetHint = histogram . UnknownCounterReset
}
return t , fh
}
func ( c * chainSampleIterator ) AtT ( ) int64 {
if c . curr == nil {
panic ( "chainSampleIterator.AtT called before first .Next or after .Next returned false." )
}
return c . curr . AtT ( )
}
func ( c * chainSampleIterator ) Next ( ) chunkenc . ValueType {
var (
currT int64
currValueType chunkenc . ValueType
iteratorChanged bool
)
if c . h == nil {
iteratorChanged = true
c . h = samplesIteratorHeap { }
// We call c.curr.Next() as the first thing below.
// So, we don't call Next() on it here.
c . curr = c . iterators [ 0 ]
for _ , iter := range c . iterators [ 1 : ] {
if iter . Next ( ) == chunkenc . ValNone {
if iter . Err ( ) != nil {
// If any iterator is reporting an error, abort.
// If c.iterators[0] is reporting an error, we'll handle that below.
return chunkenc . ValNone
}
} else {
heap . Push ( & c . h , iter )
}
}
}
if c . curr == nil {
return chunkenc . ValNone
}
for {
currValueType = c . curr . Next ( )
if currValueType == chunkenc . ValNone {
if c . curr . Err ( ) != nil {
// Abort if we've hit an error.
return chunkenc . ValNone
}
if len ( c . h ) == 0 {
// No iterator left to iterate.
c . curr = nil
return chunkenc . ValNone
}
} else {
currT = c . curr . AtT ( )
if currT == c . lastT {
// Ignoring sample for the same timestamp.
continue
}
if len ( c . h ) == 0 {
// curr is the only iterator remaining,
// no need to check with the heap.
break
}
// Check current iterator with the top of the heap.
nextT := c . h [ 0 ] . AtT ( )
if currT < nextT {
// Current iterator has smaller timestamp than the heap.
break
}
// Current iterator does not hold the smallest timestamp.
heap . Push ( & c . h , c . curr )
}
c . curr = heap . Pop ( & c . h ) . ( chunkenc . Iterator )
iteratorChanged = true
currT = c . curr . AtT ( )
currValueType = c . curr . Seek ( currT )
if currT != c . lastT {
break
}
}
c . consecutive = ! iteratorChanged
c . lastT = currT
return currValueType
}
func ( c * chainSampleIterator ) Err ( ) error {
errs := tsdb_errors . NewMulti ( )
for _ , iter := range c . iterators {
errs . Add ( iter . Err ( ) )
}
return errs . Err ( )
}
type samplesIteratorHeap [ ] chunkenc . Iterator
func ( h samplesIteratorHeap ) Len ( ) int { return len ( h ) }
func ( h samplesIteratorHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h samplesIteratorHeap ) Less ( i , j int ) bool {
return h [ i ] . AtT ( ) < h [ j ] . AtT ( )
}
func ( h * samplesIteratorHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( chunkenc . Iterator ) )
}
func ( h * samplesIteratorHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
// NewCompactingChunkSeriesMerger returns VerticalChunkSeriesMergeFunc that merges the same chunk series into single chunk series.
// In case of the chunk overlaps, it compacts those into one or more time-ordered non-overlapping chunks with merged data.
// Samples from overlapped chunks are merged using series vertical merge func.
// It expects the same labels for each given series.
//
// NOTE: Use the returned merge function only when you see potentially overlapping series, as this introduces small a overhead
// to handle overlaps between series.
func NewCompactingChunkSeriesMerger ( mergeFunc VerticalSeriesMergeFunc ) VerticalChunkSeriesMergeFunc {
return func ( series ... ChunkSeries ) ChunkSeries {
if len ( series ) == 0 {
return nil
}
return & ChunkSeriesEntry {
Lset : series [ 0 ] . Labels ( ) ,
ChunkIteratorFn : func ( chunks . Iterator ) chunks . Iterator {
iterators := make ( [ ] chunks . Iterator , 0 , len ( series ) )
for _ , s := range series {
iterators = append ( iterators , s . Iterator ( nil ) )
}
return & compactChunkIterator {
mergeFunc : mergeFunc ,
iterators : iterators ,
}
} ,
}
}
}
// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
// If time-overlapping chunks are found, they are encoded and passed to series merge and encoded again into one bigger chunk.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
type compactChunkIterator struct {
mergeFunc VerticalSeriesMergeFunc
iterators [ ] chunks . Iterator
h chunkIteratorHeap
err error
curr chunks . Meta
}
func ( c * compactChunkIterator ) At ( ) chunks . Meta {
return c . curr
}
func ( c * compactChunkIterator ) Next ( ) bool {
if c . h == nil {
for _ , iter := range c . iterators {
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
}
}
if len ( c . h ) == 0 {
return false
}
iter := heap . Pop ( & c . h ) . ( chunks . Iterator )
c . curr = iter . At ( )
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
var (
overlapping [ ] Series
oMaxTime = c . curr . MaxTime
prev = c . curr
)
// Detect overlaps to compact. Be smart about it and deduplicate on the fly if chunks are identical.
for len ( c . h ) > 0 {
// Get the next oldest chunk by min, then max time.
next := c . h [ 0 ] . At ( )
if next . MinTime > oMaxTime {
// No overlap with current one.
break
}
// Only do something if it is not a perfect duplicate.
if next . MinTime != prev . MinTime ||
next . MaxTime != prev . MaxTime ||
! bytes . Equal ( next . Chunk . Bytes ( ) , prev . Chunk . Bytes ( ) ) {
// We operate on same series, so labels do not matter here.
overlapping = append ( overlapping , newChunkToSeriesDecoder ( labels . EmptyLabels ( ) , next ) )
if next . MaxTime > oMaxTime {
oMaxTime = next . MaxTime
}
prev = next
}
iter := heap . Pop ( & c . h ) . ( chunks . Iterator )
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
}
if len ( overlapping ) == 0 {
return true
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder ( c . mergeFunc ( append ( overlapping , newChunkToSeriesDecoder ( labels . EmptyLabels ( ) , c . curr ) ) ... ) ) . Iterator ( nil )
if ! iter . Next ( ) {
if c . err = iter . Err ( ) ; c . err != nil {
return false
}
panic ( "unexpected seriesToChunkEncoder lack of iterations" )
}
c . curr = iter . At ( )
if iter . Next ( ) {
heap . Push ( & c . h , iter )
}
return true
}
func ( c * compactChunkIterator ) Err ( ) error {
errs := tsdb_errors . NewMulti ( )
for _ , iter := range c . iterators {
errs . Add ( iter . Err ( ) )
}
errs . Add ( c . err )
return errs . Err ( )
}
type chunkIteratorHeap [ ] chunks . Iterator
func ( h chunkIteratorHeap ) Len ( ) int { return len ( h ) }
func ( h chunkIteratorHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h chunkIteratorHeap ) Less ( i , j int ) bool {
at := h [ i ] . At ( )
bt := h [ j ] . At ( )
if at . MinTime == bt . MinTime {
return at . MaxTime < bt . MaxTime
}
return at . MinTime < bt . MinTime
}
func ( h * chunkIteratorHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( chunks . Iterator ) )
}
func ( h * chunkIteratorHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
// NewConcatenatingChunkSeriesMerger returns a VerticalChunkSeriesMergeFunc that simply concatenates the
// chunks from the series. The resultant stream of chunks for a series might be overlapping and unsorted.
func NewConcatenatingChunkSeriesMerger ( ) VerticalChunkSeriesMergeFunc {
return func ( series ... ChunkSeries ) ChunkSeries {
if len ( series ) == 0 {
return nil
}
return & ChunkSeriesEntry {
Lset : series [ 0 ] . Labels ( ) ,
ChunkIteratorFn : func ( chunks . Iterator ) chunks . Iterator {
iterators := make ( [ ] chunks . Iterator , 0 , len ( series ) )
for _ , s := range series {
iterators = append ( iterators , s . Iterator ( nil ) )
}
return & concatenatingChunkIterator {
iterators : iterators ,
}
} ,
}
}
}
type concatenatingChunkIterator struct {
iterators [ ] chunks . Iterator
idx int
curr chunks . Meta
}
func ( c * concatenatingChunkIterator ) At ( ) chunks . Meta {
return c . curr
}
func ( c * concatenatingChunkIterator ) Next ( ) bool {
if c . idx >= len ( c . iterators ) {
return false
}
if c . iterators [ c . idx ] . Next ( ) {
c . curr = c . iterators [ c . idx ] . At ( )
return true
}
if c . iterators [ c . idx ] . Err ( ) != nil {
return false
}
c . idx ++
return c . Next ( )
}
func ( c * concatenatingChunkIterator ) Err ( ) error {
errs := tsdb_errors . NewMulti ( )
for _ , iter := range c . iterators {
errs . Add ( iter . Err ( ) )
}
return errs . Err ( )
}