From a317f252b96737a16b80243dcebc9a2407dd872b Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 12 Jan 2017 19:18:51 +0100 Subject: [PATCH] Expose series references to clients This exposes a reference number of a series represented by a label set to clients. Subsequent samples can be directly added via the reference rather than repeatedly passing in the full labels. This drasitcally speeds up the append process. The appender chain uses different sections of the reference number for assignment to child appenders and invalidating reference numbers as necessary. Clients can either pass out reference numbers themselves or have their own optimized lookup, i.e. by directly associating unparsed metric descriptors strings with reference numbers. --- cmd/tsdb/main.go | 28 +++- db.go | 146 ++++++++++-------- head.go | 374 +++++++++++++++++++++++++++++++++-------------- 3 files changed, 380 insertions(+), 168 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 07b51e36c..a8ba514ab 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -181,12 +181,12 @@ func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int) } func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { - app := b.storage.Appender() ts := baset type sample struct { labels labels.Labels value int64 + ref *uint64 } scrape := make([]*sample, 0, len(metrics)) @@ -200,11 +200,35 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount total := uint64(0) for i := 0; i < scrapeCount; i++ { + app := b.storage.Appender() ts += int64(30000) for _, s := range scrape { s.value += 1000 - app.Add(s.labels, ts, float64(s.value)) + + if s.ref == nil { + ref, err := app.SetSeries(s.labels) + if err != nil { + panic(err) + } + s.ref = &ref + } + + if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { + if err.Error() != "not found" { + panic(err) + } + + ref, err := app.SetSeries(s.labels) + if err != nil { + panic(err) + } + s.ref = &ref + if err := app.Add(*s.ref, ts, float64(s.value)); err != nil { + panic(err) + } + } + total++ } if err := app.Commit(); err != nil { diff --git a/db.go b/db.go index 76e82e3ba..600577018 100644 --- a/db.go +++ b/db.go @@ -41,13 +41,13 @@ type Options struct { // Appender allows committing batches of samples to a database. // The data held by the appender is reset after Commit returndb. type Appender interface { - // AddSeries registers a new known series label set with the appender + // SetSeries registers a new known series label set with the appender // and returns a reference number used to add samples to it over the // life time of the Appender. - // AddSeries(Labels) uint64 + SetSeries(labels.Labels) (uint64, error) // Add adds a sample pair for the referenced seriedb. - Add(lset labels.Labels, t int64, v float64) error + Add(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error @@ -74,6 +74,8 @@ type DB struct { mtx sync.RWMutex persisted []*persistedBlock heads []*headBlock + headGen uint8 + compactor *compactor compactc chan struct{} @@ -324,61 +326,89 @@ func (db *DB) Close() error { } func (db *DB) Appender() Appender { - return &dbAppender{db: db} + db.mtx.RLock() + + return &dbAppender{ + db: db, + head: db.heads[len(db.heads)-1].Appender().(*headAppender), + gen: db.headGen, + } } type dbAppender struct { - db *DB - buf []hashedSample + db *DB + gen uint8 + head *headAppender } -func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) error { - return a.add(hashedSample{ - hash: lset.Hash(), - labels: lset, - t: t, - v: v, - }) +func (a *dbAppender) SetSeries(lset labels.Labels) (uint64, error) { + ref, err := a.head.SetSeries(lset) + if err != nil { + return 0, err + } + return ref | (uint64(a.gen) << 32), nil } -func (a *dbAppender) add(s hashedSample) error { - a.buf = append(a.buf, s) - return nil +func (a *dbAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { + ref, err := a.head.setSeries(hash, lset) + if err != nil { + return 0, err + } + return ref | (uint64(a.gen) << 32), nil +} + +func (a *dbAppender) Add(ref uint64, t int64, v float64) error { + // We store the head generation in the 4th byte and use it to reject + // stale references. + gen := uint8((ref << 24) >> 56) + + if gen != a.gen { + return errNotFound + } + return a.head.Add(ref, t, v) } func (a *dbAppender) Commit() error { - err := a.db.appendBatch(a.buf) - a.buf = a.buf[:0] - return err -} + defer a.db.mtx.RUnlock() -func (db *DB) appendBatch(samples []hashedSample) error { - if len(samples) == 0 { - return nil - } - db.mtx.RLock() - defer db.mtx.RUnlock() + err := a.head.Commit() - head := db.heads[len(db.heads)-1] - - // TODO(fabxc): distinguish samples between concurrent heads for - // different time blocks. Those may occurr during transition to still - // allow late samples to arrive for a previous block. - n, err := head.appendBatch(samples) - if err == nil { - db.metrics.samplesAppended.Add(float64(n)) - } - - if head.fullness() > 1.0 { + if a.head.headBlock.fullness() > 1.0 { select { - case db.cutc <- struct{}{}: + case a.db.cutc <- struct{}{}: default: } } - return err } +// func (db *DB) appendBatch(samples []hashedSample) error { +// if len(samples) == 0 { +// return nil +// } +// db.mtx.RLock() +// defer db.mtx.RUnlock() + +// head := db.heads[len(db.heads)-1] + +// // TODO(fabxc): distinguish samples between concurrent heads for +// // different time blocks. Those may occurr during transition to still +// // allow late samples to arrive for a previous block. +// n, err := head.appendBatch(samples) +// if err == nil { +// db.metrics.samplesAppended.Add(float64(n)) +// } + +// if head.fullness() > 1.0 { +// select { +// case db.cutc <- struct{}{}: +// default: +// } +// } + +// return err +// } + func (db *DB) headForDir(dir string) (int, bool) { for i, b := range db.heads { if b.Dir() == dir { @@ -514,6 +544,7 @@ func (db *DB) cut() error { return err } db.heads = append(db.heads, newHead) + db.headGen++ return nil } @@ -613,39 +644,38 @@ func (db *PartitionedDB) Appender() Appender { app := &partitionedAppender{db: db} for _, p := range db.Partitions { - app.buckets = append(app.buckets, p.Appender().(*dbAppender)) + app.partitions = append(app.partitions, p.Appender().(*dbAppender)) } return app } type partitionedAppender struct { - db *PartitionedDB - buckets []*dbAppender + db *PartitionedDB + partitions []*dbAppender } -func (ba *partitionedAppender) SetSeries(lset labels.Labels) (uint32, error) { - - return 0, nil -} - -func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { +func (a *partitionedAppender) SetSeries(lset labels.Labels) (uint64, error) { h := lset.Hash() - s := h >> (64 - a.db.partitionPow) + p := h >> (64 - a.db.partitionPow) - return a.buckets[s].add(hashedSample{ - hash: h, - labels: lset, - t: t, - v: v, - }) + ref, err := a.partitions[p].setSeries(h, lset) + if err != nil { + return 0, err + } + return ref | (p << 40), nil } -func (ba *partitionedAppender) Commit() error { +func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error { + p := uint8((ref << 16) >> 56) + return a.partitions[p].Add(ref, t, v) +} + +func (a *partitionedAppender) Commit() error { var merr MultiError // Spill buckets into partitiondb. - for _, b := range ba.buckets { - merr.Add(b.Commit()) + for _, p := range a.partitions { + merr.Add(p.Commit()) } return merr.Err() } diff --git a/head.go b/head.go index 0fbf747a3..625480743 100644 --- a/head.go +++ b/head.go @@ -5,6 +5,7 @@ import ( "math" "sort" "sync" + "sync/atomic" "time" "github.com/bradfitz/slice" @@ -28,6 +29,8 @@ type headBlock struct { // to their chunk descs. hashes map[uint64][]*memSeries + nextSeriesID uint64 + values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -59,9 +62,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { err = wal.ReadAll(&walHandler{ series: func(lset labels.Labels) { - b.create(lset.Hash(), lset) + b.create(uint32(b.nextSeriesID), lset.Hash(), lset) + b.nextSeriesID++ b.stats.SeriesCount++ - b.stats.ChunkCount++ // head block has one chunk/series }, sample: func(s hashedSample) { si := s.ref @@ -102,6 +105,156 @@ func (h *headBlock) Stats() BlockStats { return *h.stats } +func (h *headBlock) Appender() Appender { + h.mtx.RLock() + return &headAppender{headBlock: h} +} + +type headAppender struct { + *headBlock + + newSeries map[uint32]hashedLabels + newLabels []labels.Labels + samples []hashedSample +} + +type hashedLabels struct { + hash uint64 + labels labels.Labels +} + +func (a *headAppender) SetSeries(lset labels.Labels) (uint64, error) { + return a.setSeries(lset.Hash(), lset) +} + +func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error) { + if ms := a.get(hash, lset); ms != nil { + return uint64(ms.ref), nil + } + + id := atomic.AddUint64(&a.nextSeriesID, 1) - 1 + if a.newSeries == nil { + a.newSeries = map[uint32]hashedLabels{} + } + a.newSeries[uint32(id)] = hashedLabels{hash: hash, labels: lset} + + return id, nil +} + +func (a *headAppender) Add(ref uint64, t int64, v float64) error { + // We only act on the last 4 bytes. Anything before is used by higher-order + // appenders. We erase it to avoid issues. + ref = (ref << 32) >> 32 + + // Distinguish between existing series and series created in + // this transaction. + if int(ref) >= len(a.series) { + if _, ok := a.newSeries[uint32(ref)]; !ok { + return errNotFound + } + // TODO(fabxc): we also have to validate here that the + // sample sequence is valid. + // We also have to revalidate it as we switch locks an create + // the new series. + a.samples = append(a.samples, hashedSample{ + ref: uint32(ref), + t: t, + v: v, + }) + return nil + } + + ms := a.series[int(ref)] + if ms == nil { + return errNotFound + } + c := ms.head() + + // TODO(fabxc): memory series should be locked here already. + // Only problem is release of locks in case of a rollback. + if t < c.maxTime { + return ErrOutOfOrderSample + } + if c.maxTime == t && ms.lastValue != v { + return ErrAmendSample + } + + a.samples = append(a.samples, hashedSample{ + ref: uint32(ref), + t: t, + v: v, + }) + return nil +} + +func (a *headAppender) createSeries() { + if len(a.newSeries) == 0 { + return + } + a.newLabels = make([]labels.Labels, 0, len(a.newSeries)) + + a.mtx.RUnlock() + a.mtx.Lock() + + for id, l := range a.newSeries { + // We switched locks and have to re-validate that the series were not + // created by another goroutine in the meantime. + if int(id) < len(a.series) && a.series[id] != nil { + continue + } + // Series is still new. + a.newLabels = append(a.newLabels, l.labels) + + a.create(id, l.hash, l.labels) + } + + a.mtx.Unlock() + a.mtx.RLock() +} + +func (a *headAppender) Commit() error { + defer a.mtx.RUnlock() + + // Write all new series and samples to the WAL and add it to the + // in-mem database on success. + if err := a.wal.Log(a.newLabels, a.samples); err != nil { + return err + } + + a.createSeries() + var ( + total = uint64(len(a.samples)) + mint = int64(math.MaxInt64) + maxt = int64(math.MinInt64) + ) + + for _, s := range a.samples { + if !a.series[s.ref].append(s.t, s.v) { + total-- + } + } + + a.stats.mtx.Lock() + defer a.stats.mtx.Unlock() + + a.stats.SampleCount += total + a.stats.SeriesCount += uint64(len(a.newSeries)) + + if mint < a.stats.MinTime { + a.stats.MinTime = mint + } + if maxt > a.stats.MaxTime { + a.stats.MaxTime = maxt + } + + return nil +} + +func (a *headAppender) Rollback() error { + a.mtx.RUnlock() + return nil +} + type headSeriesReader struct { *headBlock } @@ -221,13 +374,18 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { - s := &memSeries{lset: lset} +func (h *headBlock) create(ref uint32, hash uint64, lset labels.Labels) *memSeries { + s := &memSeries{ + ref: ref, + lset: lset, + } - // Index the new chunk. - s.ref = uint32(len(h.series)) + // Allocate empty space until we can insert at the given index. + for int(ref) >= len(h.series) { + h.series = append(h.series, nil) + } + h.series[ref] = s - h.series = append(h.series, s) h.hashes[hash] = append(h.hashes[hash], s) for _, l := range lset { @@ -258,126 +416,126 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { - // Find head chunks for all samples and allocate new IDs/refs for - // ones we haven't seen before. +// func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { +// // Find head chunks for all samples and allocate new IDs/refs for +// // ones we haven't seen before. - var ( - newSeries = map[uint64][]*hashedSample{} - newLabels []labels.Labels - ) +// var ( +// newSeries = map[uint64][]*hashedSample{} +// newLabels []labels.Labels +// ) - h.mtx.RLock() - defer h.mtx.RUnlock() +// h.mtx.RLock() +// defer h.mtx.RUnlock() - for i := range samples { - s := &samples[i] +// for i := range samples { +// s := &samples[i] - ms := h.get(s.hash, s.labels) - if ms != nil { - c := ms.head() +// ms := h.get(s.hash, s.labels) +// if ms != nil { +// c := ms.head() - if s.t < c.maxTime { - return 0, ErrOutOfOrderSample - } - if c.maxTime == s.t && ms.lastValue != s.v { - return 0, ErrAmendSample - } - // TODO(fabxc): sample refs are only scoped within a block for - // now and we ignore any previously set value - s.ref = ms.ref - continue - } +// if s.t < c.maxTime { +// return 0, ErrOutOfOrderSample +// } +// if c.maxTime == s.t && ms.lastValue != s.v { +// return 0, ErrAmendSample +// } +// // TODO(fabxc): sample refs are only scoped within a block for +// // now and we ignore any previously set value +// s.ref = ms.ref +// continue +// } - // TODO(fabxc): technically there's still collision probability here. - // Extract the hashmap of the head block and use an instance of it here as well. - newSeries[s.hash] = append(newSeries[s.hash], s) - } +// // TODO(fabxc): technically there's still collision probability here. +// // Extract the hashmap of the head block and use an instance of it here as well. +// newSeries[s.hash] = append(newSeries[s.hash], s) +// } - // After the samples were successfully written to the WAL, there may - // be no further failures. - if len(newSeries) > 0 { - newLabels = make([]labels.Labels, 0, len(newSeries)) - base0 := len(h.series) +// // After the samples were successfully written to the WAL, there may +// // be no further failures. +// if len(newSeries) > 0 { +// newLabels = make([]labels.Labels, 0, len(newSeries)) +// base0 := len(h.series) - h.mtx.RUnlock() - h.mtx.Lock() +// h.mtx.RUnlock() +// h.mtx.Lock() - base1 := len(h.series) - i := 0 +// base1 := len(h.series) +// i := 0 - for hash, ser := range newSeries { - lset := ser[0].labels - // We switched locks and have to re-validate that the series were not - // created by another goroutine in the meantime. - if base1 != base0 { - if ms := h.get(hash, lset); ms != nil { - for _, s := range ser { - s.ref = ms.ref - } - continue - } - } - // Series is still new. - newLabels = append(newLabels, lset) +// for hash, ser := range newSeries { +// lset := ser[0].labels +// // We switched locks and have to re-validate that the series were not +// // created by another goroutine in the meantime. +// if base1 != base0 { +// if ms := h.get(hash, lset); ms != nil { +// for _, s := range ser { +// s.ref = ms.ref +// } +// continue +// } +// } +// // Series is still new. +// newLabels = append(newLabels, lset) - h.create(hash, lset) - // Set sample references to the series we just created. - for _, s := range ser { - s.ref = uint32(base1 + i) - } - i++ - } +// h.create(hash, lset) +// // Set sample references to the series we just created. +// for _, s := range ser { +// s.ref = uint32(base1 + i) +// } +// i++ +// } - h.mtx.Unlock() - h.mtx.RLock() - } - // Write all new series and samples to the WAL and add it to the - // in-mem database on success. - if err := h.wal.Log(newLabels, samples); err != nil { - return 0, err - } +// h.mtx.Unlock() +// h.mtx.RLock() +// } +// // Write all new series and samples to the WAL and add it to the +// // in-mem database on success. +// if err := h.wal.Log(newLabels, samples); err != nil { +// return 0, err +// } - var ( - total = uint64(len(samples)) - mint = int64(math.MaxInt64) - maxt = int64(math.MinInt64) - ) - for _, s := range samples { - ser := h.series[s.ref] +// var ( +// total = uint64(len(samples)) +// mint = int64(math.MaxInt64) +// maxt = int64(math.MinInt64) +// ) +// for _, s := range samples { +// ser := h.series[s.ref] - ser.mtx.Lock() - ok := ser.append(s.t, s.v) - ser.mtx.Unlock() +// ser.mtx.Lock() +// ok := ser.append(s.t, s.v) +// ser.mtx.Unlock() - if !ok { - total-- - continue - } - if mint > s.t { - mint = s.t - } - if maxt < s.t { - maxt = s.t - } - } +// if !ok { +// total-- +// continue +// } +// if mint > s.t { +// mint = s.t +// } +// if maxt < s.t { +// maxt = s.t +// } +// } - h.stats.mtx.Lock() - defer h.stats.mtx.Unlock() +// h.stats.mtx.Lock() +// defer h.stats.mtx.Unlock() - h.stats.SampleCount += total - h.stats.SeriesCount += uint64(len(newSeries)) - h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series +// h.stats.SampleCount += total +// h.stats.SeriesCount += uint64(len(newSeries)) +// h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series - if mint < h.stats.MinTime { - h.stats.MinTime = mint - } - if maxt > h.stats.MaxTime { - h.stats.MaxTime = maxt - } +// if mint < h.stats.MinTime { +// h.stats.MinTime = mint +// } +// if maxt > h.stats.MaxTime { +// h.stats.MaxTime = maxt +// } - return int(total), nil -} +// return int(total), nil +// } func (h *headBlock) fullness() float64 { h.stats.mtx.RLock()