mirror of https://github.com/prometheus/prometheus
Fix race condition for 2 appenders having same ts
Race: Suppose we have 100 existing series inside a HeadBlock. Now we open two appenders in two routines A1, A2 and append 30 new series and 60 new series respectively with some common series. Both try to commit at the same time and the following happens in the given order: A2 executes createSeries() A1 executes createSeries() (with its common series referencing the ids from A2) A1 persists its newlabels, samples A2 persists its newlabels, samples Now when reading it back, we read A1's samples which reference A2's id and thereby fail. Ref: prometheus/promtheus#2795 Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>pull/5805/head
parent
357e33bd1e
commit
b51a05044e
23
head.go
23
head.go
|
@ -521,15 +521,17 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *headAppender) createSeries() {
|
||||
func (a *headAppender) createSeries() error {
|
||||
if len(a.newSeries) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
a.newLabels = make([]labels.Labels, 0, len(a.newSeries))
|
||||
base0 := len(a.series)
|
||||
|
||||
a.mtx.RUnlock()
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.RLock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
base1 := len(a.series)
|
||||
|
||||
|
@ -549,8 +551,12 @@ func (a *headAppender) createSeries() {
|
|||
a.create(l.hash, l.labels)
|
||||
}
|
||||
|
||||
a.mtx.Unlock()
|
||||
a.mtx.RLock()
|
||||
// Write all new series to the WAL.
|
||||
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||
return errors.Wrap(err, "WAL log series")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *headAppender) Commit() error {
|
||||
|
@ -558,7 +564,9 @@ func (a *headAppender) Commit() error {
|
|||
defer putHeadAppendBuffer(a.samples)
|
||||
defer a.mtx.RUnlock()
|
||||
|
||||
a.createSeries()
|
||||
if err := a.createSeries(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We have to update the refs of samples for series we just created.
|
||||
for i := range a.samples {
|
||||
|
@ -568,11 +576,8 @@ func (a *headAppender) Commit() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Write all new series and samples to the WAL and add it to the
|
||||
// Write all new samples to the WAL and add them to the
|
||||
// in-mem database on success.
|
||||
if err := a.wal.LogSeries(a.newLabels); err != nil {
|
||||
return errors.Wrap(err, "WAL log series")
|
||||
}
|
||||
if err := a.wal.LogSamples(a.samples); err != nil {
|
||||
return errors.Wrap(err, "WAL log samples")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue