diff --git a/db.go b/db.go index dbe5e992c..1c6c53c40 100644 --- a/db.go +++ b/db.go @@ -123,12 +123,23 @@ type DB struct { } type dbMetrics struct { - activeAppenders prometheus.Gauge - loadedBlocks prometheus.GaugeFunc - reloads prometheus.Counter - reloadsFailed prometheus.Counter - reloadDuration prometheus.Summary - samplesAppended prometheus.Counter + activeAppenders prometheus.Gauge + loadedBlocks prometheus.GaugeFunc + reloads prometheus.Counter + reloadsFailed prometheus.Counter + walTruncateDuration prometheus.Summary + samplesAppended prometheus.Counter + + headSeries prometheus.Gauge + headSeriesCreated prometheus.Counter + headSeriesRemoved prometheus.Counter + headChunks prometheus.Gauge + headChunksCreated prometheus.Gauge + headChunksRemoved prometheus.Gauge + headGCDuration prometheus.Summary + headMinTime prometheus.GaugeFunc + headMaxTime prometheus.GaugeFunc + compactionsTriggered prometheus.Counter } @@ -155,10 +166,53 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "tsdb_reloads_failures_total", Help: "Number of times the database failed to reload black data from disk.", }) - m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "tsdb_reload_duration_seconds", - Help: "Duration of block reloads.", + + m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tsdb_wal_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + }) + + m.headSeries = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series", + Help: "Total number of series in the head block.", + }) + m.headSeriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series_created_total", + Help: "Total number of series created in the head", + }) + m.headSeriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_series_removed_total", + Help: "Total number of series removed in the head", + }) + m.headChunks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks", + Help: "Total number of chunks in the head block.", + }) + m.headChunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks_created_total", + Help: "Total number of chunks created in the head", + }) + m.headChunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tsdb_head_chunks_removed_total", + Help: "Total number of chunks removed in the head", + }) + m.headGCDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tsdb_head_gc_duration_seconds", + Help: "Runtime of garbage collection in the head block.", + }) + m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_head_max_time", + Help: "Maximum timestamp of the head block.", + }, func() float64 { + return float64(db.head.MaxTime()) + }) + m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_head_min_time", + Help: "Minimum time bound of the head block.", + }, func() float64 { + return float64(db.head.MinTime()) }) + m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ Name: "tsdb_samples_appended_total", Help: "Total number of appended sampledb.", @@ -174,7 +228,18 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.loadedBlocks, m.reloads, m.reloadsFailed, - m.reloadDuration, + m.walTruncateDuration, + + m.headChunks, + m.headChunksCreated, + m.headChunksRemoved, + m.headSeries, + m.headSeriesCreated, + m.headSeriesRemoved, + m.headMinTime, + m.headMaxTime, + m.headGCDuration, + m.samplesAppended, m.compactionsTriggered, ) @@ -247,10 +312,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.compactor = NewLeveledCompactor(r, l, copts) - db.head, err = NewHead(l, db.wal.Reader(), copts.blockRanges[0]) + db.head, err = NewHead(l, copts.blockRanges[0]) if err != nil { return nil, err } + if err := db.readWAL(db.wal.Reader()); err != nil { + return nil, err + } if err := db.reloadBlocks(); err != nil { return nil, err } @@ -444,14 +512,56 @@ func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { return nil, false } +func (db *DB) readWAL(r WALReader) error { + + seriesFunc := func(series []labels.Labels) error { + for _, lset := range series { + db.head.create(lset.Hash(), lset) + db.metrics.headSeries.Inc() + db.metrics.headSeriesCreated.Inc() + } + return nil + } + samplesFunc := func(samples []RefSample) error { + for _, s := range samples { + ms, ok := db.head.series[uint32(s.Ref)] + if !ok { + return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) + } + _, chunkCreated := ms.append(s.T, s.V) + if chunkCreated { + db.metrics.headChunksCreated.Inc() + db.metrics.headChunks.Inc() + } + } + + return nil + } + deletesFunc := func(stones []Stone) error { + for _, s := range stones { + for _, itv := range s.intervals { + db.head.tombstones.add(s.ref, itv) + } + } + + return nil + } + + if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { + return errors.Wrap(err, "consume WAL") + } + + return nil + +} + func (db *DB) reloadBlocks() (err error) { - defer func(t time.Time) { + defer func() { if err != nil { db.metrics.reloadsFailed.Inc() } db.metrics.reloads.Inc() - db.metrics.reloadDuration.Observe(time.Since(t).Seconds()) - }(time.Now()) + }() var cs []io.Closer defer func() { closeAll(cs...) }() @@ -511,12 +621,21 @@ func (db *DB) reloadBlocks() (err error) { } start := time.Now() atomic.StoreInt64(&db.head.minTime, maxt) - db.head.gc() + + series, chunks := db.head.gc() + db.metrics.headSeriesRemoved.Add(float64(series)) + db.metrics.headSeries.Sub(float64(series)) + db.metrics.headChunksRemoved.Add(float64(chunks)) + db.metrics.headChunks.Sub(float64(chunks)) + db.logger.Log("msg", "head GC completed", "duration", time.Since(start)) + start = time.Now() + if err := db.wal.Truncate(maxt); err != nil { return errors.Wrapf(err, "truncate WAL at %d", maxt) } + db.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) return nil } @@ -852,6 +971,9 @@ func (a *dbAppender) createSeries() error { s := a.head.create(l.hash, l.labels) l.ref = uint64(s.ref) + + a.db.metrics.headSeriesCreated.Inc() + a.db.metrics.headSeries.Inc() } // Write all new series to the WAL. @@ -893,11 +1015,18 @@ func (a *dbAppender) Commit() error { if !ok { return errors.Errorf("series with ID %d not found", s.Ref) } - if !series.append(s.T, s.V) { + ok, chunkCreated := series.append(s.T, s.V) + if !ok { total-- } + if chunkCreated { + a.db.metrics.headChunks.Inc() + a.db.metrics.headChunksCreated.Inc() + } } + a.db.metrics.samplesAppended.Add(float64(total)) + for { ht := a.head.MaxTime() if a.highTimestamp <= ht { diff --git a/head.go b/head.go index 7174f03d7..6d5355c85 100644 --- a/head.go +++ b/head.go @@ -66,7 +66,7 @@ type Head struct { } // NewHead opens the head block in dir. -func NewHead(l log.Logger, wal WALReader, chunkRange int64) (*Head, error) { +func NewHead(l log.Logger, chunkRange int64) (*Head, error) { h := &Head{ chunkRange: chunkRange, minTime: math.MaxInt64, @@ -78,54 +78,15 @@ func NewHead(l log.Logger, wal WALReader, chunkRange int64) (*Head, error) { postings: &memPostings{m: make(map[term][]uint32)}, tombstones: newEmptyTombstoneReader(), } - if wal == nil { - wal = NopWAL{} - } - return h, h.init(wal) + return h, nil } func (h *Head) String() string { return "" } -func (h *Head) init(r WALReader) error { - - seriesFunc := func(series []labels.Labels) error { - for _, lset := range series { - h.create(lset.Hash(), lset) - } - return nil - } - samplesFunc := func(samples []RefSample) error { - for _, s := range samples { - if int(s.Ref) >= len(h.series) { - return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", - s.Ref, len(h.series)) - } - h.series[uint32(s.Ref)].append(s.T, s.V) - } - - return nil - } - deletesFunc := func(stones []Stone) error { - for _, s := range stones { - for _, itv := range s.intervals { - h.tombstones.add(s.ref, itv) - } - } - - return nil - } - - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { - return errors.Wrap(err, "consume WAL") - } - - return nil -} - // gc removes data before the minimum timestmap from the head. -func (h *Head) gc() { +func (h *Head) gc() (seriesRemoved, chunksRemoved int) { // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() @@ -136,7 +97,7 @@ func (h *Head) gc() { for hash, ss := range h.hashes { for _, s := range ss { s.mtx.Lock() - s.truncateChunksBefore(mint) + chunksRemoved += s.truncateChunksBefore(mint) if len(s.chunks) == 0 { deletedHashes[hash] = append(deletedHashes[hash], s.ref) @@ -186,6 +147,7 @@ func (h *Head) gc() { h.hashes[hash] = rem } else { delete(h.hashes, hash) + seriesRemoved++ } } @@ -222,6 +184,8 @@ func (h *Head) gc() { h.symbols = symbols h.values = values + + return seriesRemoved, chunksRemoved } func (h *Head) Tombstones() TombstoneReader { @@ -574,7 +538,7 @@ func (s *memSeries) chunkID(pos int) int { // truncateChunksBefore removes all chunks from the series that have not timestamp // at or after mint. Chunk IDs remain unchanged. -func (s *memSeries) truncateChunksBefore(mint int64) { +func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { var k int for i, c := range s.chunks { if c.maxTime >= mint { @@ -584,10 +548,12 @@ func (s *memSeries) truncateChunksBefore(mint int64) { } s.chunks = append(s.chunks[:0], s.chunks[k:]...) s.firstChunkID += k + + return k } // append adds the sample (t, v) to the series. -func (s *memSeries) append(t int64, v float64) bool { +func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 s.mtx.Lock() @@ -597,10 +563,11 @@ func (s *memSeries) append(t int64, v float64) bool { if len(s.chunks) == 0 { c = s.cut(t) + chunkCreated = true } c = s.head() if c.maxTime >= t { - return false + return false, chunkCreated } if c.samples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) @@ -622,7 +589,7 @@ func (s *memSeries) append(t int64, v float64) bool { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - return true + return true, chunkCreated } // computeChunkEndTime estimates the end timestamp based the beginning of a chunk,