Browse Source

Merge pull request #29 from grafana/add-jitter-to-chunk-end

Add jitter to head chunks flushing
owilliams/utf8-02-mimir
Marco Pracucci 3 years ago committed by GitHub
parent
commit
6525385b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      tsdb/db.go
  2. 26
      tsdb/head.go
  3. 36
      tsdb/head_append.go
  4. 66
      tsdb/head_append_test.go
  5. 8
      tsdb/head_test.go

9
tsdb/db.go

@ -84,6 +84,7 @@ func DefaultOptions() *Options {
WALCompression: false,
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
HeadChunksEndTimeVariance: 0,
}
}
@ -139,6 +140,10 @@ type Options struct {
// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
HeadChunksWriteBufferSize int
// HeadChunksEndTimeVariance is how much variance (between 0 and 1) should be applied to the chunk end time,
// to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 to disable variance.
HeadChunksEndTimeVariance float64
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
SeriesLifecycleCallback SeriesLifecycleCallback
@ -594,6 +599,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.HeadChunksWriteBufferSize <= 0 {
opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize
}
if opts.HeadChunksEndTimeVariance <= 0 {
opts.HeadChunksEndTimeVariance = 0
}
if opts.MaxBlockChunkSegmentSize <= 0 {
opts.MaxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
}
@ -725,6 +733,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.ChunkDirRoot = dir
headOpts.ChunkPool = db.chunkPool
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.ChunkEndTimeVariance = opts.HeadChunksEndTimeVariance
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage

26
tsdb/head.go

@ -125,6 +125,8 @@ type HeadOptions struct {
ChunkDirRoot string
ChunkPool chunkenc.Pool
ChunkWriteBufferSize int
ChunkEndTimeVariance float64
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
@ -140,6 +142,7 @@ func DefaultHeadOptions() *HeadOptions {
ChunkDirRoot: "",
ChunkPool: chunkenc.NewPool(),
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
ChunkEndTimeVariance: 0,
StripeSize: DefaultStripeSize,
SeriesCallback: &noopSeriesLifecycleCallback{},
}
@ -1226,7 +1229,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, hash, h.chunkRange.Load(), &h.memChunkPool)
return newMemSeries(lset, id, hash, h.chunkRange.Load(), h.opts.ChunkEndTimeVariance, &h.memChunkPool)
})
if err != nil {
return nil, false, err
@ -1473,6 +1476,10 @@ type memSeries struct {
chunkRange int64
firstChunkID int
// chunkEndTimeVariance is how much variance (between 0 and 1) should be applied to the chunk end time,
// to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 to disable variance.
chunkEndTimeVariance float64
nextAt int64 // Timestamp at which to cut the next chunk.
sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be committed to this series.
@ -1484,15 +1491,16 @@ type memSeries struct {
txs *txRing
}
func newMemSeries(lset labels.Labels, id, hash uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries {
func newMemSeries(lset labels.Labels, id, hash uint64, chunkRange int64, chunkEndTimeVariance float64, memChunkPool *sync.Pool) *memSeries {
s := &memSeries{
lset: lset,
hash: hash,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
txs: newTxRing(4),
memChunkPool: memChunkPool,
lset: lset,
hash: hash,
ref: id,
chunkRange: chunkRange,
chunkEndTimeVariance: chunkEndTimeVariance,
nextAt: math.MinInt64,
txs: newTxRing(4),
memChunkPool: memChunkPool,
}
return s
}

36
tsdb/head_append.go

@ -151,6 +151,13 @@ func (h *Head) AppendableMinValidTime() (int64, bool) {
return h.appendableMinValidTime(), true
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func max(a, b int64) int64 {
if a > b {
return a
@ -507,7 +514,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
maxNextAt := s.nextAt
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt)
s.nextAt = addJitterToChunkEndTime(s.hash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance)
}
if t >= s.nextAt {
c = s.cutNewHeadChunk(t, chunkDiskMapper)
@ -542,6 +552,30 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/n
}
// addJitterToChunkEndTime return chunk's nextAt applying a jitter based on the provided expected variance.
// The variance is applied to the estimated chunk duration (nextAt - chunkMinTime); the returned updated chunk
// end time is guaranteed to be between "chunkDuration - (chunkDuration*(variance/2))" to
// "chunkDuration + chunkDuration*(variance/2)", and never greater than maxNextAt.
func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt int64, variance float64) int64 {
if variance <= 0 {
return nextAt
}
// Do not apply the jitter if the chunk is expected to be the last one of the chunk range.
if nextAt >= maxNextAt {
return nextAt
}
// Compute the variance to apply to the chunk end time. The variance is based on the series hash so that
// different TSDBs ingesting the same exact samples (e.g. in a distributed system like Cortex) will have
// the same chunks for a given period.
chunkDuration := nextAt - chunkMinTime
chunkDurationMaxVariance := int64(float64(chunkDuration) * variance)
chunkDurationVariance := int64(seriesHash % uint64(chunkDurationMaxVariance))
return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2))
}
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
s.mmapCurrentHeadChunk(chunkDiskMapper)

66
tsdb/head_append_test.go

@ -0,0 +1,66 @@
package tsdb
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAddJitterToChunkEndTime_ShouldHonorMaxVarianceAndMaxNextAt(t *testing.T) {
chunkMinTime := int64(10)
nextAt := int64(95)
maxNextAt := int64(100)
variance := 0.2
// Compute the expected max variance.
expectedMaxVariance := int64(float64(nextAt-chunkMinTime) * variance)
for seriesHash := uint64(0); seriesHash < 1000; seriesHash++ {
actual := addJitterToChunkEndTime(seriesHash, chunkMinTime, nextAt, maxNextAt, variance)
require.GreaterOrEqual(t, actual, nextAt-(expectedMaxVariance/2))
require.LessOrEqual(t, actual, maxNextAt)
}
}
func TestAddJitterToChunkEndTime_Distribution(t *testing.T) {
chunkMinTime := int64(0)
nextAt := int64(50)
maxNextAt := int64(100)
variance := 0.2
numSeries := uint64(1000)
// Compute the expected max variance.
expectedMaxVariance := int64(float64(nextAt-chunkMinTime) * variance)
// Keep track of the distribution of the applied variance.
varianceDistribution := map[int64]int64{}
for seriesHash := uint64(0); seriesHash < numSeries; seriesHash++ {
actual := addJitterToChunkEndTime(seriesHash, chunkMinTime, nextAt, maxNextAt, variance)
require.GreaterOrEqual(t, actual, nextAt-(expectedMaxVariance/2))
require.LessOrEqual(t, actual, nextAt+(expectedMaxVariance/2))
require.LessOrEqual(t, actual, maxNextAt)
variance := nextAt - actual
varianceDistribution[variance]++
}
// Ensure a uniform distribution.
for variance, count := range varianceDistribution {
require.Equalf(t, int64(numSeries)/expectedMaxVariance, count, "variance = %d", variance)
}
}
func TestAddJitterToChunkEndTime_ShouldNotApplyJitterToTheLastChunkOfTheRange(t *testing.T) {
// Since the jitter could also be 0, we try it for multiple series.
for seriesHash := uint64(0); seriesHash < 10; seriesHash++ {
require.Equal(t, int64(200), addJitterToChunkEndTime(seriesHash, 150, 200, 200, 0.2))
}
}
func TestAddJitterToChunkEndTime_ShouldNotApplyJitterIfDisabled(t *testing.T) {
// Since the jitter could also be 0, we try it for multiple series.
for seriesHash := uint64(0); seriesHash < 10; seriesHash++ {
require.Equal(t, int64(130), addJitterToChunkEndTime(seriesHash, 100, 130, 200, 0))
}
}

8
tsdb/head_test.go

@ -228,7 +228,7 @@ func BenchmarkLoadWAL(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
lbls := labels.Labels{}
s := newMemSeries(lbls, uint64(k)*101, lbls.Hash(), c.mmappedChunkT, nil)
s := newMemSeries(lbls, uint64(k)*101, lbls.Hash(), c.mmappedChunkT, 0, nil)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
@ -553,7 +553,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
}
lbls := labels.FromStrings("a", "b")
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, &memChunkPool)
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
@ -1092,7 +1092,7 @@ func TestMemSeries_append(t *testing.T) {
}()
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, lbls.Hash(), 500, nil)
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
@ -2323,7 +2323,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
}()
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, lbls.Hash(), 500, nil)
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)

Loading…
Cancel
Save