mirror of https://github.com/prometheus/prometheus
Simplify AddFast and avoid errgroup Commits in general
parent
fb5c5535fc
commit
debfe9b1e5
50
db.go
50
db.go
|
@ -491,22 +491,13 @@ type dbAppender struct {
|
||||||
samples int
|
samples int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) getAppender(ulid string) (*metaAppender, bool) {
|
|
||||||
for _, h := range a.heads {
|
|
||||||
if string(h.meta.ULID[:]) == ulid {
|
|
||||||
return h, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
type metaAppender struct {
|
type metaAppender struct {
|
||||||
meta BlockMeta
|
meta BlockMeta
|
||||||
app Appender
|
app Appender
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||||
h, err := a.appenderFor(t)
|
h, err := a.appenderAt(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -524,9 +515,13 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
return errors.Wrap(ErrNotFound, "invalid ref length")
|
return errors.Wrap(ErrNotFound, "invalid ref length")
|
||||||
}
|
}
|
||||||
// The first 16 bytes a ref hold the ULID of the head block.
|
// The first 16 bytes a ref hold the ULID of the head block.
|
||||||
h, ok := a.getAppender(ref[:16])
|
h, err := a.appenderAt(t)
|
||||||
if !ok {
|
if err != nil {
|
||||||
return errors.Wrapf(ErrNotFound, "no block for ULID %s", ref[:16])
|
return err
|
||||||
|
}
|
||||||
|
// Validate the ref points to the same block we got for t.
|
||||||
|
if string(h.meta.ULID[:]) != ref[:16] {
|
||||||
|
return ErrNotFound
|
||||||
}
|
}
|
||||||
if err := h.app.AddFast(ref[16:], t, v); err != nil {
|
if err := h.app.AddFast(ref[16:], t, v); err != nil {
|
||||||
// The block the ref points to might fit the given timestamp.
|
// The block the ref points to might fit the given timestamp.
|
||||||
|
@ -543,7 +538,7 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
|
||||||
|
|
||||||
// appenderFor gets the appender for the head containing timestamp t.
|
// appenderFor gets the appender for the head containing timestamp t.
|
||||||
// If the head block doesn't exist yet, it gets created.
|
// If the head block doesn't exist yet, it gets created.
|
||||||
func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) {
|
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
|
||||||
for _, h := range a.heads {
|
for _, h := range a.heads {
|
||||||
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
|
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
|
||||||
return h, nil
|
return h, nil
|
||||||
|
@ -590,13 +585,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
// ensureHead makes sure that there is a head block for the timestamp t if
|
// ensureHead makes sure that there is a head block for the timestamp t if
|
||||||
// it is within or after the currently appendable window.
|
// it is within or after the currently appendable window.
|
||||||
func (db *DB) ensureHead(t int64) error {
|
func (db *DB) ensureHead(t int64) error {
|
||||||
mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
var (
|
||||||
|
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
||||||
|
addBuffer = len(db.blocks) == 0
|
||||||
|
last BlockMeta
|
||||||
|
)
|
||||||
|
|
||||||
last := db.blocks[len(db.blocks)-1].Meta()
|
if !addBuffer {
|
||||||
|
last = db.blocks[len(db.blocks)-1].Meta()
|
||||||
|
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
|
||||||
|
}
|
||||||
// Create another block of buffer in front if the DB is initialized or retrieving
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||||
// new data after a long gap.
|
// new data after a long gap.
|
||||||
// This ensures we always have a full block width if append window.
|
// This ensures we always have a full block width if append window.
|
||||||
if len(db.blocks) == 0 || last.MaxTime <= mint-int64(db.opts.MinBlockDuration) {
|
if addBuffer {
|
||||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -621,14 +623,22 @@ func (a *dbAppender) Commit() error {
|
||||||
|
|
||||||
// Commits to partial appenders must be concurrent as concurrent appenders
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
||||||
// may have conflicting locks on head appenders.
|
// may have conflicting locks on head appenders.
|
||||||
// XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error?
|
// For high-throughput use cases the errgroup causes significant blocking. Typically,
|
||||||
var g errgroup.Group
|
// we just deal with a single appender and special case it.
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch len(a.heads) {
|
||||||
|
case 1:
|
||||||
|
err = a.heads[0].app.Commit()
|
||||||
|
default:
|
||||||
|
var g errgroup.Group
|
||||||
for _, h := range a.heads {
|
for _, h := range a.heads {
|
||||||
g.Go(h.app.Commit)
|
g.Go(h.app.Commit)
|
||||||
}
|
}
|
||||||
|
err = g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
if err := g.Wait(); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// XXX(fabxc): Push the metric down into head block to account properly
|
// XXX(fabxc): Push the metric down into head block to account properly
|
||||||
|
|
Loading…
Reference in New Issue