Properly initialize head time

This fixes various issues when initializing the head time range
under different starting conditions.

Signed-off-by: Fabian Reinartz <freinartz@google.com>
pull/5805/head
Fabian Reinartz 2018-05-25 17:19:32 -04:00
parent 3e76f0163e
commit 45071c657c
2 changed files with 155 additions and 34 deletions

View File

@ -19,6 +19,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"testing" "testing"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
) )
func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
@ -1193,3 +1195,109 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
count := len(q.(*querier).blocks) count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
} }
func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
// Should be set to init values if no WAL or blocks exist so far.
testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
// First added sample initializes the writable range.
app := db.Appender()
_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
testutil.Ok(t, err)
testutil.Equals(t, int64(1000), db.head.MinTime())
testutil.Equals(t, int64(1000), db.head.MaxTime())
})
t.Run("wal-only", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(5000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
t.Run("existing-block", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 2000,
})
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(2000), db.head.MinTime())
testutil.Equals(t, int64(2000), db.head.MaxTime())
})
t.Run("existing-block-and-wal", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
id := ulid.MustNew(2000, nil)
createEmptyBlock(t, path.Join(dir, id.String()), &BlockMeta{
ULID: id,
MinTime: 1000,
MaxTime: 6000,
})
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
var enc RecordEncoder
err = w.Log(
enc.Series([]RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())
db, err := Open(dir, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, int64(6000), db.head.MinTime())
testutil.Equals(t, int64(15000), db.head.MaxTime())
})
}

81
head.go
View File

@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
wal: wal, wal: wal,
logger: l, logger: l,
chunkRange: chunkRange, chunkRange: chunkRange,
minTime: math.MinInt64, minTime: math.MaxInt64,
maxTime: math.MinInt64, maxTime: math.MinInt64,
series: newStripeSeries(), series: newStripeSeries(),
values: map[string]stringset{}, values: map[string]stringset{},
@ -200,17 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
// them on to other workers. // them on to other workers.
// Samples before the mint timestamp are discarded. // Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples( func (h *Head) processWALSamples(
mint int64, minValidTime int64,
partition, total uint64, partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample, input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) { ) (unknownRefs uint64) {
defer close(output) defer close(output)
maxt := h.MaxTime() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input { for samples := range input {
for _, s := range samples { for _, s := range samples {
if s.T < mint || s.Ref%total != partition { if s.T < minValidTime || s.Ref%total != partition {
continue continue
} }
ms := h.series.getByID(s.Ref) ms := h.series.getByID(s.Ref)
@ -226,10 +226,27 @@ func (h *Head) processWALSamples(
if s.T > maxt { if s.T > maxt {
maxt = s.T maxt = s.T
} }
if s.T < mint {
mint = s.T
}
} }
output <- samples output <- samples
} }
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
func (h *Head) updateMinMaxTime(mint, maxt int64) {
for {
lt := h.MinTime()
if mint >= lt {
break
}
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
break
}
}
for { for {
ht := h.MaxTime() ht := h.MaxTime()
if maxt <= ht { if maxt <= ht {
@ -239,12 +256,15 @@ func (h *Head) processWALSamples(
break break
} }
} }
return unknownRefs
} }
func (h *Head) loadWAL(r *wal.Reader) error { func (h *Head) loadWAL(r *wal.Reader) error {
mint := h.MinTime() minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}
// Track number of samples that referenced a series we don't know about // Track number of samples that referenced a series we don't know about
// for error reporting. // for error reporting.
@ -265,7 +285,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
output := make(chan []RefSample, 300) output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) { go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown) atomic.AddUint64(&unknownRefs, unknown)
wg.Done() wg.Done()
}(i, input, output) }(i, input, output)
@ -327,7 +347,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
} }
for _, s := range tstones { for _, s := range tstones {
for _, itv := range s.intervals { for _, itv := range s.intervals {
if itv.Maxt < mint { if itv.Maxt < minValidTime {
continue continue
} }
h.tombstones.addInterval(s.ref, itv) h.tombstones.addInterval(s.ref, itv)
@ -400,9 +420,9 @@ func (h *Head) Init() error {
// Truncate removes old data before mint from the head. // Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error { func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MinInt64 initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint { if h.MinTime() >= mint && !initialize {
return nil return nil
} }
atomic.StoreInt64(&h.minTime, mint) atomic.StoreInt64(&h.minTime, mint)
@ -462,10 +482,7 @@ func (h *Head) Truncate(mint int64) error {
// for a compltely fresh head with an empty WAL. // for a compltely fresh head with an empty WAL.
// Returns true if the initialization took an effect. // Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) { func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64. if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) {
return false return false
} }
// Ensure that max time is initialized to at least the min time we just set. // Ensure that max time is initialized to at least the min time we just set.
@ -536,7 +553,7 @@ func (h *Head) Appender() Appender {
// The head cache might not have a starting point yet. The init appender // The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base. // picks up the first appended timestamp as the base.
if h.MinTime() == math.MinInt64 { if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h} return &initAppender{head: h}
} }
return h.appender() return h.appender()
@ -544,10 +561,11 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender { func (h *Head) appender() *headAppender {
return &headAppender{ return &headAppender{
head: h, head: h,
mint: h.MaxTime() - h.chunkRange/2, minValidTime: h.MaxTime() - h.chunkRange/2,
maxt: math.MinInt64, mint: math.MaxInt64,
samples: h.getAppendBuffer(), maxt: math.MinInt64,
samples: h.getAppendBuffer(),
} }
} }
@ -576,15 +594,16 @@ func (h *Head) putBytesBuffer(b []byte) {
} }
type headAppender struct { type headAppender struct {
head *Head head *Head
mint, maxt int64 minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []RefSeries series []RefSeries
samples []RefSample samples []RefSample
} }
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.mint { if t < a.minValidTime {
return 0, ErrOutOfBounds return 0, ErrOutOfBounds
} }
@ -611,9 +630,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil { if err != nil {
return err return err
} }
if t < a.mint { if t < a.minValidTime {
return ErrOutOfBounds return ErrOutOfBounds
} }
if t < a.mint {
a.mint = t
}
if t > a.maxt { if t > a.maxt {
a.maxt = t a.maxt = t
} }
@ -682,16 +704,7 @@ func (a *headAppender) Commit() error {
} }
a.head.metrics.samplesAppended.Add(float64(total)) a.head.metrics.samplesAppended.Add(float64(total))
a.head.updateMinMaxTime(a.mint, a.maxt)
for {
ht := a.head.MaxTime()
if a.maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) {
break
}
}
return nil return nil
} }