Browse Source

tsdb: Only initialise out-of-order fields when required

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/11779/head
Ganesh Vernekar 2 years ago
parent
commit
38fa151a7c
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
  1. 24
      tsdb/db_test.go
  2. 46
      tsdb/head.go
  3. 25
      tsdb/head_append.go
  4. 26
      tsdb/head_read.go
  5. 20
      tsdb/head_test.go
  6. 14
      tsdb/head_wal.go
  7. 34
      tsdb/ooo_head_read.go
  8. 5
      tsdb/ooo_head_read_test.go

24
tsdb/db_test.go

@ -4095,8 +4095,7 @@ func TestOOOCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.oooHeadChunk)
require.Equal(t, 0, len(ms.oooMmappedChunks))
require.Nil(t, ms.ooo)
}
checkEmptyOOOChunk(series1)
checkEmptyOOOChunk(series2)
@ -4138,8 +4137,8 @@ func TestOOOCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0)
require.Equal(t, 14, len(ms.oooMmappedChunks)) // 7 original, 7 duplicate.
require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
require.Equal(t, 14, len(ms.ooo.oooMmappedChunks)) // 7 original, 7 duplicate.
}
checkNonEmptyOOOChunk(series1)
checkNonEmptyOOOChunk(series2)
@ -4278,7 +4277,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Greater(t, ms.oooHeadChunk.chunk.NumSamples(), 0)
require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0)
}
// If the normal Head is not compacted, the OOO head compaction does not take place.
@ -4306,8 +4305,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.oooHeadChunk)
require.Equal(t, 0, len(ms.oooMmappedChunks))
require.Nil(t, ms.ooo)
}
verifySamples := func(block *Block, fromMins, toMins int64) {
@ -4700,8 +4698,7 @@ func TestOOODisabled(t *testing.T) {
require.NoError(t, err)
require.False(t, created)
require.NotNil(t, ms)
require.Nil(t, ms.oooHeadChunk)
require.Len(t, ms.oooMmappedChunks, 0)
require.Nil(t, ms.ooo)
}
func TestWBLAndMmapReplay(t *testing.T) {
@ -4765,7 +4762,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
require.False(t, created)
require.NoError(t, err)
var s1MmapSamples []tsdbutil.Sample
for _, mc := range ms.oooMmappedChunks {
for _, mc := range ms.ooo.oooMmappedChunks {
chk, err := db.head.chunkDiskMapper.Chunk(mc.ref)
require.NoError(t, err)
it := chk.Iterator(nil)
@ -4972,8 +4969,7 @@ func TestOOOCompactionFailure(t *testing.T) {
ms, created, err := db.head.getOrCreate(series1.Hash(), series1)
require.NoError(t, err)
require.False(t, created)
require.Nil(t, ms.oooHeadChunk)
require.Len(t, ms.oooMmappedChunks, 0)
require.Nil(t, ms.ooo)
// The failed compaction should not have left the ooo Head corrupted.
// Hence, expect no new blocks with another OOO compaction call.
@ -5778,7 +5774,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
db.DisableCompactions()
ms := db.head.series.getByHash(series1.Hash(), series1)
require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed")
require.Greater(t, len(ms.ooo.oooMmappedChunks), 0, "OOO mmap chunk was not replayed")
checkMmapFileContents := func(contains, notContains []string) {
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
@ -5806,7 +5802,7 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
checkMmapFileContents([]string{"000001", "000002"}, nil)
require.NoError(t, db.Compact())
checkMmapFileContents([]string{"000002"}, []string{"000001"})
require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted")
require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")
addSamples(501, 650)
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})

46
tsdb/head.go

@ -762,7 +762,11 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
h.metrics.chunks.Inc()
h.metrics.chunksCreated.Inc()
ms.oooMmappedChunks = append(ms.oooMmappedChunks, &mmappedChunk{
if ms.ooo == nil {
ms.ooo = &memSeriesOOOFields{}
}
ms.ooo.oooMmappedChunks = append(ms.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
@ -1665,24 +1669,24 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
minMmapFile = seq
}
}
if len(series.oooMmappedChunks) > 0 {
seq, _ := series.oooMmappedChunks[0].ref.Unpack()
if series.ooo != nil && len(series.ooo.oooMmappedChunks) > 0 {
seq, _ := series.ooo.oooMmappedChunks[0].ref.Unpack()
if seq < minMmapFile {
minMmapFile = seq
}
for _, ch := range series.oooMmappedChunks {
for _, ch := range series.ooo.oooMmappedChunks {
if ch.minTime < minOOOTime {
minOOOTime = ch.minTime
}
}
}
if series.oooHeadChunk != nil {
if series.oooHeadChunk.minTime < minOOOTime {
minOOOTime = series.oooHeadChunk.minTime
if series.ooo != nil && series.ooo.oooHeadChunk != nil {
if series.ooo.oooHeadChunk.minTime < minOOOTime {
minOOOTime = series.ooo.oooHeadChunk.minTime
}
}
if len(series.mmappedChunks) > 0 || len(series.oooMmappedChunks) > 0 ||
series.headChunk != nil || series.oooHeadChunk != nil || series.pendingCommit {
if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit ||
(series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
seriesMint := series.minTime()
if seriesMint < actualMint {
actualMint = seriesMint
@ -1839,9 +1843,7 @@ type memSeries struct {
headChunk *memChunk // Most recent chunk in memory that's still being built.
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples.
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]
ooo *memSeriesOOOFields
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
@ -1869,6 +1871,14 @@ type memSeries struct {
pendingCommit bool // Whether there are samples waiting to be committed to this series.
}
// memSeriesOOOFields contains the fields required by memSeries
// to handle out-of-order data.
type memSeriesOOOFields struct {
oooMmappedChunks []*mmappedChunk // Immutable chunks on disk containing OOO samples.
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
s := &memSeries{
lset: lset,
@ -1927,15 +1937,19 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
}
var removedOOO int
if len(s.oooMmappedChunks) > 0 {
for i, c := range s.oooMmappedChunks {
if s.ooo != nil && len(s.ooo.oooMmappedChunks) > 0 {
for i, c := range s.ooo.oooMmappedChunks {
if c.ref.GreaterThan(minOOOMmapRef) {
break
}
removedOOO = i + 1
}
s.oooMmappedChunks = append(s.oooMmappedChunks[:0], s.oooMmappedChunks[removedOOO:]...)
s.firstOOOChunkID += chunks.HeadChunkID(removedOOO)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks[:0], s.ooo.oooMmappedChunks[removedOOO:]...)
s.ooo.firstOOOChunkID += chunks.HeadChunkID(removedOOO)
if len(s.ooo.oooMmappedChunks) == 0 && s.ooo.oooHeadChunk == nil {
s.ooo = nil
}
}
return removedInOrder + removedOOO

25
tsdb/head_append.go

@ -1086,7 +1086,10 @@ func (a *headAppender) Commit() (err error) {
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
c := s.oooHeadChunk
if s.ooo == nil {
s.ooo = &memSeriesOOOFields{}
}
c := s.ooo.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
@ -1373,33 +1376,35 @@ func (s *memSeries) cutNewHeadChunk(
return s.headChunk
}
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) {
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
s.oooHeadChunk = &oooHeadChunk{
s.ooo.oooHeadChunk = &oooHeadChunk{
chunk: NewOOOChunk(),
minTime: mint,
maxTime: math.MinInt64,
}
return s.oooHeadChunk, ref
return s.ooo.oooHeadChunk, ref
}
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef {
if s.oooHeadChunk == nil {
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
// There is no head chunk, so nothing to m-map here.
return 0
}
xor, _ := s.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
oooXor := &chunkenc.OOOXORChunk{XORChunk: xor}
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.oooHeadChunk.minTime, s.oooHeadChunk.maxTime, oooXor, handleChunkWriteError)
s.oooMmappedChunks = append(s.oooMmappedChunks, &mmappedChunk{
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, oooXor, handleChunkWriteError)
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
ref: chunkRef,
numSamples: uint16(xor.NumSamples()),
minTime: s.oooHeadChunk.minTime,
maxTime: s.oooHeadChunk.maxTime,
minTime: s.ooo.oooHeadChunk.minTime,
maxTime: s.ooo.oooHeadChunk.maxTime,
})
s.oooHeadChunk = nil
s.ooo.oooHeadChunk = nil
return chunkRef
}

26
tsdb/head_read.go

@ -194,8 +194,9 @@ func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
// oooHeadChunkID returns the HeadChunkID referred to by the given position.
// * 0 <= pos < len(s.oooMmappedChunks) refer to s.oooMmappedChunks[pos]
// * pos == len(s.oooMmappedChunks) refers to s.oooHeadChunk
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstOOOChunkID
return chunks.HeadChunkID(pos) + s.ooo.firstOOOChunkID
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
@ -347,6 +348,7 @@ func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDi
// might be a merge of all the overlapping chunks, if any, amongst all the
// chunks in the OOOHead.
// This function is not thread safe unless the caller holds a lock.
// The caller must ensure that s.ooo is not nil.
func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper, mint, maxt int64) (chunk *mergedOOOChunks, err error) {
_, cid := chunks.HeadChunkRef(meta.Ref).Unpack()
@ -354,23 +356,23 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
// incremented by 1 when new chunk is created, hence (meta - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := int(cid) - int(s.firstOOOChunkID)
if ix < 0 || ix > len(s.oooMmappedChunks) {
ix := int(cid) - int(s.ooo.firstOOOChunkID)
if ix < 0 || ix > len(s.ooo.oooMmappedChunks) {
return nil, storage.ErrNotFound
}
if ix == len(s.oooMmappedChunks) {
if s.oooHeadChunk == nil {
if ix == len(s.ooo.oooMmappedChunks) {
if s.ooo.oooHeadChunk == nil {
return nil, errors.New("invalid ooo head chunk")
}
}
// We create a temporary slice of chunk metas to hold the information of all
// possible chunks that may overlap with the requested chunk.
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.oooMmappedChunks))
tmpChks := make([]chunkMetaAndChunkDiskMapperRef, 0, len(s.ooo.oooMmappedChunks))
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks))))
if s.oooHeadChunk != nil && s.oooHeadChunk.OverlapsClosedInterval(mint, maxt) {
oooHeadRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
if s.ooo.oooHeadChunk != nil && s.ooo.oooHeadChunk.OverlapsClosedInterval(mint, maxt) {
// We only want to append the head chunk if this chunk existed when
// Series() was called. This brings consistency in case new data
// is added in between Series() and Chunk() calls.
@ -386,7 +388,7 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
}
}
for i, c := range s.oooMmappedChunks {
for i, c := range s.ooo.oooMmappedChunks {
chunkRef := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
// We can skip chunks that came in later than the last known OOOLastRef.
if chunkRef > meta.OOOLastRef {
@ -431,11 +433,11 @@ func (s *memSeries) oooMergedChunk(meta chunks.Meta, cdm *chunks.ChunkDiskMapper
// If head chunk min and max time match the meta OOO markers
// that means that the chunk has not expanded so we can append
// it as it is.
if s.oooHeadChunk.minTime == meta.OOOLastMinTime && s.oooHeadChunk.maxTime == meta.OOOLastMaxTime {
xor, err = s.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called.
if s.ooo.oooHeadChunk.minTime == meta.OOOLastMinTime && s.ooo.oooHeadChunk.maxTime == meta.OOOLastMaxTime {
xor, err = s.ooo.oooHeadChunk.chunk.ToXOR() // TODO(jesus.vazquez) (This is an optimization idea that has no priority and might not be that useful) See if we could use a copy of the underlying slice. That would leave the more expensive ToXOR() function only for the usecase where Bytes() is called.
} else {
// We need to remove samples that are outside of the markers
xor, err = s.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
xor, err = s.ooo.oooHeadChunk.chunk.ToXORBetweenTimestamps(meta.OOOLastMinTime, meta.OOOLastMaxTime)
}
if err != nil {
return nil, errors.Wrap(err, "failed to convert ooo head chunk to xor chunk")

20
tsdb/head_test.go

@ -3884,7 +3884,7 @@ func TestOOOWalReplay(t *testing.T) {
require.False(t, ok)
require.NotNil(t, ms)
xor, err := ms.oooHeadChunk.chunk.ToXOR()
xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR()
require.NoError(t, err)
it := xor.Iterator(nil)
@ -3944,16 +3944,16 @@ func TestOOOMmapReplay(t *testing.T) {
require.False(t, ok)
require.NotNil(t, ms)
require.Len(t, ms.oooMmappedChunks, 3)
require.Len(t, ms.ooo.oooMmappedChunks, 3)
// Verify that we can access the chunks without error.
for _, m := range ms.oooMmappedChunks {
for _, m := range ms.ooo.oooMmappedChunks {
chk, err := h.chunkDiskMapper.Chunk(m.ref)
require.NoError(t, err)
require.Equal(t, int(m.numSamples), chk.NumSamples())
}
expMmapChunks := make([]*mmappedChunk, 3)
copy(expMmapChunks, ms.oooMmappedChunks)
copy(expMmapChunks, ms.ooo.oooMmappedChunks)
// Restart head.
require.NoError(t, h.Close())
@ -3972,16 +3972,16 @@ func TestOOOMmapReplay(t *testing.T) {
require.False(t, ok)
require.NotNil(t, ms)
require.Len(t, ms.oooMmappedChunks, len(expMmapChunks))
require.Len(t, ms.ooo.oooMmappedChunks, len(expMmapChunks))
// Verify that we can access the chunks without error.
for _, m := range ms.oooMmappedChunks {
for _, m := range ms.ooo.oooMmappedChunks {
chk, err := h.chunkDiskMapper.Chunk(m.ref)
require.NoError(t, err)
require.Equal(t, int(m.numSamples), chk.NumSamples())
}
actMmapChunks := make([]*mmappedChunk, len(expMmapChunks))
copy(actMmapChunks, ms.oooMmappedChunks)
copy(actMmapChunks, ms.ooo.oooMmappedChunks)
require.Equal(t, expMmapChunks, actMmapChunks)
@ -4376,8 +4376,8 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
require.NotNil(t, ms)
require.Nil(t, ms.headChunk)
require.NotNil(t, ms.oooHeadChunk)
require.Equal(t, expSamples, ms.oooHeadChunk.chunk.NumSamples())
require.NotNil(t, ms.ooo.oooHeadChunk)
require.Equal(t, expSamples, ms.ooo.oooHeadChunk.chunk.NumSamples())
}
verifyInOrderSamples := func(lbls labels.Labels, expSamples int) {
@ -4386,7 +4386,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
require.False(t, created)
require.NotNil(t, ms)
require.Nil(t, ms.oooHeadChunk)
require.Nil(t, ms.ooo)
require.NotNil(t, ms.headChunk)
require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples())
}

14
tsdb/head_wal.go

@ -499,7 +499,15 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
mSeries.oooMmappedChunks = oooMmc
mSeries.ooo = nil
if len(oooMmc) == 0 {
mSeries.ooo = nil
} else {
if mSeries.ooo == nil {
mSeries.ooo = &memSeriesOOOFields{}
}
*mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc}
}
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 {
mSeries.mmMaxTime = math.MinInt64
@ -818,7 +826,9 @@ func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.
// chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart.
ms.oooHeadChunk = nil
if ms.ooo != nil {
ms.ooo.oooHeadChunk = nil
}
processors[idx].mx.Unlock()
}

34
tsdb/ooo_head_read.go

@ -71,7 +71,11 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
defer s.Unlock()
*chks = (*chks)[:0]
tmpChks := make([]chunks.Meta, 0, len(s.oooMmappedChunks))
if s.ooo == nil {
return nil
}
tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks))
// We define these markers to track the last chunk reference while we
// fill the chunk meta.
@ -103,15 +107,15 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
// Collect all chunks that overlap the query range, in order from most recent to most old,
// so we can set the correct markers.
if s.oooHeadChunk != nil {
c := s.oooHeadChunk
if s.ooo.oooHeadChunk != nil {
c := s.ooo.oooHeadChunk
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && lastMmapRef == 0 {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.oooMmappedChunks))))
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
addChunk(c.minTime, c.maxTime, ref)
}
}
for i := len(s.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.oooMmappedChunks[i]
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
c := s.ooo.oooMmappedChunks[i]
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && (lastMmapRef == 0 || lastMmapRef.GreaterThanOrEqualTo(c.ref)) {
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(i)))
addChunk(c.minTime, c.maxTime, ref)
@ -232,6 +236,11 @@ func (cr OOOHeadChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
}
s.Lock()
if s.ooo == nil {
// There is no OOO data for this series.
s.Unlock()
return nil, storage.ErrNotFound
}
c, err := s.oooMergedChunk(meta, cr.head.chunkDiskMapper, cr.mint, cr.maxt)
s.Unlock()
if err != nil {
@ -302,18 +311,23 @@ func NewOOOCompactionHead(head *Head) (*OOOCompactionHead, error) {
// TODO: consider having a lock specifically for ooo data.
ms.Lock()
if ms.ooo == nil {
ms.Unlock()
continue
}
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
if mmapRef == 0 && len(ms.oooMmappedChunks) > 0 {
if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
mmapRef = ms.oooMmappedChunks[len(ms.oooMmappedChunks)-1].ref
mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref
}
seq, off := mmapRef.Unpack()
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off
}
if len(ms.oooMmappedChunks) > 0 {
if len(ms.ooo.oooMmappedChunks) > 0 {
ch.postings = append(ch.postings, seriesRef)
for _, c := range ms.oooMmappedChunks {
for _, c := range ms.ooo.oooMmappedChunks {
if c.minTime < ch.mint {
ch.mint = c.minTime
}

5
tsdb/ooo_head_read_test.go

@ -301,6 +301,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
require.NoError(t, h.Init(0))
s1, _, _ := h.getOrCreate(s1ID, s1Lset)
s1.ooo = &memSeriesOOOFields{}
var lastChunk chunkInterval
var lastChunkPos int
@ -340,7 +341,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
if headChunk && len(intervals) > 0 {
// Put the last interval in the head chunk
s1.oooHeadChunk = &oooHeadChunk{
s1.ooo.oooHeadChunk = &oooHeadChunk{
minTime: intervals[len(intervals)-1].mint,
maxTime: intervals[len(intervals)-1].maxt,
}
@ -348,7 +349,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
}
for _, ic := range intervals {
s1.oooMmappedChunks = append(s1.oooMmappedChunks, &mmappedChunk{
s1.ooo.oooMmappedChunks = append(s1.ooo.oooMmappedChunks, &mmappedChunk{
minTime: ic.mint,
maxTime: ic.maxt,
})

Loading…
Cancel
Save