From 8d991bdc1e494074ca2d559e5a75f49f13a789ac Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 7 Jan 2019 11:43:33 +0300 Subject: [PATCH 1/9] Delete temp checkpoint folder on error. (#415) --- checkpoint.go | 8 +++++++- checkpoint_test.go | 30 ++++++++++++++++++++++++++++++ wal/wal.go | 7 ++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index d1cad4c10..1c6239232 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -128,7 +128,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) defer sgmReader.Close() } - cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to)) + cpdir := filepath.Join(w.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", to)) cpdirtmp := cpdir + ".tmp" if err := os.MkdirAll(cpdirtmp, 0777); err != nil { @@ -139,6 +139,12 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) return nil, errors.Wrap(err, "open checkpoint") } + // Ensures that an early return caused by an error doesn't leave any tmp files. + defer func() { + cp.Close() + os.RemoveAll(cpdirtmp) + }() + r := wal.NewReader(sgmReader) var ( diff --git a/checkpoint_test.go b/checkpoint_test.go index 8538cf0c6..8b13c152a 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -15,11 +15,14 @@ package tsdb import ( + "fmt" "io/ioutil" "os" "path/filepath" + "strings" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -180,3 +183,30 @@ func TestCheckpoint(t *testing.T) { {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, }, series) } + +func TestCheckpointNoTmpFolderAfterError(t *testing.T) { + // Create a new wal with an invalid records. + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + w, err := wal.NewSize(nil, nil, dir, 64*1024) + testutil.Ok(t, err) + testutil.Ok(t, w.Log([]byte{99})) + w.Close() + + // Run the checkpoint and since the wal contains an invalid records this should return an error. + _, err = Checkpoint(w, 0, 1, nil, 0) + testutil.NotOk(t, err) + + // Walk the wal dir to make sure there are no tmp folder left behind after the error. + err = filepath.Walk(w.Dir(), func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrapf(err, "access err %q: %v\n", path, err) + } + if info.IsDir() && strings.HasSuffix(info.Name(), ".tmp") { + return fmt.Errorf("wal dir contains temporary folder:%s", info.Name()) + } + return nil + }) + testutil.Ok(t, err) +} diff --git a/wal/wal.go b/wal/wal.go index 92374e312..5433fd042 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -164,6 +164,7 @@ type WAL struct { page *page // active page stopc chan chan struct{} actorc chan func() + closed bool // To allow calling Close() more than once without blocking. fsyncDuration prometheus.Summary pageFlushes prometheus.Counter @@ -584,6 +585,10 @@ func (w *WAL) Close() (err error) { w.mtx.Lock() defer w.mtx.Unlock() + if w.closed { + return nil + } + // Flush the last page and zero out all its remaining size. // We must not flush an empty page as it would falsely signal // the segment is done if we start writing to it again after opening. @@ -603,7 +608,7 @@ func (w *WAL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } - + w.closed = true return nil } From c065fa6957bd3196f237c69c9471168be02b2e99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20P=C5=82otka?= Date: Fri, 11 Jan 2019 17:31:26 +0000 Subject: [PATCH 2/9] Exposed helper methods for reading index bytes. (#492) Changes: * Make `NewReader` method useful. It was impossible to use it, because closer was always nil. * ReadSymbols, TOC and ReadOffsetTable are not public functions (used by Thanos). * decbufXXX are now functions. * More verbose errors. * Removed unused crc32 field. * Some var name changes to make it more verbose: * symbols -> allocatedSymbols * symbolsSlice -> symbolsV1 * symbols -> symbolsV2 * * Pre-calculate symbolsTableSize. * Initialized symbols for Symbols() method with valid length. * Added test for Symbol method. * Made Decoder LookupSymbol method public. Kept Decode public as it is useful as helper from index package. Signed-off-by: Bartek Plotka --- index/encoding_helpers.go | 56 +++++++ index/index.go | 309 ++++++++++++++++---------------------- index/index_test.go | 21 ++- 3 files changed, 203 insertions(+), 183 deletions(-) diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go index 602498f11..9104f1cb5 100644 --- a/index/encoding_helpers.go +++ b/index/encoding_helpers.go @@ -18,6 +18,8 @@ import ( "hash" "hash/crc32" "unsafe" + + "github.com/pkg/errors" ) // enbuf is a helper type to populate a byte slice with various types. @@ -86,6 +88,60 @@ type decbuf struct { e error } +// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. +func newDecbufAt(bs ByteSlice, off int) decbuf { + if bs.Len() < off+4 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if bs.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufUvarintAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if bs.Len() < off+binary.MaxVarintLen32 { + return decbuf{e: errInvalidSize} + } + b := bs.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} + } + + if bs.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = bs.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec +} + func (d *decbuf) uvarint() int { return int(d.uvarint64()) } func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } diff --git a/index/index.go b/index/index.go index 6413a9fca..cce29d9b1 100644 --- a/index/index.go +++ b/index/index.go @@ -20,6 +20,7 @@ import ( "hash" "hash/crc32" "io" + "io/ioutil" "math" "os" "path/filepath" @@ -35,9 +36,13 @@ import ( const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + // HeaderLen represents number of bytes reserved of index for header. + HeaderLen = 5 - indexFormatV1 = 1 - indexFormatV2 = 2 + // FormatV1 represents 1 version of index. + FormatV1 = 1 + // FormatV2 represents 2 version of index. + FormatV2 = 2 labelNameSeperator = "\xff" ) @@ -108,7 +113,7 @@ type Writer struct { fbuf *bufio.Writer pos uint64 - toc indexTOC + toc TOC stage indexWriterStage // Reusable memory. @@ -129,13 +134,42 @@ type Writer struct { Version int } -type indexTOC struct { - symbols uint64 - series uint64 - labelIndices uint64 - labelIndicesTable uint64 - postings uint64 - postingsTable uint64 +// TOC represents index Table Of Content that states where each section of index starts. +type TOC struct { + Symbols uint64 + Series uint64 + LabelIndices uint64 + LabelIndicesTable uint64 + Postings uint64 + PostingsTable uint64 +} + +// NewTOCFromByteSlice return parsed TOC from given index byte slice. +func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { + if bs.Len() < indexTOCLen { + return nil, errInvalidSize + } + b := bs.Range(bs.Len()-indexTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} + + if d.crc32() != expCRC { + return nil, errors.Wrap(errInvalidChecksum, "read TOC") + } + + if err := d.err(); err != nil { + return nil, err + } + + return &TOC{ + Symbols: d.be64(), + Series: d.be64(), + LabelIndices: d.be64(), + LabelIndicesTable: d.be64(), + Postings: d.be64(), + PostingsTable: d.be64(), + }, nil } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. @@ -223,22 +257,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.symbols = w.pos + w.toc.Symbols = w.pos case idxStageSeries: - w.toc.series = w.pos + w.toc.Series = w.pos case idxStageLabelIndex: - w.toc.labelIndices = w.pos + w.toc.LabelIndices = w.pos case idxStagePostings: - w.toc.postings = w.pos + w.toc.Postings = w.pos case idxStageDone: - w.toc.labelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.pos if err := w.writeOffsetTable(w.labelIndexes); err != nil { return err } - w.toc.postingsTable = w.pos + w.toc.PostingsTable = w.pos if err := w.writeOffsetTable(w.postings); err != nil { return err } @@ -254,7 +288,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { func (w *Writer) writeMeta() error { w.buf1.reset() w.buf1.putBE32(MagicIndex) - w.buf1.putByte(indexFormatV2) + w.buf1.putByte(FormatV2) return w.write(w.buf1.get()) } @@ -346,8 +380,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - const headerSize = 4 - w.buf1.reset() w.buf2.reset() @@ -438,12 +470,12 @@ const indexTOCLen = 6*8 + 4 func (w *Writer) writeTOC() error { w.buf1.reset() - w.buf1.putBE64(w.toc.symbols) - w.buf1.putBE64(w.toc.series) - w.buf1.putBE64(w.toc.labelIndices) - w.buf1.putBE64(w.toc.labelIndicesTable) - w.buf1.putBE64(w.toc.postings) - w.buf1.putBE64(w.toc.postingsTable) + w.buf1.putBE64(w.toc.Symbols) + w.buf1.putBE64(w.toc.Series) + w.buf1.putBE64(w.toc.LabelIndices) + w.buf1.putBE64(w.toc.LabelIndicesTable) + w.buf1.putBE64(w.toc.Postings) + w.buf1.putBE64(w.toc.PostingsTable) w.buf1.putHash(w.crc32) @@ -535,15 +567,14 @@ type StringTuples interface { } type Reader struct { - // The underlying byte slice holding the encoded series data. - b ByteSlice - toc indexTOC + b ByteSlice // Close that releases the underlying resources of the byte slice. c io.Closer // Cached hashmaps of section offsets. - labels map[string]uint64 + labels map[string]uint64 + // LabelName to LabelValue to offset map. postings map[string]map[string]uint64 // Cache of read symbols. Strings that are returned when reading from the // block are always backed by true strings held in here rather than @@ -551,19 +582,17 @@ type Reader struct { // prevents memory faults when applications work with read symbols after // the block has been unmapped. The older format has sparse indexes so a map // must be used, but the new format is not so we can use a slice. - symbols map[uint32]string - symbolSlice []string + symbolsV1 map[uint32]string + symbolsV2 []string + symbolsTableSize uint64 dec *Decoder - crc32 hash.Hash32 - version int } var ( errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") errInvalidChecksum = fmt.Errorf("invalid checksum") ) @@ -587,10 +616,10 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewReader returns a new IndexReader on the given byte slice. It automatically +// NewReader returns a new index reader on the given byte slice. It automatically // handles different format versions. func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil) + return newReader(b, ioutil.NopCloser(nil)) } // NewFileReader returns a new index reader against the given index file. @@ -606,14 +635,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { r := &Reader{ b: b, c: c, - symbols: map[uint32]string{}, labels: map[string]uint64{}, postings: map[string]map[string]uint64{}, - crc32: newCRC32(), } // Verify header. - if b.Len() < 5 { + if r.b.Len() < HeaderLen { return nil, errors.Wrap(errInvalidSize, "index header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { @@ -621,54 +648,59 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != 1 && r.version != 2 { + if r.version != FormatV1 && r.version != FormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } - if err := r.readTOC(); err != nil { + toc, err := NewTOCFromByteSlice(b) + if err != nil { return nil, errors.Wrap(err, "read TOC") } - if err := r.readSymbols(int(r.toc.symbols)); err != nil { + + r.symbolsV2, r.symbolsV1, err = ReadSymbols(r.b, r.version, int(toc.Symbols)) + if err != nil { return nil, errors.Wrap(err, "read symbols") } - var err error // Use the strings already allocated by symbols, rather than // re-allocating them again below. - symbols := make(map[string]string, len(r.symbols)+len(r.symbolSlice)) - for _, s := range r.symbols { - symbols[s] = s + // Additionally, calculate symbolsTableSize. + allocatedSymbols := make(map[string]string, len(r.symbolsV1)+len(r.symbolsV2)) + for _, s := range r.symbolsV1 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - for _, s := range r.symbolSlice { - symbols[s] = s + for _, s := range r.symbolsV2 { + r.symbolsTableSize += uint64(len(s) + 8) + allocatedSymbols[s] = s } - err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.LabelIndicesTable, func(key []string, off uint64) error { if len(key) != 1 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - r.labels[symbols[key[0]]] = off + + r.labels[allocatedSymbols[key[0]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read label index table") } + r.postings[""] = map[string]uint64{} - err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint64) error { + if err := ReadOffsetTable(r.b, toc.PostingsTable, func(key []string, off uint64) error { if len(key) != 2 { - return errors.Errorf("unexpected key length %d", len(key)) + return errors.Errorf("unexpected key length for posting table %d", len(key)) } if _, ok := r.postings[key[0]]; !ok { - r.postings[symbols[key[0]]] = map[string]uint64{} + r.postings[allocatedSymbols[key[0]]] = map[string]uint64{} } - r.postings[key[0]][symbols[key[1]]] = off + r.postings[key[0]][allocatedSymbols[key[1]]] = off return nil - }) - if err != nil { + }); err != nil { return nil, errors.Wrap(err, "read postings table") } - r.dec = &Decoder{lookupSymbol: r.lookupSymbol} + r.dec = &Decoder{LookupSymbol: r.lookupSymbol} return r, nil } @@ -690,7 +722,7 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { for k, e := range r.postings { for v, start := range e { - d := r.decbufAt(int(start)) + d := newDecbufAt(r.b, int(start)) if d.err() != nil { return nil, d.err() } @@ -703,121 +735,45 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { return m, nil } -func (r *Reader) readTOC() error { - if r.b.Len() < indexTOCLen { - return errInvalidSize - } - b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) - - expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := decbuf{b: b[:len(b)-4]} - - if d.crc32() != expCRC { - return errors.Wrap(errInvalidChecksum, "read TOC") - } - - r.toc.symbols = d.be64() - r.toc.series = d.be64() - r.toc.labelIndices = d.be64() - r.toc.labelIndicesTable = d.be64() - r.toc.postings = d.be64() - r.toc.postingsTable = d.be64() - - return d.err() -} - -// decbufAt returns a new decoding buffer. It expects the first 4 bytes -// after offset to hold the big endian encoded content length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufAt(off int) decbuf { - if r.b.Len() < off+4 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+4) - l := int(binary.BigEndian.Uint32(b)) - - if r.b.Len() < off+4+l+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+4, off+4+l+4) - dec := decbuf{b: b[:len(b)-4]} - - if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// decbufUvarintAt returns a new decoding buffer. It expects the first bytes -// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected -// checksum. -func (r *Reader) decbufUvarintAt(off int) decbuf { - // We never have to access this method at the far end of the byte slice. Thus just checking - // against the MaxVarintLen32 is sufficient. - if r.b.Len() < off+binary.MaxVarintLen32 { - return decbuf{e: errInvalidSize} - } - b := r.b.Range(off, off+binary.MaxVarintLen32) - - l, n := binary.Uvarint(b) - if n <= 0 || n > binary.MaxVarintLen32 { - return decbuf{e: errors.Errorf("invalid uvarint %d", n)} - } - - if r.b.Len() < off+n+int(l)+4 { - return decbuf{e: errInvalidSize} - } - - // Load bytes holding the contents plus a CRC32 checksum. - b = r.b.Range(off+n, off+n+int(l)+4) - dec := decbuf{b: b[:len(b)-4]} - - if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { - return decbuf{e: errInvalidChecksum} - } - return dec -} - -// readSymbols reads the symbol table fully into memory and allocates proper strings for them. +// ReadSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. -func (r *Reader) readSymbols(off int) error { +func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]string, error) { if off == 0 { - return nil + return nil, nil, nil } - d := r.decbufAt(off) + d := newDecbufAt(bs, off) var ( - origLen = d.len() - cnt = d.be32int() - basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d.len()) + origLen = d.len() + cnt = d.be32int() + basePos = uint32(off) + 4 + nextPos = basePos + uint32(origLen-d.len()) + symbolSlice []string + symbols = map[uint32]string{} ) - if r.version == 2 { - r.symbolSlice = make([]string, 0, cnt) + if version == 2 { + symbolSlice = make([]string, 0, cnt) } for d.err() == nil && d.len() > 0 && cnt > 0 { s := d.uvarintStr() - if r.version == 2 { - r.symbolSlice = append(r.symbolSlice, s) + if version == FormatV2 { + symbolSlice = append(symbolSlice, s) } else { - r.symbols[nextPos] = s + symbols[nextPos] = s nextPos = basePos + uint32(origLen-d.len()) } cnt-- } - return errors.Wrap(d.err(), "read symbols") + return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols") } -// readOffsetTable reads an offset table at the given position calls f for each -// found entry.f -// If f returns an error it stops decoding and returns the received error, -func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) error { - d := r.decbufAt(int(off)) +// ReadOffsetTable reads an offset table and at the given position calls f for each +// found entry. If f returns an error it stops decoding and returns the received error. +func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error { + d := newDecbufAt(bs, int(off)) cnt := d.be32() for d.err() == nil && d.len() > 0 && cnt > 0 { @@ -845,10 +801,10 @@ func (r *Reader) Close() error { } func (r *Reader) lookupSymbol(o uint32) (string, error) { - if int(o) < len(r.symbolSlice) { - return r.symbolSlice[o], nil + if int(o) < len(r.symbolsV2) { + return r.symbolsV2[o], nil } - s, ok := r.symbols[o] + s, ok := r.symbolsV1[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) } @@ -857,27 +813,20 @@ func (r *Reader) lookupSymbol(o uint32) (string, error) { // Symbols returns a set of symbols that exist within the index. func (r *Reader) Symbols() (map[string]struct{}, error) { - res := make(map[string]struct{}, len(r.symbols)) + res := make(map[string]struct{}, len(r.symbolsV1)+len(r.symbolsV2)) - for _, s := range r.symbols { + for _, s := range r.symbolsV1 { res[s] = struct{}{} } - for _, s := range r.symbolSlice { + for _, s := range r.symbolsV2 { res[s] = struct{}{} } return res, nil } -// SymbolTableSize returns the symbol table that is used to resolve symbol references. +// SymbolTableSize returns the symbol table size in bytes. func (r *Reader) SymbolTableSize() uint64 { - var size int - for _, s := range r.symbols { - size += len(s) + 8 - } - for _, s := range r.symbolSlice { - size += len(s) + 8 - } - return uint64(size) + return r.symbolsTableSize } // LabelValues returns value tuples that exist for the given label name tuples. @@ -892,7 +841,7 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) nc := d.be32int() d.be32() // consume unused value entry count. @@ -916,7 +865,7 @@ func (emptyStringTuples) Len() int { return 0 } // LabelIndices returns a slice of label names for which labels or label tuples value indices exist. // NOTE: This is deprecated. Use `LabelNames()` instead. func (r *Reader) LabelIndices() ([][]string, error) { - res := [][]string{} + var res [][]string for s := range r.labels { res = append(res, strings.Split(s, labelNameSeperator)) } @@ -928,10 +877,10 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == 2 { + if r.version == FormatV2 { offset = id * 16 } - d := r.decbufUvarintAt(int(offset)) + d := newDecbufUvarintAt(r.b, int(offset)) if d.err() != nil { return d.err() } @@ -948,7 +897,7 @@ func (r *Reader) Postings(name, value string) (Postings, error) { if !ok { return EmptyPostings(), nil } - d := r.decbufAt(int(off)) + d := newDecbufAt(r.b, int(off)) if d.err() != nil { return nil, errors.Wrap(d.err(), "get postings entry") } @@ -1062,7 +1011,7 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - lookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) } // Postings returns a postings list for b and its number of elements. @@ -1090,11 +1039,11 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e return errors.Wrap(d.err(), "read series label offsets") } - ln, err := dec.lookupSymbol(lno) + ln, err := dec.LookupSymbol(lno) if err != nil { return errors.Wrap(err, "lookup label name") } - lv, err := dec.lookupSymbol(lvo) + lv, err := dec.LookupSymbol(lvo) if err != nil { return errors.Wrap(err, "lookup label value") } diff --git a/index/index_test.go b/index/index_test.go index f915bca28..2edd3956a 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -380,13 +380,28 @@ func TestPersistence_index_e2e(t *testing.T) { } } + gotSymbols, err := ir.Symbols() + testutil.Ok(t, err) + + testutil.Equals(t, len(mi.symbols), len(gotSymbols)) + for s := range mi.symbols { + _, ok := gotSymbols[s] + testutil.Assert(t, ok, "") + } + testutil.Ok(t, ir.Close()) } +func TestDecbufUvariantWithInvalidBuffer(t *testing.T) { + b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) + + db := newDecbufUvarintAt(b, 0) + testutil.NotOk(t, db.err()) +} + func TestReaderWithInvalidBuffer(t *testing.T) { b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - r := &Reader{b: b} - db := r.decbufUvarintAt(0) - testutil.NotOk(t, db.err()) + _, err := NewReader(b) + testutil.NotOk(t, err) } From a360aa3e86dc3ff67e6bd242abdb2429ec3ef5fb Mon Sep 17 00:00:00 2001 From: Ye Ben Date: Mon, 14 Jan 2019 16:44:32 +0800 Subject: [PATCH 3/9] change variable name metrics to labels (#496) Signed-off-by: yeya24 --- cmd/tsdb/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 01d85d590..e319b490d 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -138,7 +138,7 @@ func (b *writeBenchmark) run() { } b.storage = st - var metrics []labels.Labels + var labels []labels.Labels measureTime("readData", func() { f, err := os.Open(b.samplesFile) @@ -147,7 +147,7 @@ func (b *writeBenchmark) run() { } defer f.Close() - metrics, err = readPrometheusLabels(f, b.numMetrics) + labels, err = readPrometheusLabels(f, b.numMetrics) if err != nil { exitWithError(err) } @@ -157,7 +157,7 @@ func (b *writeBenchmark) run() { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(labels, 3000) if err != nil { exitWithError(err) } @@ -213,7 +213,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u return total, nil } -func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) { +func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount int, baset int64) (uint64, error) { ts := baset type sample struct { @@ -222,9 +222,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount ref *uint64 } - scrape := make([]*sample, 0, len(metrics)) + scrape := make([]*sample, 0, len(lbls)) - for _, m := range metrics { + for _, m := range lbls { scrape = append(scrape, &sample{ labels: m, value: 123456789, From bff5aa4d21113b52b7894d5a72e658a07aa2a77c Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Mon, 14 Jan 2019 23:28:03 +0800 Subject: [PATCH 4/9] Missing the len of crc32 when calculating maxLen in WriteChunks (#494) Signed-off-by: naivewong <867245430@qq.com> --- chunks/chunks.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chunks/chunks.go b/chunks/chunks.go index 5eab23982..569aeddc2 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -205,6 +205,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error { for _, c := range chks { maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += int64(len(c.Chunk.Bytes())) + maxLen += 4 // The 4 bytes of crc32 } newsz := w.n + maxLen From ebf5d74325a5309a7bd53b654e575c1acf6b7fb1 Mon Sep 17 00:00:00 2001 From: mknapphrt <39998367+mknapphrt@users.noreply.github.com> Date: Wed, 16 Jan 2019 05:03:52 -0500 Subject: [PATCH 5/9] Added storage size based retention method and new metrics (#343) Added methods needed to retain data based on a byte limitation rather than time. Limitation is only applied if the flag is set (defaults to 0). Both blocks that are older than the retention period and the blocks that make the size of the storage too large are removed. 2 new metrics for keeping track of the size of the local storage folder and the amount of times data has been deleted because the size restriction was exceeded. Signed-off-by: Mark Knapp --- CHANGELOG.md | 4 + block.go | 38 +++++- block_test.go | 4 +- chunks/chunks.go | 24 ++-- compact.go | 2 +- db.go | 286 ++++++++++++++++++++++++++++----------------- db_test.go | 142 ++++++++++++++-------- index/index.go | 5 + querier_test.go | 4 +- tombstones.go | 36 ++++-- tombstones_test.go | 2 +- 11 files changed, 360 insertions(+), 187 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af44615e7..a44471681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. diff --git a/block.go b/block.go index e5a66bd9e..837f4a763 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -257,7 +266,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index cf8716062..789aebaa7 100644 --- a/block_test.go +++ b/block_test.go @@ -46,14 +46,14 @@ func TestSetCompactionFailed(t *testing.T) { defer os.RemoveAll(tmpdir) blockDir := createBlock(t, tmpdir, 0, 0, 0) - b, err := OpenBlock(blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) diff --git a/chunks/chunks.go b/chunks/chunks.go index 569aeddc2..8fb288384 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -285,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -305,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -328,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -346,6 +347,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/compact.go b/compact.go index f8e6ff545..49d4e5868 100644 --- a/compact.go +++ b/compact.go @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } diff --git a/db.go b/db.go index 43cfadd46..630700439 100644 --- a/db.go +++ b/db.go @@ -58,6 +58,13 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -127,11 +134,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -340,25 +353,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -474,8 +468,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -485,112 +478,187 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") + return err } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue - } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} - continue - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) - } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") } - // Load new blocks into memory. - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) - if !ok { - b, err = OpenBlock(dir, db.chunkPool) - if err != nil { - return errors.Wrapf(err, "open block %s", dir) - } - } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + bb = append(bb, block) + blocksSize += block.Size() + } - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { + if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. + // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = blocks + db.blocks = loadable db.mtx.Unlock() - // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue - } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { - if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { - return errors.Wrapf(err, "delete obsolete block %s", ulid) - } + + if err := db.deleteBlocks(deletable); err != nil { + return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(blocks) == 0 { + if len(loadable) == 0 { return nil } - maxt := blocks[len(blocks)-1].Meta().MaxTime + + maxt := loadable[len(loadable)-1].Meta().MaxTime return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + + // See if we already have the block in memory or open it otherwise. + block, ok := db.getBlock(meta.ULID) + if !ok { + block, err = OpenBlock(db.logger, dir, db.chunkPool) + if err != nil { + corrupted[meta.ULID] = err + continue + } + } + blocks = append(blocks, block) + } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime + }) + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block + } + + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } + + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break + } + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break + } + } + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) + } + } + return nil +} + // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. func validateBlockSequence(bs []*Block) error { if len(bs) <= 1 { diff --git a/db_test.go b/db_test.go index 92d487eec..2beef0c52 100644 --- a/db_test.go +++ b/db_test.go @@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) @@ -834,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), 0, 0, 0) - block, err := OpenBlock(blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() @@ -877,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -901,59 +904,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} - - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) - - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) - - testutil.Equals(t, 1, len(db.blocks)) - - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) - - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) - - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) - - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, }) - testutil.Ok(t, err) + defer close() defer db.Close() - testutil.Equals(t, 2, len(db.blocks)) + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - // Reload blocks, which should drop blocks beyond the retention boundary. + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } + + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} + +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } + + for _, m := range blocks { + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } + + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") + +} + +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { diff --git a/index/index.go b/index/index.go index cce29d9b1..74e08d465 100644 --- a/index/index.go +++ b/index/index.go @@ -914,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) diff --git a/querier_test.go b/querier_test.go index 79dfbff7a..69ca84602 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1227,12 +1227,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) testutil.Ok(b, err) defer block.Close() diff --git a/tombstones.go b/tombstones.go index a1f30b59c..078140406 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index e12574f11..2a106d705 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. From 3929359302170bd79567f9808bafcb7fdf116acf Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Wed, 16 Jan 2019 10:09:08 -0800 Subject: [PATCH 6/9] add live reader for WAL (#481) * add live reader for WAL Signed-off-by: Callum Styan --- CHANGELOG.md | 1 + wal/wal.go | 247 +++++++++++++++++++++++++++++--- wal/wal_test.go | 368 ++++++++++++++++++++++++++++++++++++------------ 3 files changed, 506 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a44471681..5975d8b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. + - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. ## 0.3.1 - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. diff --git a/wal/wal.go b/wal/wal.go index 5433fd042..fd90eb90e 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -832,28 +832,13 @@ func (r *Reader) next() (err error) { } r.rec = append(r.rec, buf[:length]...) - switch r.curRecTyp { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record") - } - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record") - } - case recLast: - if i == 0 { - return errors.New("unexpected last record") - } - return nil - default: - return errors.Errorf("unexpected record type %d", r.curRecTyp) + if err := validateRecord(r.curRecTyp, i); err != nil { + return err } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + // Only increment i for non-zero records since we use it // to determine valid content record sequences. i++ @@ -904,6 +889,226 @@ func (r *Reader) Offset() int64 { return r.total } +// NewLiveReader returns a new live reader. +func NewLiveReader(r io.Reader) *LiveReader { + return &LiveReader{rdr: r} +} + +// Reader reads WAL records from an io.Reader. It buffers partial record data for +// the next read. +type LiveReader struct { + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. +} + +func (r *LiveReader) Err() error { + return r.err +} + +func (r *LiveReader) TotalRead() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() error { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return err +} + +// Shift the buffer up to the read index. +func (r *LiveReader) shiftBuffer() { + copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) + r.readIndex = 0 + r.writeIndex = copied +} + +// Next returns true if r.rec will contain a full record. +// False does not indicate that there will never be more data to +// read for the current io.Reader. +func (r *LiveReader) Next() bool { + for { + if r.buildRecord() { + return true + } + if r.err != nil && r.err != io.EOF { + return false + } + if r.readIndex == pageSize { + r.shiftBuffer() + } + if r.writeIndex != pageSize { + if err := r.fillBuffer(); err != nil { + // We expect to get EOF, since we're reading the segment file as it's being written. + if err != io.EOF { + r.err = err + } + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appeneded to +// LiveReader.rec +func (r *LiveReader) buildRecord() bool { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) + r.readIndex += n + r.total += int64(n) + if err != nil { + r.err = err + return false + } + + if temp == nil { + return false + } + + rt := recType(r.hdr[0]) + + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.err = err + r.index = 0 + return false + } + if rt == recLast || rt == recFull { + r.index = 0 + return true + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// TODO(callum) the EOF errors we're returning from this function should theoretically +// never happen, add a metric for them. +func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { + readIndex := 0 + header[0] = buf[0] + readIndex++ + total++ + + // The rest of this function is mostly from Reader.Next(). + typ := recType(header[0]) + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (total % pageSize) + if k == pageSize { + return nil, 1, nil // Initial 0 byte was last page byte. + } + + if k <= int64(len(buf)-readIndex) { + for _, v := range buf[readIndex : int64(readIndex)+k] { + readIndex++ + if v != 0 { + return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") + } + } + return nil, readIndex, nil + } + // Not enough bytes to read the rest of the page term rec. + // This theoretically should never happen, since we're only shifting the + // internal buffer of the live reader when we read to the end of page. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + if readIndex+recordHeaderSize-1 > len(buf) { + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + + copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) + readIndex += recordHeaderSize - 1 + total += int64(recordHeaderSize - 1) + var ( + length = binary.BigEndian.Uint16(header[1:]) + crc = binary.BigEndian.Uint32(header[3:]) + ) + readTo := int(length) + readIndex + if readTo > len(buf) { + if (readTo - readIndex) > pageSize { + return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) + } + // Not enough data to read all of the record data. + // Treat this the same as an EOF, it's an error we would expect to see. + return nil, 0, io.EOF + } + recData := buf[readIndex:readTo] + readIndex += int(length) + total += int64(length) + + // TODO(callum) what should we do here, throw out the record? We should add a metric at least. + if c := crc32.Checksum(recData, castagnoliTable); c != crc { + return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + return recData, readIndex, nil +} + func min(i, j int) int { if i < j { return i diff --git a/wal/wal_test.go b/wal/wal_test.go index 8ac14ebc8..f95b21239 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -17,15 +17,107 @@ package wal import ( "bytes" "encoding/binary" + "fmt" "hash/crc32" + "io" "io/ioutil" "math/rand" "os" + "path" + "sync" "testing" + "time" "github.com/prometheus/tsdb/testutil" ) +type record struct { + t recType + b []byte +} + +var data = make([]byte, 100000) +var testReaderCases = []struct { + t []record + exp [][]byte + fail bool +}{ + // Sequence of valid records. + { + t: []record{ + {recFull, data[0:200]}, + {recFirst, data[200:300]}, + {recLast, data[300:400]}, + {recFirst, data[400:800]}, + {recMiddle, data[800:900]}, + {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. + {recLast, data[900:900]}, + {recFirst, data[900:1000]}, + {recMiddle, data[1000:1200]}, + {recMiddle, data[1200:30000]}, + {recMiddle, data[30000:30001]}, + {recMiddle, data[30001:30001]}, + {recLast, data[30001:32000]}, + }, + exp: [][]byte{ + data[0:200], + data[200:400], + data[400:900], + data[900:32000], + }, + }, + // Exactly at the limit of one page minus the header size + { + t: []record{ + {recFull, data[0 : pageSize-recordHeaderSize]}, + }, + exp: [][]byte{ + data[:pageSize-recordHeaderSize], + }, + }, + // More than a full page, this exceeds our buffer and can never happen + // when written by the WAL. + { + t: []record{ + {recFull, data[0 : pageSize+1]}, + }, + fail: true, + }, + // Invalid orders of record types. + { + t: []record{{recMiddle, data[:200]}}, + fail: true, + }, + { + t: []record{{recLast, data[:200]}}, + fail: true, + }, + { + t: []record{ + {recFirst, data[:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + { + t: []record{ + {recFirst, data[:100]}, + {recMiddle, data[100:200]}, + {recFull, data[200:400]}, + }, + fail: true, + }, + // Non-zero data after page termination. + { + t: []record{ + {recFull, data[:100]}, + {recPageTerm, append(make([]byte, 1000), 1)}, + }, + exp: [][]byte{data[:100]}, + fail: true, + }, +} + func encodedRecord(t recType, b []byte) []byte { if t == recPageTerm { return append([]byte{0}, b...) @@ -39,95 +131,7 @@ func encodedRecord(t recType, b []byte) []byte { // TestReader feeds the reader a stream of encoded records with different types. func TestReader(t *testing.T) { - data := make([]byte, 100000) - _, err := rand.Read(data) - testutil.Ok(t, err) - - type record struct { - t recType - b []byte - } - cases := []struct { - t []record - exp [][]byte - fail bool - }{ - // Sequence of valid records. - { - t: []record{ - {recFull, data[0:200]}, - {recFirst, data[200:300]}, - {recLast, data[300:400]}, - {recFirst, data[400:800]}, - {recMiddle, data[800:900]}, - {recPageTerm, make([]byte, pageSize-900-recordHeaderSize*5-1)}, // exactly lines up with page boundary. - {recLast, data[900:900]}, - {recFirst, data[900:1000]}, - {recMiddle, data[1000:1200]}, - {recMiddle, data[1200:30000]}, - {recMiddle, data[30000:30001]}, - {recMiddle, data[30001:30001]}, - {recLast, data[30001:32000]}, - }, - exp: [][]byte{ - data[0:200], - data[200:400], - data[400:900], - data[900:32000], - }, - }, - // Exactly at the limit of one page minus the header size - { - t: []record{ - {recFull, data[0 : pageSize-recordHeaderSize]}, - }, - exp: [][]byte{ - data[:pageSize-recordHeaderSize], - }, - }, - // More than a full page, this exceeds our buffer and can never happen - // when written by the WAL. - { - t: []record{ - {recFull, data[0 : pageSize+1]}, - }, - fail: true, - }, - // Invalid orders of record types. - { - t: []record{{recMiddle, data[:200]}}, - fail: true, - }, - { - t: []record{{recLast, data[:200]}}, - fail: true, - }, - { - t: []record{ - {recFirst, data[:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - { - t: []record{ - {recFirst, data[:100]}, - {recMiddle, data[100:200]}, - {recFull, data[200:400]}, - }, - fail: true, - }, - // Non-zero data after page termination. - { - t: []record{ - {recFull, data[:100]}, - {recPageTerm, append(make([]byte, 1000), 1)}, - }, - exp: [][]byte{data[:100]}, - fail: true, - }, - } - for i, c := range cases { + for i, c := range testReaderCases { t.Logf("test %d", i) var buf []byte @@ -154,6 +158,192 @@ func TestReader(t *testing.T) { } } +func TestReader_Live(t *testing.T) { + for i, c := range testReaderCases { + t.Logf("test %d", i) + dir, err := ioutil.TempDir("", fmt.Sprintf("live_reader_%d", i)) + t.Logf("created dir %s", dir) + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + // we're never going to have more than a single segment file per test case right now + f, err := os.Create(path.Join(dir, "00000000")) + testutil.Ok(t, err) + + // live reader doesn't work on readers created from bytes buffers, + // since we need to be able to write more data to the thing we're + // reading from after the reader has been created + wg := sync.WaitGroup{} + // make sure the reader doesn't start until at least one record is written + wg.Add(1) + go func() { + for i, rec := range c.t { + rec := encodedRecord(rec.t, rec.b) + n, err := f.Write(rec) + testutil.Ok(t, err) + testutil.Assert(t, n > 0, "no bytes were written to wal") + if i == 0 { + wg.Done() + } + } + }() + sr, err := OpenReadSegment(SegmentName(dir, 0)) + testutil.Ok(t, err) + lr := NewLiveReader(sr) + j := 0 + wg.Wait() + caseLoop: + for { + for ; lr.Next(); j++ { + rec := lr.Record() + t.Log("j: ", j) + testutil.Equals(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") + if j == len(c.exp)-1 { + break caseLoop + } + + } + + // Because reads and writes are happening concurrently, unless we get an error we should + // attempt to read records again. + if j == 0 && lr.Err() == nil { + continue + } + + if !c.fail && lr.Err() != nil { + t.Fatalf("unexpected error: %s", lr.Err()) + } + if c.fail && lr.Err() == nil { + t.Fatalf("expected error but got none:\n\tinput: %+v", c.t) + } + if lr.Err() != nil { + t.Log("err: ", lr.Err()) + break + } + } + } +} + +func TestWAL_FuzzWriteRead_Live(t *testing.T) { + const count = 5000 + const segmentSize = int64(128 * 1024 * 1204) + var input [][]byte + lock := sync.RWMutex{} + var recs [][]byte + var index int + + // Get size of segment. + getSegmentSize := func(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err + } + + readSegment := func(r *LiveReader) { + for r.Next() { + rec := r.Record() + lock.RLock() + l := len(input) + lock.RUnlock() + if index >= l { + t.Fatalf("read too many records") + } + lock.RLock() + if !bytes.Equal(input[index], rec) { + t.Fatalf("record %d (len %d) does not match (expected len %d)", + index, len(rec), len(input[index])) + } + lock.RUnlock() + index++ + } + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + } + + dir, err := ioutil.TempDir("", "wal_fuzz_live") + t.Log("created dir: ", dir) + testutil.Ok(t, err) + defer func() { + os.RemoveAll(dir) + }() + + w, err := NewSize(nil, nil, dir, 128*pageSize) + testutil.Ok(t, err) + + go func() { + for i := 0; i < count; i++ { + var sz int64 + switch i % 5 { + case 0, 1: + sz = 50 + case 2, 3: + sz = pageSize + default: + sz = pageSize * 8 + } + + rec := make([]byte, rand.Int63n(sz)) + _, err := rand.Read(rec) + testutil.Ok(t, err) + lock.Lock() + input = append(input, rec) + lock.Unlock() + recs = append(recs, rec) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + testutil.Ok(t, w.Log(recs...)) + }() + + m, _, err := w.Segments() + testutil.Ok(t, err) + + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + + r := NewLiveReader(seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last > seg.i { + for { + readSegment(r) + if r.Err() != io.EOF { + testutil.Ok(t, r.Err()) + } + size, err := getSegmentSize(dir, seg.i) + testutil.Ok(t, err) + // make sure we've read all of the current segment before rotating + if r.TotalRead() == size { + break + } + } + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + r = NewLiveReader(seg) + } + case <-readTicker.C: + readSegment(r) + } + if index == count { + break + } + } + testutil.Ok(t, r.Err()) +} func TestWAL_FuzzWriteRead(t *testing.T) { const count = 25000 From 1a9d08adc548b3a3643038589af1e00489ac6883 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 18 Jan 2019 14:05:16 +0530 Subject: [PATCH 7/9] Don't write empty blocks (#374) * Dont write empty blocks when a compaction results in a block with no samples. Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 1 + block.go | 3 ++ block_test.go | 3 +- compact.go | 66 ++++++++++++++++++++------- db.go | 17 ++++++- db_test.go | 120 ++++++++++++++++++++++++++++++++++++++++++++++---- 6 files changed, 183 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5975d8b2b..1238644bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` - new public interface `SizeReader: Size() int64` diff --git a/block.go b/block.go index 837f4a763..42e11d951 100644 --- a/block.go +++ b/block.go @@ -191,6 +191,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Indicates that during compaction it resulted in a block without any samples + // so it should be deleted on the next reload. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` diff --git a/block_test.go b/block_test.go index 789aebaa7..724ab3781 100644 --- a/block_test.go +++ b/block_test.go @@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - blockDir := createBlock(t, tmpdir, 0, 0, 0) + blockDir := createBlock(t, tmpdir, 1, 0, 0) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -91,6 +91,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - return filepath.Join(dir, ulid.String()) } diff --git a/compact.go b/compact.go index 49d4e5868..5d8155f51 100644 --- a/compact.go +++ b/compact.go @@ -55,12 +55,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -186,13 +191,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return res, nil } - // Compact any blocks that have >5% tombstones. + // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } @@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } return uid, nil } @@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } @@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to @@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") diff --git a/db.go b/db.go index 630700439..1349dfbd5 100644 --- a/db.go +++ b/db.go @@ -417,7 +417,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -426,6 +427,14 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } runtime.GC() } @@ -588,6 +597,12 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime }) + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + for ulid, block := range db.beyondTimeRetention(blocks) { deletable[ulid] = block } diff --git a/db_test.go b/db_test.go index 2beef0c52..21de405e2 100644 --- a/db_test.go +++ b/db_test.go @@ -836,7 +836,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), 0, 0, 0) + blockDir := createBlock(t, db.Dir(), 1, 0, 0) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -1364,6 +1364,109 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } +func TestNoEmptyBlocks(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewMustRegexpMatcher("", ".*") + + t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { + testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") + }) + + t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + + app = db.Appender() + _, err = app.Add(defaultLabel, 1, 0) + testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + + // Adding new blocks. + currentTime := db.Head().MaxTime() + _, err = app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Ok(t, db.compact()) + testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") + }) + + t.Run(`When no new block is created from head, and there are some blocks on disk + compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { + oldBlocks := db.Blocks() + app := db.Appender() + currentTime := db.Head().MaxTime() + _, err := app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + testutil.Equals(t, oldBlocks, db.Blocks()) + }) + + t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { + currentTime := db.Head().MaxTime() + blocks := []*BlockMeta{ + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, + {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, + } + for _, m := range blocks { + createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) + } + + oldBlocks := db.Blocks() + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") + }) +} + func TestDB_LabelNames(t *testing.T) { tests := []struct { // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk -> @@ -1468,12 +1571,13 @@ func TestCorrectNumTombstones(t *testing.T) { defer db.Close() blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*blockRange+j, 0) testutil.Ok(t, err) } } @@ -1483,17 +1587,17 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 1, len(db.blocks)) - testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } From 10ba228e6baa4811818e04b1ab9b48110bb43d7b Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 18 Jan 2019 11:42:59 +0300 Subject: [PATCH 8/9] release 0.4.0 (#501) Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1238644bd..184734537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## master / unreleased + +## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: From cc277e398b479bdb601e17e439bad11901824aa3 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 21 Jan 2019 14:56:26 +0300 Subject: [PATCH 9/9] fix the refs logic for the addFast path for createBlock (#504) Signed-off-by: Krasi Georgiev --- block_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/block_test.go b/block_test.go index 724ab3781..1a0409194 100644 --- a/block_test.go +++ b/block_test.go @@ -68,17 +68,20 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin lbls, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), nSeries) testutil.Ok(tb, err) - var ref uint64 + refs := make([]uint64, nSeries) for ts := mint; ts <= maxt; ts++ { app := head.Appender() - for _, lbl := range lbls { - err := app.AddFast(ref, ts, rand.Float64()) - if err == nil { - continue + for i, lbl := range lbls { + if refs[i] != 0 { + err := app.AddFast(refs[i], ts, rand.Float64()) + if err == nil { + continue + } } - ref, err = app.Add(lbl, int64(ts), rand.Float64()) + ref, err := app.Add(lbl, int64(ts), rand.Float64()) testutil.Ok(tb, err) + refs[i] = ref } err := app.Commit() testutil.Ok(tb, err)