diff --git a/block.go b/block.go index db63e14f2..14275247d 100644 --- a/block.go +++ b/block.go @@ -22,7 +22,7 @@ type Block interface { Index() IndexReader // Series returns a SeriesReader over the block's data. - Series() SeriesReader + Chunks() ChunkReader // Persisted returns whether the block is already persisted, // and no longer being appended to. @@ -64,9 +64,9 @@ type persistedBlock struct { dir string meta BlockMeta - chunksf, indexf *mmapFile + indexf *mmapFile - chunkr *seriesReader + chunkr *chunkReader indexr *indexReader } @@ -120,37 +120,36 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } - chunksf, err := openMmapFile(chunksFileName(dir)) + cr, err := newChunkReader(filepath.Join(dir, "chunks")) if err != nil { - return nil, errors.Wrap(err, "open chunk file") + return nil, err } + // ir, err := newIndexReader(dir) + // if err != nil { + // return nil, err + // } + indexf, err := openMmapFile(indexFileName(dir)) if err != nil { return nil, errors.Wrap(err, "open index file") } - - sr, err := newSeriesReader([][]byte{chunksf.b}) - if err != nil { - return nil, errors.Wrap(err, "create series reader") - } ir, err := newIndexReader(indexf.b) if err != nil { return nil, errors.Wrap(err, "create index reader") } pb := &persistedBlock{ - dir: dir, - meta: *meta, - chunksf: chunksf, - indexf: indexf, - chunkr: sr, - indexr: ir, + dir: dir, + meta: *meta, + indexf: indexf, + chunkr: cr, + indexr: ir, } return pb, nil } func (pb *persistedBlock) Close() error { - err0 := pb.chunksf.Close() + err0 := pb.chunkr.Close() err1 := pb.indexf.Close() if err0 != nil { @@ -159,11 +158,11 @@ func (pb *persistedBlock) Close() error { return err1 } -func (pb *persistedBlock) Dir() string { return pb.dir } -func (pb *persistedBlock) Persisted() bool { return true } -func (pb *persistedBlock) Index() IndexReader { return pb.indexr } -func (pb *persistedBlock) Series() SeriesReader { return pb.chunkr } -func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } +func (pb *persistedBlock) Dir() string { return pb.dir } +func (pb *persistedBlock) Persisted() bool { return true } +func (pb *persistedBlock) Index() IndexReader { return pb.indexr } +func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } +func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } func chunksFileName(path string) string { return filepath.Join(path, "chunks-000") diff --git a/compact.go b/compact.go index 6a0d67499..67d536b74 100644 --- a/compact.go +++ b/compact.go @@ -64,7 +64,7 @@ type compactionInfo struct { mint, maxt int64 } -const compactionBlocksLen = 4 +const compactionBlocksLen = 3 // pick returns a range [i, j) in the blocks that are suitable to be compacted // into a single block at position i. @@ -114,9 +114,6 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { func (c *compactor) match(bs []compactionInfo) bool { g := bs[0].generation - if g >= 5 { - return false - } for _, b := range bs { if b.generation == 0 { @@ -166,17 +163,16 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return err } - chunkf, err := os.OpenFile(chunksFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return errors.Wrap(err, "create chunk file") - } indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return errors.Wrap(err, "create index file") } indexw := newIndexWriter(indexf) - chunkw := newChunkWriter(chunkf) + chunkw, err := newChunkWriter(filepath.Join(dir, "chunks")) + if err != nil { + return errors.Wrap(err, "open chunk writer") + } if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -188,15 +184,9 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err = fileutil.Fsync(chunkf); err != nil { - return errors.Wrap(err, "fsync chunk file") - } if err = fileutil.Fsync(indexf); err != nil { return errors.Wrap(err, "fsync index file") } - if err = chunkf.Close(); err != nil { - return errors.Wrap(err, "close chunk file") - } if err = indexf.Close(); err != nil { return errors.Wrap(err, "close index file") } @@ -215,7 +205,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw if hb, ok := b.(*headBlock); ok { all = hb.remapPostings(all) } - s := newCompactionSeriesSet(b.Index(), b.Series(), all) + s := newCompactionSeriesSet(b.Index(), b.Chunks(), all) if i == 0 { set = s @@ -300,17 +290,17 @@ type compactionSet interface { type compactionSeriesSet struct { p Postings index IndexReader - series SeriesReader + chunks ChunkReader l labels.Labels c []ChunkMeta err error } -func newCompactionSeriesSet(i IndexReader, s SeriesReader, p Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, p Postings) *compactionSeriesSet { return &compactionSeriesSet{ index: i, - series: s, + chunks: c, p: p, } } @@ -327,7 +317,7 @@ func (c *compactionSeriesSet) Next() bool { for i := range c.c { chk := &c.c[i] - chk.Chunk, c.err = c.series.Chunk(chk.Ref) + chk.Chunk, c.err = c.chunks.Chunk(chk.Ref) if c.err != nil { return false } diff --git a/db.go b/db.go index 76794a98c..9fc8dde3c 100644 --- a/db.go +++ b/db.go @@ -153,8 +153,8 @@ func Open(dir string, l log.Logger, opts *Options) (db *DB, err error) { l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) } - var r prometheus.Registerer - // r := prometheus.DefaultRegisterer + // var r prometheus.Registerer + r := prometheus.DefaultRegisterer if opts == nil { opts = DefaultOptions @@ -307,7 +307,11 @@ func (db *DB) compact(i, j int) error { return errors.Wrap(err, "removing old block") } } - return db.retentionCutoff() + if err := db.retentionCutoff(); err != nil { + return err + } + + return nil } func (db *DB) retentionCutoff() error { diff --git a/head.go b/head.go index d9ab2944c..cd0167ffb 100644 --- a/head.go +++ b/head.go @@ -146,10 +146,10 @@ func (h *headBlock) Meta() BlockMeta { return h.meta } -func (h *headBlock) Dir() string { return h.dir } -func (h *headBlock) Persisted() bool { return false } -func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } -func (h *headBlock) Series() SeriesReader { return &headSeriesReader{h} } +func (h *headBlock) Dir() string { return h.dir } +func (h *headBlock) Persisted() bool { return false } +func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } func (h *headBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) @@ -359,12 +359,12 @@ func (a *headAppender) Rollback() error { return nil } -type headSeriesReader struct { +type headChunkReader struct { *headBlock } // Chunk returns the chunk for the reference number. -func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { h.mtx.RLock() defer h.mtx.RUnlock() diff --git a/querier.go b/querier.go index c09bab885..5f02c77f7 100644 --- a/querier.go +++ b/querier.go @@ -59,7 +59,7 @@ func (s *DB) Querier(mint, maxt int64) Querier { mint: mint, maxt: maxt, index: b.Index(), - series: b.Series(), + chunks: b.Chunks(), } // TODO(fabxc): find nicer solution. @@ -123,19 +123,19 @@ func (q *querier) Close() error { // blockQuerier provides querying access to a single block database. type blockQuerier struct { index IndexReader - series SeriesReader + chunks ChunkReader postingsMapper func(Postings) Postings mint, maxt int64 } -func newBlockQuerier(ix IndexReader, s SeriesReader, mint, maxt int64) *blockQuerier { +func newBlockQuerier(ix IndexReader, c ChunkReader, mint, maxt int64) *blockQuerier { return &blockQuerier{ mint: mint, maxt: maxt, index: ix, - series: s, + chunks: c, } } @@ -162,7 +162,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { return &blockSeriesSet{ index: q.index, - chunks: q.series, + chunks: q.chunks, it: p, absent: absent, mint: q.mint, @@ -425,7 +425,7 @@ func (s *partitionSeriesSet) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { index IndexReader - chunks SeriesReader + chunks ChunkReader it Postings // postings list referencing series absent []string // labels that must not be set for result series mint, maxt int64 // considered time range diff --git a/reader.go b/reader.go index 630c11a99..ee3738b89 100644 --- a/reader.go +++ b/reader.go @@ -3,6 +3,7 @@ package tsdb import ( "encoding/binary" "fmt" + "io" "strings" "github.com/fabxc/tsdb/chunks" @@ -10,23 +11,42 @@ import ( "github.com/pkg/errors" ) -// SeriesReader provides reading access of serialized time series data. -type SeriesReader interface { +// ChunkReader provides reading access of serialized time series data. +type ChunkReader interface { // Chunk returns the series data chunk with the given reference. Chunk(ref uint64) (chunks.Chunk, error) + + // Close releases all underlying resources of the reader. + Close() error } -// seriesReader implements a SeriesReader for a serialized byte stream +// chunkReader implements a SeriesReader for a serialized byte stream // of series data. -type seriesReader struct { +type chunkReader struct { // The underlying bytes holding the encoded series data. bs [][]byte + + cs []io.Closer } -func newSeriesReader(bs [][]byte) (*seriesReader, error) { - s := &seriesReader{bs: bs} +// newChunkReader returns a new chunkReader based on mmaped files found in dir. +func newChunkReader(dir string) (*chunkReader, error) { + files, err := sequenceFiles(dir, "") + if err != nil { + return nil, err + } + var cr chunkReader - for i, b := range bs { + for _, fn := range files { + f, err := openMmapFile(fn) + if err != nil { + return nil, errors.Wrapf(err, "mmap files") + } + cr.cs = append(cr.cs, f) + cr.bs = append(cr.bs, f.b) + } + + for i, b := range cr.bs { if len(b) < 4 { return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i) } @@ -35,10 +55,14 @@ func newSeriesReader(bs [][]byte) (*seriesReader, error) { return nil, fmt.Errorf("invalid magic number %x", m) } } - return s, nil + return &cr, nil } -func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (s *chunkReader) Close() error { + return closeAll(s.cs...) +} + +func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { var ( seq = int(ref >> 32) off = int((ref << 32) >> 32) diff --git a/writer.go b/writer.go index 6b4e31deb..6b98b2b95 100644 --- a/writer.go +++ b/writer.go @@ -6,10 +6,12 @@ import ( "hash" "hash/crc32" "io" + "os" "sort" "strings" "github.com/bradfitz/slice" + "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/pkg/errors" @@ -44,20 +46,109 @@ type ChunkWriter interface { // chunkWriter implements the ChunkWriter interface for the standard // serialization format. type chunkWriter struct { - ow io.Writer - w *bufio.Writer - n int64 - c int - crc32 hash.Hash + dirFile *os.File + files []*os.File + wbuf *bufio.Writer + n int64 + crc32 hash.Hash + + segmentSize int64 } -func newChunkWriter(w io.Writer) *chunkWriter { - return &chunkWriter{ - ow: w, - w: bufio.NewWriterSize(w, 1*1024*1024), - n: 0, - crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), +const ( + defaultChunkSegmentSize = 512 * 1024 * 1024 + + chunksFormatV1 = 1 + indexFormatV1 = 1 +) + +func newChunkWriter(dir string) (*chunkWriter, error) { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err } + dirFile, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + cw := &chunkWriter{ + dirFile: dirFile, + n: 0, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), + segmentSize: defaultChunkSegmentSize, + } + return cw, nil +} + +func (w *chunkWriter) tail() *os.File { + if len(w.files) == 0 { + return nil + } + return w.files[len(w.files)-1] +} + +// finalizeTail writes all pending data to the current tail file, +// truncates its size, and closes it. +func (w *chunkWriter) finalizeTail() error { + tf := w.tail() + if tf == nil { + return nil + } + + if err := w.wbuf.Flush(); err != nil { + return err + } + if err := fileutil.Fsync(tf); err != nil { + return err + } + // As the file was pre-allocated, we truncate any superfluous zero bytes. + off, err := tf.Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + if err := tf.Truncate(off); err != nil { + return err + } + return tf.Close() +} + +func (w *chunkWriter) cut() error { + // Sync current tail to disk and close. + w.finalizeTail() + + p, _, err := nextSequenceFile(w.dirFile.Name(), "") + if err != nil { + return err + } + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return err + } + if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { + return err + } + if err = w.dirFile.Sync(); err != nil { + return err + } + + // Write header metadata for new file. + + metab := make([]byte, 8) + binary.BigEndian.PutUint32(metab[:4], MagicSeries) + metab[4] = chunksFormatV1 + + if _, err := f.Write(metab); err != nil { + return err + } + + w.files = append(w.files, f) + if w.wbuf != nil { + w.wbuf.Reset(f) + } else { + w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) + } + w.n = 8 + + return nil } func (w *chunkWriter) write(wr io.Writer, b []byte) error { @@ -66,44 +157,40 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error { return err } -func (w *chunkWriter) writeMeta() error { - b := [8]byte{} - - binary.BigEndian.PutUint32(b[:4], MagicSeries) - b[4] = flagStd - - return w.write(w.w, b[:]) -} - func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { - // Initialize with meta data. - if w.n == 0 { - if err := w.writeMeta(); err != nil { + // Calculate maximum space we need and cut a new segment in case + // we don't fit into the current one. + maxLen := int64(binary.MaxVarintLen32) + for _, c := range chks { + maxLen += binary.MaxVarintLen32 + 1 + maxLen += int64(len(c.Chunk.Bytes())) + } + newsz := w.n + maxLen + + if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize { + if err := w.cut(); err != nil { return err } } + // Write chunks sequentially and set the reference field in the ChunkMeta. w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.w) + wr := io.MultiWriter(w.crc32, w.wbuf) - // For normal reads we don't need the number of the chunk section but - // it allows us to verify checksums without reading the index file. - // The offsets are also technically enough to calculate chunk size. but - // holding the length of each chunk could later allow for adding padding - // between chunks. - b := [binary.MaxVarintLen32]byte{} - n := binary.PutUvarint(b[:], uint64(len(chks))) + b := make([]byte, binary.MaxVarintLen32) + n := binary.PutUvarint(b, uint64(len(chks))) if err := w.write(wr, b[:n]); err != nil { return err } + seq := uint64(w.seq()) << 32 for i := range chks { chk := &chks[i] - chk.Ref = uint64(w.n) + chk.Ref = seq | uint64(w.n) - n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes()))) + n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) if err := w.write(wr, b[:n]); err != nil { return err @@ -117,24 +204,22 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { chk.Chunk = nil } - if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { + if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { return err } return nil } +func (w *chunkWriter) seq() int { + return len(w.files) - 1 +} + func (w *chunkWriter) Size() int64 { return w.n } func (w *chunkWriter) Close() error { - // Initialize block in case no data was written to it. - if w.n == 0 { - if err := w.writeMeta(); err != nil { - return err - } - } - return w.w.Flush() + return w.finalizeTail() } // ChunkMeta holds information about a chunk of data.