diff --git a/db.go b/db.go index 34d1e884b..a376f465e 100644 --- a/db.go +++ b/db.go @@ -491,22 +491,13 @@ type dbAppender struct { samples int } -func (a *dbAppender) getAppender(ulid string) (*metaAppender, bool) { - for _, h := range a.heads { - if string(h.meta.ULID[:]) == ulid { - return h, true - } - } - return nil, false -} - type metaAppender struct { meta BlockMeta app Appender } func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { - h, err := a.appenderFor(t) + h, err := a.appenderAt(t) if err != nil { return "", err } @@ -524,9 +515,13 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error { return errors.Wrap(ErrNotFound, "invalid ref length") } // The first 16 bytes a ref hold the ULID of the head block. - h, ok := a.getAppender(ref[:16]) - if !ok { - return errors.Wrapf(ErrNotFound, "no block for ULID %s", ref[:16]) + h, err := a.appenderAt(t) + if err != nil { + return err + } + // Validate the ref points to the same block we got for t. + if string(h.meta.ULID[:]) != ref[:16] { + return ErrNotFound } if err := h.app.AddFast(ref[16:], t, v); err != nil { // The block the ref points to might fit the given timestamp. @@ -543,7 +538,7 @@ func (a *dbAppender) AddFast(ref string, t int64, v float64) error { // appenderFor gets the appender for the head containing timestamp t. // If the head block doesn't exist yet, it gets created. -func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { +func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { for _, h := range a.heads { if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) { return h, nil @@ -590,13 +585,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { // ensureHead makes sure that there is a head block for the timestamp t if // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { - mint, maxt := rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + var ( + mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + addBuffer = len(db.blocks) == 0 + last BlockMeta + ) - last := db.blocks[len(db.blocks)-1].Meta() + if !addBuffer { + last = db.blocks[len(db.blocks)-1].Meta() + addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) + } // Create another block of buffer in front if the DB is initialized or retrieving // new data after a long gap. // This ensures we always have a full block width if append window. - if len(db.blocks) == 0 || last.MaxTime <= mint-int64(db.opts.MinBlockDuration) { + if addBuffer { if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { return err } @@ -621,14 +623,22 @@ func (a *dbAppender) Commit() error { // Commits to partial appenders must be concurrent as concurrent appenders // may have conflicting locks on head appenders. - // XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error? - var g errgroup.Group + // For high-throughput use cases the errgroup causes significant blocking. Typically, + // we just deal with a single appender and special case it. + var err error - for _, h := range a.heads { - g.Go(h.app.Commit) + switch len(a.heads) { + case 1: + err = a.heads[0].app.Commit() + default: + var g errgroup.Group + for _, h := range a.heads { + g.Go(h.app.Commit) + } + err = g.Wait() } - if err := g.Wait(); err != nil { + if err != nil { return err } // XXX(fabxc): Push the metric down into head block to account properly