mirror of https://github.com/prometheus/prometheus
Misc cleanup
parent
d2673d8659
commit
fb5c5535fc
30
db.go
30
db.go
|
@ -482,26 +482,6 @@ func (db *DB) Close() error {
|
||||||
func (db *DB) Appender() Appender {
|
func (db *DB) Appender() Appender {
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
return &dbAppender{db: db}
|
return &dbAppender{db: db}
|
||||||
|
|
||||||
// // XXX(fabxc): turn off creating initial appender as it will happen on-demand
|
|
||||||
// // anyway. For now this, with combination of only having a single timestamp per batch,
|
|
||||||
// // prevents opening more than one appender and hitting an unresolved deadlock (#11).
|
|
||||||
// //
|
|
||||||
|
|
||||||
// // Only instantiate appender after returning the headmtx to avoid
|
|
||||||
// // questionable locking order.
|
|
||||||
// db.headmtx.RLock()
|
|
||||||
// app := db.appendable()
|
|
||||||
// db.headmtx.RUnlock()
|
|
||||||
|
|
||||||
// for _, b := range app {
|
|
||||||
// a.heads = append(a.heads, &metaAppender{
|
|
||||||
// meta: b.Meta(),
|
|
||||||
// app: b.Appender(),
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return a
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type dbAppender struct {
|
type dbAppender struct {
|
||||||
|
@ -612,14 +592,16 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
func (db *DB) ensureHead(t int64) error {
|
func (db *DB) ensureHead(t int64) error {
|
||||||
mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
||||||
|
|
||||||
// Initial case with an empty database. t is the first timestamp we ever received.
|
last := db.blocks[len(db.blocks)-1].Meta()
|
||||||
// Create an additional buffering block in front.
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||||
if len(db.blocks) == 0 {
|
// new data after a long gap.
|
||||||
|
// This ensures we always have a full block width if append window.
|
||||||
|
if len(db.blocks) == 0 || last.MaxTime <= mint-int64(db.opts.MinBlockDuration) {
|
||||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If the previous block reaches into our new window, make it smaller.
|
// If the previous block reaches into our new window, make it smaller.
|
||||||
} else if mt := db.blocks[len(db.blocks)-1].Meta().MaxTime; mt > mint {
|
} else if mt := last.MaxTime; mt > mint {
|
||||||
mint = mt
|
mint = mt
|
||||||
}
|
}
|
||||||
if mint >= maxt {
|
if mint >= maxt {
|
||||||
|
|
|
@ -320,17 +320,13 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
||||||
require.Equal(t, 0, len(l))
|
require.Equal(t, 0, len(l))
|
||||||
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
require.Equal(t, []RefSample{{T: 1, V: 2}}, s)
|
||||||
|
|
||||||
// Truncation should happen transparently and now cause an error.
|
// Truncation should happen transparently and not cause an error.
|
||||||
require.False(t, r.Next())
|
require.False(t, r.Next())
|
||||||
require.Nil(t, r.Err())
|
require.Nil(t, r.Err())
|
||||||
|
|
||||||
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}}))
|
||||||
require.NoError(t, w2.Close())
|
require.NoError(t, w2.Close())
|
||||||
|
|
||||||
files, err := fileutil.ReadDir(dir)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, 1, len(files))
|
|
||||||
|
|
||||||
// We should see the first valid entry and the new one, everything after
|
// We should see the first valid entry and the new one, everything after
|
||||||
// is truncated.
|
// is truncated.
|
||||||
w3, err := OpenSegmentWAL(dir, logger, 0)
|
w3, err := OpenSegmentWAL(dir, logger, 0)
|
||||||
|
|
Loading…
Reference in New Issue