Improve multi-head handling

This improves handling of multiple head blocks. Configuration is
simplified to specify the number of concurrently appendable blocks.
pull/5805/head
Fabian Reinartz 8 years ago
parent 30efe4a58c
commit 42fa342229

126
db.go

@ -30,7 +30,7 @@ var DefaultOptions = &Options{
WALFlushInterval: 5 * time.Second,
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds
GracePeriod: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
AppendableBlocks: 2,
}
// Options of the DB storage.
@ -45,9 +45,12 @@ type Options struct {
// The maximum timestamp range of compacted blocks.
MaxBlockDuration uint64
// Time window between the highest timestamp and the minimum timestamp
// that can still be appended.
GracePeriod uint64
// Number of head blocks that can be appended to.
// Should be two or higher to prevent write errors in general scenarios.
//
// After a new block is started for timestamp t0 or higher, appends with
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
AppendableBlocks int
}
// Appender allows appending a batch of data. It must be completed with a
@ -388,7 +391,6 @@ type dbAppender struct {
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t)
if err != nil {
fmt.Println("no appender")
return 0, err
}
ref, err := h.Add(lset, t, v)
@ -401,7 +403,6 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error)
func (a *dbAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
h, err := a.appenderFor(t)
if err != nil {
fmt.Println("no appender")
return 0, err
}
ref, err := h.hashedAdd(hash, lset, t, v)
@ -420,33 +421,33 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil {
return err
}
// fmt.Println("check gen", h.generation, gen)
// If the reference pointed into a previous block, we cannot
// use it to append the sample.
if h.generation != gen {
return ErrNotFound
}
return h.AddFast(ref, t, v)
}
// appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
if len(a.heads) == 0 {
if err := a.addNextHead(t); err != nil {
// If there's no fitting head block for t, ensure it gets created.
if len(a.heads) == 0 || t > a.heads[len(a.heads)-1].meta.MaxTime {
a.db.mtx.RUnlock()
if err := a.db.ensureHead(t); err != nil {
a.db.mtx.RLock()
return nil, err
}
return a.appenderFor(t)
}
for i := len(a.heads) - 1; i >= 0; i-- {
h := a.heads[i]
a.db.mtx.RLock()
if t > h.meta.MaxTime {
if err := a.addNextHead(t); err != nil {
return nil, err
}
return a.appenderFor(t)
a.heads = nil
for _, b := range a.db.appendable() {
a.heads = append(a.heads, b.Appender().(*headAppender))
}
if t >= h.meta.MinTime {
}
for i := len(a.heads) - 1; i >= 0; i-- {
if h := a.heads[i]; t >= h.meta.MinTime {
return h, nil
}
}
@ -454,33 +455,30 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
return nil, ErrNotFound
}
func (a *dbAppender) addNextHead(t int64) error {
a.db.mtx.RUnlock()
a.db.mtx.Lock()
func (db *DB) ensureHead(t int64) error {
db.mtx.Lock()
defer db.mtx.Unlock()
// We switched locks, validate that adding a head for the timestamp
// is still required.
if len(a.db.heads) > 1 {
h := a.db.heads[len(a.db.heads)-1]
if t <= h.meta.MaxTime {
a.heads = append(a.heads, h.Appender().(*headAppender))
a.maxGen++
a.db.mtx.Unlock()
a.db.mtx.RLock()
return nil
// Initial case for a new database: we must create the first
// AppendableBlocks-1 front padding heads.
if len(db.heads) == 0 {
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
return err
}
}
}
h, err := a.db.cut(t)
if err == nil {
a.heads = append(a.heads, h.Appender().(*headAppender))
a.maxGen++
for {
h := db.heads[len(db.heads)-1]
// If t doesn't exceed the range of heads blocks, there's nothing to do.
if t <= h.meta.MaxTime {
return nil
}
if _, err := db.cut(h.meta.MaxTime + 1); err != nil {
return err
}
}
a.db.mtx.Unlock()
a.db.mtx.RLock()
return err
}
func (a *dbAppender) Commit() error {
@ -506,23 +504,10 @@ func (a *dbAppender) Rollback() error {
}
func (db *DB) appendable() []*headBlock {
if len(db.heads) == 0 {
return nil
}
var blocks []*headBlock
maxHead := db.heads[len(db.heads)-1]
k := len(db.heads) - 2
for i := k; i >= 0; i-- {
if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) {
break
}
k--
if len(db.heads) <= db.opts.AppendableBlocks {
return db.heads
}
for i := k + 1; i < len(db.heads); i++ {
blocks = append(blocks, db.heads[i])
}
return blocks
return db.heads[len(db.heads)-db.opts.AppendableBlocks:]
}
func (db *DB) compactable() []Block {
@ -534,22 +519,13 @@ func (db *DB) compactable() []Block {
blocks = append(blocks, pb)
}
maxHead := db.heads[len(db.heads)-1]
k := len(db.heads) - 2
for i := k; i >= 0; i-- {
if db.heads[i].meta.MaxTime < maxHead.meta.MinTime-int64(db.opts.GracePeriod) {
break
}
k--
}
for i, hb := range db.heads[:len(db.heads)-1] {
if i > k {
break
}
blocks = append(blocks, hb)
if len(db.heads) <= db.opts.AppendableBlocks {
return blocks
}
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
blocks = append(blocks, h)
}
return blocks
}
@ -592,7 +568,6 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
// will still be appendable for the configured grace period.
func (db *DB) cut(mint int64) (*headBlock, error) {
maxt := mint + int64(db.opts.MinBlockDuration) - 1
fmt.Println("cut", mint, maxt)
dir, seq, err := nextBlockDir(db.dir)
if err != nil {
@ -608,8 +583,6 @@ func (db *DB) cut(mint int64) (*headBlock, error) {
newHead.generation = db.headGen
fmt.Println("headlen", len(db.heads))
select {
case db.compactc <- struct{}{}:
default:
@ -691,6 +664,9 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition
l = log.NewLogfmtLogger(os.Stdout)
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
if opts.AppendableBlocks < 1 {
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err

Loading…
Cancel
Save