mirror of https://github.com/prometheus/prometheus
Ensure order of postings when adding new series
parent
d970f0256a
commit
c7f5590a71
3
db.go
3
db.go
|
@ -123,7 +123,8 @@ func Open(dir string, logger log.Logger) (db *DB, err error) {
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
r := prometheus.DefaultRegisterer
|
||||
var r prometheus.Registerer
|
||||
// r := prometheus.DefaultRegisterer
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
|
|
23
head.go
23
head.go
|
@ -128,8 +128,11 @@ type headAppender struct {
|
|||
*headBlock
|
||||
|
||||
newSeries map[uint32]hashedLabels
|
||||
newHashes map[uint64]uint32
|
||||
newLabels []labels.Labels
|
||||
samples []hashedSample
|
||||
newRefs []uint32
|
||||
|
||||
samples []hashedSample
|
||||
}
|
||||
|
||||
type hashedLabels struct {
|
||||
|
@ -145,12 +148,18 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error
|
|||
if ms := a.get(hash, lset); ms != nil {
|
||||
return uint64(ms.ref), nil
|
||||
}
|
||||
if ref, ok := a.newHashes[hash]; ok {
|
||||
return uint64(ref), nil
|
||||
}
|
||||
|
||||
id := atomic.AddUint64(&a.nextSeriesID, 1) - 1
|
||||
if a.newSeries == nil {
|
||||
a.newSeries = map[uint32]hashedLabels{}
|
||||
a.newHashes = map[uint64]uint32{}
|
||||
}
|
||||
a.newSeries[uint32(id)] = hashedLabels{hash: hash, labels: lset}
|
||||
a.newHashes[hash] = uint32(id)
|
||||
a.newRefs = append(a.newRefs, uint32(id))
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
@ -210,16 +219,17 @@ func (a *headAppender) createSeries() {
|
|||
a.mtx.RUnlock()
|
||||
a.mtx.Lock()
|
||||
|
||||
for id, l := range a.newSeries {
|
||||
for _, ref := range a.newRefs {
|
||||
l := a.newSeries[ref]
|
||||
// We switched locks and have to re-validate that the series were not
|
||||
// created by another goroutine in the meantime.
|
||||
if int(id) < len(a.series) && a.series[id] != nil {
|
||||
if int(ref) < len(a.series) && a.series[ref] != nil {
|
||||
continue
|
||||
}
|
||||
// Series is still new.
|
||||
a.newLabels = append(a.newLabels, l.labels)
|
||||
|
||||
a.create(id, l.hash, l.labels)
|
||||
a.create(ref, l.hash, l.labels)
|
||||
}
|
||||
|
||||
a.mtx.Unlock()
|
||||
|
@ -228,15 +238,15 @@ func (a *headAppender) createSeries() {
|
|||
|
||||
func (a *headAppender) Commit() error {
|
||||
defer putHeadAppendBuffer(a.samples)
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
// Write all new series and samples to the WAL and add it to the
|
||||
// in-mem database on success.
|
||||
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
|
||||
a.mtx.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
||||
a.createSeries()
|
||||
|
||||
var (
|
||||
total = uint64(len(a.samples))
|
||||
mint = int64(math.MaxInt64)
|
||||
|
@ -248,6 +258,7 @@ func (a *headAppender) Commit() error {
|
|||
total--
|
||||
}
|
||||
}
|
||||
a.mtx.RUnlock()
|
||||
|
||||
a.stats.mtx.Lock()
|
||||
defer a.stats.mtx.Unlock()
|
||||
|
|
18
postings.go
18
postings.go
|
@ -26,7 +26,23 @@ func (p *memPostings) get(t term) Postings {
|
|||
// term argument appears twice.
|
||||
func (p *memPostings) add(id uint32, terms ...term) {
|
||||
for _, t := range terms {
|
||||
p.m[t] = append(p.m[t], id)
|
||||
// We expect IDs to roughly be appended in order but some concurrency
|
||||
// related out of order at the end. We do insertion sort from the end
|
||||
// to account for it.
|
||||
l := p.m[t]
|
||||
i := len(l) - 1
|
||||
|
||||
for ; i >= 0; i-- {
|
||||
if id > l[i] {
|
||||
break
|
||||
}
|
||||
}
|
||||
l = append(l, 0)
|
||||
|
||||
copy(l[i+2:], l[i+1:])
|
||||
l[i+1] = id
|
||||
|
||||
p.m[t] = l
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue