mirror of https://github.com/prometheus/prometheus
Browse Source
This implementation is based on this design doc: https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing This commit adds support to accept out-of-order ("OOO") sample into the TSDB up to a configurable time allowance. If OOO is enabled, overlapping querying are automatically enabled. Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Jesus Vazquez <jesus.vazquez@grafana.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Dieter Plaetinck <dieter@grafana.com>owilliams/utf8-02-mimir
Ganesh Vernekar
2 years ago
committed by
GitHub
39 changed files with 5656 additions and 293 deletions
@ -0,0 +1,80 @@
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"sort" |
||||
) |
||||
|
||||
type sample struct { |
||||
t int64 |
||||
v float64 |
||||
} |
||||
|
||||
// OOOChunk maintains samples in time-ascending order.
|
||||
// Inserts for timestamps already seen, are dropped.
|
||||
// Samples are stored uncompressed to allow easy sorting.
|
||||
// Perhaps we can be more efficient later.
|
||||
type OOOChunk struct { |
||||
samples []sample |
||||
} |
||||
|
||||
func NewOOOChunk(capacity int) *OOOChunk { |
||||
return &OOOChunk{samples: make([]sample, 0, capacity)} |
||||
} |
||||
|
||||
// Insert inserts the sample such that order is maintained.
|
||||
// Returns false if insert was not possible due to the same timestamp already existing.
|
||||
func (o *OOOChunk) Insert(t int64, v float64) bool { |
||||
// find index of sample we should replace
|
||||
i := sort.Search(len(o.samples), func(i int) bool { return o.samples[i].t >= t }) |
||||
|
||||
if i >= len(o.samples) { |
||||
// none found. append it at the end
|
||||
o.samples = append(o.samples, sample{t, v}) |
||||
return true |
||||
} |
||||
|
||||
if o.samples[i].t == t { |
||||
return false |
||||
} |
||||
|
||||
// expand length by 1 to make room. use a zero sample, we will overwrite it anyway
|
||||
o.samples = append(o.samples, sample{}) |
||||
copy(o.samples[i+1:], o.samples[i:]) |
||||
o.samples[i] = sample{t, v} |
||||
|
||||
return true |
||||
} |
||||
|
||||
func (o *OOOChunk) NumSamples() int { |
||||
return len(o.samples) |
||||
} |
||||
|
||||
func (o *OOOChunk) ToXor() (*XORChunk, error) { |
||||
x := NewXORChunk() |
||||
app, err := x.Appender() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for _, s := range o.samples { |
||||
app.Append(s.t, s.v) |
||||
} |
||||
return x, nil |
||||
} |
||||
|
||||
func (o *OOOChunk) ToXorBetweenTimestamps(mint, maxt int64) (*XORChunk, error) { |
||||
x := NewXORChunk() |
||||
app, err := x.Appender() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
for _, s := range o.samples { |
||||
if s.t < mint { |
||||
continue |
||||
} |
||||
if s.t > maxt { |
||||
break |
||||
} |
||||
app.Append(s.t, s.v) |
||||
} |
||||
return x, nil |
||||
} |
@ -0,0 +1,84 @@
|
||||
package chunkenc |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
const testMaxSize int = 32 |
||||
|
||||
// formulas chosen to make testing easy:
|
||||
func valPre(pos int) int { return pos*2 + 2 } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 // predictable pre-existing values
|
||||
func valNew(pos int) int { return pos*2 + 1 } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 // new values will interject at chosen position because they sort before the pre-existing vals
|
||||
|
||||
func samplify(v int) sample { return sample{int64(v), float64(v)} } |
||||
|
||||
func makePre(n int) []sample { |
||||
s := make([]sample, n) |
||||
for i := 0; i < n; i++ { |
||||
s[i] = samplify(valPre(i)) |
||||
} |
||||
return s |
||||
} |
||||
|
||||
// TestOOOInsert tests the following cases:
|
||||
// number of pre-existing samples anywhere from 0 to testMaxSize-1
|
||||
// insert new sample before first pre-existing samples, after the last, and anywhere in between
|
||||
// with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
|
||||
// Note: in all samples used, t always equals v in numeric value. when we talk about 'value' we just refer to a value that will be used for both sample.t and sample.v
|
||||
func TestOOOInsert(t *testing.T) { |
||||
for numPre := 0; numPre <= testMaxSize; numPre++ { |
||||
// for example, if we have numPre 2, then:
|
||||
// chunk.samples indexes filled 0 1
|
||||
// chunk.samples with these values 2 4 // valPre
|
||||
// we want to test inserting at index 0 1 2 // insertPos=0..numPre
|
||||
// we can do this by using values 1, 3 5 // valNew(insertPos)
|
||||
|
||||
for insertPos := 0; insertPos <= numPre; insertPos++ { |
||||
for capacity := range []int{testMaxSize / 8, testMaxSize} { |
||||
chunk := NewOOOChunk(capacity) |
||||
chunk.samples = makePre(numPre) |
||||
newSample := samplify(valNew(insertPos)) |
||||
chunk.Insert(newSample.t, newSample.v) |
||||
|
||||
var expSamples []sample |
||||
// our expected new samples slice, will be first the original samples...
|
||||
for i := 0; i < insertPos; i++ { |
||||
expSamples = append(expSamples, samplify(valPre(i))) |
||||
} |
||||
// ... then the new sample ...
|
||||
expSamples = append(expSamples, newSample) |
||||
// ... followed by any original samples that were pushed back by the new one
|
||||
for i := insertPos; i < numPre; i++ { |
||||
expSamples = append(expSamples, samplify(valPre(i))) |
||||
} |
||||
|
||||
require.Equal(t, expSamples, chunk.samples, "numPre %d, insertPos %d", numPre, insertPos) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TestOOOInsertDuplicate tests the correct behavior when inserting a sample that is a duplicate of any
|
||||
// pre-existing samples, with between 1 and testMaxSize pre-existing samples and
|
||||
// with a chunk initial capacity of testMaxSize/8 and testMaxSize, which lets us test non-full and full chunks, and chunks that need to expand themselves.
|
||||
func TestOOOInsertDuplicate(t *testing.T) { |
||||
for numPre := 1; numPre <= testMaxSize; numPre++ { |
||||
for dupPos := 0; dupPos < numPre; dupPos++ { |
||||
for capacity := range []int{testMaxSize / 8, testMaxSize} { |
||||
chunk := NewOOOChunk(capacity) |
||||
chunk.samples = makePre(numPre) |
||||
|
||||
dupSample := chunk.samples[dupPos] |
||||
dupSample.v = 0.123 // unmistakeably different from any of the pre-existing values, so we can properly detect the correct value below
|
||||
|
||||
ok := chunk.Insert(dupSample.t, dupSample.v) |
||||
|
||||
expSamples := makePre(numPre) // we expect no change
|
||||
require.False(t, ok) |
||||
require.Equal(t, expSamples, chunk.samples, "numPre %d, dupPos %d", numPre, dupPos) |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,165 @@
|
||||
package tsdb |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc" |
||||
) |
||||
|
||||
func TestBoundedChunk(t *testing.T) { |
||||
tests := []struct { |
||||
name string |
||||
inputChunk chunkenc.Chunk |
||||
inputMinT int64 |
||||
inputMaxT int64 |
||||
initialSeek int64 |
||||
seekIsASuccess bool |
||||
expSamples []sample |
||||
}{ |
||||
{ |
||||
name: "if there are no samples it returns nothing", |
||||
inputChunk: newTestChunk(0), |
||||
expSamples: nil, |
||||
}, |
||||
{ |
||||
name: "bounds represent a single sample", |
||||
inputChunk: newTestChunk(10), |
||||
expSamples: []sample{ |
||||
{0, 0}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if there are bounds set only samples within them are returned", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 1, |
||||
inputMaxT: 8, |
||||
expSamples: []sample{ |
||||
{1, 1}, |
||||
{2, 2}, |
||||
{3, 3}, |
||||
{4, 4}, |
||||
{5, 5}, |
||||
{6, 6}, |
||||
{7, 7}, |
||||
{8, 8}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if bounds set and only maxt is less than actual maxt", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 0, |
||||
inputMaxT: 5, |
||||
expSamples: []sample{ |
||||
{0, 0}, |
||||
{1, 1}, |
||||
{2, 2}, |
||||
{3, 3}, |
||||
{4, 4}, |
||||
{5, 5}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if bounds set and only mint is more than actual mint", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 5, |
||||
inputMaxT: 9, |
||||
expSamples: []sample{ |
||||
{5, 5}, |
||||
{6, 6}, |
||||
{7, 7}, |
||||
{8, 8}, |
||||
{9, 9}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if there are bounds set with seek before mint", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 3, |
||||
inputMaxT: 7, |
||||
initialSeek: 1, |
||||
seekIsASuccess: true, |
||||
expSamples: []sample{ |
||||
{3, 3}, |
||||
{4, 4}, |
||||
{5, 5}, |
||||
{6, 6}, |
||||
{7, 7}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if there are bounds set with seek between mint and maxt", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 3, |
||||
inputMaxT: 7, |
||||
initialSeek: 5, |
||||
seekIsASuccess: true, |
||||
expSamples: []sample{ |
||||
{5, 5}, |
||||
{6, 6}, |
||||
{7, 7}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "if there are bounds set with seek after maxt", |
||||
inputChunk: newTestChunk(10), |
||||
inputMinT: 3, |
||||
inputMaxT: 7, |
||||
initialSeek: 8, |
||||
seekIsASuccess: false, |
||||
}, |
||||
} |
||||
for _, tc := range tests { |
||||
t.Run(fmt.Sprintf("name=%s", tc.name), func(t *testing.T) { |
||||
chunk := boundedChunk{tc.inputChunk, tc.inputMinT, tc.inputMaxT} |
||||
|
||||
// Testing Bytes()
|
||||
expChunk := chunkenc.NewXORChunk() |
||||
if tc.inputChunk.NumSamples() > 0 { |
||||
app, err := expChunk.Appender() |
||||
require.NoError(t, err) |
||||
for ts := tc.inputMinT; ts <= tc.inputMaxT; ts++ { |
||||
app.Append(ts, float64(ts)) |
||||
} |
||||
} |
||||
require.Equal(t, expChunk.Bytes(), chunk.Bytes()) |
||||
|
||||
var samples []sample |
||||
it := chunk.Iterator(nil) |
||||
|
||||
if tc.initialSeek != 0 { |
||||
// Testing Seek()
|
||||
ok := it.Seek(tc.initialSeek) |
||||
require.Equal(t, tc.seekIsASuccess, ok) |
||||
if ok { |
||||
t, v := it.At() |
||||
samples = append(samples, sample{t, v}) |
||||
} |
||||
} |
||||
|
||||
// Testing Next()
|
||||
for it.Next() { |
||||
t, v := it.At() |
||||
samples = append(samples, sample{t, v}) |
||||
} |
||||
|
||||
// it.Next() should keep returning false.
|
||||
for i := 0; i < 10; i++ { |
||||
require.False(t, it.Next()) |
||||
} |
||||
|
||||
require.Equal(t, tc.expSamples, samples) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func newTestChunk(numSamples int) chunkenc.Chunk { |
||||
xor := chunkenc.NewXORChunk() |
||||
a, _ := xor.Appender() |
||||
for i := 0; i < numSamples; i++ { |
||||
a.Append(int64(i), float64(i)) |
||||
} |
||||
return xor |
||||
} |
@ -0,0 +1,74 @@
|
||||
package tsdb |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones" |
||||
) |
||||
|
||||
var _ BlockReader = &OOORangeHead{} |
||||
|
||||
// OOORangeHead allows querying Head out of order samples via BlockReader
|
||||
// interface implementation.
|
||||
type OOORangeHead struct { |
||||
head *Head |
||||
// mint and maxt are tracked because when a query is handled we only want
|
||||
// the timerange of the query and having preexisting pointers to the first
|
||||
// and last timestamp help with that.
|
||||
mint, maxt int64 |
||||
} |
||||
|
||||
func NewOOORangeHead(head *Head, mint, maxt int64) *OOORangeHead { |
||||
return &OOORangeHead{ |
||||
head: head, |
||||
mint: mint, |
||||
maxt: maxt, |
||||
} |
||||
} |
||||
|
||||
func (oh *OOORangeHead) Index() (IndexReader, error) { |
||||
return NewOOOHeadIndexReader(oh.head, oh.mint, oh.maxt), nil |
||||
} |
||||
|
||||
func (oh *OOORangeHead) Chunks() (ChunkReader, error) { |
||||
return NewOOOHeadChunkReader(oh.head, oh.mint, oh.maxt), nil |
||||
} |
||||
|
||||
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error) { |
||||
// As stated in the design doc https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing
|
||||
// Tombstones are not supported for out of order metrics.
|
||||
return tombstones.NewMemTombstones(), nil |
||||
} |
||||
|
||||
func (oh *OOORangeHead) Meta() BlockMeta { |
||||
var id [16]byte |
||||
copy(id[:], "____ooo_head____") |
||||
return BlockMeta{ |
||||
MinTime: oh.mint, |
||||
MaxTime: oh.maxt, |
||||
ULID: id, |
||||
Stats: BlockStats{ |
||||
NumSeries: oh.head.NumSeries(), |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// Size returns the size taken by the Head block.
|
||||
func (oh *OOORangeHead) Size() int64 { |
||||
return oh.head.Size() |
||||
} |
||||
|
||||
// String returns an human readable representation of the out of order range
|
||||
// head. It's important to keep this function in order to avoid the struct dump
|
||||
// when the head is stringified in errors or logs.
|
||||
func (oh *OOORangeHead) String() string { |
||||
return fmt.Sprintf("ooo range head (mint: %d, maxt: %d)", oh.MinTime(), oh.MaxTime()) |
||||
} |
||||
|
||||
func (oh *OOORangeHead) MinTime() int64 { |
||||
return oh.mint |
||||
} |
||||
|
||||
func (oh *OOORangeHead) MaxTime() int64 { |
||||
return oh.maxt |
||||
} |
@ -0,0 +1,409 @@
|
||||
package tsdb |
||||
|
||||
import ( |
||||
"errors" |
||||
"math" |
||||
"sort" |
||||
|
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/prometheus/prometheus/storage" |
||||
"github.com/prometheus/prometheus/tsdb/chunkenc" |
||||
"github.com/prometheus/prometheus/tsdb/chunks" |
||||
"github.com/prometheus/prometheus/tsdb/index" |
||||
"github.com/prometheus/prometheus/tsdb/tombstones" |
||||
) |
||||
|
||||
var _ IndexReader = &OOOHeadIndexReader{} |
||||
|
||||
// OOOHeadIndexReader implements IndexReader so ooo samples in the head can be
|
||||
// accessed.
|
||||
// It also has a reference to headIndexReader so we can leverage on its
|
||||
// IndexReader implementation for all the methods that remain the same. We
|
||||
// decided to do this to avoid code duplication.
|
||||
// The only methods that change are the ones about getting Series and Postings.
|
||||
type OOOHeadIndexReader struct { |
||||
*headIndexReader // A reference to the headIndexReader so we can reuse as many interface implementation as possible.
|
||||
} |
||||
|
||||
func NewOOOHeadIndexReader(head *Head, mint, maxt int64) *OOOHeadIndexReader { |
||||
hr := &headIndexReader{ |
||||
head: head, |
||||
mint: mint, |
||||
maxt: maxt, |
||||
} |
||||
return &OOOHeadIndexReader{hr} |
||||
} |
||||
|
||||
func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta) error { |
||||
return oh.series(ref, lbls, chks, 0) |
||||
} |
||||
|
||||
// The passed lastMmapRef tells upto what max m-map chunk that we can consider.
|
||||
// If it is 0, it means all chunks need to be considered.
|
||||
// If it is non-0, then the oooHeadChunk must not be considered.
|
||||
func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta, lastMmapRef chunks.ChunkDiskMapperRef) error { |
||||
s := oh.head.series.getByID(chunks.HeadSeriesRef(ref)) |
||||
|
||||
if s == nil { |
||||
oh.head.metrics.seriesNotFound.Inc() |
||||
return storage.ErrNotFound |
||||
} |
||||
*lbls = append((*lbls)[:0], s.lset...) |
||||
|
||||
if chks == nil { |
||||
return nil |
||||
} |
||||
|
||||
s.Lock() |
||||
defer s.Unlock() |
||||
*chks = (*chks)[:0] |
||||
|
||||
tmpChks := make([]chunks.Meta, 0, len(s.oooMmappedChunks)) |
||||
|
||||
// We define these markers to track the last chunk reference while we
|
||||
// fill the chunk meta.
|
||||
// These markers are useful to give consistent responses to repeated queries
|
||||
// even if new chunks that might be overlapping or not are added afterwards.
|
||||
// Also, lastMinT and lastMaxT are initialized to the max int as a sentinel
|
||||
// value to know they are unset.
|
||||
var lastChunkRef chunks.ChunkRef |
||||
lastMinT, lastMaxT := int64(math.MaxInt64), int64(math.MaxInt64) |
||||
|
||||
addChunk := func(minT, maxT int64, ref chunks.ChunkRef) { |
||||
// the first time we get called is for the last included chunk.
|
||||
// set the markers accordingly
|
||||
if lastMinT == int64(math.MaxInt64) { |
||||
lastChunkRef = ref |
||||
lastMinT = minT |
||||
lastMaxT = maxT |
||||
} |
||||
|
||||
tmpChks = append(tmpChks, chunks.Meta{ |
||||
MinTime: minT, |
||||
MaxTime: maxT, |
||||
Ref: ref, |
||||
OOOLastRef: lastChunkRef, |
||||
OOOLastMinTime: lastMinT, |
||||
OOOLastMaxTime: lastMaxT, |
||||
}) |
||||
} |
||||
|
||||
// Collect all chunks that overlap the query range, in order from most recent to most old,
|
||||
// so we can set the correct markers.
|
||||
if s.oooHeadChunk != nil { |
||||
c := s.oooHeadChunk |
||||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 { |
||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks)))) |
||||
addChunk(c.minTime, c.maxTime, ref) |
||||
} |
||||
} |
||||
for i := len(s.oooMmappedChunks) - 1; i >= 0; i-- { |
||||
c := s.oooMmappedChunks[i] |
||||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) { |
||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i))) |
||||
addChunk(c.minTime, c.maxTime, ref) |
||||
} |
||||
} |
||||
|
||||
// There is nothing to do if we did not collect any chunk
|
||||
if len(tmpChks) == 0 { |
||||
return nil |
||||
} |
||||
|
||||
// Next we want to sort all the collected chunks by min time so we can find
|
||||
// those that overlap.
|
||||
sort.Sort(metaByMinTimeAndMinRef(tmpChks)) |
||||
|
||||
// Next we want to iterate the sorted collected chunks and only return the
|
||||
// chunks Meta the first chunk that overlaps with others.
|
||||
// Example chunks of a series: 5:(100, 200) 6:(500, 600) 7:(150, 250) 8:(550, 650)
|
||||
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we only want to
|
||||
// to return chunk Metas for chunk 5 and chunk 6
|
||||
*chks = append(*chks, tmpChks[0]) |
||||
maxTime := tmpChks[0].MaxTime // tracks the maxTime of the previous "to be merged chunk"
|
||||
for _, c := range tmpChks[1:] { |
||||
if c.MinTime > maxTime { |
||||
*chks = append(*chks, c) |
||||
maxTime = c.MaxTime |
||||
} else if c.MaxTime > maxTime { |
||||
maxTime = c.MaxTime |
||||
(*chks)[len(*chks)-1].MaxTime = c.MaxTime |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
type chunkMetaAndChunkDiskMapperRef struct { |
||||
meta chunks.Meta |
||||
ref chunks.ChunkDiskMapperRef |
||||
origMinT int64 |
||||
origMaxT int64 |
||||
} |
||||
|
||||
type byMinTimeAndMinRef []chunkMetaAndChunkDiskMapperRef |
||||
|
||||
func (b byMinTimeAndMinRef) Len() int { return len(b) } |
||||
func (b byMinTimeAndMinRef) Less(i, j int) bool { |
||||
if b[i].meta.MinTime == b[j].meta.MinTime { |
||||
return b[i].meta.Ref < b[j].meta.Ref |
||||
} |
||||
return b[i].meta.MinTime < b[j].meta.MinTime |
||||
} |
||||
|
||||
func (b byMinTimeAndMinRef) Swap(i, j int) { b[i], b[j] = b[j], b[i] } |
||||
|
||||
type metaByMinTimeAndMinRef []chunks.Meta |
||||
|
||||
func (b metaByMinTimeAndMinRef) Len() int { return len(b) } |
||||
func (b metaByMinTimeAndMinRef) Less(i, j int) bool { |
||||
if b[i].MinTime == b[j].MinTime { |
||||
return b[i].Ref < b[j].Ref |
||||
} |
||||
return b[i].MinTime < b[j].MinTime |
||||
} |
||||
|
||||
func (b metaByMinTimeAndMinRef) Swap(i, j int) { b[i], b[j] = b[j], b[i] } |
||||
|
||||
func (oh *OOOHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { |
||||
switch len(values) { |
||||
case 0: |
||||
return index.EmptyPostings(), nil |
||||
case 1: |
||||
return oh.head.postings.Get(name, values[0]), nil // TODO(ganesh) Also call GetOOOPostings
|
||||
default: |
||||
// TODO(ganesh) We want to only return postings for out of order series.
|
||||
res := make([]index.Postings, 0, len(values)) |
||||
for _, value := range values { |
||||
res = append(res, oh.head.postings.Get(name, value)) // TODO(ganesh) Also call GetOOOPostings
|
||||
} |
||||
return index.Merge(res...), nil |
||||
} |
||||
} |
||||
|
||||
type OOOHeadChunkReader struct { |
||||
head *Head |
||||
mint, maxt int64 |
||||
} |
||||
|
||||
func NewOOOHeadChunkReader(head *Head, mint, maxt int64) *OOOHeadChunkReader { |
||||
return &OOOHeadChunkReader{ |
||||
head: head, |
||||
mint: mint, |
||||
maxt: maxt, |
||||
} |
||||
} |
||||
|
||||
func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) { |
||||
sid, _ := chunks.HeadChunkRef(meta.Ref).Unpack() |
||||
|
||||
s := cr.head.series.getByID(sid) |
||||
// This means that the series has been garbage collected.
|
||||
if s == nil { |
||||
return nil, storage.ErrNotFound |
||||
} |
||||
|
||||
s.Lock() |
||||
c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt) |
||||
s.Unlock() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// This means that the query range did not overlap with the requested chunk.
|
||||
if len(c.chunks) == 0 { |
||||
return nil, storage.ErrNotFound |
||||
} |
||||
|
||||
return c, nil |
||||
} |
||||
|
||||
func (cr OOOHeadChunkReader) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
type OOOCompactionHead struct { |
||||
oooIR *OOOHeadIndexReader |
||||
lastMmapRef chunks.ChunkDiskMapperRef |
||||
lastWBLFile int |
||||
postings []storage.SeriesRef |
||||
chunkRange int64 |
||||
mint, maxt int64 // Among all the compactable chunks.
|
||||
} |
||||
|
||||
// NewOOOCompactionHead does the following:
|
||||
// 1. M-maps all the in-memory ooo chunks.
|
||||
// 2. Compute the expected block ranges while iterating through all ooo series and store it.
|
||||
// 3. Store the list of postings having ooo series.
|
||||
// 4. Cuts a new WBL file for the OOO WBL.
|
||||
// All the above together have a bit of CPU and memory overhead, and can have a bit of impact
|
||||
// on the sample append latency. So call NewOOOCompactionHead only right before compaction.
|
||||
func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) { |
||||
newWBLFile, err := head.wbl.NextSegment() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ch := &OOOCompactionHead{ |
||||
chunkRange: head.chunkRange.Load(), |
||||
mint: math.MaxInt64, |
||||
maxt: math.MinInt64, |
||||
lastWBLFile: newWBLFile, |
||||
} |
||||
|
||||
ch.oooIR = NewOOOHeadIndexReader(head, math.MinInt64, math.MaxInt64) |
||||
n, v := index.AllPostingsKey() |
||||
|
||||
// TODO: verify this gets only ooo samples.
|
||||
p, err := ch.oooIR.Postings(n, v) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
p = ch.oooIR.SortedPostings(p) |
||||
|
||||
var lastSeq, lastOff int |
||||
for p.Next() { |
||||
seriesRef := p.At() |
||||
ms := head.series.getByID(chunks.HeadSeriesRef(seriesRef)) |
||||
if ms == nil { |
||||
continue |
||||
} |
||||
|
||||
// M-map the in-memory chunk and keep track of the last one.
|
||||
// Also build the block ranges -> series map.
|
||||
// TODO: consider having a lock specifically for ooo data.
|
||||
ms.Lock() |
||||
|
||||
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) |
||||
if mmapRef == 0 && len(ms.oooMmappedChunks) > 0 { |
||||
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
||||
mmapRef = ms.oooMmappedChunks[len(ms.oooMmappedChunks)-1].ref |
||||
} |
||||
seq, off := mmapRef.Unpack() |
||||
if seq > lastSeq || (seq == lastSeq && off > lastOff) { |
||||
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off |
||||
} |
||||
if len(ms.oooMmappedChunks) > 0 { |
||||
ch.postings = append(ch.postings, seriesRef) |
||||
for _, c := range ms.oooMmappedChunks { |
||||
if c.minTime < ch.mint { |
||||
ch.mint = c.minTime |
||||
} |
||||
if c.maxTime > ch.maxt { |
||||
ch.maxt = c.maxTime |
||||
} |
||||
} |
||||
} |
||||
ms.Unlock() |
||||
} |
||||
|
||||
return ch, nil |
||||
} |
||||
|
||||
func (ch *OOOCompactionHead) Index() (IndexReader, error) { |
||||
return NewOOOCompactionHeadIndexReader(ch), nil |
||||
} |
||||
|
||||
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error) { |
||||
return NewOOOHeadChunkReader(ch.oooIR.head, ch.oooIR.mint, ch.oooIR.maxt), nil |
||||
} |
||||
|
||||
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error) { |
||||
return tombstones.NewMemTombstones(), nil |
||||
} |
||||
|
||||
func (ch *OOOCompactionHead) Meta() BlockMeta { |
||||
var id [16]byte |
||||
copy(id[:], "copy(id[:], \"ooo_compact_head\")") |
||||
return BlockMeta{ |
||||
MinTime: ch.mint, |
||||
MaxTime: ch.maxt, |
||||
ULID: id, |
||||
Stats: BlockStats{ |
||||
NumSeries: uint64(len(ch.postings)), |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// CloneForTimeRange clones the OOOCompactionHead such that the IndexReader and ChunkReader
|
||||
// obtained from this only looks at the m-map chunks within the given time ranges while not looking
|
||||
// beyond the ch.lastMmapRef.
|
||||
// Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
|
||||
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead { |
||||
return &OOOCompactionHead{ |
||||
oooIR: NewOOOHeadIndexReader(ch.oooIR.head, mint, maxt), |
||||
lastMmapRef: ch.lastMmapRef, |
||||
postings: ch.postings, |
||||
chunkRange: ch.chunkRange, |
||||
mint: ch.mint, |
||||
maxt: ch.maxt, |
||||
} |
||||
} |
||||
|
||||
func (ch *OOOCompactionHead) Size() int64 { return 0 } |
||||
func (ch *OOOCompactionHead) MinTime() int64 { return ch.mint } |
||||
func (ch *OOOCompactionHead) MaxTime() int64 { return ch.maxt } |
||||
func (ch *OOOCompactionHead) ChunkRange() int64 { return ch.chunkRange } |
||||
func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef { return ch.lastMmapRef } |
||||
func (ch *OOOCompactionHead) LastWBLFile() int { return ch.lastWBLFile } |
||||
|
||||
type OOOCompactionHeadIndexReader struct { |
||||
ch *OOOCompactionHead |
||||
} |
||||
|
||||
func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader { |
||||
return &OOOCompactionHeadIndexReader{ch: ch} |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter { |
||||
return ir.ch.oooIR.Symbols() |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) Postings(name string, values ...string) (index.Postings, error) { |
||||
n, v := index.AllPostingsKey() |
||||
if name != n || len(values) != 1 || values[0] != v { |
||||
return nil, errors.New("only AllPostingsKey is supported") |
||||
} |
||||
return index.NewListPostings(ir.ch.postings), nil |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { |
||||
// This will already be sorted from the Postings() call above.
|
||||
return p |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings { |
||||
return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount) |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]chunks.Meta) error { |
||||
return ir.ch.oooIR.series(ref, lset, chks, ir.ch.lastMmapRef) |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) { |
||||
return "", errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) Close() error { |
||||
return ir.ch.oooIR.Close() |
||||
} |
Loading…
Reference in new issue