|
|
|
@ -122,13 +122,42 @@ type DB struct {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type dbMetrics struct {
|
|
|
|
|
activeAppenders prometheus.Gauge
|
|
|
|
|
loadedBlocks prometheus.GaugeFunc
|
|
|
|
|
reloads prometheus.Counter
|
|
|
|
|
reloadsFailed prometheus.Counter
|
|
|
|
|
reloadDuration prometheus.Summary
|
|
|
|
|
samplesAppended prometheus.Counter
|
|
|
|
|
compactionsTriggered prometheus.Counter
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
|
|
|
|
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
|
|
|
|
|
m := &dbMetrics{}
|
|
|
|
|
|
|
|
|
|
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
|
|
|
Name: "tsdb_active_appenders",
|
|
|
|
|
Help: "Number of currently active appender transactions",
|
|
|
|
|
})
|
|
|
|
|
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
|
|
|
|
Name: "tsdb_blocks_loaded",
|
|
|
|
|
Help: "Number of currently loaded data blocks",
|
|
|
|
|
}, func() float64 {
|
|
|
|
|
db.mtx.RLock()
|
|
|
|
|
defer db.mtx.RUnlock()
|
|
|
|
|
return float64(len(db.blocks))
|
|
|
|
|
})
|
|
|
|
|
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "tsdb_reloads_total",
|
|
|
|
|
Help: "Number of times the database reloaded block data from disk.",
|
|
|
|
|
})
|
|
|
|
|
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "tsdb_reloads_failures_total",
|
|
|
|
|
Help: "Number of times the database failed to reload black data from disk.",
|
|
|
|
|
})
|
|
|
|
|
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
|
|
|
|
Name: "tsdb_reload_duration_seconds",
|
|
|
|
|
Help: "Duration of block reloads.",
|
|
|
|
|
})
|
|
|
|
|
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: "tsdb_samples_appended_total",
|
|
|
|
|
Help: "Total number of appended sampledb.",
|
|
|
|
@ -140,6 +169,11 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics {
|
|
|
|
|
|
|
|
|
|
if r != nil {
|
|
|
|
|
r.MustRegister(
|
|
|
|
|
m.activeAppenders,
|
|
|
|
|
m.loadedBlocks,
|
|
|
|
|
m.reloads,
|
|
|
|
|
m.reloadsFailed,
|
|
|
|
|
m.reloadDuration,
|
|
|
|
|
m.samplesAppended,
|
|
|
|
|
m.compactionsTriggered,
|
|
|
|
|
)
|
|
|
|
@ -165,12 +199,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|
|
|
|
db = &DB{
|
|
|
|
|
dir: dir,
|
|
|
|
|
logger: l,
|
|
|
|
|
metrics: newDBMetrics(r),
|
|
|
|
|
opts: opts,
|
|
|
|
|
compactc: make(chan struct{}, 1),
|
|
|
|
|
donec: make(chan struct{}),
|
|
|
|
|
stopc: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
db.metrics = newDBMetrics(db, r)
|
|
|
|
|
|
|
|
|
|
if !opts.NoLockfile {
|
|
|
|
|
absdir, err := filepath.Abs(dir)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -374,7 +409,15 @@ func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
|
|
|
|
|
return nil, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) reloadBlocks() error {
|
|
|
|
|
func (db *DB) reloadBlocks() (err error) {
|
|
|
|
|
defer func(t time.Time) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
db.metrics.reloadsFailed.Inc()
|
|
|
|
|
}
|
|
|
|
|
db.metrics.reloads.Inc()
|
|
|
|
|
db.metrics.reloadDuration.Observe(time.Since(t).Seconds())
|
|
|
|
|
}(time.Now())
|
|
|
|
|
|
|
|
|
|
var cs []io.Closer
|
|
|
|
|
defer func() { closeAll(cs...) }()
|
|
|
|
|
|
|
|
|
@ -447,7 +490,7 @@ func validateBlockSequence(bs []Block) error {
|
|
|
|
|
prev := bs[0]
|
|
|
|
|
for _, b := range bs[1:] {
|
|
|
|
|
if b.Meta().MinTime < prev.Meta().MaxTime {
|
|
|
|
|
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime)
|
|
|
|
|
return errors.Errorf("block time ranges overlap (%d, %d)", b.Meta().MinTime, prev.Meta().MaxTime)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
@ -480,6 +523,8 @@ func (db *DB) Close() error {
|
|
|
|
|
|
|
|
|
|
// Appender returns a new Appender on the database.
|
|
|
|
|
func (db *DB) Appender() Appender {
|
|
|
|
|
db.metrics.activeAppenders.Inc()
|
|
|
|
|
|
|
|
|
|
db.mtx.RLock()
|
|
|
|
|
return &dbAppender{db: db}
|
|
|
|
|
}
|
|
|
|
@ -619,6 +664,7 @@ func (db *DB) ensureHead(t int64) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Commit() error {
|
|
|
|
|
defer a.db.metrics.activeAppenders.Dec()
|
|
|
|
|
defer a.db.mtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
|
|
|
@ -649,6 +695,7 @@ func (a *dbAppender) Commit() error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Rollback() error {
|
|
|
|
|
defer a.db.metrics.activeAppenders.Dec()
|
|
|
|
|
defer a.db.mtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
var g errgroup.Group
|
|
|
|
|