From cfba92a1333dce243d56f494c3d186da697e4ec3 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Tue, 11 Feb 2020 16:34:09 +0000 Subject: [PATCH] Addressed comments. Signed-off-by: Bartlomiej Plotka --- cmd/prometheus/main.go | 5 +-- storage/buffer.go | 2 +- storage/interface.go | 3 +- storage/noop.go | 35 ++------------------ tsdb/chunkenc/chunk.go | 4 +-- tsdb/chunkenc/chunk_test.go | 2 +- tsdb/cmd/tsdb/main.go | 5 ++- tsdb/db.go | 64 ++++++++++++++++++++----------------- tsdb/db_test.go | 11 +++---- util/teststorage/storage.go | 5 ++- web/api/v1/api.go | 11 ++----- web/web.go | 33 ++++++++++++++++++- 12 files changed, 90 insertions(+), 90 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 521201d87..058246fce 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -106,7 +106,7 @@ func main() { outageTolerance model.Duration resendDelay model.Duration web web.Options - tsdb tsdb.Options + tsdb web.TSDBOptions lookbackDelta model.Duration webTimeout model.Duration queryTimeout model.Duration @@ -656,6 +656,7 @@ func main() { } { // TSDB. + opts := cfg.tsdb.ToTSDBOptions() cancel := make(chan struct{}) g.Add( func() error { @@ -669,7 +670,7 @@ func main() { cfg.localStoragePath, log.With(logger, "component", "tsdb"), prometheus.DefaultRegisterer, - &cfg.tsdb, + &opts, ) if err != nil { return errors.Wrapf(err, "opening storage failed") diff --git a/storage/buffer.go b/storage/buffer.go index 874fdba95..318d16b24 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -33,7 +33,7 @@ type BufferedSeriesIterator struct { // of the current element and the duration of delta before, initialized with an // empty iterator. Use Reset() to set an actual iterator to be buffered. func NewBuffer(delta int64) *BufferedSeriesIterator { - return NewBufferIterator(&NoopSeriesIt, delta) + return NewBufferIterator(chunkenc.NewNopIterator(), delta) } // NewBufferIterator returns a new iterator that buffers the values within the diff --git a/storage/interface.go b/storage/interface.go index 32022c27d..4c8777129 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -33,7 +33,7 @@ var ( // Appendable allows creating appenders. type Appendable interface { - // Appender returns a new appender against the storage. + // Appender returns a new appender for the storage. Appender() Appender } @@ -120,6 +120,7 @@ type Appender interface { Commit() error // Rollback rolls back all modifications made in the appender so far. + // Appender has to be discarded after rollback. Rollback() error } diff --git a/storage/noop.go b/storage/noop.go index 797006dfb..a8be634fd 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -14,8 +14,6 @@ package storage import ( - "math" - "github.com/prometheus/prometheus/pkg/labels" ) @@ -53,35 +51,8 @@ func NoopSeriesSet() SeriesSet { return noopSeriesSet{} } -func (noopSeriesSet) Next() bool { - return false -} +func (noopSeriesSet) Next() bool { return false } -func (noopSeriesSet) At() Series { - return nil -} +func (noopSeriesSet) At() Series { return nil } -func (noopSeriesSet) Err() error { - return nil -} - -type noopSeriesIterator struct{} - -// NoopSeriesIt is a SeriesIterator that does nothing. -var NoopSeriesIt = noopSeriesIterator{} - -func (noopSeriesIterator) At() (int64, float64) { - return math.MinInt64, 0 -} - -func (noopSeriesIterator) Seek(t int64) bool { - return false -} - -func (noopSeriesIterator) Next() bool { - return false -} - -func (noopSeriesIterator) Err() error { - return nil -} +func (noopSeriesSet) Err() error { return nil } diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index d5a09b9de..8f5f8ea77 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -87,8 +87,8 @@ type Iterator interface { // At returns (math.MinInt64, 0.0) before the iterator has advanced. // TODO(bwplotka): Verify above statement on all implementations with unit test. At() (int64, float64) - // Err returns the current error. - // Err can return undefined value before calling `Next` or `Seek`. + // Err returns the current error. It should be used only after iterator is + // exhausted, so `Next` or `Seek` returns false. Err() error } diff --git a/tsdb/chunkenc/chunk_test.go b/tsdb/chunkenc/chunk_test.go index beda72e27..4d54d9e26 100644 --- a/tsdb/chunkenc/chunk_test.go +++ b/tsdb/chunkenc/chunk_test.go @@ -88,7 +88,7 @@ func testChunk(t *testing.T, c Chunk) { testutil.Ok(t, it2.Err()) testutil.Equals(t, exp, res2) - // 3. Test Iterator Seek. + // 3. Test iterator Seek. mid := len(exp) / 2 it3 := c.Iterator(nil) diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 236ec437d..c55eb2985 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -33,7 +33,6 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" @@ -178,8 +177,8 @@ func (b *writeBenchmark) run() error { l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ - RetentionDuration: 15 * 24 * model.Duration(time.Hour), - MinBlockDuration: 2 * model.Duration(time.Hour), + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: int64(2 * time.Hour / time.Millisecond), }) if err != nil { return err diff --git a/tsdb/db.go b/tsdb/db.go index 03825f93b..a077585c3 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -29,13 +29,11 @@ import ( "sync" "time" - "github.com/alecthomas/units" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -46,9 +44,9 @@ import ( "golang.org/x/sync/errgroup" ) -// Default duration of a block in milliseconds - 2h. const ( - DefaultBlockDuration = int64(2 * 60 * 60 * 1000) // 2h in miliseconds. + // Default duration of a block in milliseconds. + DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond) ) var ( @@ -61,13 +59,14 @@ var ( func DefaultOptions() *Options { return &Options{ WALSegmentSize: wal.DefaultSegmentSize, - RetentionDuration: 15 * 24 * model.Duration(time.Hour), - MinBlockDuration: model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond), - MaxBlockDuration: model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond), + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, NoLockfile: false, AllowOverlappingBlocks: false, WALCompression: false, StripeSize: DefaultStripeSize, + ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) }, } } @@ -77,17 +76,19 @@ type Options struct { // WALSegmentSize = 0, segment size is default size. // WALSegmentSize > 0, segment size is WALSegmentSize. // WALSegmentSize < 0, wal is disabled. - WALSegmentSize units.Base2Bytes + WALSegmentSize int // Duration of persisted data to keep. - RetentionDuration model.Duration + // Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration. + // Typically it is in milliseconds. + RetentionDuration int64 // Maximum number of bytes in blocks to be retained. // 0 or less means disabled. // NOTE: For proper storage calculations need to consider // the size of the WAL folder which is not added when calculating // the current size of the database. - MaxBytes units.Base2Bytes + MaxBytes int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool @@ -104,10 +105,17 @@ type Options struct { // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. - MinBlockDuration model.Duration + // Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration. + // Typically it is in milliseconds. + MinBlockDuration int64 // The maximum timestamp range of compacted blocks. - MaxBlockDuration model.Duration + // Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration. + // Typically it is in milliseconds. + MaxBlockDuration int64 + + // ConvertTimeToSecondsFn function is used for time based values to convert to seconds for metric purposes. + ConvertTimeToSecondsFn func(int64) float64 } // DB handles reads and writes of time series falling into @@ -163,7 +171,7 @@ type dbMetrics struct { headMinTime prometheus.GaugeFunc } -func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { +func newDBMetrics(db *DB, r prometheus.Registerer, convToSecondsFn func(int64) float64) *dbMetrics { m := &dbMetrics{} m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -238,28 +246,26 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_size_retentions_total", Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", }) + + // Unit agnostic metrics. m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp_seconds", Help: "Lowest timestamp value stored in the database.", }, func() float64 { bb := db.Blocks() if len(bb) == 0 { - return float64(db.Head().MinTime()) / 1000 + return convToSecondsFn(db.Head().MinTime()) } - return float64(db.Blocks()[0].Meta().MinTime) / 1000 + return convToSecondsFn(db.Blocks()[0].Meta().MinTime) }) m.headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_min_time_seconds", Help: "Minimum time bound of the head block.", - }, func() float64 { - return float64(db.Head().MinTime()) / 1000 - }) + }, func() float64 { return convToSecondsFn(db.Head().MinTime()) }) m.headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_max_time_seconds", Help: "Maximum timestamp of the head block.", - }, func() float64 { - return float64(db.Head().MaxTime()) / 1000 - }) + }, func() float64 { return convToSecondsFn(db.Head().MaxTime()) }) if r != nil { r.MustRegister( m.loadedBlocks, @@ -350,7 +356,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error { context.Background(), nil, db.logger, - ExponentialBlockRanges(int64(time.Duration(DefaultOptions().MinBlockDuration))/1e6, 3, 5), + ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), chunkenc.NewPool(), ) if err != nil { @@ -520,7 +526,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { } if opts.MinBlockDuration <= 0 { - opts.MinBlockDuration = model.Duration(time.Duration(DefaultBlockDuration) * time.Millisecond) + opts.MinBlockDuration = DefaultBlockDuration } if opts.MinBlockDuration > opts.MaxBlockDuration { opts.MaxBlockDuration = opts.MinBlockDuration @@ -529,7 +535,7 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if len(rngs) == 0 { // Start with smallest block duration and create exponential buckets until the exceed the // configured maximum block duration. - rngs = ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 10, 3) + rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } return opts, rngs } @@ -543,7 +549,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } for i, v := range rngs { - if v > int64(time.Duration(opts.MaxBlockDuration).Seconds()*1000) { + if v > opts.MaxBlockDuration { rngs = rngs[:i] break } @@ -568,7 +574,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs autoCompact: true, chunkPool: chunkenc.NewPool(), } - db.metrics = newDBMetrics(db, r) + db.metrics = newDBMetrics(db, r, opts.ConvertTimeToSecondsFn) maxBytes := opts.MaxBytes if maxBytes < 0 { @@ -602,7 +608,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs if opts.WALSegmentSize >= 0 { // Wal is set to a custom size. if opts.WALSegmentSize > 0 { - segmentSize = int(opts.WALSegmentSize) + segmentSize = opts.WALSegmentSize } wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression) if err != nil { @@ -1013,7 +1019,7 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo for i, block := range blocks { // The difference between the first block and this block is larger than // the retention period so any blocks after that are added as deletable. - if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > (int64(db.opts.RetentionDuration)/int64(time.Millisecond)) { + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration { for _, b := range blocks[i:] { deletable[b.meta.ULID] = b } @@ -1525,7 +1531,7 @@ func (s *ReadyStorage) StartTime() (int64, error) { return startTime + s.startTimeMargin, nil } - return int64(model.Latest), ErrNotReady + return math.MaxInt64, ErrNotReady } // Querier implements the Storage interface. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b3a399a1f..4de1c0c7c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -30,7 +30,6 @@ import ( "testing" "time" - "github.com/alecthomas/units" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -864,7 +863,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { for segmentSize, testFunc := range tests { t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) { opts := DefaultOptions() - opts.WALSegmentSize = units.Base2Bytes(segmentSize) + opts.WALSegmentSize = segmentSize db, closeFn := openTestDB(t, opts, nil) defer closeFn() @@ -1083,7 +1082,7 @@ func TestTimeRetention(t *testing.T) { testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. - db.opts.RetentionDuration = model.Duration(time.Duration(blocks[2].MaxTime-blocks[1].MinTime) * time.Millisecond) + db.opts.RetentionDuration = blocks[2].MaxTime - blocks[1].MinTime testutil.Ok(t, db.reload()) expBlocks := blocks[1:] @@ -1163,8 +1162,8 @@ func TestSizeRetention(t *testing.T) { // Check total size, total count and check that the oldest block was deleted. firstBlockSize := db.Blocks()[0].Size() sizeLimit := actSize - firstBlockSize - db.opts.MaxBytes = units.Base2Bytes(sizeLimit) // Set the new db size limit one block smaller that the actual size. - testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. expBlocks := blocks[1:] actBlocks := db.Blocks() @@ -1197,7 +1196,7 @@ func TestSizeRetentionMetric(t *testing.T) { for _, c := range cases { db, closeFn := openTestDB(t, &Options{ - MaxBytes: units.Base2Bytes(c.maxBytes), + MaxBytes: c.maxBytes, }, []int64{100}) defer func() { testutil.Ok(t, db.Close()) diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index fbdf74d2b..c6ac4b89a 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -18,7 +18,6 @@ import ( "os" "time" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/testutil" @@ -35,8 +34,8 @@ func New(t testutil.T) storage.Storage { // Tests just load data for a series sequentially. Thus we // need a long appendable window. opts := tsdb.DefaultOptions() - opts.MinBlockDuration = model.Duration(24 * time.Hour) - opts.MaxBlockDuration = model.Duration(24 * time.Hour) + opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) + opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) db, err := tsdb.Open(dir, nil, nil, opts) if err != nil { t.Fatalf("Opening test storage failed: %s", err) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index e75cb5342..32cc8fa75 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -1189,19 +1189,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { for i, query := range req.Queries { err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { // The streaming API provides sorted series. - set, ws, err := querier.SelectSorted(selectParams, filteredMatchers...) + // TODO(bwplotka): Handle warnings via query log. + set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) if err != nil { return err } - if len(ws) > 0 { - msg := "" - for _, w := range ws { - msg += w.Error() + ";" - } - level.Warn(api.logger).Log("remote read warnings", "warnings", msg) - } - return remote.StreamChunkedReadResponses( remote.NewChunkedWriter(w, f), int64(i), diff --git a/web/web.go b/web/web.go index 9274176ae..77e5c2a19 100644 --- a/web/web.go +++ b/web/web.go @@ -38,6 +38,7 @@ import ( template_text "text/template" "time" + "github.com/alecthomas/units" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" conntrack "github.com/mwitkow/go-conntrack" @@ -208,11 +209,41 @@ func (h *Handler) ApplyConfig(conf *config.Config) error { return nil } +// TSDBOptions is tsdb.Option version with defined units. +// This is required as tsdb.Option fields are unit agnostic (time). +type TSDBOptions struct { + WALSegmentSize units.Base2Bytes + RetentionDuration model.Duration + MaxBytes units.Base2Bytes + NoLockfile bool + AllowOverlappingBlocks bool + WALCompression bool + StripeSize int + MinBlockDuration model.Duration + MaxBlockDuration model.Duration +} + +func (opts TSDBOptions) ToTSDBOptions() tsdb.Options { + return tsdb.Options{ + WALSegmentSize: int(opts.WALSegmentSize), + RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), + MaxBytes: int64(opts.MaxBytes), + NoLockfile: opts.NoLockfile, + AllowOverlappingBlocks: opts.AllowOverlappingBlocks, + WALCompression: opts.WALCompression, + StripeSize: opts.StripeSize, + MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), + + ConvertTimeToSecondsFn: func(i int64) float64 { return float64(i / 1000) }, + } +} + // Options for the web Handler. type Options struct { Context context.Context TSDB func() *tsdb.DB - TSDBCfg tsdb.Options + TSDBCfg TSDBOptions Storage storage.Storage QueryEngine *promql.Engine LookbackDelta time.Duration