diff --git a/db.go b/db.go index 175ad0c52..2add43a5d 100644 --- a/db.go +++ b/db.go @@ -323,6 +323,35 @@ func (db *DB) Close() error { return merr.Err() } +func (db *DB) Appender() Appender { + return &dbAppender{db: db} +} + +type dbAppender struct { + db *DB + buf []hashedSample +} + +func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) error { + return a.add(hashedSample{ + hash: lset.Hash(), + labels: lset, + t: t, + v: v, + }) +} + +func (a *dbAppender) add(s hashedSample) error { + a.buf = append(a.buf, s) + return nil +} + +func (a *dbAppender) Commit() error { + err := a.db.appendBatch(a.buf) + a.buf = a.buf[:0] + return err +} + func (db *DB) appendBatch(samples []hashedSample) error { if len(samples) == 0 { return nil @@ -335,9 +364,9 @@ func (db *DB) appendBatch(samples []hashedSample) error { // TODO(fabxc): distinguish samples between concurrent heads for // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. - err := head.appendBatch(samples) + n, err := head.appendBatch(samples) if err == nil { - db.metrics.samplesAppended.Add(float64(len(samples))) + db.metrics.samplesAppended.Add(float64(n)) } if head.fullness() > 1.0 { @@ -583,45 +612,42 @@ func (db *PartitionedDB) Close() error { // Appender returns a new appender against the database. func (db *PartitionedDB) Appender() Appender { - return &partitionedAppender{ - db: db, - buckets: make([][]hashedSample, len(db.Partitions)), + app := &partitionedAppender{db: db} + + for _, p := range db.Partitions { + app.buckets = append(app.buckets, p.Appender().(*dbAppender)) } + return app } type partitionedAppender struct { db *PartitionedDB - buckets [][]hashedSample + buckets []*dbAppender } -func (ba *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { - h := lset.Hash() - s := h >> (64 - ba.db.partitionPow) +func (ba *partitionedAppender) SetSeries(lset labels.Labels) (uint32, error) { - ba.buckets[s] = append(ba.buckets[s], hashedSample{ + return 0, nil +} + +func (a *partitionedAppender) Add(lset labels.Labels, t int64, v float64) error { + h := lset.Hash() + s := h >> (64 - a.db.partitionPow) + + return a.buckets[s].add(hashedSample{ hash: h, labels: lset, t: t, v: v, }) - - return nil -} - -func (ba *partitionedAppender) reset() { - for i := range ba.buckets { - ba.buckets[i] = ba.buckets[i][:0] - } } func (ba *partitionedAppender) Commit() error { - defer ba.reset() - var merr MultiError // Spill buckets into partitiondb. - for s, b := range ba.buckets { - merr.Add(ba.db.Partitions[s].appendBatch(b)) + for _, b := range ba.buckets { + merr.Add(b.Commit()) } return merr.Err() } diff --git a/head.go b/head.go index 5eafb4f95..4c1de4000 100644 --- a/head.go +++ b/head.go @@ -284,7 +284,7 @@ var ( ErrAmendSample = errors.New("amending sample") ) -func (h *HeadBlock) appendBatch(samples []hashedSample) error { +func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) { // Find head chunks for all samples and allocate new IDs/refs for // ones we haven't seen before. var ( @@ -303,10 +303,10 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { if cd != nil { // Samples must only occur in order. if s.t < cd.lastTimestamp { - return ErrOutOfOrderSample + return 0, ErrOutOfOrderSample } if cd.lastTimestamp == s.t && cd.lastValue != s.v { - return ErrAmendSample + return 0, ErrAmendSample } // TODO(fabxc): sample refs are only scoped within a block for // now and we ignore any previously set value @@ -332,7 +332,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { // Write all new series and samples to the WAL and add it to the // in-mem database on success. if err := h.wal.Log(newSeries, samples); err != nil { - return err + return 0, err } // After the samples were successfully written to the WAL, there may @@ -392,7 +392,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { h.bstats.MaxTime = maxt } - return nil + return int(total), nil } func (h *HeadBlock) fullness() float64 {