diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 5df30c2c5..b89d9c23a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -20,7 +20,6 @@ import ( _ "net/http/pprof" // Comment this line to disable pprof endpoint. "os" "os/signal" - "runtime/trace" "syscall" "time" @@ -61,18 +60,6 @@ func init() { // Main manages the stup and shutdown lifecycle of the entire Prometheus server. func Main() int { - go func() { - f, err := os.Create("trace") - if err != nil { - panic(err) - } - if err := trace.Start(f); err != nil { - panic(err) - } - time.Sleep(30 * time.Second) - trace.Stop() - f.Close() - }() if err := parse(os.Args[1:]); err != nil { log.Error(err) return 2 @@ -91,7 +78,11 @@ func Main() int { reloadables []Reloadable ) - localStorage, err := tsdb.Open(cfg.localStoragePath) + localStorage, err := tsdb.Open(cfg.localStoragePath, &tsdb.Options{ + MinBlockDuration: 2 * 60 * 60 * 1000, + MaxBlockDuration: 24 * 60 * 60 * 1000, + AppendableBlocks: 2, + }) if err != nil { log.Errorf("Opening storage failed: %s", err) return 1 diff --git a/promql/test.go b/promql/test.go index 72f0f272d..12b0e8516 100644 --- a/promql/test.go +++ b/promql/test.go @@ -285,11 +285,7 @@ func (cmd *loadCmd) append(a storage.Appender) error { m := cmd.metrics[h] for _, s := range smpls { - ref, err := a.SetSeries(m) - if err != nil { - return err - } - if err := a.Add(ref, s.T, s.V); err != nil { + if _, err := a.Add(m, s.T, s.V); err != nil { return err } } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 81421d585..e9492acbb 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -514,6 +514,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro defTime = timestamp.FromTime(ts) ) +loop: for p.Next() { total++ @@ -526,32 +527,32 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro mets := string(met) ref, ok := sl.cache[mets] if ok { - if err = app.Add(ref, t, v); err == nil { - added++ + switch err = app.AddFast(ref, t, v); err { + case nil: + case storage.ErrNotFound: + ok = false + case errSeriesDropped: continue - } else if err != storage.ErrNotFound { - break + default: + break loop } - ok = false } if !ok { var lset labels.Labels p.Metric(&lset) - ref, err = app.SetSeries(lset) + ref, err = app.Add(lset, t, v) // TODO(fabxc): also add a dropped-cache? - if err == errSeriesDropped { + switch err { + case nil: + case errSeriesDropped: continue + default: + break loop } - if err != nil { - break - } - if err = app.Add(ref, t, v); err != nil { - break - } - added++ + sl.cache[mets] = ref } - sl.cache[mets] = ref + added++ } if err == nil { err = p.Err() @@ -601,20 +602,19 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, ok := sl.cache[s] if ok { - if err := app.Add(ref, t, v); err != storage.ErrNotFound { + if err := app.AddFast(ref, t, v); err == nil { + return nil + } else if err != storage.ErrNotFound { return err } } met := labels.Labels{ labels.Label{Name: labels.MetricName, Value: s}, } - ref, err := app.SetSeries(met) + ref, err := app.Add(met, t, v) if err != nil { return err } - if err = app.Add(ref, t, v); err != nil { - return err - } sl.cache[s] = ref return nil diff --git a/retrieval/target.go b/retrieval/target.go index c32a12aab..4c206ad76 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -235,12 +235,24 @@ type limitAppender struct { i int } -func (app *limitAppender) Add(ref uint64, t int64, v float64) error { +func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + if app.i+1 > app.limit { + return 0, errors.New("sample limit exceeded") + } + ref, err := app.Appender.Add(lset, t, v) + if err != nil { + return 0, fmt.Errorf("sample limit of %d exceeded", app.limit) + } + app.i++ + return ref, nil +} + +func (app *limitAppender) AddFast(ref uint64, t int64, v float64) error { if app.i+1 > app.limit { return errors.New("sample limit exceeded") } - if err := app.Appender.Add(ref, t, v); err != nil { + if err := app.Appender.AddFast(ref, t, v); err != nil { return fmt.Errorf("sample limit of %d exceeded", app.limit) } app.i++ @@ -254,7 +266,7 @@ type ruleLabelsAppender struct { labels labels.Labels } -func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { +func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { lb := labels.NewBuilder(lset) for _, l := range app.labels { @@ -265,7 +277,7 @@ func (app ruleLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { lb.Set(l.Name, l.Value) } - return app.Appender.SetSeries(lb.Labels()) + return app.Appender.Add(lb.Labels(), t, v) } type honorLabelsAppender struct { @@ -276,7 +288,7 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { +func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { lb := labels.NewBuilder(lset) for _, l := range app.labels { @@ -284,7 +296,7 @@ func (app honorLabelsAppender) SetSeries(lset labels.Labels) (uint64, error) { lb.Set(l.Name, l.Value) } } - return app.Appender.SetSeries(lb.Labels()) + return app.Appender.Add(lb.Labels(), t, v) } // Applies a set of relabel configurations to the sample's metric @@ -296,12 +308,12 @@ type relabelAppender struct { var errSeriesDropped = errors.New("series dropped") -func (app relabelAppender) SetSeries(lset labels.Labels) (uint64, error) { +func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { lset = relabel.Process(lset, app.relabelings...) if lset == nil { return 0, errSeriesDropped } - return app.Appender.SetSeries(lset) + return app.Appender.Add(lset, t, v) } // populateLabels builds a label set from the given label set and scrape configuration. diff --git a/rules/manager.go b/rules/manager.go index 5c3ebbfb2..b158764b8 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -282,21 +282,16 @@ func (g *Group) Eval() { } for _, s := range vector { - ref, err := app.SetSeries(s.Metric) - if err != nil { - log.With("sample", s).With("error", err).Warn("Setting metric failed") - continue - } - if err := app.Add(ref, s.T, s.V); err != nil { + if _, err := app.Add(s.Metric, s.T, s.V); err != nil { switch err { case storage.ErrOutOfOrderSample: numOutOfOrder++ - log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") + log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded") case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ - log.With("sample", s).With("error", err).Debug("Rule evaluation result discarded") + log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded") default: - log.With("sample", s).With("error", err).Warn("Rule evaluation result discarded") + log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded") } } } diff --git a/storage/interface.go b/storage/interface.go index 7c54231cc..b96e70dc3 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -52,10 +52,9 @@ type Querier interface { // Appender provides batched appends against a storage. type Appender interface { - SetSeries(labels.Labels) (uint64, error) + Add(l labels.Labels, t int64, v float64) (uint64, error) - // Add adds a sample pair for the referenced series. - Add(ref uint64, t int64, v float64) error + AddFast(ref uint64, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 01098c67c..b6552a7c6 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -1,6 +1,7 @@ package tsdb import ( + "time" "unsafe" "github.com/fabxc/tsdb" @@ -14,9 +15,34 @@ type adapter struct { db *tsdb.PartitionedDB } +// Options of the DB storage. +type Options struct { + // The interval at which the write ahead log is flushed to disc. + WALFlushInterval time.Duration + + // The timestamp range of head blocks after which they get persisted. + // It's the minimum duration of any persisted block. + MinBlockDuration uint64 + + // The maximum timestamp range of compacted blocks. + MaxBlockDuration uint64 + + // Number of head blocks that can be appended to. + // Should be two or higher to prevent write errors in general scenarios. + // + // After a new block is started for timestamp t0 or higher, appends with + // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. + AppendableBlocks int +} + // Open returns a new storage backed by a tsdb database. -func Open(path string) (storage.Storage, error) { - db, err := tsdb.OpenPartitioned(path, 1, nil, nil) +func Open(path string, opts *Options) (storage.Storage, error) { + db, err := tsdb.OpenPartitioned(path, 1, nil, &tsdb.Options{ + WALFlushInterval: 10 * time.Second, + MinBlockDuration: opts.MinBlockDuration, + MaxBlockDuration: opts.MaxBlockDuration, + AppendableBlocks: opts.AppendableBlocks, + }) if err != nil { return nil, err } @@ -73,12 +99,22 @@ type appender struct { a tsdb.Appender } -func (a appender) SetSeries(lset labels.Labels) (uint64, error) { - return a.a.SetSeries(toTSDBLabels(lset)) +func (a appender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + ref, err := a.a.Add(toTSDBLabels(lset), t, v) + + switch err { + case tsdb.ErrNotFound: + return 0, storage.ErrNotFound + case tsdb.ErrOutOfOrderSample: + return 0, storage.ErrOutOfOrderSample + case tsdb.ErrAmendSample: + return 0, storage.ErrDuplicateSampleForTimestamp + } + return ref, err } -func (a appender) Add(ref uint64, t int64, v float64) error { - err := a.a.Add(ref, t, v) +func (a appender) AddFast(ref uint64, t int64, v float64) error { + err := a.a.AddFast(ref, t, v) switch err { case tsdb.ErrNotFound: diff --git a/util/testutil/storage.go b/util/testutil/storage.go index 34181ca28..d37ac361e 100644 --- a/util/testutil/storage.go +++ b/util/testutil/storage.go @@ -19,7 +19,11 @@ func NewStorage(t T) storage.Storage { log.With("dir", dir).Debugln("opening test storage") - db, err := tsdb.Open(dir) + db, err := tsdb.Open(dir, &tsdb.Options{ + MinBlockDuration: 2 * 60 * 60 * 1000, + MaxBlockDuration: 24 * 60 * 60 * 1000, + AppendableBlocks: 10, + }) if err != nil { t.Fatalf("Opening test storage failed: %s", err) }