From 052993414a60254d71bd0e19cf3ee648ca697728 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Wed, 12 Apr 2023 09:48:35 -0700 Subject: [PATCH] Add storage.tsdb.samples-per-chunk flag Signed-off-by: Justin Lei --- cmd/prometheus/main.go | 5 +++++ tsdb/db.go | 5 +++++ tsdb/head.go | 19 +++++++++++++------ tsdb/head_append.go | 7 ++----- tsdb/head_test.go | 12 ++++++------ 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f4f6af20d..cafe2f819 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -336,6 +336,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) + serverOnlyFlag(a, "storage.tsdb.samples-per-chunk", "Target number of samples per chunk."). + Default("120").Hidden().IntVar(&cfg.tsdb.SamplesPerChunk) + agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -1542,6 +1545,7 @@ type tsdbOptions struct { NoLockfile bool WALCompression bool HeadChunksWriteQueueSize int + SamplesPerChunk int StripeSize int MinBlockDuration model.Duration MaxBlockDuration model.Duration @@ -1562,6 +1566,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { AllowOverlappingCompaction: true, WALCompression: opts.WALCompression, HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, + SamplesPerChunk: opts.SamplesPerChunk, StripeSize: opts.StripeSize, MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), diff --git a/tsdb/db.go b/tsdb/db.go index 659251c3c..e0e9c69f0 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -78,6 +78,7 @@ func DefaultOptions() *Options { NoLockfile: false, AllowOverlappingCompaction: true, WALCompression: false, + SamplesPerChunk: DefaultSamplesPerChunk, StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, @@ -149,6 +150,9 @@ type Options struct { // HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper. HeadChunksWriteQueueSize int + // SamplesPerChunk configures the target number of samples per chunk. + SamplesPerChunk int + // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback @@ -778,6 +782,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.ChunkPool = db.chunkPool headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize + headOpts.SamplesPerChunk = opts.SamplesPerChunk headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.EnableExemplarStorage = opts.EnableExemplarStorage diff --git a/tsdb/head.go b/tsdb/head.go index af8175cd0..b4df1b2d0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -150,6 +150,8 @@ type HeadOptions struct { ChunkWriteBufferSize int ChunkWriteQueueSize int + SamplesPerChunk int + // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. @@ -169,6 +171,8 @@ type HeadOptions struct { const ( // DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk. DefaultOutOfOrderCapMax int64 = 32 + // DefaultSamplesPerChunk provides a default target number of samples per chunk. + DefaultSamplesPerChunk = 120 ) func DefaultHeadOptions() *HeadOptions { @@ -178,6 +182,7 @@ func DefaultHeadOptions() *HeadOptions { ChunkPool: chunkenc.NewPool(), ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, + SamplesPerChunk: DefaultSamplesPerChunk, StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, @@ -1607,7 +1612,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.opts.IsolationDisabled) + return newMemSeries(lset, id, h.opts.IsolationDisabled, h.opts.SamplesPerChunk) }) if err != nil { return nil, false, err @@ -1915,7 +1920,8 @@ type memSeries struct { mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. - nextAt int64 // Timestamp at which to cut the next chunk. + samplesPerChunk int // Target number of samples per chunk. + nextAt int64 // Timestamp at which to cut the next chunk. // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 @@ -1943,11 +1949,12 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool, samplesPerChunk int) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - nextAt: math.MinInt64, + lset: lset, + ref: id, + nextAt: math.MinInt64, + samplesPerChunk: samplesPerChunk, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 46180051e..eb5b219ea 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1324,9 +1324,6 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, func (s *memSeries) appendPreprocessor( t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, ) (c *memChunk, sampleInOrder, chunkCreated bool) { - // The basis for this number can be found here: https://github.com/prometheus/prometheus/pull/12055 - const samplesPerChunk = 220 - c = s.head() if c == nil { @@ -1363,7 +1360,7 @@ func (s *memSeries) appendPreprocessor( // for this chunk that will try to make samples equally distributed within // the remaining chunks in the current chunk range. // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == samplesPerChunk/4 { + if numSamples == s.samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, @@ -1371,7 +1368,7 @@ func (s *memSeries) appendPreprocessor( // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. - if t >= s.nextAt || numSamples >= samplesPerChunk*2 { + if t >= s.nextAt || numSamples >= s.samplesPerChunk*2 { c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 9a7220957..df48e592d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -284,7 +284,7 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled, DefaultSamplesPerChunk) s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT) s.mmapCurrentHeadChunk(chunkDiskMapper) } @@ -806,7 +806,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled, DefaultSamplesPerChunk) for i := 0; i < 8000; i += 5 { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) @@ -1337,7 +1337,7 @@ func TestMemSeries_append(t *testing.T) { }() const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) // Add first two samples at the very end of a chunk range and the next two // on and after it. @@ -1391,7 +1391,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { }() chunkRange := int64(1000) - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1447,7 +1447,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { }) chunkRange := DefaultBlockDuration - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -2609,7 +2609,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { }() const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk) for i := 0; i < 7; i++ { ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)