Add Rollback() and docs to Appender interface

pull/5805/head
Fabian Reinartz 2017-01-12 20:17:49 +01:00
parent 22db9c3413
commit d970f0256a
2 changed files with 27 additions and 7 deletions

33
db.go
View File

@ -38,19 +38,24 @@ type Options struct {
WALFlushInterval time.Duration WALFlushInterval time.Duration
} }
// Appender allows committing batches of samples to a database. // Appender allows appending a batch of data. It must be completed with a
// The data held by the appender is reset after Commit returndb. // call to Commit or Rollback and must not be reused afterwards.
type Appender interface { type Appender interface {
// SetSeries registers a new known series label set with the appender // SetSeries ensures that a series with the given label set exists and
// and returns a reference number used to add samples to it over the // returns a unique reference number identifying it. Returned reference
// life time of the Appender. // numbers are ephemeral and may be rejected in calls to Add() at any point.
// A new reference number can then be requested with another call to
// SetSeries.
SetSeries(labels.Labels) (uint64, error) SetSeries(labels.Labels) (uint64, error)
// Add adds a sample pair for the referenced seriedb. // Add adds a sample pair for the referenced serie.
Add(ref uint64, t int64, v float64) error Add(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch. // Commit submits the collected samples and purges the batch.
Commit() error Commit() error
// Rollback rolls back all modifications made in the appender so far.
Rollback() error
} }
type hashedSample struct { type hashedSample struct {
@ -382,6 +387,12 @@ func (a *dbAppender) Commit() error {
return err return err
} }
func (a *dbAppender) Rollback() error {
err := a.head.Rollback()
a.db.mtx.RUnlock()
return err
}
func (db *DB) headForDir(dir string) (int, bool) { func (db *DB) headForDir(dir string) (int, bool) {
for i, b := range db.heads { for i, b := range db.heads {
if b.Dir() == dir { if b.Dir() == dir {
@ -646,13 +657,21 @@ func (a *partitionedAppender) Add(ref uint64, t int64, v float64) error {
func (a *partitionedAppender) Commit() error { func (a *partitionedAppender) Commit() error {
var merr MultiError var merr MultiError
// Spill buckets into partitiondb.
for _, p := range a.partitions { for _, p := range a.partitions {
merr.Add(p.Commit()) merr.Add(p.Commit())
} }
return merr.Err() return merr.Err()
} }
func (a *partitionedAppender) Rollback() error {
var merr MultiError
for _, p := range a.partitions {
merr.Add(p.Rollback())
}
return merr.Err()
}
// The MultiError type implements the error interface, and contains the // The MultiError type implements the error interface, and contains the
// Errors used to construct it. // Errors used to construct it.
type MultiError []error type MultiError []error

View File

@ -266,6 +266,7 @@ func (a *headAppender) Commit() error {
} }
func (a *headAppender) Rollback() error { func (a *headAppender) Rollback() error {
putHeadAppendBuffer(a.samples)
a.mtx.RUnlock() a.mtx.RUnlock()
return nil return nil
} }