mirror of https://github.com/prometheus/prometheus
Fix race condition between gc and committing (#378)
Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>pull/5805/head
parent
722f0ab920
commit
a8966cb53d
26
head.go
26
head.go
|
@ -620,21 +620,22 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
s := a.head.series.getByID(ref)
|
if t < a.minValidTime {
|
||||||
|
return ErrOutOfBounds
|
||||||
|
}
|
||||||
|
|
||||||
|
s := a.head.series.getByID(ref)
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return errors.Wrap(ErrNotFound, "unknown series")
|
return errors.Wrap(ErrNotFound, "unknown series")
|
||||||
}
|
}
|
||||||
s.Lock()
|
s.Lock()
|
||||||
err := s.appendable(t, v)
|
if err := s.appendable(t, v); err != nil {
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if t < a.minValidTime {
|
s.pendingCommit = true
|
||||||
return ErrOutOfBounds
|
s.Unlock()
|
||||||
}
|
|
||||||
if t < a.mint {
|
if t < a.mint {
|
||||||
a.mint = t
|
a.mint = t
|
||||||
}
|
}
|
||||||
|
@ -694,6 +695,7 @@ func (a *headAppender) Commit() error {
|
||||||
for _, s := range a.samples {
|
for _, s := range a.samples {
|
||||||
s.series.Lock()
|
s.series.Lock()
|
||||||
ok, chunkCreated := s.series.append(s.T, s.V)
|
ok, chunkCreated := s.series.append(s.T, s.V)
|
||||||
|
s.series.pendingCommit = false
|
||||||
s.series.Unlock()
|
s.series.Unlock()
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -713,6 +715,11 @@ func (a *headAppender) Commit() error {
|
||||||
|
|
||||||
func (a *headAppender) Rollback() error {
|
func (a *headAppender) Rollback() error {
|
||||||
a.head.metrics.activeAppenders.Dec()
|
a.head.metrics.activeAppenders.Dec()
|
||||||
|
for _, s := range a.samples {
|
||||||
|
s.series.Lock()
|
||||||
|
s.series.pendingCommit = false
|
||||||
|
s.series.Unlock()
|
||||||
|
}
|
||||||
a.head.putAppendBuffer(a.samples)
|
a.head.putAppendBuffer(a.samples)
|
||||||
|
|
||||||
// Series are created in the head memory regardless of rollback. Thus we have
|
// Series are created in the head memory regardless of rollback. Thus we have
|
||||||
|
@ -1165,7 +1172,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
|
||||||
series.Lock()
|
series.Lock()
|
||||||
rmChunks += series.truncateChunksBefore(mint)
|
rmChunks += series.truncateChunksBefore(mint)
|
||||||
|
|
||||||
if len(series.chunks) > 0 {
|
if len(series.chunks) > 0 || series.pendingCommit {
|
||||||
series.Unlock()
|
series.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1256,9 +1263,10 @@ type memSeries struct {
|
||||||
chunkRange int64
|
chunkRange int64
|
||||||
firstChunkID int
|
firstChunkID int
|
||||||
|
|
||||||
nextAt int64 // timestamp at which to cut the next chunk.
|
nextAt int64 // Timestamp at which to cut the next chunk.
|
||||||
lastValue float64
|
lastValue float64
|
||||||
sampleBuf [4]sample
|
sampleBuf [4]sample
|
||||||
|
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||||||
|
|
||||||
app chunkenc.Appender // Current appender for the chunk.
|
app chunkenc.Appender // Current appender for the chunk.
|
||||||
}
|
}
|
||||||
|
|
58
head_test.go
58
head_test.go
|
@ -781,6 +781,64 @@ func TestGCSeriesAccess(t *testing.T) {
|
||||||
testutil.Equals(t, ErrNotFound, err)
|
testutil.Equals(t, ErrNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
||||||
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
|
h.initTime(0)
|
||||||
|
|
||||||
|
app := h.appender()
|
||||||
|
lset := labels.FromStrings("a", "1")
|
||||||
|
_, err = app.Add(lset, 2100, 1)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Ok(t, h.Truncate(2000))
|
||||||
|
testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
|
||||||
|
|
||||||
|
testutil.Ok(t, app.Commit())
|
||||||
|
|
||||||
|
q, err := NewBlockQuerier(h, 1500, 2500)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
ss, err := q.Select(labels.NewEqualMatcher("a", "1"))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Equals(t, true, ss.Next())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
||||||
|
h, err := NewHead(nil, nil, nil, 1000)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer h.Close()
|
||||||
|
|
||||||
|
h.initTime(0)
|
||||||
|
|
||||||
|
app := h.appender()
|
||||||
|
lset := labels.FromStrings("a", "1")
|
||||||
|
_, err = app.Add(lset, 2100, 1)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Ok(t, h.Truncate(2000))
|
||||||
|
testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
|
||||||
|
|
||||||
|
testutil.Ok(t, app.Rollback())
|
||||||
|
|
||||||
|
q, err := NewBlockQuerier(h, 1500, 2500)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
ss, err := q.Select(labels.NewEqualMatcher("a", "1"))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
testutil.Equals(t, false, ss.Next())
|
||||||
|
|
||||||
|
// Truncate again, this time the series should be deleted
|
||||||
|
testutil.Ok(t, h.Truncate(2050))
|
||||||
|
testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset))
|
||||||
|
}
|
||||||
|
|
||||||
func TestHead_LogRollback(t *testing.T) {
|
func TestHead_LogRollback(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "wal_rollback")
|
dir, err := ioutil.TempDir("", "wal_rollback")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
Loading…
Reference in New Issue