Bucket samples before appending.

This pre-sorts samples into buckets before appending them to reduce
locking of shards.
pull/5805/head
Fabian Reinartz 8 years ago
parent c5945177fb
commit 52276c6966

52
db.go

@ -31,8 +31,9 @@ type DB struct {
// TODO(fabxc): make configurable // TODO(fabxc): make configurable
const ( const (
numSeriesShards = 32 seriesShardShift = 3
maxChunkSize = 1024 numSeriesShards = 1 << seriesShardShift
maxChunkSize = 1024
) )
// Open or create a new DB. // Open or create a new DB.
@ -63,6 +64,12 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
// Close the database. // Close the database.
func (db *DB) Close() error { func (db *DB) Close() error {
for i, shard := range db.shards {
fmt.Println("shard", i)
fmt.Println(" num chunks", len(shard.head.forward))
fmt.Println(" num samples", shard.head.samples)
}
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
@ -214,26 +221,49 @@ func LabelsFromMap(m map[string]string) Labels {
// Vector is a set of LabelSet associated with one value each. // Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length. // Label sets and values must have equal length.
type Vector struct { type Vector struct {
LabelSets []Labels Buckets map[uint16][]Sample
Values []float64 }
type Sample struct {
Hash uint64
Labels Labels
Value float64
}
// Reset the vector but keep resources allocated.
func (v *Vector) Reset() {
v.Buckets = make(map[uint16][]Sample, len(v.Buckets))
}
// Add a sample to the vector.
func (v *Vector) Add(lset Labels, val float64) {
h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift))
v.Buckets[s] = append(v.Buckets[s], Sample{
Hash: h,
Labels: lset,
Value: val,
})
} }
// AppendVector adds values for a list of label sets for the given timestamp // AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds. // in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error { func (db *DB) AppendVector(ts int64, v *Vector) error {
// Sequentially add samples to shards. // Sequentially add samples to shards.
for i, ls := range v.LabelSets { for s, bkt := range v.Buckets {
h := ls.Hash() shard := db.shards[s]
shard := db.shards[h>>(64-uint(len(db.shards)))]
// TODO(fabxc): benchmark whether grouping into shards and submitting to // TODO(fabxc): benchmark whether grouping into shards and submitting to
// shards in batches is more efficient. // shards in batches is more efficient.
shard.head.mtx.Lock() shard.head.mtx.Lock()
if err := shard.head.append(h, ls, ts, v.Values[i]); err != nil { for _, smpl := range bkt {
shard.head.mtx.Unlock() if err := shard.head.append(smpl.Hash, smpl.Labels, ts, smpl.Value); err != nil {
// TODO(fabxc): handle gracefully and collect multi-error. shard.head.mtx.Unlock()
return err // TODO(fabxc): handle gracefully and collect multi-error.
return err
}
} }
shard.head.mtx.Unlock() shard.head.mtx.Unlock()
} }

@ -40,6 +40,8 @@ type HeadBlock struct {
forward map[uint32]*chunkDesc // chunk ID to chunk desc forward map[uint32]*chunkDesc // chunk ID to chunk desc
values map[string][]string // label names to possible values values map[string][]string // label names to possible values
index *memIndex // inverted index for label pairs index *memIndex // inverted index for label pairs
samples uint64
} }
// Block handles reads against a completed block of time series data within a time window. // Block handles reads against a completed block of time series data within a time window.
@ -97,7 +99,12 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
// Store forward index for the returned ID. // Store forward index for the returned ID.
h.forward[id] = chkd h.forward[id] = chkd
} }
return chkd.append(ts, v) if err := chkd.append(ts, v); err != nil {
return err
}
h.samples++
return nil
} }
// chunkDesc wraps a plain data chunk and provides cached meta data about it. // chunkDesc wraps a plain data chunk and provides cached meta data about it.

Loading…
Cancel
Save