mirror of https://github.com/prometheus/prometheus
Limit compaction range, make cut and compact concurrent
parent
67d185ceb9
commit
30d8866c13
4
block.go
4
block.go
|
@ -68,9 +68,9 @@ type persistedBlock struct {
|
|||
}
|
||||
|
||||
type blockMeta struct {
|
||||
*BlockMeta
|
||||
|
||||
Version int `json:"version"`
|
||||
|
||||
*BlockMeta
|
||||
}
|
||||
|
||||
const metaFilename = "meta.json"
|
||||
|
|
53
compact.go
53
compact.go
|
@ -49,7 +49,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
|||
}
|
||||
|
||||
type compactorOptions struct {
|
||||
maxBlocks uint8
|
||||
maxBlockRange uint64
|
||||
maxSize uint64
|
||||
}
|
||||
|
@ -61,9 +60,14 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
|
|||
}
|
||||
}
|
||||
|
||||
type compactionInfo struct {
|
||||
generation int
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
// pick returns a range [i, j] in the blocks that are suitable to be compacted
|
||||
// into a single block at position i.
|
||||
func (c *compactor) pick(bs []Block) (i, j int, ok bool) {
|
||||
func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
||||
|
||||
last := len(bs) - 1
|
||||
if len(bs) == 0 {
|
||||
|
@ -71,8 +75,8 @@ func (c *compactor) pick(bs []Block) (i, j int, ok bool) {
|
|||
}
|
||||
|
||||
// Make sure we always compact the last block if unpersisted.
|
||||
if bs[last].Meta().Compaction.Generation == 0 {
|
||||
if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) {
|
||||
if bs[last].generation == 0 {
|
||||
if len(bs) >= 3 && c.match(bs[last-2:last+1]) {
|
||||
return last - 2, last, true
|
||||
}
|
||||
return last, last, true
|
||||
|
@ -80,55 +84,40 @@ func (c *compactor) pick(bs []Block) (i, j int, ok bool) {
|
|||
|
||||
for i := len(bs); i-3 >= 0; i -= 3 {
|
||||
tpl := bs[i-3 : i]
|
||||
if compactionMatch(tpl) {
|
||||
if c.match(tpl) {
|
||||
return i - 3, i - 1, true
|
||||
}
|
||||
}
|
||||
return 0, 0, false
|
||||
}
|
||||
|
||||
func compactionMatch(blocks []Block) bool {
|
||||
g := blocks[0].Meta().Compaction.Generation
|
||||
func (c *compactor) match(bs []compactionInfo) bool {
|
||||
g := bs[0].generation
|
||||
if g >= 5 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, b := range blocks[1:] {
|
||||
if b.Meta().Compaction.Generation == 0 {
|
||||
for _, b := range bs {
|
||||
if b.generation == 0 {
|
||||
continue
|
||||
}
|
||||
if b.Meta().Compaction.Generation != g {
|
||||
if b.generation != g {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
// TODO(fabxc): check whether combined size is below maxCompactionSize.
|
||||
// Apply maximum time range? or number of series? – might already be covered by size implicitly.
|
||||
|
||||
// Naively check whether both blocks have roughly the same number of samples
|
||||
// and whether the total sample count doesn't exceed 2GB chunk file size
|
||||
// by rough approximation.
|
||||
n := float64(blocks[0].Meta().Stats.NumSamples)
|
||||
t := n
|
||||
|
||||
for _, b := range blocks[1:] {
|
||||
m := float64(b.Meta().Stats.NumSamples)
|
||||
|
||||
if m < 0.7*n || m > 1.3*n {
|
||||
return false
|
||||
}
|
||||
t += m
|
||||
}
|
||||
|
||||
// Pessimistic 10 bytes/sample should do.
|
||||
return t < 10*200e6
|
||||
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
|
||||
}
|
||||
|
||||
func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
|
||||
res.MinTime = blocks[0].Meta().MinTime
|
||||
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
|
||||
res.Compaction.Generation = blocks[0].Meta().Compaction.Generation + 1
|
||||
|
||||
g := blocks[0].Meta().Compaction.Generation
|
||||
if g == 0 && len(blocks) > 1 {
|
||||
g++
|
||||
}
|
||||
res.Compaction.Generation = g + 1
|
||||
|
||||
for _, b := range blocks {
|
||||
res.Stats.NumSamples += b.Meta().Stats.NumSamples
|
||||
|
|
58
db.go
58
db.go
|
@ -132,7 +132,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
|||
}
|
||||
db.compactor = newCompactor(r, &compactorOptions{
|
||||
maxBlockRange: opts.MaxBlockRange,
|
||||
maxBlocks: 3,
|
||||
maxSize: 1 << 29, // 512MB
|
||||
})
|
||||
|
||||
|
@ -148,31 +147,49 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
|||
func (db *DB) run() {
|
||||
defer close(db.donec)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-db.cutc:
|
||||
db.mtx.Lock()
|
||||
_, err := db.cut()
|
||||
db.mtx.Unlock()
|
||||
|
||||
if err != nil {
|
||||
db.logger.Log("msg", "cut failed", "err", err)
|
||||
} else {
|
||||
select {
|
||||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// Drain cut channel so we don't trigger immediately again.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-db.cutc:
|
||||
default:
|
||||
}
|
||||
db.mtx.Lock()
|
||||
_, err := db.cut()
|
||||
db.mtx.Unlock()
|
||||
|
||||
if err != nil {
|
||||
db.logger.Log("msg", "cut failed", "err", err)
|
||||
} else {
|
||||
select {
|
||||
case db.compactc <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// Drain cut channel so we don't trigger immediately again.
|
||||
select {
|
||||
case <-db.cutc:
|
||||
default:
|
||||
}
|
||||
case <-db.stopc:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-db.compactc:
|
||||
db.metrics.compactionsTriggered.Inc()
|
||||
|
||||
i, j, ok := db.compactor.pick(db.compactable())
|
||||
var infos []compactionInfo
|
||||
for _, b := range db.compactable() {
|
||||
m := b.Meta()
|
||||
|
||||
infos = append(infos, compactionInfo{
|
||||
generation: m.Compaction.Generation,
|
||||
mint: *m.MinTime,
|
||||
maxt: *m.MaxTime,
|
||||
})
|
||||
}
|
||||
|
||||
i, j, ok := db.compactor.pick(infos)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -180,6 +197,7 @@ func (db *DB) run() {
|
|||
db.logger.Log("msg", "compaction failed", "err", err)
|
||||
continue
|
||||
}
|
||||
db.logger.Log("msg", "compaction completed")
|
||||
// Trigger another compaction in case there's more work to do.
|
||||
select {
|
||||
case db.compactc <- struct{}{}:
|
||||
|
|
10
writer.go
10
writer.go
|
@ -1,6 +1,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
@ -8,7 +9,6 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/bradfitz/slice"
|
||||
"github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/fabxc/tsdb/chunks"
|
||||
"github.com/fabxc/tsdb/labels"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -43,7 +43,7 @@ type SeriesWriter interface {
|
|||
// serialization format.
|
||||
type seriesWriter struct {
|
||||
ow io.Writer
|
||||
w *ioutil.PageWriter
|
||||
w *bufio.Writer
|
||||
n int64
|
||||
c int
|
||||
|
||||
|
@ -53,7 +53,7 @@ type seriesWriter struct {
|
|||
func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter {
|
||||
return &seriesWriter{
|
||||
ow: w,
|
||||
w: ioutil.NewPageWriter(w, compactionPageBytes, 0),
|
||||
w: bufio.NewWriterSize(w, 1*1024*1024),
|
||||
n: 0,
|
||||
index: index,
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ type indexWriterSeries struct {
|
|||
// serialization format.
|
||||
type indexWriter struct {
|
||||
ow io.Writer
|
||||
w *ioutil.PageWriter
|
||||
w *bufio.Writer
|
||||
n int64
|
||||
started bool
|
||||
|
||||
|
@ -193,7 +193,7 @@ type indexWriter struct {
|
|||
|
||||
func newIndexWriter(w io.Writer) *indexWriter {
|
||||
return &indexWriter{
|
||||
w: ioutil.NewPageWriter(w, compactionPageBytes, 0),
|
||||
w: bufio.NewWriterSize(w, 1*1024*1024),
|
||||
ow: w,
|
||||
n: 0,
|
||||
symbols: make(map[string]uint32, 4096),
|
||||
|
|
Loading…
Reference in New Issue