tsdb: Do a full rollback upon commit error

I think the previous behavior is problematic as it will leave
`memSeries` around that still have `pendingCommit` set to `true`.

The only case where this can happen in this code path is a failure to
write to the WAL, in which case we are probably in trouble anyway. I
believe, however, we should still try to do the right thing and do the
full rollback. This will implicitly try to write to the WAL again, but
this time without samples, which may even succeed. (But we propagate
the previous error in any case.)

This also adds `a.head.putSeriesBuffer(a.sampleSeries)` to Rollback,
which was previously missing.

Signed-off-by: beorn7 <beorn@grafana.com>
pull/6951/head
beorn7 5 years ago
parent a17ddcf0f9
commit f6f4fd6556

@ -116,7 +116,10 @@ type Appender interface {
// faster than adding a sample by providing its full label set. // faster than adding a sample by providing its full label set.
AddFast(ref uint64, t int64, v float64) error AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
// must not be used anymore after Commit has been called.
Commit() error Commit() error
// Rollback rolls back all modifications made in the appender so far. // Rollback rolls back all modifications made in the appender so far.

@ -1042,15 +1042,17 @@ func (a *headAppender) log() error {
} }
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
if err := a.log(); err != nil {
//nolint: errcheck
a.Rollback() // Most likely the same error will happen again.
return errors.Wrap(err, "write to WAL")
}
defer a.head.metrics.activeAppenders.Dec() defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples) defer a.head.putAppendBuffer(a.samples)
defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.iso.closeAppend(a.appendID) defer a.head.iso.closeAppend(a.appendID)
if err := a.log(); err != nil {
return errors.Wrap(err, "write to WAL")
}
total := len(a.samples) total := len(a.samples)
var series *memSeries var series *memSeries
for i, s := range a.samples { for i, s := range a.samples {
@ -1088,6 +1090,7 @@ func (a *headAppender) Rollback() error {
} }
a.head.putAppendBuffer(a.samples) a.head.putAppendBuffer(a.samples)
a.samples = nil a.samples = nil
a.head.putSeriesBuffer(a.sampleSeries)
a.head.iso.closeAppend(a.appendID) a.head.iso.closeAppend(a.appendID)
// Series are created in the head memory regardless of rollback. Thus we have // Series are created in the head memory regardless of rollback. Thus we have

Loading…
Cancel
Save