mirror of https://github.com/prometheus/prometheus
Fix missing appends after reference lookups
parent
f556036037
commit
5a1c8eaa0e
|
@ -122,7 +122,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
|||
|
||||
dur := measureTime("ingestScrapes", func() {
|
||||
b.startProfiling()
|
||||
total, err = b.ingestScrapes(metrics, 3000)
|
||||
total, err = b.ingestScrapes(metrics, 4000)
|
||||
if err != nil {
|
||||
exitWithError(err)
|
||||
}
|
||||
|
@ -140,11 +140,11 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
|||
}
|
||||
|
||||
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var total uint64
|
||||
|
||||
for i := 0; i < scrapeCount; i += 50 {
|
||||
for i := 0; i < scrapeCount; i += 100 {
|
||||
var wg sync.WaitGroup
|
||||
lbls := lbls
|
||||
for len(lbls) > 0 {
|
||||
l := 1000
|
||||
|
@ -156,7 +156,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
|
|||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
n, err := b.ingestScrapesShard(batch, 50, int64(30000*i))
|
||||
n, err := b.ingestScrapesShard(batch, 100, int64(30000*i))
|
||||
if err != nil {
|
||||
// exitWithError(err)
|
||||
fmt.Println(" err", err)
|
||||
|
@ -204,10 +204,9 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// fmt.Println("Add:", s.labels, ref)
|
||||
s.ref = &ref
|
||||
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
||||
// fmt.Println("AddFast:", *s.ref)
|
||||
|
||||
if err.Error() != "not found" {
|
||||
panic(err)
|
||||
}
|
||||
|
|
25
db.go
25
db.go
|
@ -266,7 +266,7 @@ func (db *DB) compact(i, j int) error {
|
|||
|
||||
for _, b := range blocks {
|
||||
if err := b.Close(); err != nil {
|
||||
return errors.Wrap(err, "close old block")
|
||||
return errors.Wrapf(err, "close old block %s", b.Dir())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,11 +278,9 @@ func (db *DB) compact(i, j int) error {
|
|||
db.removeBlocks(i, j)
|
||||
db.persisted = append(db.persisted, pb)
|
||||
|
||||
for i, b := range blocks {
|
||||
if i > 0 {
|
||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||
return errors.Wrap(err, "removing old block")
|
||||
}
|
||||
for _, b := range blocks[1:] {
|
||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||
return errors.Wrap(err, "removing old block")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -355,11 +353,7 @@ func (db *DB) Appender() Appender {
|
|||
}
|
||||
|
||||
type dbAppender struct {
|
||||
db *DB
|
||||
// gen uint8
|
||||
// head *headAppender
|
||||
maxGen uint8
|
||||
|
||||
db *DB
|
||||
heads []*headAppender
|
||||
}
|
||||
|
||||
|
@ -410,15 +404,12 @@ func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
|
|||
// If there's no fitting head block for t, ensure it gets created.
|
||||
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
||||
a.db.mtx.RUnlock()
|
||||
var mints []int64
|
||||
for _, h := range a.heads {
|
||||
mints = append(mints, h.meta.MinTime)
|
||||
}
|
||||
fmt.Println("ensure head", t, mints)
|
||||
|
||||
if err := a.db.ensureHead(t); err != nil {
|
||||
a.db.mtx.RLock()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
a.db.mtx.RLock()
|
||||
|
||||
if len(a.heads) == 0 {
|
||||
|
@ -451,7 +442,6 @@ func (db *DB) ensureHead(t int64) error {
|
|||
// AppendableBlocks-1 front padding heads.
|
||||
if len(db.heads) == 0 {
|
||||
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
|
||||
fmt.Println("cut init for", t-i*int64(db.opts.MinBlockDuration))
|
||||
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -464,7 +454,6 @@ func (db *DB) ensureHead(t int64) error {
|
|||
if t < h.meta.MaxTime {
|
||||
return nil
|
||||
}
|
||||
fmt.Println("cut for", h.meta.MaxTime)
|
||||
if _, err := db.cut(h.meta.MaxTime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
8
head.go
8
head.go
|
@ -193,10 +193,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
|||
|
||||
func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if ms := a.get(hash, lset); ms != nil {
|
||||
return uint64(ms.ref), nil
|
||||
return uint64(ms.ref), a.AddFast(uint64(ms.ref), t, v)
|
||||
}
|
||||
if ref, ok := a.newHashes[hash]; ok {
|
||||
return uint64(ref), nil
|
||||
return uint64(ref), a.AddFast(uint64(ref), t, v)
|
||||
}
|
||||
|
||||
// We only know the actual reference after committing. We generate an
|
||||
|
@ -220,7 +220,6 @@ func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v flo
|
|||
}
|
||||
|
||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
// fmt.Println("add fast ref", ref)
|
||||
// We only own the last 5 bytes of the reference. Anything before is
|
||||
// used by higher-order appenders. We erase it to avoid issues.
|
||||
ref = (ref << 24) >> 24
|
||||
|
@ -325,6 +324,7 @@ func (a *headAppender) Commit() error {
|
|||
if !a.series[s.ref].append(s.t, s.v) {
|
||||
total--
|
||||
}
|
||||
|
||||
if s.t < mint {
|
||||
mint = s.t
|
||||
}
|
||||
|
@ -568,7 +568,7 @@ func (s *memSeries) cut() *memChunk {
|
|||
func (s *memSeries) append(t int64, v float64) bool {
|
||||
var c *memChunk
|
||||
|
||||
if s.app == nil || s.head().samples > 10050 {
|
||||
if s.app == nil || s.head().samples > 2000 {
|
||||
c = s.cut()
|
||||
c.minTime = t
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue