mirror of https://github.com/prometheus/prometheus
Write out of order hint when initially creating meta file (#13894)
Signed-off-by: Jonathan Halterman <jonathan@grafana.com> Signed-off-by: Jonathan Halterman <jhalterman@gmail.com> Co-authored-by: Jesus Vazquez <jesusvazquez@users.noreply.github.com>pull/13906/head
parent
776eea6e96
commit
633224886a
|
@ -60,7 +60,7 @@ type Compactor interface {
|
|||
|
||||
// Write persists a Block into a directory.
|
||||
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
|
||||
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
|
||||
Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error)
|
||||
|
||||
// Compact runs compaction against the provided directories. Must
|
||||
// only be called concurrently with results of Plan().
|
||||
|
@ -536,7 +536,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
|
|||
return uid, errs.Err()
|
||||
}
|
||||
|
||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) {
|
||||
start := time.Now()
|
||||
|
||||
uid := ulid.MustNew(ulid.Now(), rand.Reader)
|
||||
|
@ -549,9 +549,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
|
|||
meta.Compaction.Level = 1
|
||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||
|
||||
if parent != nil {
|
||||
if base != nil {
|
||||
meta.Compaction.Parents = []BlockDesc{
|
||||
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
|
||||
{ULID: base.ULID, MinTime: base.MinTime, MaxTime: base.MaxTime},
|
||||
}
|
||||
if base.Compaction.FromOutOfOrder() {
|
||||
meta.Compaction.SetOutOfOrder()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
14
tsdb/db.go
14
tsdb/db.go
|
@ -1299,25 +1299,17 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
|
|||
}
|
||||
}()
|
||||
|
||||
meta := &BlockMeta{}
|
||||
meta.Compaction.SetOutOfOrder()
|
||||
for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize {
|
||||
mint, maxt := t, t+blockSize
|
||||
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
|
||||
uid, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, nil)
|
||||
uid, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if uid.Compare(ulid.ULID{}) != 0 {
|
||||
ulids = append(ulids, uid)
|
||||
blockDir := filepath.Join(dest, uid.String())
|
||||
meta, _, err := readMetaFile(blockDir)
|
||||
if err != nil {
|
||||
return ulids, fmt.Errorf("read meta: %w", err)
|
||||
}
|
||||
meta.Compaction.SetOutOfOrder()
|
||||
_, err = writeMetaFile(db.logger, blockDir, meta)
|
||||
if err != nil {
|
||||
return ulids, fmt.Errorf("write meta: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1441,7 +1441,7 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *
|
|||
c.blocks = append(c.blocks, block)
|
||||
|
||||
// Now check that all expected blocks are actually persisted on disk.
|
||||
// This way we make sure that the we have some blocks that are supposed to be removed.
|
||||
// This way we make sure that we have some blocks that are supposed to be removed.
|
||||
var expectedBlocks []string
|
||||
for _, b := range c.blocks {
|
||||
expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
|
||||
|
|
Loading…
Reference in New Issue