Browse Source

Add storage.tsdb.samples-per-chunk flag

Signed-off-by: Justin Lei <justin.lei@grafana.com>
pull/12055/head
Justin Lei 2 years ago
parent
commit
052993414a
  1. 5
      cmd/prometheus/main.go
  2. 5
      tsdb/db.go
  3. 19
      tsdb/head.go
  4. 7
      tsdb/head_append.go
  5. 12
      tsdb/head_test.go

5
cmd/prometheus/main.go

@ -336,6 +336,9 @@ func main() {
serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental.").
Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize)
serverOnlyFlag(a, "storage.tsdb.samples-per-chunk", "Target number of samples per chunk.").
Default("120").Hidden().IntVar(&cfg.tsdb.SamplesPerChunk)
agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage.").
Default("data-agent/").StringVar(&cfg.agentStoragePath)
@ -1542,6 +1545,7 @@ type tsdbOptions struct {
NoLockfile bool
WALCompression bool
HeadChunksWriteQueueSize int
SamplesPerChunk int
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
@ -1562,6 +1566,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
AllowOverlappingCompaction: true,
WALCompression: opts.WALCompression,
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
SamplesPerChunk: opts.SamplesPerChunk,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),

5
tsdb/db.go

@ -78,6 +78,7 @@ func DefaultOptions() *Options {
NoLockfile: false,
AllowOverlappingCompaction: true,
WALCompression: false,
SamplesPerChunk: DefaultSamplesPerChunk,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
@ -149,6 +150,9 @@ type Options struct {
// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
HeadChunksWriteQueueSize int
// SamplesPerChunk configures the target number of samples per chunk.
SamplesPerChunk int
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
SeriesLifecycleCallback SeriesLifecycleCallback
@ -778,6 +782,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.ChunkPool = db.chunkPool
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize
headOpts.SamplesPerChunk = opts.SamplesPerChunk
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage

19
tsdb/head.go

@ -150,6 +150,8 @@ type HeadOptions struct {
ChunkWriteBufferSize int
ChunkWriteQueueSize int
SamplesPerChunk int
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
@ -169,6 +171,8 @@ type HeadOptions struct {
const (
// DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk.
DefaultOutOfOrderCapMax int64 = 32
// DefaultSamplesPerChunk provides a default target number of samples per chunk.
DefaultSamplesPerChunk = 120
)
func DefaultHeadOptions() *HeadOptions {
@ -178,6 +182,7 @@ func DefaultHeadOptions() *HeadOptions {
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkWriteQueueSize: chunks.DefaultWriteQueueSize,
SamplesPerChunk: DefaultSamplesPerChunk,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
IsolationDisabled: defaultIsolationDisabled,
@ -1607,7 +1612,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, h.opts.IsolationDisabled)
return newMemSeries(lset, id, h.opts.IsolationDisabled, h.opts.SamplesPerChunk)
})
if err != nil {
return nil, false, err
@ -1915,7 +1920,8 @@ type memSeries struct {
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
nextAt int64 // Timestamp at which to cut the next chunk.
samplesPerChunk int // Target number of samples per chunk.
nextAt int64 // Timestamp at which to cut the next chunk.
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
lastValue float64
@ -1943,11 +1949,12 @@ type memSeriesOOOFields struct {
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool, samplesPerChunk int) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
nextAt: math.MinInt64,
lset: lset,
ref: id,
nextAt: math.MinInt64,
samplesPerChunk: samplesPerChunk,
}
if !isolationDisabled {
s.txs = newTxRing(4)

7
tsdb/head_append.go

@ -1324,9 +1324,6 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
// The basis for this number can be found here: https://github.com/prometheus/prometheus/pull/12055
const samplesPerChunk = 220
c = s.head()
if c == nil {
@ -1363,7 +1360,7 @@ func (s *memSeries) appendPreprocessor(
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
if numSamples == s.samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
@ -1371,7 +1368,7 @@ func (s *memSeries) appendPreprocessor(
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
if t >= s.nextAt || numSamples >= s.samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
chunkCreated = true
}

12
tsdb/head_test.go

@ -284,7 +284,7 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled, DefaultSamplesPerChunk)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
@ -806,7 +806,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
},
}
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled)
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled, DefaultSamplesPerChunk)
for i := 0; i < 8000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
@ -1337,7 +1337,7 @@ func TestMemSeries_append(t *testing.T) {
}()
const chunkRange = 500
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
@ -1391,7 +1391,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
}()
chunkRange := int64(1000)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk)
histograms := tsdbutil.GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy()
@ -1447,7 +1447,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
})
chunkRange := DefaultBlockDuration
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk)
// At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -2609,7 +2609,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
}()
const chunkRange = 500
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled, DefaultSamplesPerChunk)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)

Loading…
Cancel
Save