Browse Source

Remove unnecessary chunk fetch in Head queries

`safeChunk` is only obtained from the `headChunkReader.Chunk` call where
the chunk is already fetched and stored with the `safeChunk`. So, when
getting the iterator for the `safeChunk`, we don't need to get the chunk again.

Also removed a couple of unnecessary fields from `safeChunk` as a part of this.

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/12003/head
Ganesh Vernekar 2 years ago
parent
commit
d504c950a2
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
  1. 45
      tsdb/head_read.go
  2. 21
      tsdb/head_test.go
  3. 0
      tsdb/test.txt

45
tsdb/head_read.go

@ -304,12 +304,10 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
s.Unlock()
return &safeChunk{
Chunk: c.chunk,
s: s,
cid: cid,
isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
memChunkPool: &h.head.memChunkPool,
Chunk: c.chunk,
s: s,
cid: cid,
isoState: h.isoState,
}, nil
}
@ -600,43 +598,24 @@ func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
// safeChunk makes sure that the chunk can be accessed without a race condition
type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
memChunkPool *sync.Pool
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter)
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter)
c.s.Unlock()
return it
}
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if err != nil {
return chunkenc.NewNopIterator()
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
// This should be done always at the end.
c.chunk = nil
memChunkPool.Put(c)
}
}()
func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
ix := int(id) - int(s.firstChunkID)
numSamples := c.chunk.NumSamples()
numSamples := c.NumSamples()
stopAfter := numSamples
if isoState != nil && !isoState.IsolationDisabled() {
@ -681,9 +660,9 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
return chunkenc.NewNopIterator()
}
if stopAfter == numSamples {
return c.chunk.Iterator(it)
return c.Iterator(it)
}
return makeStopIterator(c.chunk, it, stopAfter)
return makeStopIterator(c, it, stopAfter)
}
// stopIterator wraps an Iterator, but only returns the first

21
tsdb/head_test.go

@ -537,11 +537,18 @@ func TestHead_ReadWAL(t *testing.T) {
require.NoError(t, c.Err())
return x
}
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
@ -2566,7 +2573,13 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
require.True(t, ok, "sample append failed")
}
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil)
c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{
New: func() interface{} {
return &memChunk{}
},
})
require.NoError(t, err)
it := c.chunk.Iterator(nil)
// First point.
require.Equal(t, chunkenc.ValFloat, it.Seek(0))

0
tsdb/test.txt

Loading…
Cancel
Save