Browse Source

Unexport HeadBlock, export Block interface

pull/5805/head
Fabian Reinartz 8 years ago
parent
commit
c32a94d409
  1. 47
      block.go
  2. 2
      cmd/tsdb/Makefile
  3. 34
      compact.go
  4. 62
      db.go
  5. 144
      head.go
  6. 7
      querier.go

47
block.go

@ -10,19 +10,18 @@ import (
"github.com/pkg/errors"
)
// Block handles reads against a block of time series data.
type block interface {
dir() string
stats() BlockStats
interval() (int64, int64)
index() IndexReader
series() SeriesReader
persisted() bool
// Block handles reads against a Block of time series data.
type Block interface {
Dir() string
Stats() BlockStats
Index() IndexReader
Series() SeriesReader
Persisted() bool
}
// BlockStats provides stats on a data block.
type BlockStats struct {
// Time range of samples in the block.
MinTime, MaxTime int64
MinTime, MaxTime int64 // time range of samples in the block
SampleCount uint64
SeriesCount uint64
@ -37,8 +36,8 @@ const (
)
type persistedBlock struct {
d string
bstats BlockStats
dir string
stats *BlockStats
chunksf, indexf *mmapFile
@ -46,15 +45,15 @@ type persistedBlock struct {
indexr *indexReader
}
func newPersistedBlock(p string) (*persistedBlock, error) {
func newPersistedBlock(dir string) (*persistedBlock, error) {
// TODO(fabxc): validate match of name and stats time, validate magic.
// mmap files belonging to the block.
chunksf, err := openMmapFile(chunksFileName(p))
chunksf, err := openMmapFile(chunksFileName(dir))
if err != nil {
return nil, errors.Wrap(err, "open chunk file")
}
indexf, err := openMmapFile(indexFileName(p))
indexf, err := openMmapFile(indexFileName(dir))
if err != nil {
return nil, errors.Wrap(err, "open index file")
}
@ -74,12 +73,12 @@ func newPersistedBlock(p string) (*persistedBlock, error) {
}
pb := &persistedBlock{
d: p,
dir: dir,
chunksf: chunksf,
indexf: indexf,
chunkr: sr,
indexr: ir,
bstats: stats,
stats: &stats,
}
return pb, nil
}
@ -94,15 +93,11 @@ func (pb *persistedBlock) Close() error {
return err1
}
func (pb *persistedBlock) dir() string { return pb.d }
func (pb *persistedBlock) persisted() bool { return true }
func (pb *persistedBlock) index() IndexReader { return pb.indexr }
func (pb *persistedBlock) series() SeriesReader { return pb.chunkr }
func (pb *persistedBlock) stats() BlockStats { return pb.bstats }
func (pb *persistedBlock) interval() (int64, int64) {
return pb.bstats.MinTime, pb.bstats.MaxTime
}
func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Persisted() bool { return true }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr }
func (pb *persistedBlock) Stats() BlockStats { return *pb.stats }
func chunksFileName(path string) string {
return filepath.Join(path, "chunks-000")

2
cmd/tsdb/Makefile

@ -3,7 +3,7 @@ build:
bench: build
@echo ">> running benchmark"
@./tsdb bench write --out=benchout/ --metrics=$(NUM_METRICS) testdata.100k
@./tsdb bench write --out=$(OUT) --metrics=$(NUM_METRICS) testdata.100k
@go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg
@go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg

34
compact.go

@ -49,11 +49,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
}
type blockStore interface {
blocks() []block
blocks() []Block
}
type compactableBlocks interface {
compactable() []block
compactable() []Block
}
func newCompactor(blocks compactableBlocks, r prometheus.Registerer) (*compactor, error) {
@ -70,15 +70,15 @@ const (
compactionBlocks = 2
)
func (c *compactor) pick() []block {
func (c *compactor) pick() []Block {
bs := c.blocks.compactable()
if len(bs) == 0 {
return nil
}
if len(bs) == 1 && !bs[0].persisted() {
if len(bs) == 1 && !bs[0].Persisted() {
return bs
}
if !bs[0].persisted() {
if !bs[0].Persisted() {
if len(bs) == 2 || !compactionMatch(bs[:3]) {
return bs[:1]
}
@ -93,18 +93,18 @@ func (c *compactor) pick() []block {
return nil
}
func compactionMatch(blocks []block) bool {
func compactionMatch(blocks []Block) bool {
// TODO(fabxc): check whether combined size is below maxCompactionSize.
// Apply maximum time range? or number of series? – might already be covered by size implicitly.
// Naively check whether both blocks have roughly the same number of samples
// and whether the total sample count doesn't exceed 2GB chunk file size
// by rough approximation.
n := float64(blocks[0].stats().SampleCount)
n := float64(blocks[0].Stats().SampleCount)
t := n
for _, b := range blocks[1:] {
m := float64(b.stats().SampleCount)
m := float64(b.Stats().SampleCount)
if m < 0.8*n || m > 1.2*n {
return false
@ -116,17 +116,17 @@ func compactionMatch(blocks []block) bool {
return t < 10*200e6
}
func mergeStats(blocks ...block) (res BlockStats) {
res.MinTime = blocks[0].stats().MinTime
res.MaxTime = blocks[len(blocks)-1].stats().MaxTime
func mergeStats(blocks ...Block) (res BlockStats) {
res.MinTime = blocks[0].Stats().MinTime
res.MaxTime = blocks[len(blocks)-1].Stats().MaxTime
for _, b := range blocks {
res.SampleCount += b.stats().SampleCount
res.SampleCount += b.Stats().SampleCount
}
return res
}
func (c *compactor) compact(dir string, blocks ...block) (err error) {
func (c *compactor) compact(dir string, blocks ...Block) (err error) {
start := time.Now()
defer func() {
if err != nil {
@ -182,18 +182,18 @@ func (c *compactor) compact(dir string, blocks ...block) (err error) {
return nil
}
func (c *compactor) write(blocks []block, indexw IndexWriter, chunkw SeriesWriter) error {
func (c *compactor) write(blocks []Block, indexw IndexWriter, chunkw SeriesWriter) error {
var set compactionSet
for i, b := range blocks {
all, err := b.index().Postings("", "")
all, err := b.Index().Postings("", "")
if err != nil {
return err
}
// TODO(fabxc): find more transparent way of handling this.
if hb, ok := b.(*HeadBlock); ok {
if hb, ok := b.(*headBlock); ok {
all = hb.remapPostings(all)
}
s := newCompactionSeriesSet(b.index(), b.series(), all)
s := newCompactionSeriesSet(b.Index(), b.Series(), all)
if i == 0 {
set = s

62
db.go

@ -73,7 +73,7 @@ type DB struct {
mtx sync.RWMutex
persisted []*persistedBlock
heads []*HeadBlock
heads []*headBlock
compactor *compactor
compactc chan struct{}
@ -174,9 +174,9 @@ func (db *DB) run() {
}
// TODO(fabxc): pick emits blocks in order. compact acts on
// inverted order. Put inversion into compactor?
var bs []block
var bs []Block
for _, b := range blocks {
bs = append([]block{b}, bs...)
bs = append([]Block{b}, bs...)
}
select {
@ -195,15 +195,15 @@ func (db *DB) run() {
}
}
func (db *DB) compact(blocks []block) error {
func (db *DB) compact(blocks []Block) error {
if len(blocks) == 0 {
return nil
}
tmpdir := blocks[0].dir() + ".tmp"
tmpdir := blocks[0].Dir() + ".tmp"
// TODO(fabxc): find a better place to do this transparently.
for _, b := range blocks {
if h, ok := b.(*HeadBlock); ok {
if h, ok := b.(*headBlock); ok {
h.updateMapping()
}
}
@ -215,11 +215,11 @@ func (db *DB) compact(blocks []block) error {
db.mtx.Lock()
defer db.mtx.Unlock()
if err := renameDir(tmpdir, blocks[0].dir()); err != nil {
if err := renameDir(tmpdir, blocks[0].Dir()); err != nil {
return errors.Wrap(err, "rename dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.dir()); err != nil {
if err := os.RemoveAll(b.Dir()); err != nil {
return errors.Wrap(err, "delete dir")
}
}
@ -227,7 +227,7 @@ func (db *DB) compact(blocks []block) error {
var merr MultiError
for _, b := range blocks {
merr.Add(errors.Wrapf(db.reinit(b.dir()), "reinit block at %q", b.dir()))
merr.Add(errors.Wrapf(db.reinit(b.Dir()), "reinit block at %q", b.Dir()))
}
return merr.Err()
}
@ -248,7 +248,7 @@ func isBlockDir(fi os.FileInfo) bool {
func (db *DB) initBlocks() error {
var (
pbs []*persistedBlock
heads []*HeadBlock
heads []*headBlock
)
files, err := ioutil.ReadDir(db.dir)
@ -263,7 +263,7 @@ func (db *DB) initBlocks() error {
dir := filepath.Join(db.dir, fi.Name())
if fileutil.Exist(filepath.Join(dir, walFileName)) {
h, err := OpenHeadBlock(dir, db.logger)
h, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
}
@ -282,16 +282,16 @@ func (db *DB) initBlocks() error {
lastTime := int64(math.MinInt64)
for _, b := range pbs {
if b.stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.dir())
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.stats().MaxTime
lastTime = b.Stats().MaxTime
}
for _, b := range heads {
if b.stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.dir())
if b.Stats().MinTime < lastTime {
return errors.Errorf("illegal order for block at %q", b.Dir())
}
lastTime = b.stats().MaxTime
lastTime = b.Stats().MaxTime
}
db.persisted = pbs
@ -381,7 +381,7 @@ func (db *DB) appendBatch(samples []hashedSample) error {
func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads {
if b.dir() == dir {
if b.Dir() == dir {
return i, true
}
}
@ -390,7 +390,7 @@ func (db *DB) headForDir(dir string) (int, bool) {
func (db *DB) persistedForDir(dir string) (int, bool) {
for i, b := range db.persisted {
if b.dir() == dir {
if b.Dir() == dir {
return i, true
}
}
@ -441,13 +441,13 @@ func (db *DB) reinit(dir string) error {
return nil
}
func (db *DB) compactable() []block {
func (db *DB) compactable() []Block {
db.mtx.RLock()
defer db.mtx.RUnlock()
var blocks []block
var blocks []Block
for _, pb := range db.persisted {
blocks = append([]block{pb}, blocks...)
blocks = append([]Block{pb}, blocks...)
}
// threshold := db.heads[len(db.heads)-1].bstatdb.MaxTime - headGracePeriod
@ -458,7 +458,7 @@ func (db *DB) compactable() []block {
// }
// }
for _, hb := range db.heads[:len(db.heads)-1] {
blocks = append([]block{hb}, blocks...)
blocks = append([]Block{hb}, blocks...)
}
return blocks
@ -480,20 +480,18 @@ func intervalContains(min, max, t int64) bool {
// blocksForInterval returns all blocks within the partition that may contain
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []block {
var bs []block
func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block
for _, b := range db.persisted {
bmin, bmax := b.interval()
if intervalOverlap(mint, maxt, bmin, bmax) {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
bs = append(bs, b)
}
}
for _, b := range db.heads {
bmin, bmax := b.interval()
if intervalOverlap(mint, maxt, bmin, bmax) {
s := b.Stats()
if intervalOverlap(mint, maxt, s.MinTime, s.MaxTime) {
bs = append(bs, b)
}
}
@ -511,7 +509,7 @@ func (db *DB) cut() error {
if err != nil {
return err
}
newHead, err := OpenHeadBlock(dir, db.logger)
newHead, err := openHeadBlock(dir, db.logger)
if err != nil {
return err
}

144
head.go

@ -13,10 +13,10 @@ import (
"github.com/go-kit/kit/log"
)
// HeadBlock handles reads and writes of time series data within a time window.
type HeadBlock struct {
// headBlock handles reads and writes of time series data within a time window.
type headBlock struct {
mtx sync.RWMutex
d string
dir string
// descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID.
@ -33,18 +33,18 @@ type HeadBlock struct {
wal *WAL
bstats *BlockStats
stats *BlockStats
}
// OpenHeadBlock creates a new empty head block.
func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
// openHeadBlock creates a new empty head block.
func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
wal, err := OpenWAL(dir, log.NewContext(l).With("component", "wal"), 15*time.Second)
if err != nil {
return nil, err
}
b := &HeadBlock{
d: dir,
b := &headBlock{
dir: dir,
descs: []*chunkDesc{},
hashes: map[uint64][]*chunkDesc{},
values: map[string]stringset{},
@ -52,31 +52,25 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal: wal,
mapper: newPositionMapper(nil),
}
b.bstats = &BlockStats{
MinTime: math.MaxInt64,
MaxTime: math.MinInt64,
b.stats = &BlockStats{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}
err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) {
b.create(lset.Hash(), lset)
b.stats.SeriesCount++
b.stats.ChunkCount++ // head block has one chunk/series
},
sample: func(s hashedSample) {
cd := b.descs[s.ref]
// Duplicated from appendBatch – TODO(fabxc): deduplicate?
if cd.lastTimestamp == s.t && cd.lastValue != s.v {
return
}
cd.append(s.t, s.v)
if s.t > b.bstats.MaxTime {
b.bstats.MaxTime = s.t
if s.t > b.stats.MaxTime {
b.stats.MaxTime = s.t
}
if s.t < b.bstats.MinTime {
b.bstats.MinTime = s.t
}
b.bstats.SampleCount++
b.stats.SampleCount++
},
})
if err != nil {
@ -89,24 +83,29 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
}
// Close syncs all data and closes underlying resources of the head block.
func (h *HeadBlock) Close() error {
func (h *headBlock) Close() error {
return h.wal.Close()
}
func (h *HeadBlock) dir() string { return h.d }
func (h *HeadBlock) persisted() bool { return false }
func (h *HeadBlock) index() IndexReader { return h }
func (h *HeadBlock) series() SeriesReader { return h }
func (h *headBlock) Dir() string { return h.dir }
func (h *headBlock) Persisted() bool { return false }
func (h *headBlock) Index() IndexReader { return &headIndexReader{h} }
func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} }
// Stats returns statisitics about the indexed data.
func (h *headBlock) Stats() BlockStats {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
func (h *HeadBlock) stats() BlockStats {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return *h.stats
}
return *h.bstats
type headSeriesReader struct {
*headBlock
}
// Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -116,22 +115,6 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
return h.descs[int(ref)].chunk, nil
}
type headSeriesReader struct {
h *HeadBlock
}
func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) {
h.h.mtx.RLock()
defer h.h.mtx.RUnlock()
if int(ref) >= len(h.h.descs) {
return nil, errNotFound
}
return &safeChunk{
cd: h.h.descs[int(ref)],
}, nil
}
type safeChunk struct {
cd *chunkDesc
}
@ -147,21 +130,12 @@ func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") }
func (c *safeChunk) Bytes() []byte { panic("illegal") }
func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
func (h *HeadBlock) interval() (int64, int64) {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return h.bstats.MinTime, h.bstats.MaxTime
}
// Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (BlockStats, error) {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return *h.bstats, nil
type headIndexReader struct {
*headBlock
}
// LabelValues returns the possible label values
func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) {
func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -179,7 +153,7 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) {
}
// Postings returns the postings list iterator for the label pair.
func (h *HeadBlock) Postings(name, value string) (Postings, error) {
func (h *headIndexReader) Postings(name, value string) (Postings, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -187,7 +161,7 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) {
}
// Series returns the series for the given reference.
func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -206,7 +180,7 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
return cd.lset, []ChunkMeta{meta}, nil
}
func (h *HeadBlock) LabelIndices() ([][]string, error) {
func (h *headIndexReader) LabelIndices() ([][]string, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
@ -218,9 +192,15 @@ func (h *HeadBlock) LabelIndices() ([][]string, error) {
return res, nil
}
func (h *headIndexReader) Stats() (BlockStats, error) {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
return *h.stats, nil
}
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
func (h *headBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
cds := h.hashes[hash]
for _, cd := range cds {
@ -231,7 +211,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
return nil
}
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
cd := &chunkDesc{
lset: lset,
chunk: chunks.NewXORChunk(),
@ -274,9 +254,11 @@ var (
// ErrAmendSample is returned if an appended sample has the same timestamp
// as the most recent sample but a different value.
ErrAmendSample = errors.New("amending sample")
ErrOutOfBounds = errors.New("out of bounds")
)
func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) {
func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
// Find head chunks for all samples and allocate new IDs/refs for
// ones we haven't seen before.
var (
@ -370,31 +352,31 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) (int, error) {
}
}
h.bstats.mtx.Lock()
defer h.bstats.mtx.Unlock()
h.stats.mtx.Lock()
defer h.stats.mtx.Unlock()
h.bstats.SampleCount += total
h.bstats.SeriesCount += uint64(len(newSeries))
h.bstats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series
h.stats.SampleCount += total
h.stats.SeriesCount += uint64(len(newSeries))
h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series
if mint < h.bstats.MinTime {
h.bstats.MinTime = mint
if mint < h.stats.MinTime {
h.stats.MinTime = mint
}
if maxt > h.bstats.MaxTime {
h.bstats.MaxTime = maxt
if maxt > h.stats.MaxTime {
h.stats.MaxTime = maxt
}
return int(total), nil
}
func (h *HeadBlock) fullness() float64 {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
func (h *headBlock) fullness() float64 {
h.stats.mtx.RLock()
defer h.stats.mtx.RUnlock()
return float64(h.bstats.SampleCount) / float64(h.bstats.SeriesCount+1) / 250
return float64(h.stats.SampleCount) / float64(h.stats.SeriesCount+1) / 250
}
func (h *HeadBlock) updateMapping() {
func (h *headBlock) updateMapping() {
h.mtx.RLock()
if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) {
@ -418,7 +400,7 @@ func (h *HeadBlock) updateMapping() {
// of the series they reference.
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular
// postings set operations, i.e. intersect and merge.
func (h *HeadBlock) remapPostings(p Postings) Postings {
func (h *headBlock) remapPostings(p Postings) Postings {
list, err := expandPostings(p)
if err != nil {
return errPostings{err: err}

7
querier.go

@ -58,14 +58,13 @@ func (s *DB) Querier(mint, maxt int64) Querier {
q := &blockQuerier{
mint: mint,
maxt: maxt,
index: b.index(),
series: b.series(),
index: b.Index(),
series: b.Series(),
}
// TODO(fabxc): find nicer solution.
if hb, ok := b.(*HeadBlock); ok {
if hb, ok := b.(*headBlock); ok {
q.postingsMapper = hb.remapPostings
q.series = &headSeriesReader{h: hb}
}
sq.blocks = append(sq.blocks, q)

Loading…
Cancel
Save