diff --git a/db.go b/db.go index b0f68eb23..7c551c738 100644 --- a/db.go +++ b/db.go @@ -35,14 +35,14 @@ type DB struct { opts *Options path string - shards []*SeriesShard + shards []*Shard } // TODO(fabxc): make configurable const ( - seriesShardShift = 2 - numSeriesShards = 1 << seriesShardShift - maxChunkSize = 1024 + shardShift = 2 + numShards = 1 << shardShift + maxChunkSize = 1024 ) // Open or create a new DB. @@ -67,11 +67,11 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // Initialize vertical shards. // TODO(fabxc): validate shard number to be power of 2, which is required // for the bitshift-modulo when finding the right shard. - for i := 0; i < numSeriesShards; i++ { + for i := 0; i < numShards; i++ { l := log.NewContext(l).With("shard", i) d := shardDir(path, i) - s, err := NewSeriesShard(d, l) + s, err := OpenShard(d, l) if err != nil { return nil, fmt.Errorf("initializing shard %q failed: %s", d, err) } @@ -139,7 +139,7 @@ func (v *Vector) Reset() { // Add a sample to the vector. func (v *Vector) Add(lset Labels, val float64) { h := lset.Hash() - s := uint16(h >> (64 - seriesShardShift)) + s := uint16(h >> (64 - shardShift)) v.Buckets[s] = append(v.Buckets[s], Sample{ Hash: h, @@ -185,7 +185,7 @@ func (db *DB) AppendVector(ts int64, v *Vector) error { func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error { sort.Sort(lset) h := lset.Hash() - s := uint16(h >> (64 - seriesShardShift)) + s := uint16(h >> (64 - shardShift)) return db.shards[s].appendBatch(ts, []Sample{ { @@ -198,9 +198,9 @@ func (db *DB) AppendSingle(lset Labels, ts int64, v float64) error { const sep = '\xff' -// SeriesShard handles reads and writes of time series falling into +// Shard handles reads and writes of time series falling into // a hashed shard of a series. -type SeriesShard struct { +type Shard struct { path string persistCh chan struct{} logger log.Logger @@ -210,8 +210,8 @@ type SeriesShard struct { head *HeadBlock } -// NewSeriesShard returns a new SeriesShard. -func NewSeriesShard(path string, logger log.Logger) (*SeriesShard, error) { +// OpenShard returns a new Shard. +func OpenShard(path string, logger log.Logger) (*Shard, error) { // Create directory if shard is new. if _, err := os.Stat(path); os.IsNotExist(err) { if err := os.MkdirAll(path, 0777); err != nil { @@ -225,7 +225,7 @@ func NewSeriesShard(path string, logger log.Logger) (*SeriesShard, error) { return nil, err } - s := &SeriesShard{ + s := &Shard{ path: path, persistCh: make(chan struct{}, 1), logger: logger, @@ -240,8 +240,8 @@ func NewSeriesShard(path string, logger log.Logger) (*SeriesShard, error) { return s, nil } -// Close the series shard. -func (s *SeriesShard) Close() error { +// Close the shard. +func (s *Shard) Close() error { var e MultiError for _, pb := range s.persisted { @@ -251,7 +251,7 @@ func (s *SeriesShard) Close() error { return e.Err() } -func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { +func (s *Shard) appendBatch(ts int64, samples []Sample) error { // TODO(fabxc): make configurable. const persistenceTimeThreshold = 1000 * 60 * 60 // 1 hour if timestamp in ms @@ -283,14 +283,14 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error { // blocksForRange returns all blocks within the shard that may contain // data for the given time range. -func (s *SeriesShard) blocksForRange(mint, maxt int64) (bs []Block) { +func (s *Shard) blocksForRange(mint, maxt int64) (bs []Block) { return []Block{s.head} } // TODO(fabxc): make configurable. const shardGracePeriod = 60 * 1000 // 60 seconds for millisecond scale -func (s *SeriesShard) persist() error { +func (s *Shard) persist() error { s.mtx.Lock() // Set new head block. diff --git a/querier.go b/querier.go index 2e49bc9d6..2602b7232 100644 --- a/querier.go +++ b/querier.go @@ -110,7 +110,7 @@ type shardQuerier struct { // Querier returns a new querier over the data shard for the given // time range. -func (s *SeriesShard) Querier(mint, maxt int64) Querier { +func (s *Shard) Querier(mint, maxt int64) Querier { blocks := s.blocksForRange(mint, maxt) sq := &shardQuerier{