|
|
|
@ -514,13 +514,18 @@ type HeadAndOOOQuerier struct {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewHeadAndOOOQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier { |
|
|
|
|
isoState := head.iso.State(mint, maxt) |
|
|
|
|
cr := &headChunkReader{ |
|
|
|
|
head: head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
isoState: head.iso.State(mint, maxt), |
|
|
|
|
} |
|
|
|
|
return &HeadAndOOOQuerier{ |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
head: head, |
|
|
|
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), |
|
|
|
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), |
|
|
|
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), |
|
|
|
|
querier: querier, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -552,13 +557,18 @@ type HeadAndOOOChunkQuerier struct {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier { |
|
|
|
|
isoState := head.iso.State(mint, maxt) |
|
|
|
|
cr := &headChunkReader{ |
|
|
|
|
head: head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
isoState: head.iso.State(mint, maxt), |
|
|
|
|
} |
|
|
|
|
return &HeadAndOOOChunkQuerier{ |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
head: head, |
|
|
|
|
index: NewHeadAndOOOIndexReader(head, mint, maxt, oooIsoState.minRef), |
|
|
|
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, isoState, oooIsoState, 0), |
|
|
|
|
chunkr: NewHeadAndOOOChunkReader(head, mint, maxt, cr, oooIsoState, 0), |
|
|
|
|
querier: querier, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -618,19 +628,19 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type HeadAndOOOChunkReader struct { |
|
|
|
|
cr headChunkReader |
|
|
|
|
head *Head |
|
|
|
|
mint, maxt int64 |
|
|
|
|
cr *headChunkReader // If nil, only read OOO chunks.
|
|
|
|
|
maxMmapRef chunks.ChunkDiskMapperRef |
|
|
|
|
oooIsoState *oooIsolationState |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, isoState *isolationState, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { |
|
|
|
|
func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader { |
|
|
|
|
return &HeadAndOOOChunkReader{ |
|
|
|
|
cr: headChunkReader{ |
|
|
|
|
head: head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
isoState: isoState, |
|
|
|
|
}, |
|
|
|
|
cr: cr, |
|
|
|
|
maxMmapRef: maxMmapRef, |
|
|
|
|
oooIsoState: oooIsoState, |
|
|
|
|
} |
|
|
|
@ -642,14 +652,14 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chu
|
|
|
|
|
return cr.cr.ChunkOrIterable(meta) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s := cr.cr.head.series.getByID(sid) |
|
|
|
|
s := cr.head.series.getByID(sid) |
|
|
|
|
// This means that the series has been garbage collected.
|
|
|
|
|
if s == nil { |
|
|
|
|
return nil, nil, storage.ErrNotFound |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s.Lock() |
|
|
|
|
mc, err := s.oooMergedChunks(meta, cr.cr.head.chunkDiskMapper, &cr.cr, cr.cr.mint, cr.cr.maxt, cr.maxMmapRef) |
|
|
|
|
mc, err := s.oooMergedChunks(meta, cr.head.chunkDiskMapper, cr.cr, cr.mint, cr.maxt, cr.maxMmapRef) |
|
|
|
|
s.Unlock() |
|
|
|
|
|
|
|
|
|
return nil, mc, err |
|
|
|
@ -666,7 +676,7 @@ func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chun
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (cr *HeadAndOOOChunkReader) Close() error { |
|
|
|
|
if cr.cr.isoState != nil { |
|
|
|
|
if cr.cr != nil && cr.cr.isoState != nil { |
|
|
|
|
cr.cr.isoState.Close() |
|
|
|
|
} |
|
|
|
|
if cr.oooIsoState != nil { |
|
|
|
|