Browse Source

Rename OutOfOrderAllowance to OutOfOrderTimeWindow

After review Allowance is perhaps a bit misleading so we've decided to
replace it with a more common term like TimeWindow.
owilliams/utf8-02-mimir
Jesus Vazquez 2 years ago
parent
commit
e70e769889
  1. 6
      cmd/prometheus/main.go
  2. 14
      config/config.go
  3. 36
      tsdb/db.go
  4. 62
      tsdb/db_test.go
  5. 30
      tsdb/head.go
  6. 24
      tsdb/head_append.go
  7. 6
      tsdb/head_test.go
  8. 4
      tsdb/ooo_head_read_test.go

6
cmd/prometheus/main.go

@ -463,7 +463,7 @@ func main() {
cfg.tsdb.MaxExemplars = int64(cfgFile.StorageConfig.ExemplarsConfig.MaxExemplars)
}
if cfgFile.StorageConfig.TSDBConfig != nil {
cfg.tsdb.OutOfOrderAllowance = cfgFile.StorageConfig.TSDBConfig.OutOfOrderAllowance
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
}
// Now that the validity of the config is established, set the config
@ -1535,7 +1535,7 @@ type tsdbOptions struct {
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
OutOfOrderAllowance int64
OutOfOrderTimeWindow int64
OutOfOrderCapMin int
OutOfOrderCapMax int
EnableExemplarStorage bool
@ -1560,7 +1560,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
OutOfOrderAllowance: opts.OutOfOrderAllowance,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
OutOfOrderCapMin: int64(opts.OutOfOrderCapMin),
OutOfOrderCapMax: int64(opts.OutOfOrderCapMax),
}

14
config/config.go

@ -507,15 +507,15 @@ type StorageConfig struct {
// TSDBConfig configures runtime reloadable configuration options.
type TSDBConfig struct {
// OutOfOrderAllowance sets how long back in time an out-of-order sample can be inserted
// OutOfOrderTimeWindow sets how long back in time an out-of-order sample can be inserted
// into the TSDB. This is the one finally used by the TSDB and should be in the same unit
// as other timestamps in the TSDB.
OutOfOrderAllowance int64
OutOfOrderTimeWindow int64
// OutOfOrderAllowanceFlag holds the parsed duration from the config file.
// During unmarshall, this is converted into milliseconds and stored in OutOfOrderAllowance.
// This should not be used directly and must be converted into OutOfOrderAllowance.
OutOfOrderAllowanceFlag model.Duration `yaml:"out_of_order_allowance,omitempty"`
// OutOfOrderTimeWindowFlag holds the parsed duration from the config file.
// During unmarshall, this is converted into milliseconds and stored in OutOfOrderTimeWindow.
// This should not be used directly and must be converted into OutOfOrderTimeWindow.
OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -526,7 +526,7 @@ func (t *TSDBConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
return err
}
t.OutOfOrderAllowance = time.Duration(t.OutOfOrderAllowanceFlag).Milliseconds()
t.OutOfOrderTimeWindow = time.Duration(t.OutOfOrderTimeWindowFlag).Milliseconds()
return nil
}

36
tsdb/db.go

@ -182,10 +182,10 @@ type Options struct {
// If nil, the cache won't be used.
SeriesHashCache *hashcache.SeriesHashCache
// OutOfOrderAllowance specifies how much out of order is allowed, if any.
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
// This can change during run-time, so this value from here should only be used
// while initialising.
OutOfOrderAllowance int64
OutOfOrderTimeWindow int64
// OutOfOrderCapMin minimum capacity for OOO chunks (in samples).
// If it is <=0, the default value is assumed.
@ -662,7 +662,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.MinBlockDuration > opts.MaxBlockDuration {
opts.MaxBlockDuration = opts.MinBlockDuration
}
if opts.OutOfOrderAllowance > 0 {
if opts.OutOfOrderTimeWindow > 0 {
opts.AllowOverlappingQueries = true
}
if opts.OutOfOrderCapMin <= 0 {
@ -671,8 +671,8 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.OutOfOrderCapMax <= 0 {
opts.OutOfOrderCapMax = DefaultOutOfOrderCapMax
}
if opts.OutOfOrderAllowance < 0 {
opts.OutOfOrderAllowance = 0
if opts.OutOfOrderTimeWindow < 0 {
opts.OutOfOrderTimeWindow = 0
}
if len(rngs) == 0 {
@ -792,14 +792,14 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
if err != nil {
return nil, err
}
if opts.OutOfOrderAllowance > 0 {
if opts.OutOfOrderTimeWindow > 0 {
wblog, err = wal.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
}
}
db.oooWasEnabled.Store(opts.OutOfOrderAllowance > 0)
db.oooWasEnabled.Store(opts.OutOfOrderTimeWindow > 0)
headOpts := DefaultHeadOptions()
headOpts.ChunkRange = rngs[0]
headOpts.ChunkDirRoot = dir
@ -812,7 +812,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
headOpts.OutOfOrderAllowance.Store(opts.OutOfOrderAllowance)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMin.Store(opts.OutOfOrderCapMin)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.NewChunkDiskMapper = opts.NewChunkDiskMapper
@ -953,32 +953,32 @@ func (db *DB) Appender(ctx context.Context) storage.Appender {
}
// ApplyConfig applies a new config to the DB.
// Behaviour of 'OutOfOrderAllowance' is as follows:
// OOO enabled = oooAllowance > 0. OOO disabled = oooAllowance is 0.
// Behaviour of 'OutOfOrderTimeWindow' is as follows:
// OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0.
// 1) Before: OOO disabled, Now: OOO enabled =>
// * A new WBL is created for the head block.
// * OOO compaction is enabled.
// * Overlapping queries are enabled.
// 2) Before: OOO enabled, Now: OOO enabled =>
// * Only the allowance is updated.
// * Only the time window is updated.
// 3) Before: OOO enabled, Now: OOO disabled =>
// * Allowance set to 0. So no new OOO samples will be allowed.
// * Time Window set to 0. So no new OOO samples will be allowed.
// * OOO WBL will stay and follow the usual cleanup until a restart.
// * OOO Compaction and overlapping queries will remain enabled until a restart.
// 4) Before: OOO disabled, Now: OOO disabled => no-op.
func (db *DB) ApplyConfig(conf *config.Config) error {
oooAllowance := int64(0)
oooTimeWindow := int64(0)
if conf.StorageConfig.TSDBConfig != nil {
oooAllowance = conf.StorageConfig.TSDBConfig.OutOfOrderAllowance
oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
}
if oooAllowance < 0 {
oooAllowance = 0
if oooTimeWindow < 0 {
oooTimeWindow = 0
}
// Create WBL if it was not present and if OOO is enabled with WAL enabled.
var wblog *wal.WAL
var err error
if !db.oooWasEnabled.Load() && oooAllowance > 0 && db.opts.WALSegmentSize >= 0 {
if !db.oooWasEnabled.Load() && oooTimeWindow > 0 && db.opts.WALSegmentSize >= 0 {
segmentSize := wal.DefaultSegmentSize
// Wal is set to a custom size.
if db.opts.WALSegmentSize > 0 {
@ -994,7 +994,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
db.head.ApplyConfig(conf, wblog)
if !db.oooWasEnabled.Load() {
db.oooWasEnabled.Store(oooAllowance > 0)
db.oooWasEnabled.Store(oooTimeWindow > 0)
}
return nil
}

62
tsdb/db_test.go

@ -1410,7 +1410,7 @@ func TestTimeRetention(t *testing.T) {
func TestSizeRetention(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderAllowance = 100
opts.OutOfOrderTimeWindow = 100
db := openTestDB(t, opts, []int64{100})
defer func() {
require.NoError(t, db.Close())
@ -3469,7 +3469,7 @@ func TestOOOWALWrite(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 2
opts.OutOfOrderAllowance = 30 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 30 * time.Minute.Milliseconds()
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
@ -3720,7 +3720,7 @@ func TestOOOCompaction(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 300 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = true
@ -3903,7 +3903,7 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 300 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = true
@ -4001,7 +4001,7 @@ func Test_Querier_OOOQuery(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 24 * time.Hour.Milliseconds()
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = false
@ -4088,7 +4088,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 24 * time.Hour.Milliseconds()
opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = false
@ -4183,7 +4183,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 4 * time.Hour.Milliseconds()
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
opts.AllowOverlappingQueries = true
db := openTestDB(t, opts, nil)
@ -4247,17 +4247,17 @@ func TestOOOAppendAndQuery(t *testing.T) {
addSample(s2, 255, 265, false)
testQuery()
// Out of allowance.
// Out of time window.
addSample(s1, 59, 59, true)
addSample(s2, 49, 49, true)
testQuery()
// At the edge of allowance, also it would be "out of bound" without the ooo support.
// At the edge of time window, also it would be "out of bound" without the ooo support.
addSample(s1, 60, 65, false)
addSample(s2, 50, 55, false)
testQuery()
// Out of allowance again.
// Out of time window again.
addSample(s1, 59, 59, true)
addSample(s2, 49, 49, true)
testQuery()
@ -4273,7 +4273,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
func TestOOODisabled(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderAllowance = 0
opts.OutOfOrderTimeWindow = 0
db := openTestDB(t, opts, nil)
db.DisableCompactions()
t.Cleanup(func() {
@ -4310,9 +4310,9 @@ func TestOOODisabled(t *testing.T) {
addSample(s1, 300, 300, false) // In-order samples.
addSample(s1, 250, 260, true) // Some ooo samples.
addSample(s1, 59, 59, true) // Out of allowance.
addSample(s1, 60, 65, true) // At the edge of allowance, also it would be "out of bound" without the ooo support.
addSample(s1, 59, 59, true) // Out of allowance again.
addSample(s1, 59, 59, true) // Out of time window.
addSample(s1, 60, 65, true) // At the edge of time window, also it would be "out of bound" without the ooo support.
addSample(s1, 59, 59, true) // Out of time window again.
addSample(s1, 301, 310, false) // More in-order samples.
querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
@ -4341,7 +4341,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 4 * time.Hour.Milliseconds()
opts.OutOfOrderTimeWindow = 4 * time.Hour.Milliseconds()
opts.AllowOverlappingQueries = true
db := openTestDB(t, opts, nil)
@ -4531,7 +4531,7 @@ func TestOOOCompactionFailure(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 300 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = true
@ -4673,7 +4673,7 @@ func TestWBLCorruption(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 30
opts.OutOfOrderAllowance = 300 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = true
@ -4820,7 +4820,7 @@ func TestOOOMmapCorruption(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 2
opts.OutOfOrderCapMax = 10
opts.OutOfOrderAllowance = 300 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.AllowOverlappingQueries = true
opts.AllowOverlappingCompaction = true
@ -4941,11 +4941,11 @@ func TestOOOMmapCorruption(t *testing.T) {
}
func TestOutOfOrderRuntimeConfig(t *testing.T) {
getDB := func(oooAllowance int64) *DB {
getDB := func(oooTimeWindow int64) *DB {
dir := t.TempDir()
opts := DefaultOptions()
opts.OutOfOrderAllowance = oooAllowance
opts.OutOfOrderTimeWindow = oooTimeWindow
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err)
@ -4957,11 +4957,11 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
return db
}
makeConfig := func(oooAllowanceMins int) *config.Config {
makeConfig := func(oooTimeWindow int) *config.Config {
return &config.Config{
StorageConfig: config.StorageConfig{
TSDBConfig: &config.TSDBConfig{
OutOfOrderAllowance: int64(oooAllowanceMins) * time.Minute.Milliseconds(),
OutOfOrderTimeWindow: int64(oooTimeWindow) * time.Minute.Milliseconds(),
},
},
}
@ -5016,7 +5016,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
require.Equal(t, int64(0), size)
}
t.Run("increase allowance", func(t *testing.T) {
t.Run("increase time window", func(t *testing.T) {
var allSamples []tsdbutil.Sample
db := getDB(30 * time.Minute.Milliseconds())
@ -5033,7 +5033,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
oldWblPtr := fmt.Sprintf("%p", db.head.wbl)
// Increase allowance and try adding again.
// Increase time window and try adding again.
err := db.ApplyConfig(makeConfig(60))
require.NoError(t, err)
allSamples = addSamples(t, db, 251, 260, true, allSamples)
@ -5046,7 +5046,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
verifySamples(t, db, allSamples)
})
t.Run("decrease allowance and increase again", func(t *testing.T) {
t.Run("decrease time window and increase again", func(t *testing.T) {
var allSamples []tsdbutil.Sample
db := getDB(60 * time.Minute.Milliseconds())
@ -5057,7 +5057,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
allSamples = addSamples(t, db, 251, 260, true, allSamples)
oldWblPtr := fmt.Sprintf("%p", db.head.wbl)
// Decrease allowance.
// Decrease time window.
err := db.ApplyConfig(makeConfig(30))
require.NoError(t, err)
@ -5071,7 +5071,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
verifySamples(t, db, allSamples)
// Increase allowance again and check
// Increase time window again and check
err = db.ApplyConfig(makeConfig(60))
require.NoError(t, err)
allSamples = addSamples(t, db, 261, 270, true, allSamples)
@ -5099,7 +5099,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
require.Nil(t, db.head.wbl)
// Increase allowance and try adding again.
// Increase time window and try adding again.
err := db.ApplyConfig(makeConfig(60))
require.NoError(t, err)
allSamples = addSamples(t, db, 251, 260, true, allSamples)
@ -5125,11 +5125,11 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
allSamples = addSamples(t, db, 251, 260, true, allSamples)
oldWblPtr := fmt.Sprintf("%p", db.head.wbl)
// Allowance to 0, hence disabled.
// Time Window to 0, hence disabled.
err := db.ApplyConfig(makeConfig(0))
require.NoError(t, err)
// OOO within old allowance fails.
// OOO within old time window fails.
s := addSamples(t, db, 290, 309, false, nil)
require.Len(t, s, 0)
@ -5157,7 +5157,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
verifySamples(t, db, allSamples)
require.Nil(t, db.head.wbl)
// Allowance to 0.
// Time window to 0.
err := db.ApplyConfig(makeConfig(0))
require.NoError(t, err)

30
tsdb/head.go

@ -150,7 +150,7 @@ type HeadOptions struct {
ChunkWriteBufferSize int
ChunkEndTimeVariance float64
ChunkWriteQueueSize int
OutOfOrderAllowance atomic.Int64
OutOfOrderTimeWindow atomic.Int64
OutOfOrderCapMin atomic.Int64
OutOfOrderCapMax atomic.Int64
@ -214,11 +214,11 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wal.WAL, opts *Hea
l = log.NewNopLogger()
}
if opts.OutOfOrderAllowance.Load() < 0 {
opts.OutOfOrderAllowance.Store(0)
if opts.OutOfOrderTimeWindow.Load() < 0 {
opts.OutOfOrderTimeWindow.Store(0)
}
// Allowance can be set on runtime. So the capMin and capMax should be valid
// Time window can be set on runtime. So the capMin and capMax should be valid
// even if ooo is not enabled yet.
capMin, capMax := opts.OutOfOrderCapMin.Load(), opts.OutOfOrderCapMax.Load()
if capMin > 255 {
@ -429,7 +429,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}),
tooOldSamples: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_too_old_samples_total",
Help: "Total number of out of order samples ingestion failed attempts with out of support enabled, but sample outside of allowance.",
Help: "Total number of out of order samples ingestion failed attempts with out of support enabled, but sample outside of time window.",
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
@ -465,7 +465,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}),
oooHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_sample_ooo_delta",
Help: "Delta in seconds by which a sample is considered out of order (reported regardless of OOO allowance and whether sample is accepted or not).",
Help: "Delta in seconds by which a sample is considered out of order (reported regardless of OOO time window and whether sample is accepted or not).",
Buckets: []float64{
// Note that mimir distributor only gives us a range of wallclock-12h to wallclock+15min
60 * 10, // 10 min
@ -882,15 +882,15 @@ func (h *Head) removeCorruptedMmappedChunks(err error) (map[chunks.HeadSeriesRef
}
func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) {
oooAllowance := int64(0)
oooTimeWindow := int64(0)
if cfg.StorageConfig.TSDBConfig != nil {
oooAllowance = cfg.StorageConfig.TSDBConfig.OutOfOrderAllowance
oooTimeWindow = cfg.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
}
if oooAllowance < 0 {
oooAllowance = 0
if oooTimeWindow < 0 {
oooTimeWindow = 0
}
h.SetOutOfOrderAllowance(oooAllowance, wbl)
h.SetOutOfOrderTimeWindow(oooTimeWindow, wbl)
if !h.opts.EnableExemplarStorage {
return
@ -911,14 +911,14 @@ func (h *Head) ApplyConfig(cfg *config.Config, wbl *wal.WAL) {
level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", newSize, "migrated", migrated)
}
// SetOutOfOrderAllowance updates the out of order related parameters.
// SetOutOfOrderTimeWindow updates the out of order related parameters.
// If the Head already has a WBL set, then the wbl will be ignored.
func (h *Head) SetOutOfOrderAllowance(oooAllowance int64, wbl *wal.WAL) {
if oooAllowance > 0 && h.wbl == nil {
func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wal.WAL) {
if oooTimeWindow > 0 && h.wbl == nil {
h.wbl = wbl
}
h.opts.OutOfOrderAllowance.Store(oooAllowance)
h.opts.OutOfOrderTimeWindow.Store(oooTimeWindow)
}
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.

24
tsdb/head_append.go

@ -253,8 +253,8 @@ type headAppender struct {
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work.
oooAllowance := a.head.opts.OutOfOrderAllowance.Load()
if oooAllowance == 0 && t < a.minValidTime {
oooTimeWindow := a.head.opts.OutOfOrderTimeWindow.Load()
if oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc()
return 0, storage.ErrOutOfBounds
}
@ -288,7 +288,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
s.Lock()
// TODO: if we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, oooAllowance)
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, oooTimeWindow)
if err == nil {
s.pendingCommit = true
}
@ -325,7 +325,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
// appendable checks whether the given sample is valid for appending to the series. (if we return false and no error)
// The sample belongs to the out of order chunk if we return true and no error.
// An error signifies the sample cannot be handled.
func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooAllowance int64) (isOutOfOrder bool, delta int64, err error) {
func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTimeWindow int64) (isOutOfOrder bool, delta int64, err error) {
msMaxt := s.maxTime()
if msMaxt == math.MinInt64 {
// The series has no sample and was freshly created.
@ -334,7 +334,7 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooAl
return false, 0, nil
}
// We cannot append it in the in-order head. So we check the oooAllowance
// We cannot append it in the in-order head. So we check the oooTimeWindow
// w.r.t. the head's maxt.
// -1 because for the first sample in the Head, headMaxt will be equal to t.
msMaxt = headMaxt - 1
@ -344,8 +344,8 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooAl
return false, 0, nil
}
if t < msMaxt-oooAllowance {
if oooAllowance > 0 {
if t < msMaxt-oooTimeWindow {
if oooTimeWindow > 0 {
return true, msMaxt - t, storage.ErrTooOldSample
}
if t < minValidTime {
@ -355,7 +355,7 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooAl
}
if t != msMaxt || s.head() == nil {
// Sample is ooo and within allowance OR series has no active chunk to check for duplicate sample.
// Sample is ooo and within time window OR series has no active chunk to check for duplicate sample.
return true, msMaxt - t, nil
}
@ -503,9 +503,9 @@ func (a *headAppender) Commit() (err error) {
var (
samplesAppended = len(a.samples)
oooAccepted int // number of samples out of order but accepted: with ooo enabled and within allowance
oooAccepted int // number of samples out of order but accepted: with ooo enabled and within time window
oooRejected int // number of samples rejected due to: out of order but OOO support disabled.
tooOldRejected int // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside allowance)
tooOldRejected int // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
oobRejected int // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
inOrderMint int64 = math.MaxInt64
inOrderMaxt int64 = math.MinInt64
@ -554,12 +554,12 @@ func (a *headAppender) Commit() (err error) {
wblSamples = nil
oooMmapMarkers = nil
}
oooAllowance := a.head.opts.OutOfOrderAllowance.Load()
oooTimeWindow := a.head.opts.OutOfOrderTimeWindow.Load()
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
oooSample, delta, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, oooAllowance)
oooSample, delta, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, oooTimeWindow)
switch err {
case storage.ErrOutOfOrderSample:
samplesAppended--

6
tsdb/head_test.go

@ -3249,7 +3249,7 @@ func TestOOOWalReplay(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderAllowance.Store(30 * time.Minute.Milliseconds())
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
require.NoError(t, err)
@ -3334,7 +3334,7 @@ func TestOOOMmapReplay(t *testing.T) {
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderAllowance.Store(1000 * time.Minute.Milliseconds())
opts.OutOfOrderTimeWindow.Store(1000 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
require.NoError(t, err)
@ -3626,7 +3626,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.OutOfOrderCapMax.Store(30)
opts.OutOfOrderAllowance.Store(120 * time.Minute.Milliseconds())
opts.OutOfOrderTimeWindow.Store(120 * time.Minute.Milliseconds())
h, err := NewHead(nil, nil, wlog, oooWlog, opts, nil)
require.NoError(t, err)

4
tsdb/ooo_head_read_test.go

@ -364,7 +364,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMin = 1
opts.OutOfOrderCapMax = 5
opts.OutOfOrderAllowance = 120 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }
@ -772,7 +772,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
opts := DefaultOptions()
opts.OutOfOrderCapMin = 1
opts.OutOfOrderCapMax = 5
opts.OutOfOrderAllowance = 120 * time.Minute.Milliseconds()
opts.OutOfOrderTimeWindow = 120 * time.Minute.Milliseconds()
s1 := labels.FromStrings("l", "v1")
minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() }

Loading…
Cancel
Save