diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 07b51e36c..a8ba514ab 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -181,12 +181,12 @@ func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int) } func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { - app := b.storage.Appender() ts := baset type sample struct { labels labels.Labels value int64 + ref *uint64 } scrape := make([]*sample, 0, len(metrics)) @@ -200,11 +200,35 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount total := uint64(0) for i := 0; i < scrapeCount; i++ { + app := b.storage.Appender() ts += int64(30000) for _, s := range scrape { s.value += 1000 - app.Add(s.labels, ts, float64(s.value)) + + if s.ref == nil { + ref, err := app.SetSeries(s.labels) + if err != nil { + panic(err) + } + s.ref = &ref + } + + if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { + if err.Error() != "not found" { + panic(err) + } + + ref, err := app.SetSeries(s.labels) + if err != nil { + panic(err) + } + s.ref = &ref + if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { + panic(err) + } + } + total++ } if err := app.Commit(); err != nil { diff --git a/db.go b/db.go index 76e82e3ba..600577018 100644 --- a/db.go +++ b/db.go @@ -41,13 +41,13 @@ type Options struct { // Appender allows committing batches of samples to a database. // The data held by the appender is reset after Commit returndb. type Appender interface { - // AddSeries registers a new known series label set with the appender + // SetSeries registers a new known series label set with the appender // and returns a reference number used to add samples to it over the // life time of the Appender. - // AddSeries(Labels) uint64 + SetSeries(labels.Labels) (uint64, error) // Add adds a sample pair for the referenced seriedb. - Add(lset labels.Labels, t int64, v float64) error + Add(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -74,6 +74,8 @@ type DB struct { mtx sync.RWMutex persisted []*persistedBlock heads []*headBlock + headGen uint8 + compactor *compactor compactc chan struct{} @@ -324,61 +326,89 @@ func (db *DB) Close() error { } func (db *DB) Appender() Appender { - return &dbAppender{db: db} + db.mtx.RLock() + + return &dbAppender{ + db: db, + head: db.heads[len(db.heads)-1].Appender().(*headAppender), + gen: db.headGen, + } } type dbAppender struct { - db *DB - buf []hashedSample + db *DB + gen uint8 + head *headAppender } -func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) error { - return a.add(hashedSample{ - hash: lset.Hash(), - labels: lset, - t: t, - v: v, - }) +func (a *dbAppender) SetSeries(lset labels.Labels) (uint64, error) { + ref, err := a.head.SetSeries(lset) + if err != nil { + return 0, err + } + return ref | (uint64(a.gen) << 32), nil } -func (a *dbAppender) add(s hashedSample) error { - a.buf = append(a.buf, s) - return nil +func (a *dbAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { + ref, err := a.head.setSeries(hash, lset) + if err != nil { + return 0, err + } + return ref | (uint64(a.gen) << 32), nil +} + +func (a *dbAppender) Add(ref uint64, t int64, v float64) error { + // We store the head generation in the 4th byte and use it to reject + // stale references. + gen := uint8((ref << 24) >> 56) + + if gen != a.gen { + return errNotFound + } + return a.head.Add(ref, t, v) } func (a *dbAppender) Commit() error { - err := a.db.appendBatch(a.buf) - a.buf = a.buf[:0] - return err -} + defer a.db.mtx.RUnlock() -func (db *DB) appendBatch(samples []hashedSample) error { - if len(samples) == 0 { - return nil - } - db.mtx.RLock() - defer db.mtx.RUnlock() + err := a.head.Commit() - head := db.heads[len(db.heads)-1] - - // TODO(fabxc): distinguish samples between concurrent heads for - // different time blocks. Those may occurr during transition to still - // allow late samples to arrive for a previous block. - n, err := head.appendBatch(samples) - if err == nil { - db.metrics.samplesAppended.Add(float64(n)) - } - - if head.fullness() > 1.0 { + if a.head.headBlock.fullness() > 1.0 { select { - case db.cutc <- struct{}{}: + case a.db.cutc <- struct{}{}: default: } } - return err } +// func (db *DB) appendBatch(samples []hashedSample) error { +// if len(samples) == 0 { +// return nil +// } +// db.mtx.RLock() +// defer db.mtx.RUnlock() + +// head := db.heads[len(db.heads)-1] + +// // TODO(fabxc): distinguish samples between concurrent heads for +// // different time blocks. Those may occurr during transition to still +// // allow late samples to arrive for a previous block. +// n, err := head.appendBatch(samples) +// if err == nil { +// db.metrics.samplesAppended.Add(float64(n)) +// } + +// if head.fullness() > 1.0 { +// select { +// case db.cutc <- struct{}{}: +// default: +// } +// } + +// return err +// } + func (db *DB) headForDir(dir string) (int, bool) { for i, b := range db.heads { if b.Dir() == dir { @@ -514,6 +544,7 @@ func (db *DB) cut() error { return err } db.heads = append(db.heads, newHead) + db.headGen++ return nil } @@ -613,39 +644,38 @@ func (db *PartitionedDB) Appender() Appender { app := &partitionedAppender{db: db} for _, p := range db.Partitions { - app.buckets = append(app.buckets, p.Appender().(*dbAppender)) + app.partitions = append(app.partitions, p.Appender().(*dbAppender)) } return app } type partitionedAppender struct { - db *PartitionedDB - buckets []*dbAppender + db *PartitionedDB + partitions []*dbAppender } -func (ba *partitionedAppender) SetSeries(lset labels.Labels) (uint32, error) { - - return 0, nil -} - -func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { +func (a *partitionedAppender) SetSeries(lset labels.Labels) (uint64, error) { h := lset.Hash() - s := h >> (64 - a.db.partitionPow) + p := h >> (64 - a.db.partitionPow) - return a.buckets[s].add(hashedSample{ - hash: h, - labels: lset, - t: t, - v: v, - }) + ref, err := a.partitions[p].setSeries(h, lset) + if err != nil { + return 0, err + } + return ref | (p << 40), nil } -func (ba *partitionedAppender) Commit() error { +func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error { + p := uint8((ref << 16) >> 56) + return a.partitions[p].Add(ref, t, v) +} + +func (a *partitionedAppender) Commit() error { var merr MultiError // Spill buckets into partitiondb. - for _, b := range ba.buckets { - merr.Add(b.Commit()) + for _, p := range a.partitions { + merr.Add(p.Commit()) } return merr.Err() } diff --git a/head.go b/head.go index 0fbf747a3..625480743 100644 --- a/head.go +++ b/head.go @@ -5,6 +5,7 @@ import ( "math" "sort" "sync" + "sync/atomic" "time" "github.com/bradfitz/slice" @@ -28,6 +29,8 @@ type headBlock struct { // to their chunk descs. hashes map[uint64][]*memSeries + nextSeriesID uint64 + values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -59,9 +62,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { - b.create(lset.Hash(), lset) + b.create(uint32(b.nextSeriesID), lset.Hash(), lset) + b.nextSeriesID++ b.stats.SeriesCount++ - b.stats.ChunkCount++ // head block has one chunk/series }, sample: func(s hashedSample) { si := s.ref @@ -102,6 +105,156 @@ func (h *headBlock) Stats() BlockStats { return *h.stats } +func (h *headBlock) Appender() Appender { + h.mtx.RLock() + return &headAppender{headBlock: h} +} + +type headAppender struct { + *headBlock + + newSeries map[uint32]hashedLabels + newLabels []labels.Labels + samples []hashedSample +} + +type hashedLabels struct { + hash uint64 + labels labels.Labels +} + +func (a *headAppender) SetSeries(lset labels.Labels) (uint64, error) { + return a.setSeries(lset.Hash(), lset) +} + +func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { + if ms := a.get(hash, lset); ms != nil { + return uint64(ms.ref), nil + } + + id := atomic.AddUint64(&a.nextSeriesID, 1) - 1 + if a.newSeries == nil { + a.newSeries = map[uint32]hashedLabels{} + } + a.newSeries[uint32(id)] = hashedLabels{hash: hash, labels: lset} + + return id, nil +} + +func (a *headAppender) Add(ref uint64, t int64, v float64) error { + // We only act on the last 4 bytes. Anything before is used by higher-order + // appenders. We erase it to avoid issues. + ref = (ref << 32) >> 32 + + // Distinguish between existing series and series created in + // this transaction. + if int(ref) >= len(a.series) { + if _, ok := a.newSeries[uint32(ref)]; !ok { + return errNotFound + } + // TODO(fabxc): we also have to validate here that the + // sample sequence is valid. + // We also have to revalidate it as we switch locks an create + // the new series. + a.samples = append(a.samples, hashedSample{ + ref: uint32(ref), + t: t, + v: v, + }) + return nil + } + + ms := a.series[int(ref)] + if ms == nil { + return errNotFound + } + c := ms.head() + + // TODO(fabxc): memory series should be locked here already. + // Only problem is release of locks in case of a rollback. + if t < c.maxTime { + return ErrOutOfOrderSample + } + if c.maxTime == t && ms.lastValue != v { + return ErrAmendSample + } + + a.samples = append(a.samples, hashedSample{ + ref: uint32(ref), + t: t, + v: v, + }) + return nil +} + +func (a *headAppender) createSeries() { + if len(a.newSeries) == 0 { + return + } + a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + + a.mtx.RUnlock() + a.mtx.Lock() + + for id, l := range a.newSeries { + // We switched locks and have to re-validate that the series were not + // created by another goroutine in the meantime. + if int(id) < len(a.series) && a.series[id] != nil { + continue + } + // Series is still new. + a.newLabels = append(a.newLabels, l.labels) + + a.create(id, l.hash, l.labels) + } + + a.mtx.Unlock() + a.mtx.RLock() +} + +func (a *headAppender) Commit() error { + defer a.mtx.RUnlock() + + // Write all new series and samples to the WAL and add it to the + // in-mem database on success. + if err := a.wal.Log(a.newLabels, a.samples); err != nil { + return err + } + + a.createSeries() + var ( + total = uint64(len(a.samples)) + mint = int64(math.MaxInt64) + maxt = int64(math.MinInt64) + ) + + for _, s := range a.samples { + if !a.series[s.ref].append(s.t, s.v) { + total-- + } + } + + a.stats.mtx.Lock() + defer a.stats.mtx.Unlock() + + a.stats.SampleCount += total + a.stats.SeriesCount += uint64(len(a.newSeries)) + + if mint < a.stats.MinTime { + a.stats.MinTime = mint + } + if maxt > a.stats.MaxTime { + a.stats.MaxTime = maxt + } + + return nil +} + +func (a *headAppender) Rollback() error { + a.mtx.RUnlock() + return nil +} + type headSeriesReader struct { *headBlock } @@ -221,13 +374,18 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { - s := &memSeries{lset: lset} +func (h *headBlock) create(ref uint32, hash uint64, lset labels.Labels) *memSeries { + s := &memSeries{ + ref: ref, + lset: lset, + } - // Index the new chunk. - s.ref = uint32(len(h.series)) + // Allocate empty space until we can insert at the given index. + for int(ref) >= len(h.series) { + h.series = append(h.series, nil) + } + h.series[ref] = s - h.series = append(h.series, s) h.hashes[hash] = append(h.hashes[hash], s) for _, l := range lset { @@ -258,126 +416,126 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { - // Find head chunks for all samples and allocate new IDs/refs for - // ones we haven't seen before. +// func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { +// // Find head chunks for all samples and allocate new IDs/refs for +// // ones we haven't seen before. - var ( - newSeries = map[uint64][]*hashedSample{} - newLabels []labels.Labels - ) +// var ( +// newSeries = map[uint64][]*hashedSample{} +// newLabels []labels.Labels +// ) - h.mtx.RLock() - defer h.mtx.RUnlock() +// h.mtx.RLock() +// defer h.mtx.RUnlock() - for i := range samples { - s := &samples[i] +// for i := range samples { +// s := &samples[i] - ms := h.get(s.hash, s.labels) - if ms != nil { - c := ms.head() +// ms := h.get(s.hash, s.labels) +// if ms != nil { +// c := ms.head() - if s.t < c.maxTime { - return 0, ErrOutOfOrderSample - } - if c.maxTime == s.t && ms.lastValue != s.v { - return 0, ErrAmendSample - } - // TODO(fabxc): sample refs are only scoped within a block for - // now and we ignore any previously set value - s.ref = ms.ref - continue - } +// if s.t < c.maxTime { +// return 0, ErrOutOfOrderSample +// } +// if c.maxTime == s.t && ms.lastValue != s.v { +// return 0, ErrAmendSample +// } +// // TODO(fabxc): sample refs are only scoped within a block for +// // now and we ignore any previously set value +// s.ref = ms.ref +// continue +// } - // TODO(fabxc): technically there's still collision probability here. - // Extract the hashmap of the head block and use an instance of it here as well. - newSeries[s.hash] = append(newSeries[s.hash], s) - } +// // TODO(fabxc): technically there's still collision probability here. +// // Extract the hashmap of the head block and use an instance of it here as well. +// newSeries[s.hash] = append(newSeries[s.hash], s) +// } - // After the samples were successfully written to the WAL, there may - // be no further failures. - if len(newSeries) > 0 { - newLabels = make([]labels.Labels, 0, len(newSeries)) - base0 := len(h.series) +// // After the samples were successfully written to the WAL, there may +// // be no further failures. +// if len(newSeries) > 0 { +// newLabels = make([]labels.Labels, 0, len(newSeries)) +// base0 := len(h.series) - h.mtx.RUnlock() - h.mtx.Lock() +// h.mtx.RUnlock() +// h.mtx.Lock() - base1 := len(h.series) - i := 0 +// base1 := len(h.series) +// i := 0 - for hash, ser := range newSeries { - lset := ser[0].labels - // We switched locks and have to re-validate that the series were not - // created by another goroutine in the meantime. - if base1 != base0 { - if ms := h.get(hash, lset); ms != nil { - for _, s := range ser { - s.ref = ms.ref - } - continue - } - } - // Series is still new. - newLabels = append(newLabels, lset) +// for hash, ser := range newSeries { +// lset := ser[0].labels +// // We switched locks and have to re-validate that the series were not +// // created by another goroutine in the meantime. +// if base1 != base0 { +// if ms := h.get(hash, lset); ms != nil { +// for _, s := range ser { +// s.ref = ms.ref +// } +// continue +// } +// } +// // Series is still new. +// newLabels = append(newLabels, lset) - h.create(hash, lset) - // Set sample references to the series we just created. - for _, s := range ser { - s.ref = uint32(base1 + i) - } - i++ - } +// h.create(hash, lset) +// // Set sample references to the series we just created. +// for _, s := range ser { +// s.ref = uint32(base1 + i) +// } +// i++ +// } - h.mtx.Unlock() - h.mtx.RLock() - } - // Write all new series and samples to the WAL and add it to the - // in-mem database on success. - if err := h.wal.Log(newLabels, samples); err != nil { - return 0, err - } +// h.mtx.Unlock() +// h.mtx.RLock() +// } +// // Write all new series and samples to the WAL and add it to the +// // in-mem database on success. +// if err := h.wal.Log(newLabels, samples); err != nil { +// return 0, err +// } - var ( - total = uint64(len(samples)) - mint = int64(math.MaxInt64) - maxt = int64(math.MinInt64) - ) - for _, s := range samples { - ser := h.series[s.ref] +// var ( +// total = uint64(len(samples)) +// mint = int64(math.MaxInt64) +// maxt = int64(math.MinInt64) +// ) +// for _, s := range samples { +// ser := h.series[s.ref] - ser.mtx.Lock() - ok := ser.append(s.t, s.v) - ser.mtx.Unlock() +// ser.mtx.Lock() +// ok := ser.append(s.t, s.v) +// ser.mtx.Unlock() - if !ok { - total-- - continue - } - if mint > s.t { - mint = s.t - } - if maxt < s.t { - maxt = s.t - } - } +// if !ok { +// total-- +// continue +// } +// if mint > s.t { +// mint = s.t +// } +// if maxt < s.t { +// maxt = s.t +// } +// } - h.stats.mtx.Lock() - defer h.stats.mtx.Unlock() +// h.stats.mtx.Lock() +// defer h.stats.mtx.Unlock() - h.stats.SampleCount += total - h.stats.SeriesCount += uint64(len(newSeries)) - h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series +// h.stats.SampleCount += total +// h.stats.SeriesCount += uint64(len(newSeries)) +// h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series - if mint < h.stats.MinTime { - h.stats.MinTime = mint - } - if maxt > h.stats.MaxTime { - h.stats.MaxTime = maxt - } +// if mint < h.stats.MinTime { +// h.stats.MinTime = mint +// } +// if maxt > h.stats.MaxTime { +// h.stats.MaxTime = maxt +// } - return int(total), nil -} +// return int(total), nil +// } func (h *headBlock) fullness() float64 { h.stats.mtx.RLock()