diff --git a/db.go b/db.go index 0baa1bc5f..7916d700b 100644 --- a/db.go +++ b/db.go @@ -28,7 +28,7 @@ import ( // millisecond precision timestampdb. var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds AppendableBlocks: 2, } @@ -93,7 +93,6 @@ type DB struct { compactor *compactor compactc chan struct{} - cutc chan struct{} donec chan struct{} stopc chan struct{} } @@ -137,6 +136,9 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { if opts == nil { opts = DefaultOptions } + if opts.AppendableBlocks < 1 { + return nil, errors.Errorf("AppendableBlocks must be greater than 0") + } db = &DB{ dir: dir, @@ -144,7 +146,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { metrics: newDBMetrics(r), opts: opts, compactc: make(chan struct{}, 1), - cutc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), } @@ -164,32 +165,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { func (db *DB) run() { defer close(db.donec) - // go func() { - // for { - // select { - // case <-db.cutc: - // db.mtx.Lock() - // _, err := db.cut() - // db.mtx.Unlock() - - // if err != nil { - // db.logger.Log("msg", "cut failed", "err", err) - // } else { - // select { - // case db.compactc <- struct{}{}: - // default: - // } - // } - // // Drain cut channel so we don't trigger immediately again. - // select { - // case <-db.cutc: - // default: - // } - // case <-db.stopc: - // } - // } - // }() - for { select { case <-db.compactc: @@ -342,9 +317,6 @@ func (db *DB) initBlocks() error { db.persisted = persisted db.heads = heads - // if len(heads) == 0 { - // _, err = db.cut() - // } return nil } @@ -433,17 +405,30 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { // If the head block doesn't exist yet, it gets created. func (a *dbAppender) appenderFor(t int64) (*headAppender, error) { // If there's no fitting head block for t, ensure it gets created. - if len(a.heads) == 0 || t > a.heads[len(a.heads)-1].meta.MaxTime { + if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.mtx.RUnlock() + var mints []int64 + for _, h := range a.heads { + mints = append(mints, h.meta.MinTime) + } + fmt.Println("ensure head", t, mints) if err := a.db.ensureHead(t); err != nil { a.db.mtx.RLock() return nil, err } a.db.mtx.RLock() - a.heads = nil - for _, b := range a.db.appendable() { - a.heads = append(a.heads, b.Appender().(*headAppender)) + if len(a.heads) == 0 { + for _, b := range a.db.appendable() { + a.heads = append(a.heads, b.Appender().(*headAppender)) + } + } else { + maxSeq := a.heads[len(a.heads)-1].meta.Sequence + for _, b := range a.db.appendable() { + if b.meta.Sequence > maxSeq { + a.heads = append(a.heads, b.Appender().(*headAppender)) + } + } } } for i := len(a.heads) - 1; i >= 0; i-- { @@ -463,6 +448,7 @@ func (db *DB) ensureHead(t int64) error { // AppendableBlocks-1 front padding heads. if len(db.heads) == 0 { for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { + fmt.Println("cut init for", t-i*int64(db.opts.MinBlockDuration)) if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { return err } @@ -472,10 +458,11 @@ func (db *DB) ensureHead(t int64) error { for { h := db.heads[len(db.heads)-1] // If t doesn't exceed the range of heads blocks, there's nothing to do. - if t <= h.meta.MaxTime { + if t < h.meta.MaxTime { return nil } - if _, err := db.cut(h.meta.MaxTime + 1); err != nil { + fmt.Println("cut for", h.meta.MaxTime) + if _, err := db.cut(h.meta.MaxTime); err != nil { return err } } @@ -567,7 +554,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. func (db *DB) cut(mint int64) (*headBlock, error) { - maxt := mint + int64(db.opts.MinBlockDuration) - 1 + maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextBlockDir(db.dir) if err != nil { @@ -664,9 +651,6 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition l = log.NewLogfmtLogger(os.Stdout) l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) } - if opts.AppendableBlocks < 1 { - return nil, errors.Errorf("AppendableBlocks must be greater than 0") - } if err := os.MkdirAll(dir, 0777); err != nil { return nil, err diff --git a/head.go b/head.go index 38626814e..6af2c9bb7 100644 --- a/head.go +++ b/head.go @@ -128,6 +128,9 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { + if err := writeMetaFile(h.dir, &h.meta); err != nil { + return err + } return h.wal.Close() } @@ -190,11 +193,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if ms := a.get(hash, lset); ms != nil { - // fmt.Println("add ref get", ms.ref) return uint64(ms.ref), nil } if ref, ok := a.newHashes[hash]; ok { - // fmt.Println("add ref newHashes", ref) return uint64(ref), nil } @@ -215,8 +216,6 @@ func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v flo a.newSeries[ref] = hashedLabels{hash: hash, labels: lset} a.newHashes[hash] = ref - // fmt.Println("add ref", ref) - return ref, a.AddFast(ref, t, v) } diff --git a/writer.go b/writer.go index 886e518bd..a73f2bd11 100644 --- a/writer.go +++ b/writer.go @@ -132,6 +132,12 @@ func (w *seriesWriter) Size() int64 { } func (w *seriesWriter) Close() error { + // Initialize block in case no data was written to it. + if w.n == 0 { + if err := w.writeMeta(); err != nil { + return err + } + } return w.w.Flush() } @@ -334,18 +340,26 @@ func (w *indexWriter) writeSeries() error { }) } +func (w *indexWriter) init() error { + if err := w.writeMeta(); err != nil { + return err + } + if err := w.writeSymbols(); err != nil { + return err + } + if err := w.writeSeries(); err != nil { + return err + } + w.started = true + + return nil +} + func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { if !w.started { - if err := w.writeMeta(); err != nil { - return err - } - if err := w.writeSymbols(); err != nil { + if err := w.init(); err != nil { return err } - if err := w.writeSeries(); err != nil { - return err - } - w.started = true } valt, err := newStringTuples(values, len(names)) @@ -382,6 +396,12 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } func (w *indexWriter) WritePostings(name, value string, it Postings) error { + if !w.started { + if err := w.init(); err != nil { + return err + } + } + key := name + string(sep) + value w.postings = append(w.postings, hashEntry{ @@ -473,6 +493,12 @@ func (w *indexWriter) finalize() error { } func (w *indexWriter) Close() error { + // Handle blocks without any data. + if !w.started { + if err := w.init(); err != nil { + return err + } + } if err := w.finalize(); err != nil { return err }