Merge branch 'master' into update-makefile-common

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
pull/5805/head
Simon Pasquier 2019-01-22 10:47:31 +01:00
commit 2e69508536
18 changed files with 1311 additions and 517 deletions

View File

@ -1,7 +1,15 @@
## master / unreleased
## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.

View File

@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}
// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}
// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}
// BlockDesc describes a block by ULID and time range.
@ -182,6 +191,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
@ -257,7 +269,10 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
@ -272,11 +287,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}
tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}
// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}
pb := &Block{
dir: dir,
meta: *meta,
@ -288,6 +312,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}
func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}
// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
@ -315,6 +349,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }
// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

View File

@ -45,15 +45,15 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)
blockDir := createBlock(t, tmpdir, 0, 0, 0)
b, err := OpenBlock(blockDir, nil)
blockDir := createBlock(t, tmpdir, 1, 0, 0)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
b, err = OpenBlock(blockDir, nil)
b, err = OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())
@ -68,17 +68,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin
lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries)
testutil.Ok(tb, err)
var ref uint64
refs := make([]uint64, nSeries)
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
for _, lbl := range lbls {
err := app.AddFast(ref, ts, rand.Float64())
if err == nil {
continue
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], ts, rand.Float64())
if err == nil {
continue
}
}
ref, err = app.Add(lbl, int64(ts), rand.Float64())
ref, err := app.Add(lbl, int64(ts), rand.Float64())
testutil.Ok(tb, err)
refs[i] = ref
}
err := app.Commit()
testutil.Ok(tb, err)
@ -91,6 +94,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)
return filepath.Join(dir, ulid.String())
}

View File

@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
defer sgmReader.Close()
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to))
cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
return nil, errors.Wrap(err, "open checkpoint")
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
r := wal.NewReader(sgmReader)
var (

View File

@ -15,11 +15,14 @@
package tsdb
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) {
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
// Create a new wal with an invalid records.
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer os.RemoveAll(dir)
w, err := wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()
// Run the checkpoint and since the wal contains an invalid records this should return an error.
_, err = Checkpoint(w, 0, 1, nil, 0)
testutil.NotOk(t, err)
// Walk the wal dir to make sure there are no tmp folder left behind after the error.
err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrapf(err, "access err %q: %v\n", path, err)
}
if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") {
return fmt.Errorf("wal dir contains temporary folder:%s", info.Name())
}
return nil
})
testutil.Ok(t, err)
}

View File

@ -203,6 +203,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
maxLen += int64(len(c.Chunk.Bytes()))
maxLen += 4 // The 4 bytes of crc32
}
newsz := w.n + maxLen
@ -282,17 +283,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a SeriesReader for a serialized byte stream
// of series data.
type Reader struct {
// The underlying bytes holding the encoded series data.
bs []ByteSlice
// Closers for resources behind the byte slices.
cs []io.Closer
bs []ByteSlice // The underlying bytes holding the encoded series data.
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}
func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64
for i, b := range cr.bs {
if b.Len() < 4 {
@ -302,7 +301,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m)
}
totalSize += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}
@ -325,9 +326,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
pool = chunkenc.NewPool()
}
var bs []ByteSlice
var cs []io.Closer
var (
bs []ByteSlice
cs []io.Closer
)
for _, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
@ -343,6 +345,11 @@ func (s *Reader) Close() error {
return closeAll(s.cs...)
}
// Size returns the size of the chunks.
func (s *Reader) Size() int64 {
return s.size
}
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var (
seq = int(ref >> 32)

View File

@ -138,7 +138,7 @@ func (b *writeBenchmark) run() {
}
b.storage = st
var metrics []labels.Labels
var labels []labels.Labels
measureTime("readData", func() {
f, err := os.Open(b.samplesFile)
@ -147,7 +147,7 @@ func (b *writeBenchmark) run() {
}
defer f.Close()
metrics, err = readPrometheusLabels(f, b.numMetrics)
labels, err = readPrometheusLabels(f, b.numMetrics)
if err != nil {
exitWithError(err)
}
@ -157,7 +157,7 @@ func (b *writeBenchmark) run() {
dur := measureTime("ingestScrapes", func() {
b.startProfiling()
total, err = b.ingestScrapes(metrics, 3000)
total, err = b.ingestScrapes(labels, 3000)
if err != nil {
exitWithError(err)
}
@ -213,7 +213,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
return total, nil
}
func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
ts := baset
type sample struct {
@ -222,9 +222,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
ref *uint64
}
scrape := make([]*sample, 0, len(metrics))
scrape := make([]*sample, 0, len(lbls))
for _, m := range metrics {
for _, m := range lbls {
scrape = append(scrape, &sample{
labels: m,
value: 123456789,

View File

@ -55,12 +55,17 @@ type Compactor interface {
Plan(dir string) ([]string, error)
// Write persists a Block into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
// When resulting Block has 0 samples
// * No block is written.
// * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}
@ -185,13 +190,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil
}
// Compact any blocks that have >5% tombstones.
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
}
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
}
@ -346,7 +350,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil {
return uid, err
}
@ -365,15 +369,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(b.dir, &b.meta); err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
}
return uid, nil
}
@ -412,6 +435,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return uid, err
}
if meta.Stats.NumSamples == 0 {
return ulid.ULID{}, nil
}
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil
}
@ -489,11 +516,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
@ -505,6 +527,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer")
}
// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil
}
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")

303
db.go
View File

@ -58,6 +58,13 @@ type Options struct {
// Duration of persisted data to keep.
RetentionDuration uint64
// Maximum number of bytes in blocks to be retained.
// 0 or less means disabled.
// NOTE: For proper storage calculations need to consider
// the size of the WAL folder which is not added when calculating
// the current size of the database.
MaxBytes int64
// The sizes of the Blocks.
BlockRanges []int64
@ -127,11 +134,12 @@ type dbMetrics struct {
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
timeRetentionCount prometheus.Counter
compactionsSkipped prometheus.Counter
cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram
blocksBytes prometheus.Gauge
sizeRetentionCount prometheus.Counter
}
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
})
m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_time_retentions_total",
Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.",
})
m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_skipped_total",
Help: "Total number of skipped compactions due to disabled auto compaction.",
})
m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_total",
Help: "Number of times the database cut off block data from disk.",
})
m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.",
})
m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp",
Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.",
@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
})
m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_storage_blocks_bytes_total",
Help: "The number of bytes that are currently used for local storage by all blocks.",
})
m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_size_retentions_total",
Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.",
})
if r != nil {
r.MustRegister(
@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m.symbolTableSize,
m.reloads,
m.reloadsFailed,
m.cutoffs,
m.cutoffsFailed,
m.timeRetentionCount,
m.compactionsTriggered,
m.startTime,
m.tombCleanTimer,
m.blocksBytes,
m.sizeRetentionCount,
)
}
return m
@ -340,25 +353,6 @@ func (db *DB) run() {
}
}
func (db *DB) beyondRetention(meta *BlockMeta) bool {
if db.opts.RetentionDuration == 0 {
return false
}
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
if len(blocks) == 0 {
return false
}
last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return meta.MaxTime < mint
}
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
@ -423,7 +417,8 @@ func (db *DB) compact() (err error) {
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}
@ -432,6 +427,14 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil {
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
runtime.GC()
}
@ -474,8 +477,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
return nil, false
}
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
// a list of block directories which should be deleted during reload.
// reload blocks and trigger head truncation if new blocks appeared.
// Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) {
defer func() {
@ -485,112 +487,193 @@ func (db *DB) reload() (err error) {
db.metrics.reloads.Inc()
}()
dirs, err := blockDirs(db.dir)
loadable, corrupted, err := db.openBlocks()
if err != nil {
return errors.Wrap(err, "find blocks")
return err
}
// We delete old blocks that have been superseded by new ones by gathering all parents
// from existing blocks. Those parents all have newer replacements and can be safely deleted
// after we loaded the other blocks.
// This makes us resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
// blocks with their parents, we can pick up the deletion where it left off during a crash.
var (
blocks []*Block
corrupted = map[ulid.ULID]error{}
opened = map[ulid.ULID]struct{}{}
deleteable = map[ulid.ULID]struct{}{}
)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
// The block was potentially in the middle of being deleted during a crash.
// Skip it since we may delete it properly further down again.
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
ulid, err2 := ulid.Parse(filepath.Base(dir))
if err2 != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
corrupted[ulid] = err
continue
}
if db.beyondRetention(meta) {
deleteable[meta.ULID] = struct{}{}
continue
}
for _, b := range meta.Compaction.Parents {
deleteable[b.ULID] = struct{}{}
deletable := db.deletableBlocks(loadable)
// Corrupted blocks that have been replaced by parents can be safely ignored and deleted.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range loadable {
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
deletable[b.ULID] = nil
}
}
// Blocks we failed to open should all be those we are want to delete anyway.
for c, err := range corrupted {
if _, ok := deleteable[c]; !ok {
return errors.Wrapf(err, "unexpected corrupted block %s", c)
}
if len(corrupted) > 0 {
return errors.Wrap(err, "unexpected corrupted block")
}
// Load new blocks into memory.
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
return errors.Wrapf(err, "read meta information %s", dir)
}
// Don't load blocks that are scheduled for deletion.
if _, ok := deleteable[meta.ULID]; ok {
// All deletable blocks should not be loaded.
var (
bb []*Block
blocksSize int64
)
for _, block := range loadable {
if _, ok := deletable[block.Meta().ULID]; ok {
deletable[block.Meta().ULID] = block
continue
}
// See if we already have the block in memory or open it otherwise.
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = OpenBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
}
}
blocks = append(blocks, b)
opened[meta.ULID] = struct{}{}
bb = append(bb, block)
blocksSize += block.Size()
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
loadable = bb
db.metrics.blocksBytes.Set(float64(blocksSize))
sort.Slice(loadable, func(i, j int) bool {
return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime
})
if err := validateBlockSequence(blocks); err != nil {
if err := validateBlockSequence(loadable); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// Swap in new blocks first for subsequently created readers to be seen.
// Then close previous blocks, which may block for pending readers to complete.
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = blocks
db.blocks = loadable
db.mtx.Unlock()
// Drop old blocks from memory.
for _, b := range oldBlocks {
if _, ok := opened[b.Meta().ULID]; ok {
continue
}
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
if _, ok := deletable[b.Meta().ULID]; ok {
deletable[b.Meta().ULID] = b
}
}
// Delete all obsolete blocks. None of them are opened any longer.
for ulid := range deleteable {
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
if err := db.deleteBlocks(deletable); err != nil {
return err
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
if len(loadable) == 0 {
return nil
}
maxt := blocks[len(blocks)-1].Meta().MaxTime
maxt := loadable[len(loadable)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
dirs, err := blockDirs(db.dir)
if err != nil {
return nil, nil, errors.Wrap(err, "find blocks")
}
corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
}
// See if we already have the block in memory or open it otherwise.
block, ok := db.getBlock(meta.ULID)
if !ok {
block, err = OpenBlock(db.logger, dir, db.chunkPool)
if err != nil {
corrupted[meta.ULID] = err
continue
}
}
blocks = append(blocks, block)
}
return blocks, corrupted, nil
}
// deletableBlocks returns all blocks past retention policy.
func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
deletable := make(map[ulid.ULID]*Block)
// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime
})
for _, block := range blocks {
if block.Meta().Compaction.Deletable {
deletable[block.Meta().ULID] = block
}
}
for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
}
for ulid, block := range db.beyondSizeRetention(blocks) {
deletable[ulid] = block
}
return deletable
}
func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Time retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deleteable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) {
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.timeRetentionCount.Inc()
break
}
}
return deleteable
}
func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) {
// Size retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {
return
}
deleteable = make(map[ulid.ULID]*Block)
blocksSize := int64(0)
for i, block := range blocks {
blocksSize += block.Size()
if blocksSize > db.opts.MaxBytes {
// Add this and all following blocks for deletion.
for _, b := range blocks[i:] {
deleteable[b.meta.ULID] = b
}
db.metrics.sizeRetentionCount.Inc()
break
}
}
return deleteable
}
// deleteBlocks closes and deletes blocks from the disk.
// When the map contains a non nil block object it means it is loaded in memory
// so needs to be closed first as it might need to wait for pending readers to complete.
func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
for ulid, block := range blocks {
if block != nil {
if err := block.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
}
}
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid)
}
}
return nil
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) <= 1 {

View File

@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) {
testutil.Ok(t, db.reload())
blocks := db.Blocks()
for _, b := range blocks {
b.meta.Stats.NumBytes = 0
}
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
@ -829,8 +832,8 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 0, 0, 0)
block, err := OpenBlock(blockDir, nil)
blockDir := createBlock(t, db.Dir(), 1, 0, 0)
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
tomb := newMemTombstones()
@ -873,7 +876,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}
block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil)
testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
@ -897,59 +900,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block)
}
func TestDB_Retention(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
app := db.Appender()
_, err := app.Add(lbls, 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
// create snapshot to make it create a block.
// TODO(gouthamve): Add a method to compact headblock.
snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))
app = db.Appender()
_, err = app.Add(lbls, 100, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
// Snapshot again to create another block.
snap, err = ioutil.TempDir("", "snap")
testutil.Ok(t, err)
defer os.RemoveAll(snap)
testutil.Ok(t, db.Snapshot(snap, true))
testutil.Ok(t, db.Close())
// reopen DB from snapshot
db, err = Open(snap, nil, nil, &Options{
RetentionDuration: 10,
BlockRanges: []int64{50},
func TestTimeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{1000},
})
testutil.Ok(t, err)
defer close()
defer db.Close()
testutil.Equals(t, 2, len(db.blocks))
blocks := []*BlockMeta{
{MinTime: 500, MaxTime: 900}, // Oldest block
{MinTime: 1000, MaxTime: 1500},
{MinTime: 1500, MaxTime: 2000}, // Newest Block
}
// Reload blocks, which should drop blocks beyond the retention boundary.
for _, m := range blocks {
createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime)
}
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime)
testutil.Ok(t, db.reload())
testutil.Equals(t, 1, len(db.blocks))
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
expBlocks := blocks[1:]
actBlocks := db.Blocks()
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch")
testutil.Equals(t, len(expBlocks), len(actBlocks))
testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime)
testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime)
}
func TestSizeRetention(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
blocks := []*BlockMeta{
{MinTime: 100, MaxTime: 200}, // Oldest block
{MinTime: 200, MaxTime: 300},
{MinTime: 300, MaxTime: 400},
{MinTime: 400, MaxTime: 500},
{MinTime: 500, MaxTime: 600}, // Newest Block
}
for _, m := range blocks {
createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime)
}
// Test that registered size matches the actual disk size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
actSize := dbDiskSize(db.Dir())
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
// Decrease the max bytes limit so that a delete is triggered.
// Check total size, total count and check that the oldest block was deleted.
firstBlockSize := db.Blocks()[0].Size()
sizeLimit := actSize - firstBlockSize
db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
expBlocks := blocks[1:]
actBlocks := db.Blocks()
expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
actSize = dbDiskSize(db.Dir())
testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit)
testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1)
testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block")
testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block")
}
func dbDiskSize(dir string) int64 {
var statSize int64
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == tombstoneFilename {
statSize += info.Size()
}
return nil
})
return statSize
}
func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
@ -1318,6 +1360,109 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}
func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
db.DisableCompactions()
rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")
t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
testutil.Ok(t, db.compact())
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
})
t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
app = db.Appender()
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
// Adding new blocks.
currentTime := db.Head().MaxTime()
_, err = app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.compact())
testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
})
t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks()
app := db.Appender()
currentTime := db.Head().MaxTime()
_, err := app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
testutil.Equals(t, oldBlocks, db.Blocks())
})
t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
currentTime := db.Head().MaxTime()
blocks := []*BlockMeta{
{MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]},
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
}
for _, m := range blocks {
createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime)
}
oldBlocks := db.Blocks()
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones")
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
})
}
func TestDB_LabelNames(t *testing.T) {
tests := []struct {
// Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
@ -1422,12 +1567,13 @@ func TestCorrectNumTombstones(t *testing.T) {
defer db.Close()
blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value)
app := db.Appender()
for i := int64(0); i < 3; i++ {
for j := int64(0); j < 15; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
@ -1437,17 +1583,17 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))
testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(0, 1, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
// {0, 1} and {2, 3} are merged to form 1 tombstone.
testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(2, 3, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(5, 6, defaultMatcher))
testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(9, 11, defaultMatcher))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

View File

@ -18,6 +18,8 @@ import (
"hash"
"hash/crc32"
"unsafe"
"github.com/pkg/errors"
)
// enbuf is a helper type to populate a byte slice with various types.
@ -83,6 +85,60 @@ type decbuf struct {
e error
}
// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func newDecbufAt(bs ByteSlice, off int) decbuf {
if bs.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if bs.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if bs.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := bs.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if bs.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = bs.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }

View File

@ -20,6 +20,7 @@ import (
"hash"
"hash/crc32"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
@ -35,9 +36,13 @@ import (
const (
// MagicIndex 4 bytes at the head of an index file.
MagicIndex = 0xBAAAD700
// HeaderLen represents number of bytes reserved of index for header.
HeaderLen = 5
indexFormatV1 = 1
indexFormatV2 = 2
// FormatV1 represents 1 version of index.
FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
labelNameSeperator = "\xff"
)
@ -108,7 +113,7 @@ type Writer struct {
fbuf *bufio.Writer
pos uint64
toc indexTOC
toc TOC
stage indexWriterStage
// Reusable memory.
@ -129,13 +134,42 @@ type Writer struct {
Version int
}
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
// TOC represents index Table Of Content that states where each section of index starts.
type TOC struct {
Symbols uint64
Series uint64
LabelIndices uint64
LabelIndicesTable uint64
Postings uint64
PostingsTable uint64
}
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
if bs.Len() < indexTOCLen {
return nil, errInvalidSize
}
b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return nil, errors.Wrap(errInvalidChecksum, "read TOC")
}
if err := d.err(); err != nil {
return nil, err
}
return &TOC{
Symbols: d.be64(),
Series: d.be64(),
LabelIndices: d.be64(),
LabelIndicesTable: d.be64(),
Postings: d.be64(),
PostingsTable: d.be64(),
}, nil
}
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.symbols = w.pos
w.toc.Symbols = w.pos
case idxStageSeries:
w.toc.series = w.pos
w.toc.Series = w.pos
case idxStageLabelIndex:
w.toc.labelIndices = w.pos
w.toc.LabelIndices = w.pos
case idxStagePostings:
w.toc.postings = w.pos
w.toc.Postings = w.pos
case idxStageDone:
w.toc.labelIndicesTable = w.pos
w.toc.LabelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postingsTable = w.pos
w.toc.PostingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV2)
w.buf1.putByte(FormatV2)
return w.write(w.buf1.get())
}
@ -436,12 +470,12 @@ const indexTOCLen = 6*8 + 4
func (w *Writer) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putBE64(w.toc.Symbols)
w.buf1.putBE64(w.toc.Series)
w.buf1.putBE64(w.toc.LabelIndices)
w.buf1.putBE64(w.toc.LabelIndicesTable)
w.buf1.putBE64(w.toc.Postings)
w.buf1.putBE64(w.toc.PostingsTable)
w.buf1.putHash(w.crc32)
@ -533,15 +567,14 @@ type StringTuples interface {
}
type Reader struct {
// The underlying byte slice holding the encoded series data.
b ByteSlice
toc indexTOC
b ByteSlice
// Close that releases the underlying resources of the byte slice.
c io.Closer
// Cached hashmaps of section offsets.
labels map[string]uint64
labels map[string]uint64
// LabelName to LabelValue to offset map.
postings map[string]map[string]uint64
// Cache of read symbols. Strings that are returned when reading from the
// block are always backed by true strings held in here rather than
@ -549,13 +582,12 @@ type Reader struct {
// prevents memory faults when applications work with read symbols after
// the block has been unmapped. The older format has sparse indexes so a map
// must be used, but the new format is not so we can use a slice.
symbols map[uint32]string
symbolSlice []string
symbolsV1 map[uint32]string
symbolsV2 []string
symbolsTableSize uint64
dec *Decoder
crc32 hash.Hash32
version int
}
@ -584,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// NewReader returns a new IndexReader on the given byte slice. It automatically
// NewReader returns a new index reader on the given byte slice. It automatically
// handles different format versions.
func NewReader(b ByteSlice) (*Reader, error) {
return newReader(b, nil)
return newReader(b, ioutil.NopCloser(nil))
}
// NewFileReader returns a new index reader against the given index file.
@ -603,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &Reader{
b: b,
c: c,
symbols: map[uint32]string{},
labels: map[string]uint64{},
postings: map[string]map[string]uint64{},
crc32: newCRC32(),
}
// Verify header.
if b.Len() < 5 {
if r.b.Len() < HeaderLen {
return nil, errors.Wrap(errInvalidSize, "index header")
}
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
@ -618,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
}
r.version = int(r.b.Range(4, 5)[0])
if r.version != indexFormatV1 && r.version != indexFormatV2 {
if r.version != FormatV1 && r.version != FormatV2 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}
if err := r.readTOC(); err != nil {
toc, err := NewTOCFromByteSlice(b)
if err != nil {
return nil, errors.Wrap(err, "read TOC")
}
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
}
var err error
// Use the strings already allocated by symbols, rather than
// re-allocating them again below.
symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice))
for _, s := range r.symbols {
symbols[s] = s
// Additionally, calculate symbolsTableSize.
allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2))
for _, s := range r.symbolsV1 {
r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
}
for _, s := range r.symbolSlice {
symbols[s] = s
for _, s := range r.symbolsV2 {
r.symbolsTableSize += uint64(len(s) + 8)
allocatedSymbols[s] = s
}
err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error {
if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error {
if len(key) != 1 {
return errors.Errorf("unexpected key length %d", len(key))
return errors.Errorf("unexpected key length for label indices table %d", len(key))
}
r.labels[symbols[key[0]]] = off
r.labels[allocatedSymbols[key[0]]] = off
return nil
})
if err != nil {
}); err != nil {
return nil, errors.Wrap(err, "read label index table")
}
r.postings[""] = map[string]uint64{}
err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error {
if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error {
if len(key) != 2 {
return errors.Errorf("unexpected key length %d", len(key))
return errors.Errorf("unexpected key length for posting table %d", len(key))
}
if _, ok := r.postings[key[0]]; !ok {
r.postings[symbols[key[0]]] = map[string]uint64{}
r.postings[allocatedSymbols[key[0]]] = map[string]uint64{}
}
r.postings[key[0]][symbols[key[1]]] = off
r.postings[key[0]][allocatedSymbols[key[1]]] = off
return nil
})
if err != nil {
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
r.dec = &Decoder{lookupSymbol: r.lookupSymbol}
r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
return r, nil
}
@ -687,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
for k, e := range r.postings {
for v, start := range e {
d := r.decbufAt(int(start))
d := newDecbufAt(r.b, int(start))
if d.err() != nil {
return nil, d.err()
}
@ -700,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
return m, nil
}
func (r *Reader) readTOC() error {
if r.b.Len() < indexTOCLen {
return errInvalidSize
}
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len())
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
d := decbuf{b: b[:len(b)-4]}
if d.crc32() != expCRC {
return errors.Wrap(errInvalidChecksum, "read TOC")
}
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
return d.err()
}
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufAt(off int) decbuf {
if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+4)
l := int(binary.BigEndian.Uint32(b))
if r.b.Len() < off+4+l+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+4, off+4+l+4)
dec := decbuf{b: b[:len(b)-4]}
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum.
func (r *Reader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 {
return decbuf{e: errInvalidSize}
}
b := r.b.Range(off, off+binary.MaxVarintLen32)
l, n := binary.Uvarint(b)
if n <= 0 || n > binary.MaxVarintLen32 {
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
}
if r.b.Len() < off+n+int(l)+4 {
return decbuf{e: errInvalidSize}
}
// Load bytes holding the contents plus a CRC32 checksum.
b = r.b.Range(off+n, off+n+int(l)+4)
dec := decbuf{b: b[:len(b)-4]}
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
return decbuf{e: errInvalidChecksum}
}
return dec
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed.
func (r *Reader) readSymbols(off int) error {
func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
if off == 0 {
return nil
return nil, nil, nil
}
d := r.decbufAt(off)
d := newDecbufAt(bs, off)
var (
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len())
origLen = d.len()
cnt = d.be32int()
basePos = uint32(off) + 4
nextPos = basePos + uint32(origLen-d.len())
symbolSlice []string
symbols = map[uint32]string{}
)
if r.version == indexFormatV2 {
r.symbolSlice = make([]string, 0, cnt)
if version == FormatV2 {
symbolSlice = make([]string, 0, cnt)
}
for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr()
if r.version == indexFormatV2 {
r.symbolSlice = append(r.symbolSlice, s)
if version == FormatV2 {
symbolSlice = append(symbolSlice, s)
} else {
r.symbols[nextPos] = s
symbols[nextPos] = s
nextPos = basePos + uint32(origLen-d.len())
}
cnt--
}
return errors.Wrap(d.err(), "read symbols")
return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols")
}
// readOffsetTable reads an offset table at the given position calls f for each
// found entry.f
// If f returns an error it stops decoding and returns the received error,
func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error {
d := r.decbufAt(int(off))
// ReadOffsetTable reads an offset table and at the given position calls f for each
// found entry. If f returns an error it stops decoding and returns the received error.
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
d := newDecbufAt(bs, int(off))
cnt := d.be32()
for d.err() == nil && d.len() > 0 && cnt > 0 {
@ -842,10 +801,10 @@ func (r *Reader) Close() error {
}
func (r *Reader) lookupSymbol(o uint32) (string, error) {
if int(o) < len(r.symbolSlice) {
return r.symbolSlice[o], nil
if int(o) < len(r.symbolsV2) {
return r.symbolsV2[o], nil
}
s, ok := r.symbols[o]
s, ok := r.symbolsV1[o]
if !ok {
return "", errors.Errorf("unknown symbol offset %d", o)
}
@ -854,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) {
// Symbols returns a set of symbols that exist within the index.
func (r *Reader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols))
res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2))
for _, s := range r.symbols {
for _, s := range r.symbolsV1 {
res[s] = struct{}{}
}
for _, s := range r.symbolSlice {
for _, s := range r.symbolsV2 {
res[s] = struct{}{}
}
return res, nil
}
// SymbolTableSize returns the symbol table that is used to resolve symbol references.
// SymbolTableSize returns the symbol table size in bytes.
func (r *Reader) SymbolTableSize() uint64 {
var size int
for _, s := range r.symbols {
size += len(s) + 8
}
for _, s := range r.symbolSlice {
size += len(s) + 8
}
return uint64(size)
return r.symbolsTableSize
}
// LabelValues returns value tuples that exist for the given label name tuples.
@ -889,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist")
}
d := r.decbufAt(int(off))
d := newDecbufAt(r.b, int(off))
nc := d.be32int()
d.be32() // consume unused value entry count.
@ -913,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 }
// LabelIndices returns a slice of label names for which labels or label tuples value indices exist.
// NOTE: This is deprecated. Use `LabelNames()` instead.
func (r *Reader) LabelIndices() ([][]string, error) {
res := [][]string{}
var res [][]string
for s := range r.labels {
res = append(res, strings.Split(s, labelNameSeperator))
}
@ -925,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == indexFormatV2 {
if r.version == FormatV2 {
offset = id * 16
}
d := r.decbufUvarintAt(int(offset))
d := newDecbufUvarintAt(r.b, int(offset))
if d.err() != nil {
return d.err()
}
@ -945,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
if !ok {
return EmptyPostings(), nil
}
d := r.decbufAt(int(off))
d := newDecbufAt(r.b, int(off))
if d.err() != nil {
return nil, errors.Wrap(d.err(), "get postings entry")
}
@ -962,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings {
return p
}
// Size returns the size of an index file.
func (r *Reader) Size() int64 {
return int64(r.b.Len())
}
// LabelNames returns all the unique label names present in the index.
func (r *Reader) LabelNames() ([]string, error) {
labelNamesMap := make(map[string]struct{}, len(r.labels))
@ -1059,7 +1016,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) {
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type Decoder struct {
lookupSymbol func(uint32) (string, error)
LookupSymbol func(uint32) (string, error)
}
// Postings returns a postings list for b and its number of elements.
@ -1087,11 +1044,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
return errors.Wrap(d.err(), "read series label offsets")
}
ln, err := dec.lookupSymbol(lno)
ln, err := dec.LookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
}
lv, err := dec.lookupSymbol(lvo)
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
}

View File

@ -380,13 +380,28 @@ func TestPersistence_index_e2e(t *testing.T) {
}
}
gotSymbols, err := ir.Symbols()
testutil.Ok(t, err)
testutil.Equals(t, len(mi.symbols), len(gotSymbols))
for s := range mi.symbols {
_, ok := gotSymbols[s]
testutil.Assert(t, ok, "")
}
testutil.Ok(t, ir.Close())
}
func TestDecbufUvariantWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
db := newDecbufUvarintAt(b, 0)
testutil.NotOk(t, db.err())
}
func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
r := &Reader{b: b}
db := r.decbufUvarintAt(0)
testutil.NotOk(t, db.err())
_, err := NewReader(b)
testutil.NotOk(t, err)
}

View File

@ -1206,12 +1206,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
func BenchmarkPersistedQueries(b *testing.B) {
for _, nSeries := range []int{10, 100} {
for _, nSamples := range []int{1000, 10000, 100000} {
for _, nSamples := range []int64{1000, 10000, 100000} {
b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil)
block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil)
testutil.Ok(b, err)
defer block.Close()

View File

@ -113,37 +113,41 @@ type Stone struct {
intervals Intervals
}
func readTombstones(dir string) (TombstoneReader, error) {
func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), nil
return newMemTombstones(), nil, nil
} else if err != nil {
return nil, err
return nil, nil, err
}
sr := &TombstoneFile{
size: int64(len(b)),
}
if len(b) < 5 {
return nil, errors.Wrap(errInvalidSize, "tombstones header")
return nil, sr, errors.Wrap(errInvalidSize, "tombstones header")
}
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
if mg := d.be32(); mg != MagicTombstone {
return nil, fmt.Errorf("invalid magic number %x", mg)
return nil, sr, fmt.Errorf("invalid magic number %x", mg)
}
if flag := d.byte(); flag != tombstoneFormatV1 {
return nil, fmt.Errorf("invalid tombstone format %x", flag)
return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
}
if d.err() != nil {
return nil, d.err()
return nil, sr, d.err()
}
// Verify checksum.
hash := newCRC32()
if _, err := hash.Write(d.get()); err != nil {
return nil, errors.Wrap(err, "write to hash")
return nil, sr, errors.Wrap(err, "write to hash")
}
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, errors.New("checksum did not match")
return nil, sr, errors.New("checksum did not match")
}
stonesMap := newMemTombstones()
@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) {
mint := d.varint64()
maxt := d.varint64()
if d.err() != nil {
return nil, d.err()
return nil, sr, d.err()
}
stonesMap.addInterval(k, Interval{mint, maxt})
}
return stonesMap, nil
return stonesMap, sr, nil
}
type memTombstones struct {
@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}
// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
}
// Size returns the tombstone file size.
func (t *TombstoneFile) Size() int64 {
return t.size
}
func (*memTombstones) Close() error {
return nil
}

View File

@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
testutil.Ok(t, writeTombstoneFile(tmpdir, stones))
restr, err := readTombstones(tmpdir)
restr, _, err := readTombstones(tmpdir)
testutil.Ok(t, err)
// Compare the two readers.

View File

@ -164,6 +164,7 @@ type WAL struct {
page *page // active page
stopc chan chan struct{}
actorc chan func()
closed bool // To allow calling Close() more than once without blocking.
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
if w.closed {
return nil
}
// Flush the last page and zero out all its remaining size.
// We must not flush an empty page as it would falsely signal
// the segment is done if we start writing to it again after opening.
@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) {
if err := w.segment.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
w.closed = true
return nil
}
@ -827,28 +832,13 @@ func (r *Reader) next() (err error) {
}
r.rec = append(r.rec, buf[:length]...)
switch r.curRecTyp {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
return nil
default:
return errors.Errorf("unexpected record type %d", r.curRecTyp)
if err := validateRecord(r.curRecTyp, i); err != nil {
return err
}
if r.curRecTyp == recLast || r.curRecTyp == recFull {
return nil
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
@ -899,6 +889,226 @@ func (r *Reader) Offset() int64 {
return r.total
}
// NewLiveReader returns a new live reader.
func NewLiveReader(r io.Reader) *LiveReader {
return &LiveReader{rdr: r}
}
// Reader reads WAL records from an io.Reader. It buffers partial record data for
// the next read.
type LiveReader struct {
rdr io.Reader
err error
rec []byte
hdr [recordHeaderSize]byte
buf [pageSize]byte
readIndex int // Index in buf to start at for next read.
writeIndex int // Index in buf to start at for next write.
total int64 // Total bytes processed during reading in calls to Next().
index int // Used to track partial records, should be 0 at the start of every new record.
}
func (r *LiveReader) Err() error {
return r.err
}
func (r *LiveReader) TotalRead() int64 {
return r.total
}
func (r *LiveReader) fillBuffer() error {
n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)])
r.writeIndex += n
return err
}
// Shift the buffer up to the read index.
func (r *LiveReader) shiftBuffer() {
copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex])
r.readIndex = 0
r.writeIndex = copied
}
// Next returns true if r.rec will contain a full record.
// False does not indicate that there will never be more data to
// read for the current io.Reader.
func (r *LiveReader) Next() bool {
for {
if r.buildRecord() {
return true
}
if r.err != nil && r.err != io.EOF {
return false
}
if r.readIndex == pageSize {
r.shiftBuffer()
}
if r.writeIndex != pageSize {
if err := r.fillBuffer(); err != nil {
// We expect to get EOF, since we're reading the segment file as it's being written.
if err != io.EOF {
r.err = err
}
return false
}
}
}
}
// Record returns the current record.
// The returned byte slice is only valid until the next call to Next.
func (r *LiveReader) Record() []byte {
return r.rec
}
// Rebuild a full record from potentially partial records. Returns false
// if there was an error or if we weren't able to read a record for any reason.
// Returns true if we read a full record. Any record data is appeneded to
// LiveReader.rec
func (r *LiveReader) buildRecord() bool {
for {
// Check that we have data in the internal buffer to read.
if r.writeIndex <= r.readIndex {
return false
}
// Attempt to read a record, partial or otherwise.
temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total)
r.readIndex += n
r.total += int64(n)
if err != nil {
r.err = err
return false
}
if temp == nil {
return false
}
rt := recType(r.hdr[0])
if rt == recFirst || rt == recFull {
r.rec = r.rec[:0]
}
r.rec = append(r.rec, temp...)
if err := validateRecord(rt, r.index); err != nil {
r.err = err
r.index = 0
return false
}
if rt == recLast || rt == recFull {
r.index = 0
return true
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
r.index++
}
}
// Returns an error if the recType and i indicate an invalid record sequence.
// As an example, if i is > 0 because we've read some amount of a partial record
// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull
// instead of a recLast or recMiddle we would have an invalid record.
func validateRecord(typ recType, i int) error {
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record, dropping buffer")
}
return nil
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record, dropping buffer")
}
return nil
case recLast:
if i == 0 {
return errors.New("unexpected last record, dropping buffer")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
}
// Read a sub-record (see recType) from the buffer. It could potentially
// be a full record (recFull) if the record fits within the bounds of a single page.
// Returns a byte slice of the record data read, the number of bytes read, and an error
// if there's a non-zero byte in a page term record or the record checksum fails.
// TODO(callum) the EOF errors we're returning from this function should theoretically
// never happen, add a metric for them.
func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) {
readIndex := 0
header[0] = buf[0]
readIndex++
total++
// The rest of this function is mostly from Reader.Next().
typ := recType(header[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (total % pageSize)
if k == pageSize {
return nil, 1, nil // Initial 0 byte was last page byte.
}
if k <= int64(len(buf)-readIndex) {
for _, v := range buf[readIndex : int64(readIndex)+k] {
readIndex++
if v != 0 {
return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes")
}
}
return nil, readIndex, nil
}
// Not enough bytes to read the rest of the page term rec.
// This theoretically should never happen, since we're only shifting the
// internal buffer of the live reader when we read to the end of page.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
if readIndex+recordHeaderSize-1 > len(buf) {
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
copy(header[1:], buf[readIndex:readIndex+len(header[1:])])
readIndex += recordHeaderSize - 1
total += int64(recordHeaderSize - 1)
var (
length = binary.BigEndian.Uint16(header[1:])
crc = binary.BigEndian.Uint32(header[3:])
)
readTo := int(length) + readIndex
if readTo > len(buf) {
if (readTo - readIndex) > pageSize {
return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length))
}
// Not enough data to read all of the record data.
// Treat this the same as an EOF, it's an error we would expect to see.
return nil, 0, io.EOF
}
recData := buf[readIndex:readTo]
readIndex += int(length)
total += int64(length)
// TODO(callum) what should we do here, throw out the record? We should add a metric at least.
if c := crc32.Checksum(recData, castagnoliTable); c != crc {
return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
return recData, readIndex, nil
}
func min(i, j int) int {
if i < j {
return i

View File

@ -17,15 +17,107 @@ package wal
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"sync"
"testing"
"time"
"github.com/prometheus/tsdb/testutil"
)
type record struct {
t recType
b []byte
}
var data = make([]byte, 100000)
var testReaderCases = []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
func encodedRecord(t recType, b []byte) []byte {
if t == recPageTerm {
return append([]byte{0}, b...)
@ -39,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte {
// TestReader feeds the reader a stream of encoded records with different types.
func TestReader(t *testing.T) {
data := make([]byte, 100000)
_, err := rand.Read(data)
testutil.Ok(t, err)
type record struct {
t recType
b []byte
}
cases := []struct {
t []record
exp [][]byte
fail bool
}{
// Sequence of valid records.
{
t: []record{
{recFull, data[0:200]},
{recFirst, data[200:300]},
{recLast, data[300:400]},
{recFirst, data[400:800]},
{recMiddle, data[800:900]},
{recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary.
{recLast, data[900:900]},
{recFirst, data[900:1000]},
{recMiddle, data[1000:1200]},
{recMiddle, data[1200:30000]},
{recMiddle, data[30000:30001]},
{recMiddle, data[30001:30001]},
{recLast, data[30001:32000]},
},
exp: [][]byte{
data[0:200],
data[200:400],
data[400:900],
data[900:32000],
},
},
// Exactly at the limit of one page minus the header size
{
t: []record{
{recFull, data[0 : pageSize-recordHeaderSize]},
},
exp: [][]byte{
data[:pageSize-recordHeaderSize],
},
},
// More than a full page, this exceeds our buffer and can never happen
// when written by the WAL.
{
t: []record{
{recFull, data[0 : pageSize+1]},
},
fail: true,
},
// Invalid orders of record types.
{
t: []record{{recMiddle, data[:200]}},
fail: true,
},
{
t: []record{{recLast, data[:200]}},
fail: true,
},
{
t: []record{
{recFirst, data[:200]},
{recFull, data[200:400]},
},
fail: true,
},
{
t: []record{
{recFirst, data[:100]},
{recMiddle, data[100:200]},
{recFull, data[200:400]},
},
fail: true,
},
// Non-zero data after page termination.
{
t: []record{
{recFull, data[:100]},
{recPageTerm, append(make([]byte, 1000), 1)},
},
exp: [][]byte{data[:100]},
fail: true,
},
}
for i, c := range cases {
for i, c := range testReaderCases {
t.Logf("test %d", i)
var buf []byte
@ -154,6 +158,191 @@ func TestReader(t *testing.T) {
}
}
func TestReader_Live(t *testing.T) {
for i, c := range testReaderCases {
t.Logf("test %d", i)
dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i))
t.Logf("created dir %s", dir)
testutil.Ok(t, err)
defer os.RemoveAll(dir)
// we're never going to have more than a single segment file per test case right now
f, err := os.Create(path.Join(dir, "00000000"))
testutil.Ok(t, err)
// live reader doesn't work on readers created from bytes buffers,
// since we need to be able to write more data to the thing we're
// reading from after the reader has been created
wg := sync.WaitGroup{}
// make sure the reader doesn't start until at least one record is written
wg.Add(1)
go func() {
for i, rec := range c.t {
rec := encodedRecord(rec.t, rec.b)
n, err := f.Write(rec)
testutil.Ok(t, err)
testutil.Assert(t, n > 0, "no bytes were written to wal")
if i == 0 {
wg.Done()
}
}
}()
sr, err := OpenReadSegment(SegmentName(dir, 0))
testutil.Ok(t, err)
lr := NewLiveReader(sr)
j := 0
wg.Wait()
caseLoop:
for {
for ; lr.Next(); j++ {
rec := lr.Record()
t.Log("j: ", j)
testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes")
if j == len(c.exp)-1 {
break caseLoop
}
}
// Because reads and writes are happening concurrently, unless we get an error we should
// attempt to read records again.
if j == 0 && lr.Err() == nil {
continue
}
if !c.fail && lr.Err() != nil {
t.Fatalf("unexpected error: %s", lr.Err())
}
if c.fail && lr.Err() == nil {
t.Fatalf("expected error but got none:\n\tinput: %+v", c.t)
}
if lr.Err() != nil {
t.Log("err: ", lr.Err())
break
}
}
}
}
func TestWAL_FuzzWriteRead_Live(t *testing.T) {
const count = 5000
var input [][]byte
lock := sync.RWMutex{}
var recs [][]byte
var index int
// Get size of segment.
getSegmentSize := func(dir string, index int) (int64, error) {
i := int64(-1)
fi, err := os.Stat(SegmentName(dir, index))
if err == nil {
i = fi.Size()
}
return i, err
}
readSegment := func(r *LiveReader) {
for r.Next() {
rec := r.Record()
lock.RLock()
l := len(input)
lock.RUnlock()
if index >= l {
t.Fatalf("read too many records")
}
lock.RLock()
if !bytes.Equal(input[index], rec) {
t.Fatalf("record %d (len %d) does not match (expected len %d)",
index, len(rec), len(input[index]))
}
lock.RUnlock()
index++
}
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
}
dir, err := ioutil.TempDir("", "wal_fuzz_live")
t.Log("created dir: ", dir)
testutil.Ok(t, err)
defer func() {
os.RemoveAll(dir)
}()
w, err := NewSize(nil, nil, dir, 128*pageSize)
testutil.Ok(t, err)
go func() {
for i := 0; i < count; i++ {
var sz int64
switch i % 5 {
case 0, 1:
sz = 50
case 2, 3:
sz = pageSize
default:
sz = pageSize * 8
}
rec := make([]byte, rand.Int63n(sz))
_, err := rand.Read(rec)
testutil.Ok(t, err)
lock.Lock()
input = append(input, rec)
lock.Unlock()
recs = append(recs, rec)
// Randomly batch up records.
if rand.Intn(4) < 3 {
testutil.Ok(t, w.Log(recs...))
recs = recs[:0]
}
}
testutil.Ok(t, w.Log(recs...))
}()
m, _, err := w.Segments()
testutil.Ok(t, err)
seg, err := OpenReadSegment(SegmentName(dir, m))
testutil.Ok(t, err)
r := NewLiveReader(seg)
segmentTicker := time.NewTicker(100 * time.Millisecond)
readTicker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-segmentTicker.C:
// check if new segments exist
_, last, err := w.Segments()
testutil.Ok(t, err)
if last > seg.i {
for {
readSegment(r)
if r.Err() != io.EOF {
testutil.Ok(t, r.Err())
}
size, err := getSegmentSize(dir, seg.i)
testutil.Ok(t, err)
// make sure we've read all of the current segment before rotating
if r.TotalRead() == size {
break
}
}
seg, err = OpenReadSegment(SegmentName(dir, seg.i+1))
testutil.Ok(t, err)
r = NewLiveReader(seg)
}
case <-readTicker.C:
readSegment(r)
}
if index == count {
break
}
}
testutil.Ok(t, r.Err())
}
func TestWAL_FuzzWriteRead(t *testing.T) {
const count = 25000