TSDB: Error when we commit/rollback twice (#7593)

* TSDB: Error when we commit/rollback twice

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
pull/7639/head
Julien Pivotto 4 years ago committed by GitHub
parent 89d2f5ec1d
commit ffc925dd21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -521,7 +521,6 @@ func TestDB_Snapshot(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
} }
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Ok(t, app.Rollback())
// create snapshot // create snapshot
snap, err := ioutil.TempDir("", "snap") snap, err := ioutil.TempDir("", "snap")
@ -571,7 +570,6 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
} }
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
testutil.Ok(t, app.Rollback())
snap, err := ioutil.TempDir("", "snap") snap, err := ioutil.TempDir("", "snap")
testutil.Ok(t, err) testutil.Ok(t, err)
@ -939,10 +937,14 @@ func TestWALSegmentSizeOptions(t *testing.T) {
opts.WALSegmentSize = segmentSize opts.WALSegmentSize = segmentSize
db := openTestDB(t, opts, nil) db := openTestDB(t, opts, nil)
app := db.Appender()
for i := int64(0); i < 155; i++ { for i := int64(0); i < 155; i++ {
_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) app := db.Appender()
ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64())
testutil.Ok(t, err) testutil.Ok(t, err)
for j := int64(1); j <= 78; j++ {
err := app.AddFast(ref, i+j, rand.Float64())
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
} }
@ -2325,6 +2327,7 @@ func TestBlockRanges(t *testing.T) {
// Test that wal records are skipped when an existing block covers the same time ranges // Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block. // and compaction doesn't create an overlapping block.
app = db.Appender()
db.DisableCompactions() db.DisableCompactions()
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
testutil.Ok(t, err) testutil.Ok(t, err)

@ -44,6 +44,9 @@ var (
// ErrInvalidSample is returned if an appended sample is not valid and can't // ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested. // be ingested.
ErrInvalidSample = errors.New("invalid sample") ErrInvalidSample = errors.New("invalid sample")
// ErrAppenderClosed is returned if an appender has already be successfully
// rolled back or commited.
ErrAppenderClosed = errors.New("appender closed")
) )
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
@ -1093,6 +1096,7 @@ type headAppender struct {
sampleSeries []*memSeries sampleSeries []*memSeries
appendID, cleanupAppendIDsBelow uint64 appendID, cleanupAppendIDsBelow uint64
closed bool
} }
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -1193,7 +1197,11 @@ func (a *headAppender) log() error {
return nil return nil
} }
func (a *headAppender) Commit() error { func (a *headAppender) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
if err := a.log(); err != nil { if err := a.log(); err != nil {
//nolint: errcheck //nolint: errcheck
a.Rollback() // Most likely the same error will happen again. a.Rollback() // Most likely the same error will happen again.
@ -1231,7 +1239,11 @@ func (a *headAppender) Commit() error {
return nil return nil
} }
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
defer a.head.metrics.activeAppenders.Dec() defer a.head.metrics.activeAppenders.Dec()
defer a.head.iso.closeAppend(a.appendID) defer a.head.iso.closeAppend(a.appendID)
defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putSeriesBuffer(a.sampleSeries)

@ -1852,3 +1852,38 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
}) })
} }
} }
func TestErrReuseAppender(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
defer func() {
testutil.Ok(t, head.Close())
}()
app := head.Appender()
_, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Commit())
testutil.NotOk(t, app.Rollback())
app = head.Appender()
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit())
app = head.Appender()
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.NotOk(t, app.Rollback())
testutil.NotOk(t, app.Commit())
app = head.Appender()
_, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.NotOk(t, app.Commit())
testutil.NotOk(t, app.Rollback())
}

Loading…
Cancel
Save