From 7369561305f4b895dff67a85859213e6ee4d57ef Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 18 Feb 2021 12:07:00 +0000 Subject: [PATCH] Combine Appender.Add and AddFast into a single Append method. (#8489) This moves the label lookup into TSDB, whilst still keeping the cached-ref optimisation for repeated Appends. This makes the API easier to consume and implement. In particular this change is motivated by the scrape-time-aggregation work, which I don't think is possible to implement without it as it needs access to label values. Signed-off-by: Tom Wilkie --- cmd/prometheus/main.go | 4 +- cmd/prometheus/main_test.go | 6 +- cmd/promtool/backfill.go | 2 +- cmd/promtool/tsdb.go | 25 ++--- promql/bench_test.go | 6 +- promql/engine_test.go | 7 +- promql/functions_test.go | 4 +- promql/test.go | 4 +- rules/manager.go | 6 +- rules/manager_test.go | 14 +-- scrape/helpers_test.go | 39 ++------ scrape/scrape.go | 82 +++++++-------- scrape/scrape_test.go | 8 +- scrape/target.go | 27 +---- storage/fanout.go | 19 +--- storage/fanout_test.go | 18 ++-- storage/interface.go | 15 ++- storage/remote/write.go | 10 +- storage/remote/write_handler.go | 2 +- storage/remote/write_handler_test.go | 6 +- tsdb/block_test.go | 9 +- tsdb/blockwriter_test.go | 4 +- tsdb/compact_test.go | 12 +-- tsdb/db_test.go | 143 ++++++++++++++------------- tsdb/head.go | 76 ++++++-------- tsdb/head_test.go | 104 +++++++++---------- tsdb/querier_bench_test.go | 4 +- tsdb/querier_test.go | 12 +-- tsdb/tsdbblockutil.go | 8 +- 29 files changed, 283 insertions(+), 393 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4c59aa7b5..964b0165c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1105,12 +1105,10 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender { type notReadyAppender struct{} -func (n notReadyAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { +func (n notReadyAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) { return 0, tsdb.ErrNotReady } -func (n notReadyAppender) AddFast(ref uint64, t int64, v float64) error { return tsdb.ErrNotReady } - func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index eb3049e11..7571234c4 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -265,11 +265,11 @@ func TestTimeMetrics(t *testing.T) { )) app := db.Appender(context.Background()) - _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1) + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 1) require.NoError(t, err) - _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1) + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 1) require.NoError(t, err) - _, err = app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1) + _, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 3000, 1) require.NoError(t, err) require.NoError(t, app.Commit()) diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index b04fb8b55..c0fc627ee 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -142,7 +142,7 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp l := labels.Labels{} p.Metric(&l) - if _, err := app.Add(l, *ts, v); err != nil { + if _, err := app.Append(0, l, *ts, v); err != nil { return errors.Wrap(err, "add sample") } diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 8d04eadab..c22e12d4b 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -35,7 +35,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -206,25 +205,19 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in for _, s := range scrape { s.value += 1000 - if s.ref == nil { - ref, err := app.Add(s.labels, ts, float64(s.value)) - if err != nil { - panic(err) - } - s.ref = &ref - } else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil { + var ref uint64 + if s.ref != nil { + ref = *s.ref + } - if errors.Cause(err) != storage.ErrNotFound { - panic(err) - } + ref, err := app.Append(ref, s.labels, ts, float64(s.value)) + if err != nil { + panic(err) + } - ref, err := app.Add(s.labels, ts, float64(s.value)) - if err != nil { - panic(err) - } + if s.ref == nil { s.ref = &ref } - total++ } if err := app.Commit(); err != nil { diff --git a/promql/bench_test.go b/promql/bench_test.go index f1ea31ceb..03111767e 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -71,10 +71,8 @@ func BenchmarkRangeQuery(b *testing.B) { a := storage.Appender(context.Background()) ts := int64(s * 10000) // 10s interval. for i, metric := range metrics { - err := a.AddFast(refs[i], ts, float64(s)) - if err != nil { - refs[i], _ = a.Add(metric, ts, float64(s)) - } + ref, _ := a.Append(refs[i], metric, ts, float64(s)) + refs[i] = ref } if err := a.Commit(); err != nil { b.Fatal(err) diff --git a/promql/engine_test.go b/promql/engine_test.go index 6074a48a6..c3a96a9c2 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -904,14 +904,15 @@ load 1ms // Add some samples with negative timestamp. db := test.TSDB() app := db.Appender(context.Background()) - ref, err := app.Add(lblsneg, -1000000, 1000) + ref, err := app.Append(0, lblsneg, -1000000, 1000) require.NoError(t, err) for ts := int64(-1000000 + 1000); ts <= 0; ts += 1000 { - require.NoError(t, app.AddFast(ref, ts, -float64(ts/1000)+1)) + _, err := app.Append(ref, nil, ts, -float64(ts/1000)+1) + require.NoError(t, err) } // To test the fix for https://github.com/prometheus/prometheus/issues/8433. - _, err = app.Add(labels.FromStrings("__name__", "metric_timestamp"), 3600*1000, 1000) + _, err = app.Append(0, labels.FromStrings("__name__", "metric_timestamp"), 3600*1000, 1000) require.NoError(t, err) require.NoError(t, app.Commit()) diff --git a/promql/functions_test.go b/promql/functions_test.go index ffdbd89d3..5707cbed3 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -43,8 +43,8 @@ func TestDeriv(t *testing.T) { a := storage.Appender(context.Background()) metric := labels.FromStrings("__name__", "foo") - a.Add(metric, 1493712816939, 1.0) - a.Add(metric, 1493712846939, 1.0) + a.Append(0, metric, 1493712816939, 1.0) + a.Append(0, metric, 1493712846939, 1.0) require.NoError(t, a.Commit()) diff --git a/promql/test.go b/promql/test.go index 77e133eed..8ee3fc150 100644 --- a/promql/test.go +++ b/promql/test.go @@ -307,7 +307,7 @@ func (cmd *loadCmd) append(a storage.Appender) error { m := cmd.metrics[h] for _, s := range smpls { - if _, err := a.Add(m, s.T, s.V); err != nil { + if _, err := a.Append(0, m, s.T, s.V); err != nil { return err } } @@ -732,7 +732,7 @@ func (ll *LazyLoader) appendTill(ts int64) error { ll.loadCmd.defs[h] = smpls[i:] break } - if _, err := app.Add(m, s.T, s.V); err != nil { + if _, err := app.Append(0, m, s.T, s.V); err != nil { return err } if i == len(smpls)-1 { diff --git a/rules/manager.go b/rules/manager.go index 129d47638..348aa0ce8 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -622,7 +622,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { g.seriesInPreviousEval[i] = seriesReturned }() for _, s := range vector { - if _, err := app.Add(s.Metric, s.T, s.V); err != nil { + if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil { switch errors.Cause(err) { case storage.ErrOutOfOrderSample: numOutOfOrder++ @@ -647,7 +647,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { for metric, lset := range g.seriesInPreviousEval[i] { if _, ok := seriesReturned[metric]; !ok { // Series no longer exposed, mark it stale. - _, err = app.Add(lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: @@ -673,7 +673,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { app := g.opts.Appendable.Appender(ctx) for _, s := range g.staleSeries { // Rule that produced series no longer configured, mark it stale. - _, err := app.Add(s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + _, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case nil: case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: diff --git a/rules/manager_test.go b/rules/manager_test.go index 587510d51..3f7810fa8 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -547,9 +547,9 @@ func TestStaleness(t *testing.T) { // A time series that has two samples and then goes stale. app := st.Appender(context.Background()) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) err = app.Commit() require.NoError(t, err) @@ -872,10 +872,10 @@ func TestNotify(t *testing.T) { }) app := storage.Appender(context.Background()) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3) - app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 6000, 0) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 2000, 3) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 5000, 3) + app.Append(0, labels.FromStrings(model.MetricNameLabel, "a"), 6000, 0) err = app.Commit() require.NoError(t, err) diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index b42d144d2..e82b41a61 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -28,10 +28,9 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender { type nopAppender struct{} -func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil } -func (a nopAppender) AddFast(uint64, int64, float64) error { return nil } -func (a nopAppender) Commit() error { return nil } -func (a nopAppender) Rollback() error { return nil } +func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, error) { return 0, nil } +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type sample struct { metric labels.Labels @@ -46,47 +45,25 @@ type collectResultAppender struct { result []sample pendingResult []sample rolledbackResult []sample - - mapper map[uint64]labels.Labels } -func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error { - if a.next == nil { - return storage.ErrNotFound - } - - err := a.next.AddFast(ref, t, v) - if err != nil { - return err - } +func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { a.pendingResult = append(a.pendingResult, sample{ - metric: a.mapper[ref], + metric: lset, t: t, v: v, }) - return err -} -func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) { - a.pendingResult = append(a.pendingResult, sample{ - metric: m, - t: t, - v: v, - }) if a.next == nil { return 0, nil } - if a.mapper == nil { - a.mapper = map[uint64]labels.Labels{} - } - - ref, err := a.next.Add(m, t, v) + ref, err := a.next.Append(ref, lset, t, v) if err != nil { return 0, err } - a.mapper[ref] = m - return ref, nil + + return ref, err } func (a *collectResultAppender) Commit() error { diff --git a/scrape/scrape.go b/scrape/scrape.go index bd89b69f6..bddcdf0d7 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -1306,20 +1306,19 @@ loop: continue } ce, ok := sl.cache.get(yoloString(met)) + var ( + ref uint64 + lset labels.Labels + mets string + hash uint64 + ) if ok { - err = app.AddFast(ce.ref, t, v) - _, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs) - // In theory this should never happen. - if err == storage.ErrNotFound { - ok = false - } - } - if !ok { - var lset labels.Labels - - mets := p.Metric(&lset) - hash := lset.Hash() + ref = ce.ref + lset = ce.lset + } else { + mets = p.Metric(&lset) + hash = lset.Hash() // Hash label set as it is seen local to the target. Then add target labels // and relabeling and store the final label set. @@ -1335,17 +1334,18 @@ loop: err = errNameLabelMandatory break loop } + } - var ref uint64 - ref, err = app.Add(lset, t, v) - sampleAdded, err = sl.checkAddError(nil, met, tp, err, &sampleLimitErr, &appErrs) - if err != nil { - if err != storage.ErrNotFound { - level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) - } - break loop + ref, err = app.Append(ref, lset, t, v) + sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs) + if err != nil { + if err != storage.ErrNotFound { + level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) } + break loop + } + if !ok { if tp == nil { // Bypass staleness logic if there is an explicit timestamp. sl.cache.trackStaleness(hash, lset) @@ -1380,7 +1380,7 @@ loop: if err == nil { sl.cache.forEachStale(func(lset labels.Labels) bool { // Series no longer exposed, mark it stale. - _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) + _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) switch errors.Cause(err) { case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: // Do not count these in logging, as this is expected if a target @@ -1497,37 +1497,31 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { ce, ok := sl.cache.get(s) + var ref uint64 + var lset labels.Labels if ok { - err := app.AddFast(ce.ref, t, v) - switch errors.Cause(err) { - case nil: - return nil - case storage.ErrNotFound: - // Try an Add. - case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: - // Do not log here, as this is expected if a target goes away and comes back - // again with a new scrape loop. - return nil - default: - return err + ref = ce.ref + lset = ce.lset + } else { + lset = labels.Labels{ + // The constants are suffixed with the invalid \xff unicode rune to avoid collisions + // with scraped metrics in the cache. + // We have to drop it when building the actual metric. + labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, } + lset = sl.reportSampleMutator(lset) } - lset := labels.Labels{ - // The constants are suffixed with the invalid \xff unicode rune to avoid collisions - // with scraped metrics in the cache. - // We have to drop it when building the actual metric. - labels.Label{Name: labels.MetricName, Value: s[:len(s)-1]}, - } - - hash := lset.Hash() - lset = sl.reportSampleMutator(lset) - ref, err := app.Add(lset, t, v) + ref, err := app.Append(ref, lset, t, v) switch errors.Cause(err) { case nil: - sl.cache.addRef(s, ref, lset, hash) + if !ok { + sl.cache.addRef(s, ref, lset, lset.Hash()) + } return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not log here, as this is expected if a target goes away and comes back + // again with a new scrape loop. return nil default: return err diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index ac972a6e9..928d820da 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1560,7 +1560,7 @@ type errorAppender struct { collectResultAppender } -func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (app *errorAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { switch lset.Get(model.MetricNameLabel) { case "out_of_order": return 0, storage.ErrOutOfOrderSample @@ -1569,14 +1569,10 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, e case "out_of_bounds": return 0, storage.ErrOutOfBounds default: - return app.collectResultAppender.Add(lset, t, v) + return app.collectResultAppender.Append(ref, lset, t, v) } } -func (app *errorAppender) AddFast(ref uint64, t int64, v float64) error { - return app.collectResultAppender.AddFast(ref, t, v) -} - func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { app := &errorAppender{} diff --git a/scrape/target.go b/scrape/target.go index 2b4c4301c..f3dd2d0c0 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -290,57 +290,38 @@ type limitAppender struct { i int } -func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (app *limitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return 0, errSampleLimit } } - ref, err := app.Appender.Add(lset, t, v) + ref, err := app.Appender.Append(ref, lset, t, v) if err != nil { return 0, err } return ref, nil } -func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error { - if !value.IsStaleNaN(v) { - app.i++ - if app.i > app.limit { - return errSampleLimit - } - } - err := app.Appender.AddFast(ref, t, v) - return err -} - type timeLimitAppender struct { storage.Appender maxTime int64 } -func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (app *timeLimitAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if t > app.maxTime { return 0, storage.ErrOutOfBounds } - ref, err := app.Appender.Add(lset, t, v) + ref, err := app.Appender.Append(ref, lset, t, v) if err != nil { return 0, err } return ref, nil } -func (app *timeLimitAppender) AddFast(ref uint64, t int64, v float64) error { - if t > app.maxTime { - return storage.ErrOutOfBounds - } - err := app.Appender.AddFast(ref, t, v) - return err -} - // populateLabels builds a label set from the given label set and scrape configuration. // It returns a label set before relabeling was applied as the second return value. // Returns the original discovered label set found before relabelling was applied if the target is dropped during relabeling. diff --git a/storage/fanout.go b/storage/fanout.go index 4bc3db12d..91872b67a 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -143,33 +143,20 @@ type fanoutAppender struct { secondaries []Appender } -func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { - ref, err := f.primary.Add(l, t, v) +func (f *fanoutAppender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) { + ref, err := f.primary.Append(ref, l, t, v) if err != nil { return ref, err } for _, appender := range f.secondaries { - if _, err := appender.Add(l, t, v); err != nil { + if _, err := appender.Append(ref, l, t, v); err != nil { return 0, err } } return ref, nil } -func (f *fanoutAppender) AddFast(ref uint64, t int64, v float64) error { - if err := f.primary.AddFast(ref, t, v); err != nil { - return err - } - - for _, appender := range f.secondaries { - if err := appender.AddFast(ref, t, v); err != nil { - return err - } - } - return nil -} - func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() diff --git a/storage/fanout_test.go b/storage/fanout_test.go index cd8877019..486f60f2f 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -36,11 +36,11 @@ func TestFanout_SelectSorted(t *testing.T) { priStorage := teststorage.New(t) defer priStorage.Close() app1 := priStorage.Appender(ctx) - app1.Add(inputLabel, 0, 0) + app1.Append(0, inputLabel, 0, 0) inputTotalSize++ - app1.Add(inputLabel, 1000, 1) + app1.Append(0, inputLabel, 1000, 1) inputTotalSize++ - app1.Add(inputLabel, 2000, 2) + app1.Append(0, inputLabel, 2000, 2) inputTotalSize++ err := app1.Commit() require.NoError(t, err) @@ -48,11 +48,11 @@ func TestFanout_SelectSorted(t *testing.T) { remoteStorage1 := teststorage.New(t) defer remoteStorage1.Close() app2 := remoteStorage1.Appender(ctx) - app2.Add(inputLabel, 3000, 3) + app2.Append(0, inputLabel, 3000, 3) inputTotalSize++ - app2.Add(inputLabel, 4000, 4) + app2.Append(0, inputLabel, 4000, 4) inputTotalSize++ - app2.Add(inputLabel, 5000, 5) + app2.Append(0, inputLabel, 5000, 5) inputTotalSize++ err = app2.Commit() require.NoError(t, err) @@ -61,11 +61,11 @@ func TestFanout_SelectSorted(t *testing.T) { defer remoteStorage2.Close() app3 := remoteStorage2.Appender(ctx) - app3.Add(inputLabel, 6000, 6) + app3.Append(0, inputLabel, 6000, 6) inputTotalSize++ - app3.Add(inputLabel, 7000, 7) + app3.Append(0, inputLabel, 7000, 7) inputTotalSize++ - app3.Add(inputLabel, 8000, 8) + app3.Append(0, inputLabel, 8000, 8) inputTotalSize++ err = app3.Commit() diff --git a/storage/interface.go b/storage/interface.go index 711c253a7..eba63b4ce 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -136,18 +136,15 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, // // Operations on the Appender interface are not goroutine-safe. type Appender interface { - // Add adds a sample pair for the given series. A reference number is - // returned which can be used to add further samples in the same or later - // transactions. + // Append adds a sample pair for the given series. + // An optional reference number can be provided to accelerate calls. + // A reference number is returned which can be used to add further + // samples in the same or later transactions. // Returned reference numbers are ephemeral and may be rejected in calls - // to AddFast() at any point. Adding the sample via Add() returns a new + // to Append() at any point. Adding the sample via Append() returns a new // reference number. // If the reference is 0 it must not be used for caching. - Add(l labels.Labels, t int64, v float64) (uint64, error) - - // AddFast adds a sample pair for the referenced series. It is generally - // faster than adding a sample by providing its full label set. - AddFast(ref uint64, t int64, v float64) error + Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) // Commit submits the collected samples and purges the batch. If Commit // returns a non-nil error, it also rolls back all modifications made in diff --git a/storage/remote/write.go b/storage/remote/write.go index 76c2a650c..c62b143a9 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -212,8 +212,8 @@ type timestampTracker struct { highestRecvTimestamp *maxTimestamp } -// Add implements storage.Appender. -func (t *timestampTracker) Add(_ labels.Labels, ts int64, _ float64) (uint64, error) { +// Append implements storage.Appender. +func (t *timestampTracker) Append(_ uint64, _ labels.Labels, ts int64, _ float64) (uint64, error) { t.samples++ if ts > t.highestTimestamp { t.highestTimestamp = ts @@ -221,12 +221,6 @@ func (t *timestampTracker) Add(_ labels.Labels, ts int64, _ float64) (uint64, er return 0, nil } -// AddFast implements storage.Appender. -func (t *timestampTracker) AddFast(_ uint64, ts int64, v float64) error { - _, err := t.Add(nil, ts, v) - return err -} - // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index c0538e2b2..435c99607 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -75,7 +75,7 @@ func (h *handler) write(ctx context.Context, req *prompb.WriteRequest) (err erro for _, ts := range req.Timeseries { labels := labelProtosToLabels(ts.Labels) for _, s := range ts.Samples { - _, err = app.Add(labels, s.Timestamp, s.Value) + _, err = app.Append(0, labels, s.Timestamp, s.Value) if err != nil { return err } diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index d6d66d0d3..a38ccd63b 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -115,7 +115,7 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } -func (m *mockAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) { +func (m *mockAppendable) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) { if t < m.latest { return 0, storage.ErrOutOfOrderSample } @@ -129,10 +129,6 @@ func (m *mockAppendable) Commit() error { return m.commitErr } -func (*mockAppendable) AddFast(uint64, int64, float64) error { - return fmt.Errorf("not implemented") -} - func (*mockAppendable) Rollback() error { return fmt.Errorf("not implemented") } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index b3fae03d2..3c2990cf3 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -448,15 +448,10 @@ func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir str for _, s := range series { ref := uint64(0) it := s.Iterator() + lset := s.Labels() for it.Next() { t, v := it.At() - if ref != 0 { - err := app.AddFast(ref, t, v) - if err == nil { - continue - } - } - ref, err = app.Add(s.Labels(), t, v) + ref, err = app.Append(ref, lset, t, v) require.NoError(tb, err) } require.NoError(tb, it.Err()) diff --git a/tsdb/blockwriter_test.go b/tsdb/blockwriter_test.go index 8f1dab73d..2be2d193c 100644 --- a/tsdb/blockwriter_test.go +++ b/tsdb/blockwriter_test.go @@ -43,10 +43,10 @@ func TestBlockWriter(t *testing.T) { // Add some series. app := w.Appender(ctx) ts1, v1 := int64(44), float64(7) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts1, v1) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts1, v1) require.NoError(t, err) ts2, v2 := int64(55), float64(12) - _, err = app.Add(labels.Labels{{Name: "c", Value: "d"}}, ts2, v2) + _, err = app.Append(0, labels.Labels{{Name: "c", Value: "d"}}, ts2, v2) require.NoError(t, err) require.NoError(t, app.Commit()) id, err := w.Flush(ctx) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 35785f505..a08464216 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1101,7 +1101,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { for ln := 0; ln < labelNames; ln++ { app := h.Appender(context.Background()) for lv := 0; lv < labelValues; lv++ { - app.Add(labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) + app.Append(0, labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) } require.NoError(b, app.Commit()) } @@ -1134,9 +1134,9 @@ func TestDisableAutoCompactions(t *testing.T) { db.DisableCompactions() app := db.Appender(context.Background()) for i := int64(0); i < 3; i++ { - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Append(0, label, i*blockRange, 0) require.NoError(t, err) - _, err = app.Add(label, i*blockRange+1000, 0) + _, err = app.Append(0, label, i*blockRange+1000, 0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1249,11 +1249,11 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { // Add some data to the head that is enough to trigger a compaction. app := db.Appender(context.Background()) - _, err := app.Add(defaultLabel, 1, 0) + _, err := app.Append(0, defaultLabel, 1, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, 2, 0) + _, err = app.Append(0, defaultLabel, 2, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) + _, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 6f8fb89bb..4849153de 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -173,7 +173,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) querier, err := db.Querier(context.TODO(), 0, 1) @@ -206,7 +206,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) { { for { app := db.Appender(ctx) - _, err := app.Add(labels.FromStrings("foo", "bar"), maxt, 0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), maxt, 0) expSamples = append(expSamples, sample{t: maxt, v: 0}) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -261,7 +261,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { }() app := db.Appender(context.Background()) - _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) err = app.Rollback() @@ -285,12 +285,13 @@ func TestDBAppenderAddRef(t *testing.T) { ctx := context.Background() app1 := db.Appender(ctx) - ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) + ref1, err := app1.Append(0, labels.FromStrings("a", "b"), 123, 0) require.NoError(t, err) // Reference should already work before commit. - err = app1.AddFast(ref1, 124, 1) + ref2, err := app1.Append(ref1, nil, 124, 1) require.NoError(t, err) + require.Equal(t, ref1, ref2) err = app1.Commit() require.NoError(t, err) @@ -298,20 +299,22 @@ func TestDBAppenderAddRef(t *testing.T) { app2 := db.Appender(ctx) // first ref should already work in next transaction. - err = app2.AddFast(ref1, 125, 0) + ref3, err := app2.Append(ref1, nil, 125, 0) require.NoError(t, err) + require.Equal(t, ref1, ref3) - ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1) + ref4, err := app2.Append(ref1, labels.FromStrings("a", "b"), 133, 1) require.NoError(t, err) - - require.Equal(t, ref1, ref2) + require.Equal(t, ref1, ref4) // Reference must be valid to add another sample. - err = app2.AddFast(ref2, 143, 2) + ref5, err := app2.Append(ref2, nil, 143, 2) require.NoError(t, err) + require.Equal(t, ref1, ref5) - err = app2.AddFast(9999999, 1, 1) - require.Equal(t, storage.ErrNotFound, errors.Cause(err)) + // Missing labels & invalid refs should fail. + _, err = app2.Append(9999999, nil, 1, 1) + require.Equal(t, ErrInvalidSample, errors.Cause(err)) require.NoError(t, app2.Commit()) @@ -340,11 +343,11 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) { ctx := context.Background() app1 := db.Appender(ctx) - ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) + ref1, err := app1.Append(0, labels.FromStrings("a", "b"), 123, 0) require.NoError(t, err) // Construct labels manually so there is an empty label. - ref2, err := app1.Add(labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0) + ref2, err := app1.Append(0, labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0) require.NoError(t, err) // Should be the same series. @@ -396,7 +399,7 @@ Outer: smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() - app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) + app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) } require.NoError(t, app.Commit()) @@ -452,12 +455,12 @@ func TestAmendDatapointCausesError(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 0) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 0) require.NoError(t, err) require.NoError(t, app.Commit()) app = db.Appender(ctx) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1) require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) require.NoError(t, app.Rollback()) } @@ -470,12 +473,12 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) require.NoError(t, err) require.NoError(t, app.Commit()) app = db.Appender(ctx) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.NaN()) require.NoError(t, err) } @@ -487,12 +490,12 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001)) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000001)) require.NoError(t, err) require.NoError(t, app.Commit()) app = db.Appender(ctx) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, math.Float64frombits(0x7ff0000000000002)) require.Equal(t, storage.ErrDuplicateSampleForTimestamp, err) } @@ -504,7 +507,7 @@ func TestEmptyLabelsetCausesError(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.Labels{}, 0, 0) + _, err := app.Append(0, labels.Labels{}, 0, 0) require.Error(t, err) require.Equal(t, "empty labelset: invalid sample", err.Error()) } @@ -518,9 +521,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { // Append AmendedValue. ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 1) require.NoError(t, err) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 0, 2) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -536,9 +539,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { // Append Out of Order Value. app = db.Appender(ctx) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 10, 3) require.NoError(t, err) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 7, 5) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -560,7 +563,7 @@ func TestDB_Snapshot(t *testing.T) { app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { - _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -610,7 +613,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { app := db.Appender(ctx) mint := int64(1414141414000) for i := 0; i < 1000; i++ { - _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -665,7 +668,7 @@ func TestDB_SnapshotWithDelete(t *testing.T) { smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() - app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) + app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) } require.NoError(t, app.Commit()) @@ -818,7 +821,7 @@ func TestDB_e2e(t *testing.T) { series = append(series, sample{ts, v}) - _, err := app.Add(lset, ts, v) + _, err := app.Append(0, lset, ts, v) require.NoError(t, err) ts += rand.Int63n(timeInterval) + 1 @@ -912,7 +915,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err := app.Add(lbls, 0, 1) + _, err := app.Append(0, lbls, 0, 1) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -986,10 +989,10 @@ func TestWALSegmentSizeOptions(t *testing.T) { for i := int64(0); i < 155; i++ { app := db.Appender(context.Background()) - ref, err := app.Add(labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64()) + ref, err := app.Append(0, labels.Labels{labels.Label{Name: "wal" + fmt.Sprintf("%d", i), Value: "size"}}, i, rand.Float64()) require.NoError(t, err) for j := int64(1); j <= 78; j++ { - err := app.AddFast(ref, i+j, rand.Float64()) + _, err := app.Append(ref, nil, i+j, rand.Float64()) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1013,7 +1016,7 @@ func TestTombstoneClean(t *testing.T) { smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() - app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) + app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) } require.NoError(t, app.Commit()) @@ -1356,7 +1359,7 @@ func TestSizeRetention(t *testing.T) { it := s.Iterator() for it.Next() { tim, v := it.At() - _, err := headApp.Add(s.Labels(), tim, v) + _, err := headApp.Append(0, s.Labels(), tim, v) require.NoError(t, err) } require.NoError(t, it.Err()) @@ -1472,7 +1475,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) for _, lbls := range labelpairs { - _, err := app.Add(lbls, 0, 1) + _, err := app.Append(0, lbls, 0, 1) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1657,9 +1660,9 @@ func TestChunkAtBlockBoundary(t *testing.T) { label := labels.FromStrings("foo", "bar") for i := int64(0); i < 3; i++ { - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Append(0, label, i*blockRange, 0) require.NoError(t, err) - _, err = app.Add(label, i*blockRange+1000, 0) + _, err = app.Append(0, label, i*blockRange+1000, 0) require.NoError(t, err) } @@ -1714,9 +1717,9 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { label := labels.FromStrings("foo", "bar") for i := int64(0); i < 5; i++ { - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Append(0, label, i*blockRange, 0) require.NoError(t, err) - _, err = app.Add(labels.FromStrings("blockID", strconv.FormatInt(i, 10)), i*blockRange, 0) + _, err = app.Append(0, labels.FromStrings("blockID", strconv.FormatInt(i, 10)), i*blockRange, 0) require.NoError(t, err) } @@ -1763,7 +1766,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { // First added sample initializes the writable range. ctx := context.Background() app := db.Appender(ctx) - _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) + _, err = app.Append(0, labels.FromStrings("a", "b"), 1000, 1) require.NoError(t, err) require.Equal(t, int64(1000), db.head.MinTime()) @@ -1880,11 +1883,11 @@ func TestNoEmptyBlocks(t *testing.T) { t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { app := db.Appender(ctx) - _, err := app.Add(defaultLabel, 1, 0) + _, err := app.Append(0, defaultLabel, 1, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, 2, 0) + _, err = app.Append(0, defaultLabel, 2, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) + _, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) @@ -1897,16 +1900,16 @@ func TestNoEmptyBlocks(t *testing.T) { require.Equal(t, 0, len(actBlocks)) app = db.Appender(ctx) - _, err = app.Add(defaultLabel, 1, 0) + _, err = app.Append(0, defaultLabel, 1, 0) require.Equal(t, storage.ErrOutOfBounds, err, "the head should be truncated so no samples in the past should be allowed") // Adding new blocks. currentTime := db.Head().MaxTime() - _, err = app.Add(defaultLabel, currentTime, 0) + _, err = app.Append(0, defaultLabel, currentTime, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, currentTime+1, 0) + _, err = app.Append(0, defaultLabel, currentTime+1, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) + _, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -1923,11 +1926,11 @@ func TestNoEmptyBlocks(t *testing.T) { oldBlocks := db.Blocks() app := db.Appender(ctx) currentTime := db.Head().MaxTime() - _, err := app.Add(defaultLabel, currentTime, 0) + _, err := app.Append(0, defaultLabel, currentTime, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, currentTime+1, 0) + _, err = app.Append(0, defaultLabel, currentTime+1, 0) require.NoError(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) + _, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0) require.NoError(t, err) require.NoError(t, app.Commit()) require.NoError(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) @@ -2009,7 +2012,7 @@ func TestDB_LabelNames(t *testing.T) { for i := mint; i <= maxt; i++ { for _, tuple := range sampleLabels { label := labels.FromStrings(tuple[0], tuple[1]) - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Append(0, label, i*blockRange, 0) require.NoError(t, err) } } @@ -2076,7 +2079,7 @@ func TestCorrectNumTombstones(t *testing.T) { app := db.Appender(ctx) for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(defaultLabel, i*blockRange+j, 0) + _, err := app.Append(0, defaultLabel, i*blockRange+j, 0) require.NoError(t, err) } } @@ -2129,16 +2132,16 @@ func TestBlockRanges(t *testing.T) { }() app := db.Appender(ctx) lbl := labels.Labels{{Name: "a", Value: "b"}} - _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) + _, err = app.Append(0, lbl, firstBlockMaxT-1, rand.Float64()) if err == nil { t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") } - _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64()) + _, err = app.Append(0, lbl, firstBlockMaxT+1, rand.Float64()) require.NoError(t, err) - _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64()) + _, err = app.Append(0, lbl, firstBlockMaxT+2, rand.Float64()) require.NoError(t, err) secondBlockMaxt := firstBlockMaxT + rangeToTriggerCompaction - _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction + _, err = app.Append(0, lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction require.NoError(t, err) require.NoError(t, app.Commit()) @@ -2158,13 +2161,13 @@ func TestBlockRanges(t *testing.T) { // and compaction doesn't create an overlapping block. app = db.Appender(ctx) db.DisableCompactions() - _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) + _, err = app.Append(0, lbl, secondBlockMaxt+1, rand.Float64()) require.NoError(t, err) - _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64()) + _, err = app.Append(0, lbl, secondBlockMaxt+2, rand.Float64()) require.NoError(t, err) - _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) + _, err = app.Append(0, lbl, secondBlockMaxt+3, rand.Float64()) require.NoError(t, err) - _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) + _, err = app.Append(0, lbl, secondBlockMaxt+4, rand.Float64()) require.NoError(t, err) require.NoError(t, app.Commit()) require.NoError(t, db.Close()) @@ -2180,7 +2183,7 @@ func TestBlockRanges(t *testing.T) { require.Equal(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") app = db.Appender(ctx) - _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction + _, err = app.Append(0, lbl, thirdBlockMaxt+rangeToTriggerCompaction, rand.Float64()) // Trigger a compaction require.NoError(t, err) require.NoError(t, app.Commit()) for x := 0; x < 100; x++ { @@ -2247,7 +2250,7 @@ func TestDBReadOnly(t *testing.T) { dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir()) require.NoError(t, err) app := dbWritable.Appender(context.Background()) - _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -2348,7 +2351,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { app := db.Appender(ctx) maxt = 1000 for i := 0; i < maxt; i++ { - _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) + _, err := app.Append(0, labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -2418,7 +2421,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) { app := db.Appender(ctx) for j := 0; j < 100; j++ { - _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) + _, err := app.Append(0, labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) require.NoError(t, err) } err = app.Commit() @@ -2483,7 +2486,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) - _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000) @@ -2808,7 +2811,7 @@ func TestCompactHead(t *testing.T) { maxt := 100 for i := 0; i < maxt; i++ { val := rand.Float64() - _, err := app.Add(labels.FromStrings("a", "b"), int64(i), val) + _, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), val) require.NoError(t, err) expSamples = append(expSamples, sample{int64(i), val}) } @@ -3008,9 +3011,9 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { // Append samples spanning 59 block ranges. app := db.Appender(context.Background()) for i := int64(0); i < 60; i++ { - _, err := app.Add(lbls, blockRange*i, rand.Float64()) + _, err := app.Append(0, lbls, blockRange*i, rand.Float64()) require.NoError(t, err) - _, err = app.Add(lbls, (blockRange*i)+blockRange/2, rand.Float64()) + _, err = app.Append(0, lbls, (blockRange*i)+blockRange/2, rand.Float64()) require.NoError(t, err) // Rotate the WAL file so that there is >3 files for checkpoint to happen. require.NoError(t, db.head.wal.NextSegment()) @@ -3067,7 +3070,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { // Adding sample way into the future. app = db.Appender(context.Background()) - _, err = app.Add(lbls, blockRange*120, rand.Float64()) + _, err = app.Append(0, lbls, blockRange*120, rand.Float64()) require.NoError(t, err) require.NoError(t, app.Commit()) diff --git a/tsdb/head.go b/tsdb/head.go index db40eb89e..31e644da2 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1052,21 +1052,14 @@ type initAppender struct { head *Head } -func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if a.app != nil { - return a.app.Add(lset, t, v) + return a.app.Append(ref, lset, t, v) } + a.head.initTime(t) a.app = a.head.appender() - - return a.app.Add(lset, t, v) -} - -func (a *initAppender) AddFast(ref uint64, t int64, v float64) error { - if a.app == nil { - return storage.ErrNotFound - } - return a.app.AddFast(ref, t, v) + return a.app.Append(ref, lset, t, v) } func (a *initAppender) Commit() error { @@ -1178,54 +1171,45 @@ type headAppender struct { closed bool } -func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { +func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { if t < a.minValidTime { a.head.metrics.outOfBoundSamples.Inc() return 0, storage.ErrOutOfBounds } - // Ensure no empty labels have gotten through. - lset = lset.WithoutEmpty() - - if len(lset) == 0 { - return 0, errors.Wrap(ErrInvalidSample, "empty labelset") - } - - if l, dup := lset.HasDuplicateLabelNames(); dup { - return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) - } - - s, created, err := a.head.getOrCreate(lset.Hash(), lset) - if err != nil { - return 0, err - } + s := a.head.series.getByID(ref) + if s == nil { + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + if len(lset) == 0 { + return 0, errors.Wrap(ErrInvalidSample, "empty labelset") + } - if created { - a.series = append(a.series, record.RefSeries{ - Ref: s.ref, - Labels: lset, - }) - } - return s.ref, a.AddFast(s.ref, t, v) -} + if l, dup := lset.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } -func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - if t < a.minValidTime { - a.head.metrics.outOfBoundSamples.Inc() - return storage.ErrOutOfBounds + var created bool + var err error + s, created, err = a.head.getOrCreate(lset.Hash(), lset) + if err != nil { + return 0, err + } + if created { + a.series = append(a.series, record.RefSeries{ + Ref: s.ref, + Labels: lset, + }) + } } - s := a.head.series.getByID(ref) - if s == nil { - return errors.Wrap(storage.ErrNotFound, "unknown series") - } s.Lock() if err := s.appendable(t, v); err != nil { s.Unlock() if err == storage.ErrOutOfOrderSample { a.head.metrics.outOfOrderSamples.Inc() } - return err + return 0, err } s.pendingCommit = true s.Unlock() @@ -1238,12 +1222,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } a.samples = append(a.samples, record.RefSample{ - Ref: ref, + Ref: s.ref, T: t, V: v, }) a.sampleSeries = append(a.sampleSeries, s) - return nil + return s.ref, nil } func (a *headAppender) log() error { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f534adc59..d47a48790 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -275,14 +275,14 @@ func TestHead_WALMultiRef(t *testing.T) { require.NoError(t, head.Init(0)) app := head.Appender(context.Background()) - ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1) + ref1, err := app.Append(0, labels.FromStrings("foo", "bar"), 100, 1) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. app = head.Appender(context.Background()) - _, err = app.Add(labels.FromStrings("foo", "bar"), 1500, 2) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), 1500, 2) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) @@ -290,14 +290,14 @@ func TestHead_WALMultiRef(t *testing.T) { require.NoError(t, head.Truncate(1600)) app = head.Appender(context.Background()) - ref2, err := app.Add(labels.FromStrings("foo", "bar"), 1700, 3) + ref2, err := app.Append(0, labels.FromStrings("foo", "bar"), 1700, 3) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) // Add another sample outside chunk range to mmap a chunk. app = head.Appender(context.Background()) - _, err = app.Add(labels.FromStrings("foo", "bar"), 2000, 4) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), 2000, 4) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) @@ -569,7 +569,7 @@ func TestHeadDeleteSimple(t *testing.T) { app := head.Appender(context.Background()) for _, smpl := range smplsAll { - _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v) require.NoError(t, err) } @@ -583,7 +583,7 @@ func TestHeadDeleteSimple(t *testing.T) { // Add more samples. app = head.Appender(context.Background()) for _, smpl := range c.addSamples { - _, err := app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + _, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v) require.NoError(t, err) } @@ -655,7 +655,7 @@ func TestDeleteUntilCurMax(t *testing.T) { smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -676,7 +676,7 @@ func TestDeleteUntilCurMax(t *testing.T) { // Add again and test for presence. app = hb.Appender(context.Background()) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 11, 1) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 11, 1) require.NoError(t, err) require.NoError(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) @@ -702,7 +702,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { for i := 0; i < numSamples; i++ { app := hb.Appender(context.Background()) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) require.NoError(t, err) require.NoError(t, app.Commit()) } @@ -800,7 +800,7 @@ func TestDelete_e2e(t *testing.T) { ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { v := rand.Float64() - _, err := app.Add(ls, ts, v) + _, err := app.Append(0, ls, ts, v) require.NoError(t, err) series = append(series, sample{ts, v}) ts += rand.Int63n(timeInterval) + 1 @@ -1145,7 +1145,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { app := h.appender() lset := labels.FromStrings("a", "1") - _, err := app.Add(lset, 2100, 1) + _, err := app.Append(0, lset, 2100, 1) require.NoError(t, err) require.NoError(t, h.Truncate(2000)) @@ -1175,7 +1175,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { app := h.appender() lset := labels.FromStrings("a", "1") - _, err := app.Add(lset, 2100, 1) + _, err := app.Append(0, lset, 2100, 1) require.NoError(t, err) require.NoError(t, h.Truncate(2000)) @@ -1205,7 +1205,7 @@ func TestHead_LogRollback(t *testing.T) { }() app := h.Appender(context.Background()) - _, err := app.Add(labels.FromStrings("a", "b"), 1, 2) + _, err := app.Append(0, labels.FromStrings("a", "b"), 1, 2) require.NoError(t, err) require.NoError(t, app.Rollback()) @@ -1397,7 +1397,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { }() add := func(ts int64) { app := h.Appender(context.Background()) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, ts, 0) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts, 0) require.NoError(t, err) require.NoError(t, app.Commit()) } @@ -1428,7 +1428,7 @@ func TestAddDuplicateLabelName(t *testing.T) { add := func(labels labels.Labels, labelName string) { app := h.Appender(context.Background()) - _, err := app.Add(labels, 0, 0) + _, err := app.Append(0, labels, 0, 0) require.Error(t, err) require.Equal(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) } @@ -1489,7 +1489,7 @@ func TestMemSeriesIsolation(t *testing.T) { app = a } - _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Commit()) } @@ -1517,7 +1517,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 500. app := hb.appender() app.cleanupAppendIDsBelow = 500 - _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Commit()) i++ @@ -1536,7 +1536,7 @@ func TestMemSeriesIsolation(t *testing.T) { // the only thing with appendIDs. app = hb.appender() app.cleanupAppendIDsBelow = 1000 - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, 999, lastValue(hb, 998)) @@ -1550,7 +1550,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1001, but with a rollback. app = hb.appender() app.cleanupAppendIDsBelow = 1001 - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Rollback()) require.Equal(t, 1000, lastValue(hb, 999)) @@ -1586,7 +1586,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1000, which means the sample buffer is // the only thing with appendIDs. app = hb.appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) i++ require.NoError(t, err) require.NoError(t, app.Commit()) @@ -1599,7 +1599,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup appendIDs below 1002, but with a rollback. app = hb.appender() - _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err) require.NoError(t, app.Rollback()) require.Equal(t, 1001, lastValue(hb, 999)) @@ -1617,21 +1617,21 @@ func TestIsolationRollback(t *testing.T) { }() app := hb.Appender(context.Background()) - _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, uint64(1), hb.iso.lowWatermark()) app = hb.Appender(context.Background()) - _, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1) require.NoError(t, err) - _, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) + _, err = app.Append(0, labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) require.Error(t, err) require.NoError(t, app.Rollback()) require.Equal(t, uint64(2), hb.iso.lowWatermark()) app = hb.Appender(context.Background()) - _, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3) + _, err = app.Append(0, labels.FromStrings("foo", "bar"), 3, 3) require.NoError(t, err) require.NoError(t, app.Commit()) require.Equal(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.") @@ -1644,18 +1644,18 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { }() app1 := hb.Appender(context.Background()) - _, err := app1.Add(labels.FromStrings("foo", "bar"), 0, 0) + _, err := app1.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) require.NoError(t, app1.Commit()) require.Equal(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") app1 = hb.Appender(context.Background()) - _, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1) + _, err = app1.Append(0, labels.FromStrings("foo", "bar"), 1, 1) require.NoError(t, err) require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.") app2 := hb.Appender(context.Background()) - _, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1) + _, err = app2.Append(0, labels.FromStrings("foo", "baz"), 1, 1) require.NoError(t, err) require.NoError(t, app2.Commit()) require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.") @@ -1701,7 +1701,7 @@ func TestIsolationWithoutAdd(t *testing.T) { require.NoError(t, app.Commit()) app = hb.Appender(context.Background()) - _, err := app.Add(labels.FromStrings("foo", "baz"), 1, 1) + _, err := app.Append(0, labels.FromStrings("foo", "baz"), 1, 1) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -1725,7 +1725,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { ctx := context.Background() app := db.Appender(ctx) for i := 1; i <= 5; i++ { - _, err = app.Add(labels.FromStrings("a", "b"), int64(i), 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1733,22 +1733,22 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { // Test out of order metric. require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) app = db.Appender(ctx) - _, err = app.Add(labels.FromStrings("a", "b"), 2, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), 3, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), 4, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) require.NoError(t, app.Commit()) // Compact Head to test out of bound metric. app = db.Appender(ctx) - _, err = app.Add(labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -1757,11 +1757,11 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { require.Greater(t, db.head.minValidTime.Load(), int64(0)) app = db.Appender(ctx) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) require.Equal(t, storage.ErrOutOfBounds, err) require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99) require.Equal(t, storage.ErrOutOfBounds, err) require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) require.NoError(t, app.Commit()) @@ -1769,22 +1769,22 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { // Some more valid samples for out of order. app = db.Appender(ctx) for i := 1; i <= 5; i++ { - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) require.NoError(t, err) } require.NoError(t, app.Commit()) // Test out of order metric. app = db.Appender(ctx) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) - _, err = app.Add(labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99) + _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99) require.Equal(t, storage.ErrOutOfOrderSample, err) require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) require.NoError(t, app.Commit()) @@ -1798,10 +1798,10 @@ func testHeadSeriesChunkRace(t *testing.T) { require.NoError(t, h.Init(0)) app := h.Appender(context.Background()) - s2, err := app.Add(labels.FromStrings("foo2", "bar"), 5, 0) + s2, err := app.Append(0, labels.FromStrings("foo2", "bar"), 5, 0) require.NoError(t, err) for ts := int64(6); ts < 11; ts++ { - err = app.AddFast(s2, ts, 0) + _, err = app.Append(s2, nil, ts, 0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1847,7 +1847,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { app := head.Appender(context.Background()) for i, name := range expectedLabelNames { - _, err := app.Add(labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) + _, err := app.Append(0, labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) require.NoError(t, err) } require.NoError(t, app.Commit()) @@ -1892,7 +1892,7 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) { app := head.Appender(context.Background()) for i := 0; i < 100; i++ { - _, err := app.Add(labels.Labels{ + _, err := app.Append(0, labels.Labels{ {Name: "unique", Value: fmt.Sprintf("value%d", i)}, {Name: "tens", Value: fmt.Sprintf("value%d", i/10)}, }, 100, 0) @@ -1952,28 +1952,28 @@ func TestErrReuseAppender(t *testing.T) { }() app := head.Appender(context.Background()) - _, err := app.Add(labels.Labels{{Name: "test", Value: "test"}}, 0, 0) + _, err := app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 0, 0) require.NoError(t, err) require.NoError(t, app.Commit()) require.Error(t, app.Commit()) require.Error(t, app.Rollback()) app = head.Appender(context.Background()) - _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 1, 0) + _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 1, 0) require.NoError(t, err) require.NoError(t, app.Rollback()) require.Error(t, app.Rollback()) require.Error(t, app.Commit()) app = head.Appender(context.Background()) - _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 2, 0) + _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 2, 0) require.NoError(t, err) require.NoError(t, app.Commit()) require.Error(t, app.Rollback()) require.Error(t, app.Commit()) app = head.Appender(context.Background()) - _, err = app.Add(labels.Labels{{Name: "test", Value: "test"}}, 3, 0) + _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 3, 0) require.NoError(t, err) require.NoError(t, app.Rollback()) require.Error(t, app.Commit()) @@ -1985,11 +1985,11 @@ func TestHeadMintAfterTruncation(t *testing.T) { head, _ := newTestHead(t, chunkRange, false) app := head.Appender(context.Background()) - _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 100, 100) + _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) require.NoError(t, err) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 4000, 200) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 4000, 200) require.NoError(t, err) - _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 8000, 300) + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 8000, 300) require.NoError(t, err) require.NoError(t, app.Commit()) @@ -2023,7 +2023,7 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { metricCount := 1000000 for i := 0; i < metricCount; i++ { - _, err := app.Add(labels.Labels{ + _, err := app.Append(0, labels.Labels{ {Name: "unique", Value: fmt.Sprintf("value%d", i)}, {Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, {Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1" diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 6002c6c7a..a04b1d3e4 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -48,7 +48,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { app := h.Appender(context.Background()) addSeries := func(l labels.Labels) { - app.Add(l, 0, 0) + app.Append(0, l, 0, 0) } for n := 0; n < 10; n++ { @@ -158,7 +158,7 @@ func BenchmarkQuerierSelect(b *testing.B) { app := h.Appender(context.Background()) numSeries := 1000000 for i := 0; i < numSeries; i++ { - app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) + app.Append(0, labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%d%s", i, postingsBenchSuffix)), int64(i), 0) } require.NoError(b, app.Commit()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index f5697201b..67346694d 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -417,7 +417,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) { for _, s := range testData { for _, chk := range s.chunks { for _, sample := range chk { - _, err = app.Add(labels.FromMap(s.lset), sample.t, sample.v) + _, err = app.Append(0, labels.FromMap(s.lset), sample.t, sample.v) require.NoError(t, err) } } @@ -1579,11 +1579,11 @@ func TestPostingsForMatchers(t *testing.T) { }() app := h.Appender(context.Background()) - app.Add(labels.FromStrings("n", "1"), 0, 0) - app.Add(labels.FromStrings("n", "1", "i", "a"), 0, 0) - app.Add(labels.FromStrings("n", "1", "i", "b"), 0, 0) - app.Add(labels.FromStrings("n", "2"), 0, 0) - app.Add(labels.FromStrings("n", "2.5"), 0, 0) + app.Append(0, labels.FromStrings("n", "1"), 0, 0) + app.Append(0, labels.FromStrings("n", "1", "i", "a"), 0, 0) + app.Append(0, labels.FromStrings("n", "1", "i", "b"), 0, 0) + app.Append(0, labels.FromStrings("n", "2"), 0, 0) + app.Append(0, labels.FromStrings("n", "2.5"), 0, 0) require.NoError(t, app.Commit()) cases := []struct { diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index be2c63f9f..8cc0dd195 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -50,14 +50,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l for _, s := range series { ref := uint64(0) it := s.Iterator() + lset := s.Labels() for it.Next() { t, v := it.At() - if ref != 0 { - if err := app.AddFast(ref, t, v); err == nil { - continue - } - } - ref, err = app.Add(s.Labels(), t, v) + ref, err = app.Append(ref, lset, t, v) if err != nil { return "", err }