Browse Source

Exposed DeletionIterator and CompactMetas functions. (#8161)

* Exposed DeletionIterator and CompactMetas functions.

Required for CLI for deletions in Thanos: https://github.com/thanos-io/thanos/pull/3421

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Removed Thanos usage mentions.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/8166/head
Bartlomiej Plotka 4 years ago committed by GitHub
parent
commit
4513537034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      tsdb/compact.go
  2. 53
      tsdb/querier.go
  3. 4
      tsdb/querier_test.go

11
tsdb/compact.go

@ -330,7 +330,9 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
return splitDirs
}
func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// CompactBlockMetas merges many block metas into one, combining it's source blocks together
// and adjusting compaction level.
func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
res := &BlockMeta{
ULID: uid,
MinTime: blocks[0].MinTime,
@ -415,7 +417,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
uid = ulid.MustNew(ulid.Now(), rand.Reader)
meta := compactBlockMetas(uid, metas...)
meta := CompactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
if meta.Stats.NumSamples == 0 {
@ -527,7 +529,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + tmpForCreationBlockDirSuffix
@ -633,7 +634,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}
df = nil
// Block successfully written, make visible and remove old ones.
// Block successfully written, make it visible in destination dir by moving it from tmp one.
if err := fileutil.Replace(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
@ -715,7 +716,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
symbols = syms
continue
}
symbols = newMergedStringIter(symbols, syms)
symbols = NewMergedStringIter(symbols, syms)
}
for symbols.Next() {

53
tsdb/querier.go

@ -485,7 +485,7 @@ type populateWithDelGenericSeriesIterator struct {
i int
err error
bufIter *deletedIterator
bufIter *DeletedIterator
intervals tombstones.Intervals
currDelIter chunkenc.Iterator
@ -501,7 +501,7 @@ func newPopulateWithDelGenericSeriesIterator(
chunks: chunks,
chks: chks,
i: -1,
bufIter: &deletedIterator{},
bufIter: &DeletedIterator{},
intervals: intervals,
}
}
@ -520,10 +520,10 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
return false
}
p.bufIter.intervals = p.bufIter.intervals[:0]
p.bufIter.Intervals = p.bufIter.Intervals[:0]
for _, interval := range p.intervals {
if p.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
p.bufIter.intervals = p.bufIter.intervals.Add(interval)
p.bufIter.Intervals = p.bufIter.Intervals.Add(interval)
}
}
@ -534,14 +534,14 @@ func (p *populateWithDelGenericSeriesIterator) next() bool {
//
// TODO think how to avoid the typecasting to verify when it is head block.
_, isSafeChunk := p.currChkMeta.Chunk.(*safeChunk)
if len(p.bufIter.intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
if len(p.bufIter.Intervals) == 0 && !(isSafeChunk && p.currChkMeta.MaxTime == math.MaxInt64) {
// If there are no overlap with deletion intervals AND it's NOT an "open" head chunk, we can take chunk as it is.
p.currDelIter = nil
return true
}
// We don't want full chunk or it's potentially still opened, take just part of it.
p.bufIter.it = p.currChkMeta.Chunk.Iterator(nil)
p.bufIter.Iter = p.currChkMeta.Chunk.Iterator(nil)
p.currDelIter = p.bufIter
return true
}
@ -723,7 +723,8 @@ func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
}
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
// NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.
func NewMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
}
@ -767,35 +768,35 @@ func (m mergedStringIter) Err() error {
return m.b.Err()
}
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
it chunkenc.Iterator
intervals tombstones.Intervals
// DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.
type DeletedIterator struct {
// Iter is an Iterator to be wrapped.
Iter chunkenc.Iterator
// Intervals are the deletion intervals.
Intervals tombstones.Intervals
}
func (it *deletedIterator) At() (int64, float64) {
return it.it.At()
func (it *DeletedIterator) At() (int64, float64) {
return it.Iter.At()
}
func (it *deletedIterator) Seek(t int64) bool {
if it.it.Err() != nil {
func (it *DeletedIterator) Seek(t int64) bool {
if it.Iter.Err() != nil {
return false
}
if ok := it.it.Seek(t); !ok {
if ok := it.Iter.Seek(t); !ok {
return false
}
// Now double check if the entry falls into a deleted interval.
ts, _ := it.At()
for _, itv := range it.intervals {
for _, itv := range it.Intervals {
if ts < itv.Mint {
return true
}
if ts > itv.Maxt {
it.intervals = it.intervals[1:]
it.Intervals = it.Intervals[1:]
continue
}
@ -807,12 +808,12 @@ func (it *deletedIterator) Seek(t int64) bool {
return true
}
func (it *deletedIterator) Next() bool {
func (it *DeletedIterator) Next() bool {
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for it.Iter.Next() {
ts, _ := it.Iter.At()
for _, tr := range it.intervals {
for _, tr := range it.Intervals {
if tr.InBounds(ts) {
continue Outer
}
@ -821,14 +822,14 @@ Outer:
return true
}
it.intervals = it.intervals[1:]
it.Intervals = it.Intervals[1:]
}
return true
}
return false
}
func (it *deletedIterator) Err() error { return it.it.Err() }
func (it *DeletedIterator) Err() error { return it.Iter.Err() }
type nopChunkReader struct {
emptyChunk chunkenc.Chunk

4
tsdb/querier_test.go

@ -1007,7 +1007,7 @@ func TestDeletedIterator(t *testing.T) {
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
it := &DeletedIterator{Iter: chk.Iterator(nil), Intervals: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++
@ -1069,7 +1069,7 @@ func TestDeletedIterator_WithSeek(t *testing.T) {
}
for _, c := range cases {
it := &deletedIterator{it: chk.Iterator(nil), intervals: c.r[:]}
it := &DeletedIterator{Iter: chk.Iterator(nil), Intervals: c.r[:]}
require.Equal(t, c.ok, it.Seek(c.seek))
if c.ok {

Loading…
Cancel
Save