mirror of https://github.com/prometheus/prometheus
Use a linked list for memSeries.headChunk (#11818)
Currently memSeries holds a single head chunk in-memory and a slice of mmapped chunks. When append() is called on memSeries it might decide that a new headChunk is needed to use for given append() call. If that happens it will first mmap existing head chunk and only after that happens it will create a new empty headChunk and continue appending our sample to it. Since appending samples uses write lock on memSeries no other read or write can happen until any append is completed. When we have an append() that must create a new head chunk the whole memSeries is blocked until mmapping of existing head chunk finishes. Mmapping itself uses a lock as it needs to be serialised, which means that the more chunks to mmap we have the longer each chunk might wait for it to be mmapped. If there's enough chunks that require mmapping some memSeries will be locked for long enough that it will start affecting queries and scrapes. Queries might timeout, since by default they have a 2 minute timeout set. Scrapes will be blocked inside append() call, which means there will be a gap between samples. This will first affect range queries or calls using rate() and such, since the time range requested in the query might have too few samples to calculate anything. To avoid this we need to remove mmapping from append path, since mmapping is blocking. But this means that when we cut a new head chunk we need to keep the old one around, so we can mmap it later. This change makes memSeries.headChunk a linked list, memSeries.headChunk still points to the 'open' head chunk that receives new samples, while older, yet to be mmapped, chunks are linked to it. Mmapping is done on a schedule by iterating all memSeries one by one. Thanks to this we control when mmapping is done, since we trigger it manually, which reduces the risk that it will have to compete for mmap locks with other chunks. Signed-off-by: Łukasz Mierzwa <l.mierzwa@gmail.com>pull/12620/head
parent
76dd9b5470
commit
3c80963e81
|
@ -85,13 +85,21 @@ func (p HeadChunkRef) Unpack() (HeadSeriesRef, HeadChunkID) {
|
|||
// - less than the above, but >= memSeries.firstID, then it's
|
||||
// memSeries.mmappedChunks[i] where i = HeadChunkID - memSeries.firstID.
|
||||
//
|
||||
// If memSeries.headChunks is non-nil it points to a *memChunk that holds the current
|
||||
// "open" (accepting appends) instance. *memChunk is a linked list and memChunk.next pointer
|
||||
// might link to the older *memChunk instance.
|
||||
// If there are multiple *memChunk instances linked to each other from memSeries.headChunks
|
||||
// they will be m-mapped as soon as possible leaving only "open" *memChunk instance.
|
||||
//
|
||||
// Example:
|
||||
// assume a memSeries.firstChunkID=7 and memSeries.mmappedChunks=[p5,p6,p7,p8,p9].
|
||||
// | HeadChunkID value | refers to ... |
|
||||
// |-------------------|----------------------------------------------------------------------------------------|
|
||||
// | 0-6 | chunks that have been compacted to blocks, these won't return data for queries in Head |
|
||||
// | 7-11 | memSeries.mmappedChunks[i] where i is 0 to 4. |
|
||||
// | 12 | memSeries.headChunk |
|
||||
// | 12 | *memChunk{next: nil}
|
||||
// | 13 | *memChunk{next: ^}
|
||||
// | 14 | memSeries.headChunks -> *memChunk{next: ^}
|
||||
type HeadChunkID uint64
|
||||
|
||||
// BlockChunkRef refers to a chunk within a persisted block.
|
||||
|
|
|
@ -973,6 +973,8 @@ func (db *DB) run() {
|
|||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// We attempt mmapping of head chunks regularly.
|
||||
db.head.mmapHeadChunks()
|
||||
case <-db.compactc:
|
||||
db.metrics.compactionsTriggered.Inc()
|
||||
|
||||
|
|
|
@ -1535,6 +1535,7 @@ func TestSizeRetention(t *testing.T) {
|
|||
}
|
||||
}
|
||||
require.NoError(t, headApp.Commit())
|
||||
db.Head().mmapHeadChunks()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return db.Head().chunkDiskMapper.IsQueueEmpty()
|
||||
|
@ -6049,12 +6050,14 @@ func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
|
|||
|
||||
// Check that m-map files gets deleted properly after compactions.
|
||||
|
||||
db.head.mmapHeadChunks()
|
||||
checkMmapFileContents([]string{"000001", "000002"}, nil)
|
||||
require.NoError(t, db.Compact())
|
||||
checkMmapFileContents([]string{"000002"}, []string{"000001"})
|
||||
require.Nil(t, ms.ooo, "OOO mmap chunk was not compacted")
|
||||
|
||||
addSamples(501, 650)
|
||||
db.head.mmapHeadChunks()
|
||||
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})
|
||||
require.NoError(t, db.Compact())
|
||||
checkMmapFileContents(nil, []string{"000001", "000002", "000003"})
|
||||
|
|
141
tsdb/head.go
141
tsdb/head.go
|
@ -344,6 +344,7 @@ type headMetrics struct {
|
|||
mmapChunkCorruptionTotal prometheus.Counter
|
||||
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
|
||||
oooHistogram prometheus.Histogram
|
||||
mmapChunksTotal prometheus.Counter
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -468,6 +469,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
60 * 60 * 12, // 12h
|
||||
},
|
||||
}),
|
||||
mmapChunksTotal: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "prometheus_tsdb_mmap_chunks_total",
|
||||
Help: "Total number of chunks that were memory-mapped.",
|
||||
}),
|
||||
}
|
||||
|
||||
if r != nil {
|
||||
|
@ -495,6 +500,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
m.checkpointDeleteTotal,
|
||||
m.checkpointCreationFail,
|
||||
m.checkpointCreationTotal,
|
||||
m.mmapChunksTotal,
|
||||
m.mmapChunkCorruptionTotal,
|
||||
m.snapshotReplayErrorTotal,
|
||||
// Metrics bound to functions and not needed in tests
|
||||
|
@ -880,11 +886,11 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
|
|||
numSamples: numSamples,
|
||||
})
|
||||
h.updateMinMaxTime(mint, maxt)
|
||||
if ms.headChunk != nil && maxt >= ms.headChunk.minTime {
|
||||
if ms.headChunks != nil && maxt >= ms.headChunks.minTime {
|
||||
// The head chunk was completed and was m-mapped after taking the snapshot.
|
||||
// Hence remove this chunk.
|
||||
ms.nextAt = 0
|
||||
ms.headChunk = nil
|
||||
ms.headChunks = nil
|
||||
ms.app = nil
|
||||
}
|
||||
return nil
|
||||
|
@ -1574,6 +1580,10 @@ func (h *Head) Close() error {
|
|||
defer h.closedMtx.Unlock()
|
||||
h.closed = true
|
||||
|
||||
// mmap all but last chunk in case we're performing snapshot since that only
|
||||
// takes samples from most recent head chunk.
|
||||
h.mmapHeadChunks()
|
||||
|
||||
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
|
||||
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
|
||||
errs.Add(h.performChunkSnapshot())
|
||||
|
@ -1630,6 +1640,37 @@ func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labe
|
|||
return s, true, nil
|
||||
}
|
||||
|
||||
// mmapHeadChunks will iterate all memSeries stored on Head and call mmapHeadChunks() on each of them.
|
||||
//
|
||||
// There are two types of chunks that store samples for each memSeries:
|
||||
// A) Head chunk - stored on Go heap, when new samples are appended they go there.
|
||||
// B) M-mapped chunks - memory mapped chunks, kernel manages the memory for us on-demand, these chunks
|
||||
//
|
||||
// are read-only.
|
||||
//
|
||||
// Calling mmapHeadChunks() will iterate all memSeries and m-mmap all chunks that should be m-mapped.
|
||||
// The m-mapping operation is needs to be serialised and so it goes via central lock.
|
||||
// If there are multiple concurrent memSeries that need to m-map some chunk then they can block each-other.
|
||||
//
|
||||
// To minimise the effect of locking on TSDB operations m-mapping is serialised and done away from
|
||||
// sample append path, since waiting on a lock inside an append would lock the entire memSeries for
|
||||
// (potentially) a long time, since that could eventually delay next scrape and/or cause query timeouts.
|
||||
func (h *Head) mmapHeadChunks() {
|
||||
var count int
|
||||
for i := 0; i < h.series.size; i++ {
|
||||
h.series.locks[i].RLock()
|
||||
for _, all := range h.series.hashes[i] {
|
||||
for _, series := range all {
|
||||
series.Lock()
|
||||
count += series.mmapChunks(h.chunkDiskMapper)
|
||||
series.Unlock()
|
||||
}
|
||||
}
|
||||
h.series.locks[i].RUnlock()
|
||||
}
|
||||
h.metrics.mmapChunksTotal.Add(float64(count))
|
||||
}
|
||||
|
||||
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
||||
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
|
||||
// Its methods require the hash to be submitted with it to avoid re-computations throughout
|
||||
|
@ -1760,7 +1801,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
|||
minOOOTime = series.ooo.oooHeadChunk.minTime
|
||||
}
|
||||
}
|
||||
if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit ||
|
||||
if len(series.mmappedChunks) > 0 || series.headChunks != nil || series.pendingCommit ||
|
||||
(series.ooo != nil && (len(series.ooo.oooMmappedChunks) > 0 || series.ooo.oooHeadChunk != nil)) {
|
||||
seriesMint := series.minTime()
|
||||
if seriesMint < actualMint {
|
||||
|
@ -1915,8 +1956,11 @@ type memSeries struct {
|
|||
//
|
||||
// pN is the pointer to the mmappedChunk referered to by HeadChunkID=N
|
||||
mmappedChunks []*mmappedChunk
|
||||
headChunk *memChunk // Most recent chunk in memory that's still being built.
|
||||
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
|
||||
// Most recent chunks in memory that are still being built or waiting to be mmapped.
|
||||
// This is a linked list, headChunks points to the most recent chunk, headChunks.next points
|
||||
// to older chunk and so on.
|
||||
headChunks *memChunk
|
||||
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
|
||||
|
||||
ooo *memSeriesOOOFields
|
||||
|
||||
|
@ -1932,7 +1976,7 @@ type memSeries struct {
|
|||
lastFloatHistogramValue *histogram.FloatHistogram
|
||||
|
||||
// Current appender for the head chunk. Set when a new head chunk is cut.
|
||||
// It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||||
// It is nil only if headChunks is nil. E.g. if there was an appender that created a new series, but rolled back the commit
|
||||
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
|
||||
app chunkenc.Appender
|
||||
|
||||
|
@ -1966,17 +2010,16 @@ func (s *memSeries) minTime() int64 {
|
|||
if len(s.mmappedChunks) > 0 {
|
||||
return s.mmappedChunks[0].minTime
|
||||
}
|
||||
if s.headChunk != nil {
|
||||
return s.headChunk.minTime
|
||||
if s.headChunks != nil {
|
||||
return s.headChunks.oldest().minTime
|
||||
}
|
||||
return math.MinInt64
|
||||
}
|
||||
|
||||
func (s *memSeries) maxTime() int64 {
|
||||
// The highest timestamps will always be in the regular (non-OOO) chunks, even if OOO is enabled.
|
||||
c := s.head()
|
||||
if c != nil {
|
||||
return c.maxTime
|
||||
if s.headChunks != nil {
|
||||
return s.headChunks.maxTime
|
||||
}
|
||||
if len(s.mmappedChunks) > 0 {
|
||||
return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime
|
||||
|
@ -1989,12 +2032,29 @@ func (s *memSeries) maxTime() int64 {
|
|||
// Chunk IDs remain unchanged.
|
||||
func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) int {
|
||||
var removedInOrder int
|
||||
if s.headChunk != nil && s.headChunk.maxTime < mint {
|
||||
// If head chunk is truncated, we can truncate all mmapped chunks.
|
||||
removedInOrder = 1 + len(s.mmappedChunks)
|
||||
s.firstChunkID += chunks.HeadChunkID(removedInOrder)
|
||||
s.headChunk = nil
|
||||
s.mmappedChunks = nil
|
||||
if s.headChunks != nil {
|
||||
var i int
|
||||
var nextChk *memChunk
|
||||
chk := s.headChunks
|
||||
for chk != nil {
|
||||
if chk.maxTime < mint {
|
||||
// If any head chunk is truncated, we can truncate all mmapped chunks.
|
||||
removedInOrder = chk.len() + len(s.mmappedChunks)
|
||||
s.firstChunkID += chunks.HeadChunkID(removedInOrder)
|
||||
if i == 0 {
|
||||
// This is the first chunk on the list so we need to remove the entire list.
|
||||
s.headChunks = nil
|
||||
} else {
|
||||
// This is NOT the first chunk, unlink it from parent.
|
||||
nextChk.prev = nil
|
||||
}
|
||||
s.mmappedChunks = nil
|
||||
break
|
||||
}
|
||||
nextChk = chk
|
||||
chk = chk.prev
|
||||
i++
|
||||
}
|
||||
}
|
||||
if len(s.mmappedChunks) > 0 {
|
||||
for i, c := range s.mmappedChunks {
|
||||
|
@ -2034,13 +2094,52 @@ func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *memSeries) head() *memChunk {
|
||||
return s.headChunk
|
||||
}
|
||||
|
||||
type memChunk struct {
|
||||
chunk chunkenc.Chunk
|
||||
minTime, maxTime int64
|
||||
prev *memChunk // Link to the previous element on the list.
|
||||
}
|
||||
|
||||
// len returns the length of memChunk list, including the element it was called on.
|
||||
func (mc *memChunk) len() (count int) {
|
||||
elem := mc
|
||||
for elem != nil {
|
||||
count++
|
||||
elem = elem.prev
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// oldest returns the oldest element on the list.
|
||||
// For single element list this will be the same memChunk oldest() was called on.
|
||||
func (mc *memChunk) oldest() (elem *memChunk) {
|
||||
elem = mc
|
||||
for elem.prev != nil {
|
||||
elem = elem.prev
|
||||
}
|
||||
return elem
|
||||
}
|
||||
|
||||
// atOffset returns a memChunk that's Nth element on the linked list.
|
||||
func (mc *memChunk) atOffset(offset int) (elem *memChunk) {
|
||||
if offset == 0 {
|
||||
return mc
|
||||
}
|
||||
if offset < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var i int
|
||||
elem = mc
|
||||
for i < offset {
|
||||
i++
|
||||
elem = elem.prev
|
||||
if elem == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return elem
|
||||
}
|
||||
|
||||
type oooHeadChunk struct {
|
||||
|
|
|
@ -395,7 +395,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOOO bool, oooDelta int64, err error) {
|
||||
// Check if we can append in the in-order chunk.
|
||||
if t >= minValidTime {
|
||||
if s.head() == nil {
|
||||
if s.headChunks == nil {
|
||||
// The series has no sample and was freshly created.
|
||||
return false, 0, nil
|
||||
}
|
||||
|
@ -433,15 +433,14 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
|
|||
|
||||
// appendableHistogram checks whether the given histogram is valid for appending to the series.
|
||||
func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
|
||||
c := s.head()
|
||||
if c == nil {
|
||||
if s.headChunks == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if t > c.maxTime {
|
||||
if t > s.headChunks.maxTime {
|
||||
return nil
|
||||
}
|
||||
if t < c.maxTime {
|
||||
if t < s.headChunks.maxTime {
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
|
@ -455,15 +454,14 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
|
|||
|
||||
// appendableFloatHistogram checks whether the given float histogram is valid for appending to the series.
|
||||
func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogram) error {
|
||||
c := s.head()
|
||||
if c == nil {
|
||||
if s.headChunks == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if t > c.maxTime {
|
||||
if t > s.headChunks.maxTime {
|
||||
return nil
|
||||
}
|
||||
if t < c.maxTime {
|
||||
if t < s.headChunks.maxTime {
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
|
@ -1200,12 +1198,11 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
|||
return true, false
|
||||
}
|
||||
|
||||
// This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk).
|
||||
s.mmapCurrentHeadChunk(o.chunkDiskMapper)
|
||||
s.headChunk = &memChunk{
|
||||
s.headChunks = &memChunk{
|
||||
chunk: newChunk,
|
||||
minTime: t,
|
||||
maxTime: t,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
s.nextAt = rangeForTimestamp(t, o.chunkRange)
|
||||
return true, true
|
||||
|
@ -1258,12 +1255,11 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
return true, false
|
||||
}
|
||||
|
||||
// This is a brand new chunk, switch out the head chunk (based on cutNewHeadChunk).
|
||||
s.mmapCurrentHeadChunk(o.chunkDiskMapper)
|
||||
s.headChunk = &memChunk{
|
||||
s.headChunks = &memChunk{
|
||||
chunk: newChunk,
|
||||
minTime: t,
|
||||
maxTime: t,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
s.nextAt = rangeForTimestamp(t, o.chunkRange)
|
||||
return true, true
|
||||
|
@ -1273,7 +1269,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
|||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
// This should be called only when appending data.
|
||||
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
|
||||
c = s.head()
|
||||
c = s.headChunks
|
||||
|
||||
if c == nil {
|
||||
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {
|
||||
|
@ -1281,7 +1277,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
return c, false, false
|
||||
}
|
||||
// There is no head chunk in this series yet, create the first chunk for the sample.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
@ -1293,8 +1289,9 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
if c.chunk.Encoding() != e {
|
||||
// The chunk encoding expected by this append is different than the head chunk's
|
||||
// encoding. So we cut a new chunk with the expected encoding.
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
|
||||
}
|
||||
|
||||
numSamples := c.chunk.NumSamples()
|
||||
|
@ -1318,7 +1315,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
|||
// as we expect more chunks to come.
|
||||
// Note that next chunk will have its nextAt recalculated for the new rate.
|
||||
if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 {
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
|
||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
||||
|
@ -1338,36 +1335,37 @@ func computeChunkEndTime(start, cur, max int64) int64 {
|
|||
return start + (max-start)/n
|
||||
}
|
||||
|
||||
func (s *memSeries) cutNewHeadChunk(
|
||||
mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64,
|
||||
) *memChunk {
|
||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||
|
||||
s.headChunk = &memChunk{
|
||||
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {
|
||||
// When cutting a new head chunk we create a new memChunk instance with .prev
|
||||
// pointing at the current .headChunks, so it forms a linked list.
|
||||
// All but first headChunks list elements will be m-mapped as soon as possible
|
||||
// so this is a single element list most of the time.
|
||||
s.headChunks = &memChunk{
|
||||
minTime: mint,
|
||||
maxTime: math.MinInt64,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
|
||||
if chunkenc.IsValidEncoding(e) {
|
||||
var err error
|
||||
s.headChunk.chunk, err = chunkenc.NewEmptyChunk(e)
|
||||
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e)
|
||||
if err != nil {
|
||||
panic(err) // This should never happen.
|
||||
}
|
||||
} else {
|
||||
s.headChunk.chunk = chunkenc.NewXORChunk()
|
||||
s.headChunks.chunk = chunkenc.NewXORChunk()
|
||||
}
|
||||
|
||||
// Set upper bound on when the next chunk must be started. An earlier timestamp
|
||||
// may be chosen dynamically at a later point.
|
||||
s.nextAt = rangeForTimestamp(mint, chunkRange)
|
||||
|
||||
app, err := s.headChunk.chunk.Appender()
|
||||
app, err := s.headChunks.chunk.Appender()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.app = app
|
||||
return s.headChunk
|
||||
return s.headChunks
|
||||
}
|
||||
|
||||
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
||||
|
@ -1401,19 +1399,32 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
|
|||
return chunkRef
|
||||
}
|
||||
|
||||
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
|
||||
if s.headChunk == nil || s.headChunk.chunk.NumSamples() == 0 {
|
||||
// There is no head chunk, so nothing to m-map here.
|
||||
// mmapChunks will m-map all but first chunk on s.headChunks list.
|
||||
func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count int) {
|
||||
if s.headChunks == nil || s.headChunks.prev == nil {
|
||||
// There is none or only one head chunk, so nothing to m-map here.
|
||||
return
|
||||
}
|
||||
|
||||
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, false, handleChunkWriteError)
|
||||
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
|
||||
ref: chunkRef,
|
||||
numSamples: uint16(s.headChunk.chunk.NumSamples()),
|
||||
minTime: s.headChunk.minTime,
|
||||
maxTime: s.headChunk.maxTime,
|
||||
})
|
||||
// Write chunks starting from the oldest one and stop before we get to current s.headChunk.
|
||||
// If we have this chain: s.headChunk{t4} -> t3 -> t2 -> t1 -> t0
|
||||
// then we need to write chunks t0 to t3, but skip s.headChunks.
|
||||
for i := s.headChunks.len() - 1; i > 0; i-- {
|
||||
chk := s.headChunks.atOffset(i)
|
||||
chunkRef := chunkDiskMapper.WriteChunk(s.ref, chk.minTime, chk.maxTime, chk.chunk, false, handleChunkWriteError)
|
||||
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
|
||||
ref: chunkRef,
|
||||
numSamples: uint16(chk.chunk.NumSamples()),
|
||||
minTime: chk.minTime,
|
||||
maxTime: chk.maxTime,
|
||||
})
|
||||
count++
|
||||
}
|
||||
|
||||
// Once we've written out all chunks except s.headChunks we need to unlink these from s.headChunk.
|
||||
s.headChunks.prev = nil
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func handleChunkWriteError(err error) {
|
||||
|
|
|
@ -174,12 +174,27 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
|
||||
})
|
||||
}
|
||||
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
MinTime: s.headChunk.minTime,
|
||||
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
|
||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
|
||||
})
|
||||
|
||||
if s.headChunks != nil {
|
||||
var maxTime int64
|
||||
var i, j int
|
||||
for i = s.headChunks.len() - 1; i >= 0; i-- {
|
||||
chk := s.headChunks.atOffset(i)
|
||||
if i == 0 {
|
||||
// Set the head chunk as open (being appended to) for the first headChunk.
|
||||
maxTime = math.MaxInt64
|
||||
} else {
|
||||
maxTime = chk.maxTime
|
||||
}
|
||||
if chk.OverlapsClosedInterval(h.mint, h.maxt) {
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
MinTime: chk.minTime,
|
||||
MaxTime: maxTime,
|
||||
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)+j))),
|
||||
})
|
||||
}
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -187,7 +202,7 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
|
||||
// headChunkID returns the HeadChunkID referred to by the given position.
|
||||
// * 0 <= pos < len(s.mmappedChunks) refer to s.mmappedChunks[pos]
|
||||
// * pos == len(s.mmappedChunks) refers to s.headChunk
|
||||
// * pos >= len(s.mmappedChunks) refers to s.headChunks linked list
|
||||
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
|
||||
return chunks.HeadChunkID(pos) + s.firstChunkID
|
||||
}
|
||||
|
@ -296,7 +311,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
|||
}
|
||||
|
||||
s.Lock()
|
||||
c, headChunk, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
|
||||
c, headChunk, isOpen, err := s.chunk(cid, h.head.chunkDiskMapper, &h.head.memChunkPool)
|
||||
if err != nil {
|
||||
s.Unlock()
|
||||
return nil, 0, err
|
||||
|
@ -305,6 +320,7 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
|||
if !headChunk {
|
||||
// Set this to nil so that Go GC can collect it after it has been used.
|
||||
c.chunk = nil
|
||||
c.prev = nil
|
||||
h.head.memChunkPool.Put(c)
|
||||
}
|
||||
}()
|
||||
|
@ -316,14 +332,14 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
|||
}
|
||||
|
||||
chk, maxTime := c.chunk, c.maxTime
|
||||
if headChunk && copyLastChunk {
|
||||
if headChunk && isOpen && copyLastChunk {
|
||||
// The caller may ask to copy the head chunk in order to take the
|
||||
// bytes of the chunk without causing the race between read and append.
|
||||
b := s.headChunk.chunk.Bytes()
|
||||
b := s.headChunks.chunk.Bytes()
|
||||
newB := make([]byte, len(b))
|
||||
copy(newB, b) // TODO(codesome): Use bytes.Clone() when we upgrade to Go 1.20.
|
||||
// TODO(codesome): Put back in the pool (non-trivial).
|
||||
chk, err = h.head.opts.ChunkPool.Get(s.headChunk.chunk.Encoding(), newB)
|
||||
chk, err = h.head.opts.ChunkPool.Get(s.headChunks.chunk.Encoding(), newB)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -341,34 +357,60 @@ func (h *headChunkReader) chunk(meta chunks.Meta, copyLastChunk bool) (chunkenc.
|
|||
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
|
||||
// If headChunk is false, it means that the returned *memChunk
|
||||
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
|
||||
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk bool, err error) {
|
||||
// if isOpen is true, it means that the returned *memChunk is used for appends.
|
||||
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool) (chunk *memChunk, headChunk, isOpen bool, err error) {
|
||||
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
|
||||
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
|
||||
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
|
||||
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
|
||||
// is >= len(s.mmappedChunks), it represents one of the chunks on s.headChunks linked list.
|
||||
// The order of elemens is different for slice and linked list.
|
||||
// For s.mmappedChunks slice newer chunks are appended to it.
|
||||
// For s.headChunks list newer chunks are prepended to it.
|
||||
//
|
||||
// memSeries {
|
||||
// mmappedChunks: [t0, t1, t2]
|
||||
// headChunk: {t5}->{t4}->{t3}
|
||||
// }
|
||||
ix := int(id) - int(s.firstChunkID)
|
||||
if ix < 0 || ix > len(s.mmappedChunks) {
|
||||
return nil, false, storage.ErrNotFound
|
||||
|
||||
var headChunksLen int
|
||||
if s.headChunks != nil {
|
||||
headChunksLen = s.headChunks.len()
|
||||
}
|
||||
|
||||
if ix == len(s.mmappedChunks) {
|
||||
if s.headChunk == nil {
|
||||
return nil, false, errors.New("invalid head chunk")
|
||||
}
|
||||
return s.headChunk, true, nil
|
||||
if ix < 0 || ix > len(s.mmappedChunks)+headChunksLen-1 {
|
||||
return nil, false, false, storage.ErrNotFound
|
||||
}
|
||||
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
|
||||
if err != nil {
|
||||
if _, ok := err.(*chunks.CorruptionErr); ok {
|
||||
panic(err)
|
||||
|
||||
if ix < len(s.mmappedChunks) {
|
||||
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
|
||||
if err != nil {
|
||||
if _, ok := err.(*chunks.CorruptionErr); ok {
|
||||
panic(err)
|
||||
}
|
||||
return nil, false, false, err
|
||||
}
|
||||
return nil, false, err
|
||||
mc := memChunkPool.Get().(*memChunk)
|
||||
mc.chunk = chk
|
||||
mc.minTime = s.mmappedChunks[ix].minTime
|
||||
mc.maxTime = s.mmappedChunks[ix].maxTime
|
||||
return mc, false, false, nil
|
||||
}
|
||||
mc := memChunkPool.Get().(*memChunk)
|
||||
mc.chunk = chk
|
||||
mc.minTime = s.mmappedChunks[ix].minTime
|
||||
mc.maxTime = s.mmappedChunks[ix].maxTime
|
||||
return mc, false, nil
|
||||
|
||||
ix -= len(s.mmappedChunks)
|
||||
|
||||
offset := headChunksLen - ix - 1
|
||||
// headChunks is a linked list where first element is the most recent one and the last one is the oldest.
|
||||
// This order is reversed when compared with mmappedChunks, since mmappedChunks[0] is the oldest chunk,
|
||||
// while headChunk.atOffset(0) would give us the most recent chunk.
|
||||
// So when calling headChunk.atOffset() we need to reverse the value of ix.
|
||||
elem := s.headChunks.atOffset(offset)
|
||||
if elem == nil {
|
||||
// This should never really happen and would mean that headChunksLen value is NOT equal
|
||||
// to the length of the headChunks list.
|
||||
return nil, false, false, storage.ErrNotFound
|
||||
}
|
||||
return elem, true, offset == 0, nil
|
||||
}
|
||||
|
||||
// oooMergedChunk returns the requested chunk based on the given chunks.Meta
|
||||
|
@ -660,8 +702,21 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *
|
|||
}
|
||||
}
|
||||
|
||||
if s.headChunk != nil {
|
||||
totalSamples += s.headChunk.chunk.NumSamples()
|
||||
ix -= len(s.mmappedChunks)
|
||||
if s.headChunks != nil {
|
||||
// Iterate all head chunks from the oldest to the newest.
|
||||
headChunksLen := s.headChunks.len()
|
||||
for j := headChunksLen - 1; j >= 0; j-- {
|
||||
chk := s.headChunks.atOffset(j)
|
||||
chkSamples := chk.chunk.NumSamples()
|
||||
totalSamples += chkSamples
|
||||
// Chunk ID is len(s.mmappedChunks) + $(headChunks list position).
|
||||
// Where $(headChunks list position) is zero for the oldest chunk and $(s.headChunks.len() - 1)
|
||||
// for the newest (open) chunk.
|
||||
if headChunksLen-1-j < ix {
|
||||
previousSamples += chkSamples
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Removing the extra transactionIDs that are relevant for samples that
|
||||
|
|
|
@ -15,11 +15,14 @@ package tsdb
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
)
|
||||
|
||||
func TestBoundedChunk(t *testing.T) {
|
||||
|
@ -176,3 +179,387 @@ func newTestChunk(numSamples int) chunkenc.Chunk {
|
|||
}
|
||||
return xor
|
||||
}
|
||||
|
||||
// TestMemSeries_chunk runs a series of tests on memSeries.chunk() calls.
|
||||
// It will simulate various conditions to ensure all code paths in that function are covered.
|
||||
func TestMemSeries_chunk(t *testing.T) {
|
||||
const chunkRange int64 = 100
|
||||
const chunkStep int64 = 5
|
||||
|
||||
appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) {
|
||||
for i := start; i < end; i += chunkStep {
|
||||
ok, _ := s.append(i, float64(i), 0, chunkOpts{
|
||||
chunkDiskMapper: cdm,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
})
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
}
|
||||
|
||||
type setupFn func(*testing.T, *memSeries, *chunks.ChunkDiskMapper)
|
||||
|
||||
type callOutput uint8
|
||||
const (
|
||||
outOpenHeadChunk callOutput = iota // memSeries.chunk() call returned memSeries.headChunks with headChunk=true & isOpen=true
|
||||
outClosedHeadChunk // memSeries.chunk() call returned memSeries.headChunks with headChunk=true & isOpen=false
|
||||
outMmappedChunk // memSeries.chunk() call returned a chunk from memSeries.mmappedChunks with headChunk=false
|
||||
outErr // memSeries.chunk() call returned an error
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
setup setupFn // optional function called just before the test memSeries.chunk() call
|
||||
inputID chunks.HeadChunkID // requested chunk id for memSeries.chunk() call
|
||||
expected callOutput
|
||||
}{
|
||||
{
|
||||
name: "call ix=0 on empty memSeries",
|
||||
inputID: 0,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=1 on empty memSeries",
|
||||
inputID: 1,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "firstChunkID > ix",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
s.firstChunkID = 5
|
||||
},
|
||||
inputID: 1,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=0 on memSeries with no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 0,
|
||||
expected: outOpenHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=1 on memSeries with no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 1,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=10 on memSeries with no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, chunkRange-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 10,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=0 on memSeries with 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 0,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=1 on memSeries with 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 1,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=3 on memSeries with 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 3,
|
||||
expected: outOpenHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=0 on memSeries with 3 mmapped chunks and no headChunk",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
s.headChunks = nil
|
||||
},
|
||||
inputID: 0,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=2 on memSeries with 3 mmapped chunks and no headChunk",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
s.headChunks = nil
|
||||
},
|
||||
inputID: 2,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=3 on memSeries with 3 mmapped chunks and no headChunk",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
s.headChunks = nil
|
||||
},
|
||||
inputID: 3,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=1 on memSeries with 3 mmapped chunks and closed ChunkDiskMapper",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
cdm.Close()
|
||||
},
|
||||
inputID: 1,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=3 on memSeries with 3 mmapped chunks and closed ChunkDiskMapper",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*4)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
cdm.Close()
|
||||
},
|
||||
inputID: 3,
|
||||
expected: outOpenHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=0 on memSeries with 3 head chunks and no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*3, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 0,
|
||||
expected: outClosedHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=1 on memSeries with 3 head chunks and no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*3, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 1,
|
||||
expected: outClosedHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=10 on memSeries with 3 head chunks and no mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*3, cdm)
|
||||
require.Len(t, s.mmappedChunks, 0, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Equal(t, int64(0), s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*3)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 10,
|
||||
expected: outErr,
|
||||
},
|
||||
{
|
||||
name: "call ix=0 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 0,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=2 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 2,
|
||||
expected: outMmappedChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=3 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 3,
|
||||
expected: outClosedHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=5 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 5,
|
||||
expected: outOpenHeadChunk,
|
||||
},
|
||||
{
|
||||
name: "call ix=6 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 6,
|
||||
expected: outErr,
|
||||
},
|
||||
|
||||
{
|
||||
name: "call ix=10 on memSeries with 3 head chunks and 3 mmapped chunks",
|
||||
setup: func(t *testing.T, s *memSeries, cdm *chunks.ChunkDiskMapper) {
|
||||
appendSamples(t, s, 0, chunkRange*4, cdm)
|
||||
s.mmapChunks(cdm)
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, s.headChunks.len(), 1, "wrong number of headChunks")
|
||||
|
||||
appendSamples(t, s, chunkRange*4, chunkRange*6, cdm)
|
||||
require.Equal(t, s.headChunks.len(), 3, "wrong number of headChunks")
|
||||
require.Len(t, s.mmappedChunks, 3, "wrong number of mmappedChunks")
|
||||
require.Equal(t, chunkRange*3, s.headChunks.oldest().minTime, "wrong minTime on last headChunks element")
|
||||
require.Equal(t, (chunkRange*6)-chunkStep, s.headChunks.maxTime, "wrong maxTime on first headChunks element")
|
||||
},
|
||||
inputID: 10,
|
||||
expected: outErr,
|
||||
},
|
||||
}
|
||||
|
||||
memChunkPool := &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &memChunk{}
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, true)
|
||||
|
||||
if tc.setup != nil {
|
||||
tc.setup(t, series, chunkDiskMapper)
|
||||
}
|
||||
|
||||
chk, headChunk, isOpen, err := series.chunk(tc.inputID, chunkDiskMapper, memChunkPool)
|
||||
switch tc.expected {
|
||||
case outOpenHeadChunk:
|
||||
require.NoError(t, err, "unexpected error")
|
||||
require.True(t, headChunk, "expected a chunk with headChunk=true but got headChunk=%v", headChunk)
|
||||
require.True(t, isOpen, "expected a chunk with isOpen=true but got isOpen=%v", isOpen)
|
||||
case outClosedHeadChunk:
|
||||
require.NoError(t, err, "unexpected error")
|
||||
require.True(t, headChunk, "expected a chunk with headChunk=true but got headChunk=%v", headChunk)
|
||||
require.False(t, isOpen, "expected a chunk with isOpen=false but got isOpen=%v", isOpen)
|
||||
case outMmappedChunk:
|
||||
require.NoError(t, err, "unexpected error")
|
||||
require.False(t, headChunk, "expected a chunk with headChunk=false but got gc=%v", headChunk)
|
||||
case outErr:
|
||||
require.Nil(t, chk, "got a non-nil chunk reference returned with an error")
|
||||
require.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,7 +292,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
|||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
}
|
||||
require.NoError(b, chunkDiskMapper.Close())
|
||||
}
|
||||
|
@ -587,15 +587,15 @@ func TestHead_ReadWAL(t *testing.T) {
|
|||
return x
|
||||
}
|
||||
|
||||
c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
c, _, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
// The samples before the new series record should be discarded since a duplicate record
|
||||
// is only possible when old samples were compacted.
|
||||
c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
c, _, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
|
@ -822,30 +822,200 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
|||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
||||
// Check that truncate removes half of the chunks and afterwards
|
||||
// that the ID of the last chunk still gives us the same chunk afterwards.
|
||||
countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
|
||||
lastID := s.headChunkID(countBefore - 1)
|
||||
lastChunk, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool)
|
||||
lastChunk, _, _, err := s.chunk(lastID, chunkDiskMapper, &memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, lastChunk)
|
||||
|
||||
chk, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool)
|
||||
chk, _, _, err := s.chunk(0, chunkDiskMapper, &memChunkPool)
|
||||
require.NotNil(t, chk)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.truncateChunksBefore(2000, 0)
|
||||
|
||||
require.Equal(t, int64(2000), s.mmappedChunks[0].minTime)
|
||||
_, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool)
|
||||
_, _, _, err = s.chunk(0, chunkDiskMapper, &memChunkPool)
|
||||
require.Equal(t, storage.ErrNotFound, err, "first chunks not gone")
|
||||
require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk.
|
||||
chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool)
|
||||
chk, _, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastChunk, chk)
|
||||
}
|
||||
|
||||
func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
||||
const chunkRange = 100
|
||||
const chunkStep = 5
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
headChunks int // the number of head chubks to create on memSeries by appending enough samples
|
||||
mmappedChunks int // the number of mmapped chunks to create on memSeries by appending enough samples
|
||||
truncateBefore int64 // the mint to pass to truncateChunksBefore()
|
||||
expectedTruncated int // the number of chunks that we're expecting be truncated and returned by truncateChunksBefore()
|
||||
expectedHead int // the expected number of head chunks after truncation
|
||||
expectedMmap int // the expected number of mmapped chunks after truncation
|
||||
expectedFirstChunkID chunks.HeadChunkID // the expected series.firstChunkID after truncation
|
||||
}{
|
||||
{
|
||||
name: "empty memSeries",
|
||||
truncateBefore: chunkRange * 10,
|
||||
},
|
||||
{
|
||||
name: "single head chunk, not truncated",
|
||||
headChunks: 1,
|
||||
expectedHead: 1,
|
||||
},
|
||||
{
|
||||
name: "single head chunk, truncated",
|
||||
headChunks: 1,
|
||||
truncateBefore: chunkRange,
|
||||
expectedTruncated: 1,
|
||||
expectedHead: 0,
|
||||
expectedFirstChunkID: 1,
|
||||
},
|
||||
{
|
||||
name: "2 head chunks, not truncated",
|
||||
headChunks: 2,
|
||||
expectedHead: 2,
|
||||
},
|
||||
{
|
||||
name: "2 head chunks, first truncated",
|
||||
headChunks: 2,
|
||||
truncateBefore: chunkRange,
|
||||
expectedTruncated: 1,
|
||||
expectedHead: 1,
|
||||
expectedFirstChunkID: 1,
|
||||
},
|
||||
{
|
||||
name: "2 head chunks, everything truncated",
|
||||
headChunks: 2,
|
||||
truncateBefore: chunkRange * 2,
|
||||
expectedTruncated: 2,
|
||||
expectedHead: 0,
|
||||
expectedFirstChunkID: 2,
|
||||
},
|
||||
{
|
||||
name: "no head chunks, 3 mmap chunks, second mmap truncated",
|
||||
headChunks: 0,
|
||||
mmappedChunks: 3,
|
||||
truncateBefore: chunkRange * 2,
|
||||
expectedTruncated: 2,
|
||||
expectedHead: 0,
|
||||
expectedMmap: 1,
|
||||
expectedFirstChunkID: 2,
|
||||
},
|
||||
{
|
||||
name: "single head chunk, single mmap chunk, not truncated",
|
||||
headChunks: 1,
|
||||
mmappedChunks: 1,
|
||||
expectedHead: 1,
|
||||
expectedMmap: 1,
|
||||
},
|
||||
{
|
||||
name: "single head chunk, single mmap chunk, mmap truncated",
|
||||
headChunks: 1,
|
||||
mmappedChunks: 1,
|
||||
truncateBefore: chunkRange,
|
||||
expectedTruncated: 1,
|
||||
expectedHead: 1,
|
||||
expectedMmap: 0,
|
||||
expectedFirstChunkID: 1,
|
||||
},
|
||||
{
|
||||
name: "5 head chunk, 5 mmap chunk, third head truncated",
|
||||
headChunks: 5,
|
||||
mmappedChunks: 5,
|
||||
truncateBefore: chunkRange * 7,
|
||||
expectedTruncated: 7,
|
||||
expectedHead: 3,
|
||||
expectedMmap: 0,
|
||||
expectedFirstChunkID: 7,
|
||||
},
|
||||
{
|
||||
name: "2 head chunks, 3 mmap chunks, second mmap truncated",
|
||||
headChunks: 2,
|
||||
mmappedChunks: 3,
|
||||
truncateBefore: chunkRange * 2,
|
||||
expectedTruncated: 2,
|
||||
expectedHead: 2,
|
||||
expectedMmap: 1,
|
||||
expectedFirstChunkID: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, true)
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
var headStart int
|
||||
if tc.mmappedChunks > 0 {
|
||||
headStart = (tc.mmappedChunks + 1) * chunkRange
|
||||
for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep {
|
||||
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
series.mmapChunks(chunkDiskMapper)
|
||||
}
|
||||
|
||||
if tc.headChunks == 0 {
|
||||
series.headChunks = nil
|
||||
} else {
|
||||
for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep {
|
||||
ok, _ := series.append(int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed: %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
if tc.headChunks > 0 {
|
||||
require.NotNil(t, series.headChunks, "head chunk is missing")
|
||||
require.Equal(t, tc.headChunks, series.headChunks.len(), "wrong number of head chunks")
|
||||
} else {
|
||||
require.Nil(t, series.headChunks, "head chunk is present")
|
||||
}
|
||||
require.Equal(t, tc.mmappedChunks, len(series.mmappedChunks), "wrong number of mmapped chunks")
|
||||
|
||||
truncated := series.truncateChunksBefore(tc.truncateBefore, 0)
|
||||
require.Equal(t, tc.expectedTruncated, truncated, "wrong number of truncated chunks returned")
|
||||
|
||||
require.Equal(t, tc.expectedMmap, len(series.mmappedChunks), "wrong number of mmappedChunks after truncation")
|
||||
|
||||
if tc.expectedHead > 0 {
|
||||
require.NotNil(t, series.headChunks, "headChunks should is nil after truncation")
|
||||
require.Equal(t, tc.expectedHead, series.headChunks.len(), "wrong number of head chunks after truncation")
|
||||
require.Nil(t, series.headChunks.oldest().prev, "last head chunk cannot have any next chunk set")
|
||||
} else {
|
||||
require.Nil(t, series.headChunks, "headChunks should is non-nil after truncation")
|
||||
}
|
||||
|
||||
if series.headChunks != nil || len(series.mmappedChunks) > 0 {
|
||||
require.GreaterOrEqual(t, series.maxTime(), tc.truncateBefore, "wrong value of series.maxTime() after truncation")
|
||||
} else {
|
||||
require.Equal(t, int64(math.MinInt64), series.maxTime(), "wrong value of series.maxTime() after truncation")
|
||||
}
|
||||
|
||||
require.Equal(t, tc.expectedFirstChunkID, series.firstChunkID, "wrong firstChunkID after truncation")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
|
||||
for _, compress := range []wlog.CompressionType{wlog.CompressionNone, wlog.CompressionSnappy, wlog.CompressionZstd} {
|
||||
t.Run(fmt.Sprintf("compress=%s", compress), func(t *testing.T) {
|
||||
|
@ -1363,6 +1533,7 @@ func TestMemSeries_append(t *testing.T) {
|
|||
ok, chunkCreated = s.append(999, 2, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
||||
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
|
@ -1372,11 +1543,12 @@ func TestMemSeries_append(t *testing.T) {
|
|||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
|
||||
require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range")
|
||||
|
||||
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
|
||||
// at approximately 120 samples per chunk.
|
||||
|
@ -1384,6 +1556,7 @@ func TestMemSeries_append(t *testing.T) {
|
|||
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
}
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
|
||||
require.Greater(t, len(s.mmappedChunks)+1, 7, "expected intermediate chunks")
|
||||
|
||||
|
@ -1437,21 +1610,23 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
|||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
|
||||
require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
|
||||
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
|
||||
require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1002), s.headChunk.maxTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1002), s.headChunks.maxTime, "wrong chunk range")
|
||||
}
|
||||
|
||||
func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
|
@ -1495,6 +1670,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
|||
require.True(t, ok, "new chunk sample was not appended")
|
||||
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
|
||||
|
||||
s.mmapChunks(chunkDiskMapper)
|
||||
var totalSamplesInChunks int
|
||||
for i, c := range s.mmappedChunks {
|
||||
totalSamplesInChunks += int(c.numSamples)
|
||||
|
@ -1841,6 +2017,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
|||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunk was created")
|
||||
h.chunkDiskMapper.CutNewFile()
|
||||
s.mmapChunks(h.chunkDiskMapper)
|
||||
}
|
||||
require.NoError(t, h.Close())
|
||||
|
||||
|
@ -1985,6 +2162,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
h.mmapHeadChunks()
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
@ -2666,7 +2844,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
|||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{
|
||||
c, _, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &memChunk{}
|
||||
},
|
||||
|
@ -3092,6 +3270,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
}
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
head.mmapHeadChunks()
|
||||
}
|
||||
|
||||
// There should be 11 mmap chunks in s1.
|
||||
|
@ -3103,7 +3282,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
cpy := *mmap
|
||||
expMmapChunks = append(expMmapChunks, &cpy)
|
||||
}
|
||||
expHeadChunkSamples := ms.headChunk.chunk.NumSamples()
|
||||
expHeadChunkSamples := ms.headChunks.chunk.NumSamples()
|
||||
require.Greater(t, expHeadChunkSamples, 0)
|
||||
|
||||
// Series with mix of histograms and float.
|
||||
|
@ -3199,7 +3378,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
|
|||
// Checking contents of s1.
|
||||
ms = head.series.getByHash(s1.Hash(), s1)
|
||||
require.Equal(t, expMmapChunks, ms.mmappedChunks)
|
||||
require.Equal(t, expHeadChunkSamples, ms.headChunk.chunk.NumSamples())
|
||||
require.Equal(t, expHeadChunkSamples, ms.headChunks.chunk.NumSamples())
|
||||
|
||||
testQuery := func() {
|
||||
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
|
||||
|
@ -3738,6 +3917,8 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
|||
// Only 1 chunk in the memory, no m-mapped chunk.
|
||||
s := head.series.getByHash(l.Hash(), l)
|
||||
require.NotNil(t, s)
|
||||
require.NotNil(t, s.headChunks)
|
||||
require.Equal(t, s.headChunks.len(), 1)
|
||||
require.Equal(t, 0, len(s.mmappedChunks))
|
||||
testQuery(1)
|
||||
|
||||
|
@ -3766,10 +3947,13 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
|||
expHistograms = append(expHistograms, timedHistogram{t: 100*int64(len(expHistograms)) + 1, h: &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}})
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
head.mmapHeadChunks()
|
||||
|
||||
// Total 2 chunks, 1 m-mapped.
|
||||
s = head.series.getByHash(l.Hash(), l)
|
||||
require.NotNil(t, s)
|
||||
require.NotNil(t, s.headChunks)
|
||||
require.Equal(t, s.headChunks.len(), 1)
|
||||
require.Equal(t, 1, len(s.mmappedChunks))
|
||||
testQuery(2)
|
||||
}
|
||||
|
@ -3804,6 +3988,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
|||
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
require.NoError(t, err)
|
||||
ms.mmapChunks(head.chunkDiskMapper)
|
||||
require.Len(t, ms.mmappedChunks, len(expHeaders)-1) // One is the head chunk.
|
||||
|
||||
for i, mmapChunk := range ms.mmappedChunks {
|
||||
|
@ -3816,9 +4001,9 @@ func TestHistogramCounterResetHeader(t *testing.T) {
|
|||
}
|
||||
}
|
||||
if floatHisto {
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
} else {
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3909,7 +4094,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
require.Len(t, ms.mmappedChunks, count-1) // One will be the head chunk.
|
||||
require.Equal(t, count, ms.headChunks.len())
|
||||
}
|
||||
|
||||
appends := []struct {
|
||||
|
@ -4350,6 +4535,7 @@ func TestHeadInit_DiscardChunksWithUnsupportedEncoding(t *testing.T) {
|
|||
require.False(t, created, "should already exist")
|
||||
require.NotNil(t, series, "should return the series we created above")
|
||||
|
||||
series.mmapChunks(h.chunkDiskMapper)
|
||||
expChunks := make([]*mmappedChunk, len(series.mmappedChunks))
|
||||
copy(expChunks, series.mmappedChunks)
|
||||
|
||||
|
@ -4507,6 +4693,7 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
|
|||
require.NoError(t, f.Close())
|
||||
|
||||
openHead()
|
||||
h.mmapHeadChunks()
|
||||
|
||||
// There should be less m-map files due to corruption.
|
||||
files, err = os.ReadDir(filepath.Join(dir, "chunks_head"))
|
||||
|
@ -4697,7 +4884,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
require.False(t, created)
|
||||
require.NotNil(t, ms)
|
||||
|
||||
require.Nil(t, ms.headChunk)
|
||||
require.Nil(t, ms.headChunks)
|
||||
require.NotNil(t, ms.ooo.oooHeadChunk)
|
||||
require.Equal(t, expSamples, ms.ooo.oooHeadChunk.chunk.NumSamples())
|
||||
}
|
||||
|
@ -4709,8 +4896,8 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
|
|||
require.NotNil(t, ms)
|
||||
|
||||
require.Nil(t, ms.ooo)
|
||||
require.NotNil(t, ms.headChunk)
|
||||
require.Equal(t, expSamples, ms.headChunk.chunk.NumSamples())
|
||||
require.NotNil(t, ms.headChunks)
|
||||
require.Equal(t, expSamples, ms.headChunks.chunk.NumSamples())
|
||||
}
|
||||
|
||||
newLabels := func(idx int) labels.Labels { return labels.FromStrings("foo", fmt.Sprintf("%d", idx)) }
|
||||
|
@ -4821,6 +5008,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
|||
appendHistogram(hists[4])
|
||||
|
||||
checkHeaders := func() {
|
||||
head.mmapHeadChunks()
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, ms.mmappedChunks, 3)
|
||||
|
@ -4835,7 +5023,7 @@ func TestGaugeHistogramWALAndChunkHeader(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, expHeaders[i], chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
checkHeaders()
|
||||
|
||||
|
@ -4898,6 +5086,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
|||
checkHeaders := func() {
|
||||
ms, _, err := head.getOrCreate(l.Hash(), l)
|
||||
require.NoError(t, err)
|
||||
head.mmapHeadChunks()
|
||||
require.Len(t, ms.mmappedChunks, 3)
|
||||
expHeaders := []chunkenc.CounterResetHeader{
|
||||
chunkenc.UnknownCounterReset,
|
||||
|
@ -4910,7 +5099,7 @@ func TestGaugeFloatHistogramWALAndChunkHeader(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, expHeaders[i], chk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunks.chunk.(*chunkenc.FloatHistogramChunk).GetCounterResetHeader())
|
||||
}
|
||||
checkHeaders()
|
||||
|
||||
|
|
|
@ -503,7 +503,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
|
|||
|
||||
// Any samples replayed till now would already be compacted. Resetting the head chunk.
|
||||
mSeries.nextAt = 0
|
||||
mSeries.headChunk = nil
|
||||
mSeries.headChunks = nil
|
||||
mSeries.app = nil
|
||||
return
|
||||
}
|
||||
|
@ -595,6 +595,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
h.metrics.chunks.Inc()
|
||||
_ = ms.mmapChunks(h.chunkDiskMapper)
|
||||
}
|
||||
if s.T > maxt {
|
||||
maxt = s.T
|
||||
|
@ -960,15 +961,15 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
|
|||
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
|
||||
|
||||
s.Lock()
|
||||
if s.headChunk == nil {
|
||||
if s.headChunks == nil {
|
||||
buf.PutUvarint(0)
|
||||
} else {
|
||||
enc := s.headChunk.chunk.Encoding()
|
||||
enc := s.headChunks.chunk.Encoding()
|
||||
buf.PutUvarint(1)
|
||||
buf.PutBE64int64(s.headChunk.minTime)
|
||||
buf.PutBE64int64(s.headChunk.maxTime)
|
||||
buf.PutBE64int64(s.headChunks.minTime)
|
||||
buf.PutBE64int64(s.headChunks.maxTime)
|
||||
buf.PutByte(byte(enc))
|
||||
buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
|
||||
buf.PutUvarintBytes(s.headChunks.chunk.Bytes())
|
||||
|
||||
switch enc {
|
||||
case chunkenc.EncXOR:
|
||||
|
@ -1414,12 +1415,12 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
|||
continue
|
||||
}
|
||||
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
|
||||
series.headChunk = csr.mc
|
||||
series.headChunks = csr.mc
|
||||
series.lastValue = csr.lastValue
|
||||
series.lastHistogramValue = csr.lastHistogramValue
|
||||
series.lastFloatHistogramValue = csr.lastFloatHistogramValue
|
||||
|
||||
app, err := series.headChunk.chunk.Appender()
|
||||
app, err := series.headChunks.chunk.Appender()
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue