@ -30,19 +30,6 @@ import (
"github.com/prometheus/prometheus/util/annotations"
)
var _ IndexReader = & OOOHeadIndexReader { }
// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be
// accessed.
// It also has a reference to headIndexReader so we can leverage on its
// IndexReader implementation for all the methods that remain the same. We
// decided to do this to avoid code duplication.
// The only methods that change are the ones about getting Series and Postings.
type OOOHeadIndexReader struct {
* headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
lastGarbageCollectedMmapRef chunks . ChunkDiskMapperRef
}
var _ chunkenc . Iterable = & mergedOOOChunks { }
// mergedOOOChunks holds the list of iterables for overlapping chunks.
@ -54,48 +41,11 @@ func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator
return storage . ChainSampleIteratorFromIterables ( iterator , o . chunkIterables )
}
func NewOOOHeadIndexReader ( head * Head , mint , maxt int64 , lastGarbageCollectedMmapRef chunks . ChunkDiskMapperRef ) * OOOHeadIndexReader {
hr := & headIndexReader {
head : head ,
mint : mint ,
maxt : maxt ,
}
return & OOOHeadIndexReader { hr , lastGarbageCollectedMmapRef }
}
func ( oh * OOOHeadIndexReader ) Series ( ref storage . SeriesRef , builder * labels . ScratchBuilder , chks * [ ] chunks . Meta ) error {
return oh . series ( ref , builder , chks , oh . lastGarbageCollectedMmapRef , 0 )
}
// lastGarbageCollectedMmapRef gives the last mmap chunk that may be being garbage collected and so
// any chunk at or before this ref will not be considered. 0 disables this check.
//
// maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then
// the oooHeadChunk will not be considered.
func ( oh * OOOHeadIndexReader ) series ( ref storage . SeriesRef , builder * labels . ScratchBuilder , chks * [ ] chunks . Meta , lastGarbageCollectedMmapRef , maxMmapRef chunks . ChunkDiskMapperRef ) error {
s := oh . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
oh . head . metrics . seriesNotFound . Inc ( )
return storage . ErrNotFound
}
builder . Assign ( s . labels ( ) )
if chks == nil {
return nil
}
s . Lock ( )
defer s . Unlock ( )
* chks = ( * chks ) [ : 0 ]
if s . ooo == nil {
return nil
}
return getOOOSeriesChunks ( s , oh . mint , oh . maxt , lastGarbageCollectedMmapRef , maxMmapRef , false , chks )
}
func getOOOSeriesChunks ( s * memSeries , mint , maxt int64 , lastGarbageCollectedMmapRef , maxMmapRef chunks . ChunkDiskMapperRef , includeInOrder bool , chks * [ ] chunks . Meta ) error {
tmpChks := make ( [ ] chunks . Meta , 0 , len ( s . ooo . oooMmappedChunks ) )
@ -176,21 +126,6 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
return nil
}
// LabelValues needs to be overridden from the headIndexReader implementation due
// to the check that happens at the beginning where we make sure that the query
// interval overlaps with the head minooot and maxooot.
func ( oh * OOOHeadIndexReader ) LabelValues ( ctx context . Context , name string , matchers ... * labels . Matcher ) ( [ ] string , error ) {
if oh . maxt < oh . head . MinOOOTime ( ) || oh . mint > oh . head . MaxOOOTime ( ) {
return [ ] string { } , nil
}
if len ( matchers ) == 0 {
return oh . head . postings . LabelValues ( ctx , name ) , nil
}
return labelValuesWithMatchers ( ctx , oh , name , matchers ... )
}
type chunkMetaAndChunkDiskMapperRef struct {
meta chunks . Meta
ref chunks . ChunkDiskMapperRef
@ -232,24 +167,8 @@ func lessByMinTimeAndMinRef(a, b chunks.Meta) int {
}
}
func ( oh * OOOHeadIndexReader ) Postings ( ctx context . Context , name string , values ... string ) ( index . Postings , error ) {
switch len ( values ) {
case 0 :
return index . EmptyPostings ( ) , nil
case 1 :
return oh . head . postings . Get ( name , values [ 0 ] ) , nil // TODO(ganesh) Also call GetOOOPostings
default :
// TODO(ganesh) We want to only return postings for out of order series.
res := make ( [ ] index . Postings , 0 , len ( values ) )
for _ , value := range values {
res = append ( res , oh . head . postings . Get ( name , value ) ) // TODO(ganesh) Also call GetOOOPostings
}
return index . Merge ( ctx , res ... ) , nil
}
}
type OOOCompactionHead struct {
oooIR * OOOHeadIndexReader
head * Head
lastMmapRef chunks . ChunkDiskMapperRef
lastWBLFile int
postings [ ] storage . SeriesRef
@ -266,6 +185,7 @@ type OOOCompactionHead struct {
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
func NewOOOCompactionHead ( ctx context . Context , head * Head ) ( * OOOCompactionHead , error ) {
ch := & OOOCompactionHead {
head : head ,
chunkRange : head . chunkRange . Load ( ) ,
mint : math . MaxInt64 ,
maxt : math . MinInt64 ,
@ -279,15 +199,14 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
ch . lastWBLFile = lastWBLFile
}
ch . oooIR = NewOOOHeadIndexReader ( head , math . MinInt64 , math . MaxInt64 , 0 )
hr := headIndexReader { head : head , mint : ch . mint , maxt : ch . maxt }
n , v := index . AllPostingsKey ( )
// TODO: verify this gets only ooo samples.
p , err := ch . oooIR . Postings ( ctx , n , v )
// TODO: filter to series with OOO samples, before sorting.
p , err := hr . Postings ( ctx , n , v )
if err != nil {
return nil , err
}
p = ch . oooIR . SortedPostings ( p )
p = hr . SortedPostings ( p )
var lastSeq , lastOff int
for p . Next ( ) {
@ -344,7 +263,7 @@ func (ch *OOOCompactionHead) Index() (IndexReader, error) {
}
func ( ch * OOOCompactionHead ) Chunks ( ) ( ChunkReader , error ) {
return NewHeadAndOOOChunkReader ( ch . oooIR . head , ch . oooIR . mint , ch . oooIR . maxt , nil , nil , ch . lastMmapRef ) , nil
return NewHeadAndOOOChunkReader ( ch . head , ch . mint , ch . maxt , nil , nil , ch . lastMmapRef ) , nil
}
func ( ch * OOOCompactionHead ) Tombstones ( ) ( tombstones . Reader , error ) {
@ -370,12 +289,12 @@ func (ch *OOOCompactionHead) Meta() BlockMeta {
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
func ( ch * OOOCompactionHead ) CloneForTimeRange ( mint , maxt int64 ) * OOOCompactionHead {
return & OOOCompactionHead {
oooIR : NewOOOHeadIndexReader ( ch . oooIR . head , mint , maxt , 0 ) ,
head : ch . head ,
lastMmapRef : ch . lastMmapRef ,
postings : ch . postings ,
chunkRange : ch . chunkRange ,
mint : ch . mint ,
maxt : ch . maxt ,
mint : mint ,
maxt : maxt ,
}
}
@ -395,7 +314,8 @@ func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader {
}
func ( ir * OOOCompactionHeadIndexReader ) Symbols ( ) index . StringIter {
return ir . ch . oooIR . Symbols ( )
hr := headIndexReader { head : ir . ch . head , mint : ir . ch . mint , maxt : ir . ch . maxt }
return hr . Symbols ( )
}
func ( ir * OOOCompactionHeadIndexReader ) Postings ( _ context . Context , name string , values ... string ) ( index . Postings , error ) {
@ -416,11 +336,28 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P
}
func ( ir * OOOCompactionHeadIndexReader ) ShardedPostings ( p index . Postings , shardIndex , shardCount uint64 ) index . Postings {
return ir . ch . oooIR . ShardedPostings ( p , shardIndex , shardCount )
hr := headIndexReader { head : ir . ch . head , mint : ir . ch . mint , maxt : ir . ch . maxt }
return hr . ShardedPostings ( p , shardIndex , shardCount )
}
func ( ir * OOOCompactionHeadIndexReader ) Series ( ref storage . SeriesRef , builder * labels . ScratchBuilder , chks * [ ] chunks . Meta ) error {
return ir . ch . oooIR . series ( ref , builder , chks , 0 , ir . ch . lastMmapRef )
s := ir . ch . head . series . getByID ( chunks . HeadSeriesRef ( ref ) )
if s == nil {
ir . ch . head . metrics . seriesNotFound . Inc ( )
return storage . ErrNotFound
}
builder . Assign ( s . lset )
s . Lock ( )
defer s . Unlock ( )
* chks = ( * chks ) [ : 0 ]
if s . ooo == nil {
return nil
}
return getOOOSeriesChunks ( s , ir . ch . mint , ir . ch . maxt , 0 , ir . ch . lastMmapRef , false , chks )
}
func ( ir * OOOCompactionHeadIndexReader ) SortedLabelValues ( _ context . Context , name string , matchers ... * labels . Matcher ) ( [ ] string , error ) {
@ -448,7 +385,7 @@ func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, posti
}
func ( ir * OOOCompactionHeadIndexReader ) Close ( ) error {
return ir . ch . oooIR . Close ( )
return nil
}
// HeadAndOOOQuerier queries both the head and the out-of-order head.