2017-04-10 18:59:45 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-11-15 09:34:25 +00:00
// Package tsdb implements a time series storage for float64 sample data.
package tsdb
import (
2018-12-05 16:34:42 +00:00
"context"
2016-12-04 12:16:11 +00:00
"fmt"
2017-02-27 09:46:15 +00:00
"io"
2017-01-06 08:26:39 +00:00
"io/ioutil"
2018-04-05 12:51:33 +00:00
"math"
2016-12-04 12:16:11 +00:00
"os"
2016-12-08 16:43:10 +00:00
"path/filepath"
2017-05-18 14:09:30 +00:00
"sort"
2016-12-15 07:31:26 +00:00
"strconv"
2018-04-05 12:51:33 +00:00
"strings"
2016-12-08 16:43:10 +00:00
"sync"
2017-01-06 14:18:06 +00:00
"time"
2016-12-15 07:31:26 +00:00
2021-06-11 16:17:59 +00:00
"github.com/go-kit/log"
"github.com/go-kit/log/level"
2017-05-18 14:09:30 +00:00
"github.com/oklog/ulid"
2017-01-03 14:43:26 +00:00
"github.com/pkg/errors"
2016-12-31 08:48:49 +00:00
"github.com/prometheus/client_golang/prometheus"
2020-10-22 09:00:08 +00:00
"golang.org/x/sync/errgroup"
2021-07-20 04:52:57 +00:00
"github.com/prometheus/prometheus/config"
2021-11-08 14:23:17 +00:00
"github.com/prometheus/prometheus/model/labels"
2020-02-06 15:58:38 +00:00
"github.com/prometheus/prometheus/storage"
2019-08-13 08:34:14 +00:00
"github.com/prometheus/prometheus/tsdb/chunkenc"
2020-11-19 13:00:47 +00:00
"github.com/prometheus/prometheus/tsdb/chunks"
2019-08-13 08:34:14 +00:00
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
2020-10-22 09:00:08 +00:00
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minium Go version is met.
2021-11-11 16:45:25 +00:00
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2020-06-24 13:41:52 +00:00
"github.com/prometheus/prometheus/tsdb/wal"
2016-11-15 09:34:25 +00:00
)
2019-11-21 12:10:25 +00:00
const (
2020-02-11 16:34:09 +00:00
// Default duration of a block in milliseconds.
DefaultBlockDuration = int64 ( 2 * time . Hour / time . Millisecond )
2020-08-11 05:56:08 +00:00
// Block dir suffixes to make deletion and creation operations atomic.
// We decided to do suffixes instead of creating meta.json as last (or delete as first) one,
// because in error case you still can recover meta.json from the block content within local TSDB dir.
// TODO(bwplotka): TSDB can end up with various .tmp files (e.g meta.json.tmp, WAL or segment tmp file. Think
// about removing those too on start to save space. Currently only blocks tmp dirs are removed.
tmpForDeletionBlockDirSuffix = ".tmp-for-deletion"
tmpForCreationBlockDirSuffix = ".tmp-for-creation"
2021-01-09 09:02:26 +00:00
// Pre-2.21 tmp dir suffix, used in clean-up functions.
tmpLegacy = ".tmp"
2020-02-06 15:58:38 +00:00
)
2021-10-22 08:06:44 +00:00
// ErrNotReady is returned if the underlying storage is not ready yet.
var ErrNotReady = errors . New ( "TSDB not ready" )
2019-11-21 12:10:25 +00:00
2016-12-09 09:00:14 +00:00
// DefaultOptions used for the DB. They are sane for setups using
2018-03-02 11:12:32 +00:00
// millisecond precision timestamps.
2020-02-06 15:58:38 +00:00
func DefaultOptions ( ) * Options {
return & Options {
2020-11-19 13:00:47 +00:00
WALSegmentSize : wal . DefaultSegmentSize ,
2021-04-15 08:55:01 +00:00
MaxBlockChunkSegmentSize : chunks . DefaultChunkSegmentSize ,
2020-11-19 13:00:47 +00:00
RetentionDuration : int64 ( 15 * 24 * time . Hour / time . Millisecond ) ,
MinBlockDuration : DefaultBlockDuration ,
MaxBlockDuration : DefaultBlockDuration ,
NoLockfile : false ,
AllowOverlappingBlocks : false ,
WALCompression : false ,
StripeSize : DefaultStripeSize ,
HeadChunksWriteBufferSize : chunks . DefaultWriteBufferSize ,
2021-11-19 10:11:32 +00:00
IsolationDisabled : defaultIsolationDisabled ,
2022-01-10 13:36:45 +00:00
HeadChunksWriteQueueSize : chunks . DefaultWriteQueueSize ,
2020-02-06 15:58:38 +00:00
}
2016-11-15 09:34:25 +00:00
}
// Options of the DB storage.
type Options struct {
2019-03-25 23:38:12 +00:00
// Segments (wal files) max size.
// WALSegmentSize = 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
// WALSegmentSize < 0, wal is disabled.
2020-02-11 16:34:09 +00:00
WALSegmentSize int
2017-01-29 07:11:47 +00:00
2021-04-15 08:55:01 +00:00
// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
// MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize.
MaxBlockChunkSegmentSize int64
2017-02-10 01:54:26 +00:00
// Duration of persisted data to keep.
2020-02-11 16:34:09 +00:00
// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
// Typically it is in milliseconds.
RetentionDuration int64
2017-02-10 01:54:26 +00:00
2019-01-16 10:03:52 +00:00
// Maximum number of bytes in blocks to be retained.
// 0 or less means disabled.
// NOTE: For proper storage calculations need to consider
// the size of the WAL folder which is not added when calculating
// the current size of the database.
2020-02-11 16:34:09 +00:00
MaxBytes int64
2017-01-29 07:11:47 +00:00
2017-05-09 10:52:47 +00:00
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
2019-02-26 19:50:37 +00:00
2019-03-02 13:54:49 +00:00
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
2019-02-26 19:50:37 +00:00
// This in-turn enables vertical compaction and vertical query merge.
2019-03-02 13:54:49 +00:00
AllowOverlappingBlocks bool
2019-06-19 13:46:24 +00:00
// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
2020-01-30 07:12:43 +00:00
2020-02-17 11:45:11 +00:00
// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance.
2020-01-30 07:12:43 +00:00
StripeSize int
2016-11-15 09:34:25 +00:00
2020-02-06 15:58:38 +00:00
// The timestamp range of head blocks after which they get persisted.
// It's the minimum duration of any persisted block.
2020-02-11 16:34:09 +00:00
// Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration.
// Typically it is in milliseconds.
MinBlockDuration int64
2020-02-06 15:58:38 +00:00
// The maximum timestamp range of compacted blocks.
2020-02-11 16:34:09 +00:00
// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
// Typically it is in milliseconds.
MaxBlockDuration int64
2020-05-20 13:22:08 +00:00
2020-11-19 13:00:47 +00:00
// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
HeadChunksWriteBufferSize int
2022-01-10 13:36:45 +00:00
// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
HeadChunksWriteQueueSize int
2020-05-20 13:22:08 +00:00
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
SeriesLifecycleCallback SeriesLifecycleCallback
2020-07-22 15:19:33 +00:00
// BlocksToDelete is a function which returns the blocks which can be deleted.
// It is always the default time and size based retention in Prometheus and
// mainly meant for external users who import TSDB.
BlocksToDelete BlocksToDeleteFunc
2021-03-16 09:47:45 +00:00
2021-11-19 10:11:32 +00:00
// Enables the in memory exemplar storage.
2021-07-20 04:52:57 +00:00
EnableExemplarStorage bool
2021-08-06 16:51:01 +00:00
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
EnableMemorySnapshotOnShutdown bool
2021-03-16 09:47:45 +00:00
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
2021-07-20 04:52:57 +00:00
MaxExemplars int64
2021-11-19 10:11:32 +00:00
// Disables isolation between reads and in-flight appends.
IsolationDisabled bool
2016-12-10 17:08:50 +00:00
}
2020-07-22 15:19:33 +00:00
type BlocksToDeleteFunc func ( blocks [ ] * Block ) map [ ulid . ULID ] struct { }
2017-01-06 10:40:09 +00:00
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
2021-11-11 16:45:25 +00:00
dir string
locker * tsdbutil . DirLocker
2017-03-04 15:50:48 +00:00
2020-07-22 15:19:33 +00:00
logger log . Logger
metrics * dbMetrics
opts * Options
chunkPool chunkenc . Pool
compactor Compactor
blocksToDelete BlocksToDeleteFunc
2016-12-09 09:00:14 +00:00
2017-07-14 07:00:22 +00:00
// Mutex for that must be held when modifying the general block layout.
2017-03-20 07:41:56 +00:00
mtx sync . RWMutex
2017-10-09 13:21:46 +00:00
blocks [ ] * Block
2017-03-04 15:50:48 +00:00
2017-08-28 22:39:17 +00:00
head * Head
2017-01-06 11:37:28 +00:00
compactc chan struct { }
donec chan struct { }
stopc chan struct { }
2017-05-20 07:51:10 +00:00
2018-11-20 10:34:26 +00:00
// cmtx ensures that compactions and deletions don't run simultaneously.
cmtx sync . Mutex
// autoCompactMtx ensures that no compaction gets triggered while
// changing the autoCompact var.
autoCompactMtx sync . Mutex
autoCompact bool
2018-12-05 16:34:42 +00:00
// Cancel a running compaction when a shutdown is initiated.
2019-02-06 12:07:35 +00:00
compactCancel context . CancelFunc
2016-12-09 09:00:14 +00:00
}
2017-01-06 10:40:09 +00:00
type dbMetrics struct {
2021-11-11 16:45:25 +00:00
loadedBlocks prometheus . GaugeFunc
symbolTableSize prometheus . GaugeFunc
reloads prometheus . Counter
reloadsFailed prometheus . Counter
compactionsFailed prometheus . Counter
compactionsTriggered prometheus . Counter
compactionsSkipped prometheus . Counter
sizeRetentionCount prometheus . Counter
timeRetentionCount prometheus . Counter
startTime prometheus . GaugeFunc
tombCleanTimer prometheus . Histogram
blocksBytes prometheus . Gauge
maxBytes prometheus . Gauge
2016-12-31 08:48:49 +00:00
}
2020-02-14 18:48:55 +00:00
func newDBMetrics ( db * DB , r prometheus . Registerer ) * dbMetrics {
2017-01-06 10:40:09 +00:00
m := & dbMetrics { }
2017-01-03 14:43:26 +00:00
2017-05-26 13:13:03 +00:00
m . loadedBlocks = prometheus . NewGaugeFunc ( prometheus . GaugeOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_blocks_loaded" ,
2017-05-26 13:13:03 +00:00
Help : "Number of currently loaded data blocks" ,
} , func ( ) float64 {
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
return float64 ( len ( db . blocks ) )
} )
2018-09-08 18:28:36 +00:00
m . symbolTableSize = prometheus . NewGaugeFunc ( prometheus . GaugeOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_symbol_table_size_bytes" ,
2020-10-21 13:35:40 +00:00
Help : "Size of symbol table in memory for loaded blocks" ,
2018-09-08 18:28:36 +00:00
} , func ( ) float64 {
db . mtx . RLock ( )
blocks := db . blocks [ : ]
db . mtx . RUnlock ( )
2018-09-12 09:09:02 +00:00
symTblSize := uint64 ( 0 )
2018-09-08 18:28:36 +00:00
for _ , b := range blocks {
2018-09-12 09:09:02 +00:00
symTblSize += b . GetSymbolTableSize ( )
2018-09-08 18:28:36 +00:00
}
2018-09-12 09:09:02 +00:00
return float64 ( symTblSize )
2018-09-08 18:28:36 +00:00
} )
2017-05-26 13:13:03 +00:00
m . reloads = prometheus . NewCounter ( prometheus . CounterOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_reloads_total" ,
2017-05-26 13:13:03 +00:00
Help : "Number of times the database reloaded block data from disk." ,
} )
m . reloadsFailed = prometheus . NewCounter ( prometheus . CounterOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_reloads_failures_total" ,
2020-10-19 15:27:08 +00:00
Help : "Number of times the database failed to reloadBlocks block data from disk." ,
2017-05-26 13:13:03 +00:00
} )
2017-01-06 11:37:28 +00:00
m . compactionsTriggered = prometheus . NewCounter ( prometheus . CounterOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_compactions_triggered_total" ,
2017-01-06 11:37:28 +00:00
Help : "Total number of triggered compactions for the partition." ,
} )
2019-05-30 11:57:28 +00:00
m . compactionsFailed = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_compactions_failed_total" ,
Help : "Total number of compactions that failed for the partition." ,
} )
2019-01-16 10:03:52 +00:00
m . timeRetentionCount = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_time_retentions_total" ,
Help : "The number of times that blocks were deleted because the maximum time limit was exceeded." ,
} )
2018-11-20 10:34:26 +00:00
m . compactionsSkipped = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_compactions_skipped_total" ,
Help : "Total number of skipped compactions due to disabled auto compaction." ,
} )
2018-09-14 12:07:45 +00:00
m . startTime = prometheus . NewGaugeFunc ( prometheus . GaugeOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_lowest_timestamp" ,
2018-11-30 18:18:12 +00:00
Help : "Lowest timestamp value stored in the database. The unit is decided by the library consumer." ,
2018-09-14 12:07:45 +00:00
} , func ( ) float64 {
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
if len ( db . blocks ) == 0 {
2019-10-09 15:41:46 +00:00
return float64 ( db . head . MinTime ( ) )
2018-09-14 12:07:45 +00:00
}
return float64 ( db . blocks [ 0 ] . meta . MinTime )
} )
2017-11-22 12:34:50 +00:00
m . tombCleanTimer = prometheus . NewHistogram ( prometheus . HistogramOpts {
2018-09-18 17:17:41 +00:00
Name : "prometheus_tsdb_tombstone_cleanup_seconds" ,
2017-11-22 12:34:50 +00:00
Help : "The time taken to recompact blocks to remove tombstones." ,
} )
2019-01-16 10:03:52 +00:00
m . blocksBytes = prometheus . NewGauge ( prometheus . GaugeOpts {
2019-01-23 13:46:58 +00:00
Name : "prometheus_tsdb_storage_blocks_bytes" ,
2019-01-16 10:03:52 +00:00
Help : "The number of bytes that are currently used for local storage by all blocks." ,
} )
2019-07-27 08:52:25 +00:00
m . maxBytes = prometheus . NewGauge ( prometheus . GaugeOpts {
Name : "prometheus_tsdb_retention_limit_bytes" ,
Help : "Max number of bytes to be retained in the tsdb blocks, configured 0 means disabled" ,
} )
2019-01-16 10:03:52 +00:00
m . sizeRetentionCount = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_size_retentions_total" ,
Help : "The number of times that blocks were deleted because the maximum number of bytes was exceeded." ,
} )
2020-02-11 16:34:09 +00:00
2016-12-31 08:48:49 +00:00
if r != nil {
r . MustRegister (
2017-05-26 13:13:03 +00:00
m . loadedBlocks ,
2018-09-08 18:28:36 +00:00
m . symbolTableSize ,
2017-05-26 13:13:03 +00:00
m . reloads ,
m . reloadsFailed ,
2019-05-30 11:57:28 +00:00
m . compactionsFailed ,
2020-01-13 22:15:45 +00:00
m . compactionsTriggered ,
m . compactionsSkipped ,
m . sizeRetentionCount ,
m . timeRetentionCount ,
2018-09-14 12:07:45 +00:00
m . startTime ,
2017-11-22 12:34:50 +00:00
m . tombCleanTimer ,
2019-01-16 10:03:52 +00:00
m . blocksBytes ,
2019-07-27 08:52:25 +00:00
m . maxBytes ,
2016-12-31 08:48:49 +00:00
)
}
return m
}
2021-06-05 14:29:32 +00:00
// DBStats contains statistics about the DB seperated by component (eg. head).
// They are available before the DB has finished initializing.
type DBStats struct {
Head * HeadStats
}
// NewDBStats returns a new DBStats object initialized using the
// the new function from each component.
func NewDBStats ( ) * DBStats {
return & DBStats {
Head : NewHeadStats ( ) ,
}
}
2019-07-23 08:04:48 +00:00
// ErrClosed is returned when the db is closed.
var ErrClosed = errors . New ( "db already closed" )
// DBReadOnly provides APIs for read only operations on a database.
2019-09-30 15:54:55 +00:00
// Current implementation doesn't support concurrency so
2019-07-23 08:04:48 +00:00
// all API calls should happen in the same go routine.
type DBReadOnly struct {
logger log . Logger
dir string
closers [ ] io . Closer
closed chan struct { }
}
// OpenDBReadOnly opens DB in the given directory for read only operations.
func OpenDBReadOnly ( dir string , l log . Logger ) ( * DBReadOnly , error ) {
if _ , err := os . Stat ( dir ) ; err != nil {
2019-09-30 15:54:55 +00:00
return nil , errors . Wrap ( err , "opening the db dir" )
2019-07-23 08:04:48 +00:00
}
if l == nil {
l = log . NewNopLogger ( )
}
return & DBReadOnly {
logger : l ,
dir : dir ,
closed : make ( chan struct { } ) ,
} , nil
}
2019-09-13 10:25:21 +00:00
// FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL.
// Samples that are in existing blocks will not be written to the new block.
// Note that if the read only database is running concurrently with a
// writable database then writing the WAL to the database directory can race.
2020-03-23 09:19:44 +00:00
func ( db * DBReadOnly ) FlushWAL ( dir string ) ( returnErr error ) {
2019-09-13 10:25:21 +00:00
blockReaders , err := db . Blocks ( )
if err != nil {
return errors . Wrap ( err , "read blocks" )
}
maxBlockTime := int64 ( math . MinInt64 )
if len ( blockReaders ) > 0 {
maxBlockTime = blockReaders [ len ( blockReaders ) - 1 ] . Meta ( ) . MaxTime
}
2020-03-10 05:31:47 +00:00
w , err := wal . Open ( db . logger , filepath . Join ( db . dir , "wal" ) )
2019-09-13 10:25:21 +00:00
if err != nil {
return err
}
2021-02-09 14:12:48 +00:00
opts := DefaultHeadOptions ( )
opts . ChunkDirRoot = db . dir
2021-06-05 14:29:32 +00:00
head , err := NewHead ( nil , db . logger , w , opts , NewHeadStats ( ) )
2019-09-13 10:25:21 +00:00
if err != nil {
return err
}
2020-03-23 09:19:44 +00:00
defer func ( ) {
2020-10-28 15:24:58 +00:00
returnErr = tsdb_errors . NewMulti (
returnErr ,
errors . Wrap ( head . Close ( ) , "closing Head" ) ,
) . Err ( )
2020-03-23 09:19:44 +00:00
} ( )
2019-09-13 10:25:21 +00:00
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head . Init ( maxBlockTime ) ; err != nil {
return errors . Wrap ( err , "read WAL" )
}
mint := head . MinTime ( )
maxt := head . MaxTime ( )
2020-08-13 09:55:35 +00:00
rh := NewRangeHead ( head , mint , maxt )
2020-02-06 15:58:38 +00:00
compactor , err := NewLeveledCompactor (
context . Background ( ) ,
nil ,
db . logger ,
2020-02-11 16:34:09 +00:00
ExponentialBlockRanges ( DefaultOptions ( ) . MinBlockDuration , 3 , 5 ) ,
2020-02-07 16:24:17 +00:00
chunkenc . NewPool ( ) ,
2021-05-18 16:38:37 +00:00
nil ,
2020-02-06 15:58:38 +00:00
)
2019-09-13 10:25:21 +00:00
if err != nil {
return errors . Wrap ( err , "create leveled compactor" )
}
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
_ , err = compactor . Write ( dir , rh , mint , maxt + 1 , nil )
return errors . Wrap ( err , "writing WAL" )
}
2020-07-31 15:03:02 +00:00
func ( db * DBReadOnly ) loadDataAsQueryable ( maxt int64 ) ( storage . SampleAndChunkQueryable , error ) {
2019-07-23 08:04:48 +00:00
select {
case <- db . closed :
return nil , ErrClosed
default :
}
2019-09-13 10:25:21 +00:00
blockReaders , err := db . Blocks ( )
2019-07-23 08:04:48 +00:00
if err != nil {
return nil , err
}
2019-09-13 10:25:21 +00:00
blocks := make ( [ ] * Block , len ( blockReaders ) )
for i , b := range blockReaders {
2019-07-23 08:04:48 +00:00
b , ok := b . ( * Block )
if ! ok {
return nil , errors . New ( "unable to convert a read only block to a normal block" )
}
blocks [ i ] = b
}
2021-02-09 14:12:48 +00:00
opts := DefaultHeadOptions ( )
opts . ChunkDirRoot = db . dir
2021-06-05 14:29:32 +00:00
head , err := NewHead ( nil , db . logger , nil , opts , NewHeadStats ( ) )
2019-07-23 08:04:48 +00:00
if err != nil {
return nil , err
}
maxBlockTime := int64 ( math . MinInt64 )
if len ( blocks ) > 0 {
maxBlockTime = blocks [ len ( blocks ) - 1 ] . Meta ( ) . MaxTime
}
2019-09-30 15:54:55 +00:00
// Also add the WAL if the current blocks don't cover the requests time range.
2019-07-23 08:04:48 +00:00
if maxBlockTime <= maxt {
2020-05-06 15:30:00 +00:00
if err := head . Close ( ) ; err != nil {
return nil , err
}
2020-03-10 05:31:47 +00:00
w , err := wal . Open ( db . logger , filepath . Join ( db . dir , "wal" ) )
2019-07-23 08:04:48 +00:00
if err != nil {
return nil , err
}
2021-02-09 14:12:48 +00:00
opts := DefaultHeadOptions ( )
opts . ChunkDirRoot = db . dir
2021-06-05 14:29:32 +00:00
head , err = NewHead ( nil , db . logger , w , opts , NewHeadStats ( ) )
2019-07-23 08:04:48 +00:00
if err != nil {
return nil , err
}
// Set the min valid time for the ingested wal samples
// to be no lower than the maxt of the last block.
if err := head . Init ( maxBlockTime ) ; err != nil {
return nil , errors . Wrap ( err , "read WAL" )
}
// Set the wal to nil to disable all wal operations.
// This is mainly to avoid blocking when closing the head.
head . wal = nil
}
2020-05-06 15:30:00 +00:00
db . closers = append ( db . closers , head )
2020-07-31 15:03:02 +00:00
return & DB {
2019-07-23 08:04:48 +00:00
dir : db . dir ,
logger : db . logger ,
blocks : blocks ,
head : head ,
2020-07-31 15:03:02 +00:00
} , nil
}
2019-07-23 08:04:48 +00:00
2020-07-31 15:03:02 +00:00
// Querier loads the blocks and wal and returns a new querier over the data partition for the given time range.
// Current implementation doesn't support multiple Queriers.
func ( db * DBReadOnly ) Querier ( ctx context . Context , mint , maxt int64 ) ( storage . Querier , error ) {
q , err := db . loadDataAsQueryable ( maxt )
if err != nil {
return nil , err
}
return q . Querier ( ctx , mint , maxt )
2019-07-23 08:04:48 +00:00
}
2020-07-31 15:03:02 +00:00
// ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range.
// Current implementation doesn't support multiple ChunkQueriers.
func ( db * DBReadOnly ) ChunkQuerier ( ctx context . Context , mint , maxt int64 ) ( storage . ChunkQuerier , error ) {
q , err := db . loadDataAsQueryable ( maxt )
if err != nil {
return nil , err
}
return q . ChunkQuerier ( ctx , mint , maxt )
2020-06-24 13:41:52 +00:00
}
2019-07-23 08:04:48 +00:00
// Blocks returns a slice of block readers for persisted blocks.
func ( db * DBReadOnly ) Blocks ( ) ( [ ] BlockReader , error ) {
select {
case <- db . closed :
return nil , ErrClosed
default :
}
loadable , corrupted , err := openBlocks ( db . logger , db . dir , nil , nil )
if err != nil {
return nil , err
}
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
for _ , block := range loadable {
for _ , b := range block . Meta ( ) . Compaction . Parents {
delete ( corrupted , b . ULID )
}
}
if len ( corrupted ) > 0 {
for _ , b := range loadable {
if err := b . Close ( ) ; err != nil {
2020-06-17 14:40:00 +00:00
level . Warn ( db . logger ) . Log ( "msg" , "Closing block failed" , "err" , err , "block" , b )
2019-07-23 08:04:48 +00:00
}
}
2020-10-28 15:24:58 +00:00
errs := tsdb_errors . NewMulti ( )
2020-06-17 14:40:00 +00:00
for ulid , err := range corrupted {
2020-10-28 15:24:58 +00:00
errs . Add ( errors . Wrapf ( err , "corrupted block %s" , ulid . String ( ) ) )
2020-06-17 14:40:00 +00:00
}
2020-10-28 15:24:58 +00:00
return nil , errs . Err ( )
2019-07-23 08:04:48 +00:00
}
if len ( loadable ) == 0 {
2019-09-13 10:25:21 +00:00
return nil , nil
2019-07-23 08:04:48 +00:00
}
sort . Slice ( loadable , func ( i , j int ) bool {
return loadable [ i ] . Meta ( ) . MinTime < loadable [ j ] . Meta ( ) . MinTime
} )
blockMetas := make ( [ ] BlockMeta , 0 , len ( loadable ) )
for _ , b := range loadable {
blockMetas = append ( blockMetas , b . Meta ( ) )
}
if overlaps := OverlappingBlocks ( blockMetas ) ; len ( overlaps ) > 0 {
2020-04-11 08:22:18 +00:00
level . Warn ( db . logger ) . Log ( "msg" , "Overlapping blocks found during opening" , "detail" , overlaps . String ( ) )
2019-07-23 08:04:48 +00:00
}
// Close all previously open readers and add the new ones to the cache.
for _ , closer := range db . closers {
closer . Close ( )
}
blockClosers := make ( [ ] io . Closer , len ( loadable ) )
blockReaders := make ( [ ] BlockReader , len ( loadable ) )
for i , b := range loadable {
blockClosers [ i ] = b
blockReaders [ i ] = b
}
db . closers = blockClosers
return blockReaders , nil
}
// Close all block readers.
func ( db * DBReadOnly ) Close ( ) error {
select {
case <- db . closed :
return ErrClosed
default :
}
close ( db . closed )
2020-10-28 15:24:58 +00:00
return tsdb_errors . CloseAll ( db . closers )
2019-07-23 08:04:48 +00:00
}
2020-02-07 16:24:17 +00:00
// Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
2021-06-05 14:29:32 +00:00
func Open ( dir string , l log . Logger , r prometheus . Registerer , opts * Options , stats * DBStats ) ( db * DB , err error ) {
2020-02-06 15:58:38 +00:00
var rngs [ ] int64
opts , rngs = validateOpts ( opts , nil )
2021-06-05 14:29:32 +00:00
return open ( dir , l , r , opts , rngs , stats )
2020-02-06 15:58:38 +00:00
}
func validateOpts ( opts * Options , rngs [ ] int64 ) ( * Options , [ ] int64 ) {
if opts == nil {
opts = DefaultOptions ( )
}
if opts . StripeSize <= 0 {
opts . StripeSize = DefaultStripeSize
}
2020-11-19 13:00:47 +00:00
if opts . HeadChunksWriteBufferSize <= 0 {
opts . HeadChunksWriteBufferSize = chunks . DefaultWriteBufferSize
}
2022-01-10 13:36:45 +00:00
if opts . HeadChunksWriteQueueSize < 0 {
opts . HeadChunksWriteQueueSize = chunks . DefaultWriteQueueSize
}
2021-04-15 08:55:01 +00:00
if opts . MaxBlockChunkSegmentSize <= 0 {
opts . MaxBlockChunkSegmentSize = chunks . DefaultChunkSegmentSize
}
2020-02-06 15:58:38 +00:00
if opts . MinBlockDuration <= 0 {
2020-02-11 16:34:09 +00:00
opts . MinBlockDuration = DefaultBlockDuration
2020-02-06 15:58:38 +00:00
}
if opts . MinBlockDuration > opts . MaxBlockDuration {
opts . MaxBlockDuration = opts . MinBlockDuration
}
if len ( rngs ) == 0 {
// Start with smallest block duration and create exponential buckets until the exceed the
// configured maximum block duration.
2020-02-11 16:34:09 +00:00
rngs = ExponentialBlockRanges ( opts . MinBlockDuration , 10 , 3 )
2020-02-06 15:58:38 +00:00
}
return opts , rngs
}
2021-11-17 10:21:27 +00:00
// open returns a new DB in the given directory.
// It initializes the lockfile, WAL, compactor, and Head (by replaying the WAL), and runs the database.
// It is not safe to open more than one DB in the same directory.
2021-06-05 14:29:32 +00:00
func open ( dir string , l log . Logger , r prometheus . Registerer , opts * Options , rngs [ ] int64 , stats * DBStats ) ( _ * DB , returnedErr error ) {
2021-10-22 08:06:44 +00:00
if err := os . MkdirAll ( dir , 0 o777 ) ; err != nil {
2017-02-19 12:01:19 +00:00
return nil , err
}
2017-02-19 15:04:37 +00:00
if l == nil {
2017-09-13 08:17:20 +00:00
l = log . NewNopLogger ( )
2017-02-19 15:04:37 +00:00
}
2021-06-05 14:29:32 +00:00
if stats == nil {
stats = NewDBStats ( )
}
2020-02-06 15:58:38 +00:00
for i , v := range rngs {
2020-02-11 16:34:09 +00:00
if v > opts . MaxBlockDuration {
2020-02-06 15:58:38 +00:00
rngs = rngs [ : i ]
break
}
2020-01-30 07:12:43 +00:00
}
2020-02-06 15:58:38 +00:00
2018-02-12 10:40:12 +00:00
// Fixup bad format written by Prometheus 2.1.
2018-02-09 12:37:10 +00:00
if err := repairBadIndexVersion ( l , dir ) ; err != nil {
2020-08-11 05:56:08 +00:00
return nil , errors . Wrap ( err , "repair bad index version" )
2018-02-09 12:11:03 +00:00
}
2020-08-14 09:45:08 +00:00
walDir := filepath . Join ( dir , "wal" )
2018-09-17 16:30:56 +00:00
// Migrate old WAL if one exists.
2020-08-14 09:45:08 +00:00
if err := MigrateWAL ( l , walDir ) ; err != nil {
2018-05-27 17:05:11 +00:00
return nil , errors . Wrap ( err , "migrate WAL" )
}
2020-08-11 05:56:08 +00:00
// Remove garbage, tmp blocks.
if err := removeBestEffortTmpDirs ( l , dir ) ; err != nil {
return nil , errors . Wrap ( err , "remove tmp dirs" )
}
2017-01-18 05:18:32 +00:00
2020-10-28 10:09:03 +00:00
db := & DB {
2020-07-22 15:19:33 +00:00
dir : dir ,
logger : l ,
opts : opts ,
compactc : make ( chan struct { } , 1 ) ,
donec : make ( chan struct { } ) ,
stopc : make ( chan struct { } ) ,
autoCompact : true ,
chunkPool : chunkenc . NewPool ( ) ,
blocksToDelete : opts . BlocksToDelete ,
}
2020-10-21 15:08:28 +00:00
defer func ( ) {
// Close files if startup fails somewhere.
if returnedErr == nil {
return
}
2020-10-28 10:09:03 +00:00
close ( db . donec ) // DB is never run if it was an error, so close this channel here.
2020-10-28 15:24:58 +00:00
returnedErr = tsdb_errors . NewMulti (
returnedErr ,
errors . Wrap ( db . Close ( ) , "close DB after failed startup" ) ,
) . Err ( )
2020-10-21 15:08:28 +00:00
} ( )
2020-07-22 15:19:33 +00:00
if db . blocksToDelete == nil {
db . blocksToDelete = DefaultBlocksToDelete ( db )
2017-01-06 08:26:39 +00:00
}
2019-07-27 08:52:25 +00:00
2021-11-11 16:45:25 +00:00
var err error
db . locker , err = tsdbutil . NewDirLocker ( dir , "tsdb" , db . logger , r )
if err != nil {
return nil , err
}
2017-05-09 10:52:47 +00:00
if ! opts . NoLockfile {
2021-11-11 16:45:25 +00:00
if err := db . locker . Lock ( ) ; err != nil {
2017-05-18 14:09:30 +00:00
return nil , err
}
2017-05-09 10:52:47 +00:00
}
2019-02-06 12:07:35 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2021-05-18 16:38:37 +00:00
db . compactor , err = NewLeveledCompactorWithChunkSize ( ctx , r , l , rngs , db . chunkPool , opts . MaxBlockChunkSegmentSize , nil )
2020-10-28 10:09:03 +00:00
if err != nil {
2019-02-06 12:07:35 +00:00
cancel ( )
2020-10-28 10:09:03 +00:00
return nil , errors . Wrap ( err , "create leveled compactor" )
2017-07-07 11:46:41 +00:00
}
2019-02-06 12:07:35 +00:00
db . compactCancel = cancel
2017-07-07 11:46:41 +00:00
2019-03-25 23:38:12 +00:00
var wlog * wal . WAL
2018-12-18 18:56:51 +00:00
segmentSize := wal . DefaultSegmentSize
2019-03-25 23:38:12 +00:00
// Wal is enabled.
if opts . WALSegmentSize >= 0 {
// Wal is set to a custom size.
if opts . WALSegmentSize > 0 {
2020-02-11 16:34:09 +00:00
segmentSize = opts . WALSegmentSize
2019-03-25 23:38:12 +00:00
}
2020-10-28 10:09:03 +00:00
wlog , err = wal . NewSize ( l , r , walDir , segmentSize , opts . WALCompression )
if err != nil {
return nil , err
2019-03-25 23:38:12 +00:00
}
2017-08-28 22:39:17 +00:00
}
2019-03-25 23:38:12 +00:00
2021-02-09 14:12:48 +00:00
headOpts := DefaultHeadOptions ( )
headOpts . ChunkRange = rngs [ 0 ]
headOpts . ChunkDirRoot = dir
headOpts . ChunkPool = db . chunkPool
headOpts . ChunkWriteBufferSize = opts . HeadChunksWriteBufferSize
2022-01-10 13:36:45 +00:00
headOpts . ChunkWriteQueueSize = opts . HeadChunksWriteQueueSize
2021-02-09 14:12:48 +00:00
headOpts . StripeSize = opts . StripeSize
headOpts . SeriesCallback = opts . SeriesLifecycleCallback
2021-07-20 04:52:57 +00:00
headOpts . EnableExemplarStorage = opts . EnableExemplarStorage
headOpts . MaxExemplars . Store ( opts . MaxExemplars )
2021-08-06 16:51:01 +00:00
headOpts . EnableMemorySnapshotOnShutdown = opts . EnableMemorySnapshotOnShutdown
2021-11-19 10:11:32 +00:00
if opts . IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts . IsolationDisabled = opts . IsolationDisabled
}
2021-06-05 14:29:32 +00:00
db . head , err = NewHead ( r , l , wlog , headOpts , stats . Head )
2020-10-28 10:09:03 +00:00
if err != nil {
return nil , err
2017-08-30 15:38:25 +00:00
}
2018-12-04 10:30:49 +00:00
2020-07-05 04:41:42 +00:00
// Register metrics after assigning the head block.
db . metrics = newDBMetrics ( db , r )
maxBytes := opts . MaxBytes
if maxBytes < 0 {
maxBytes = 0
}
db . metrics . maxBytes . Set ( float64 ( maxBytes ) )
2018-11-28 09:23:50 +00:00
if err := db . reload ( ) ; err != nil {
return nil , err
}
2018-12-04 10:30:49 +00:00
// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
blocks := db . Blocks ( )
minValidTime := int64 ( math . MinInt64 )
if len ( blocks ) > 0 {
minValidTime = blocks [ len ( blocks ) - 1 ] . Meta ( ) . MaxTime
}
2019-06-14 15:39:22 +00:00
if initErr := db . head . Init ( minValidTime ) ; initErr != nil {
db . head . metrics . walCorruptionsTotal . Inc ( )
2020-04-11 08:22:18 +00:00
level . Warn ( db . logger ) . Log ( "msg" , "Encountered WAL read error, attempting repair" , "err" , initErr )
2019-06-14 15:39:22 +00:00
if err := wlog . Repair ( initErr ) ; err != nil {
return nil , errors . Wrap ( err , "repair corrupted WAL" )
}
2018-12-04 10:30:49 +00:00
}
2017-08-28 22:39:17 +00:00
2017-01-06 11:37:28 +00:00
go db . run ( )
return db , nil
}
2020-08-11 05:56:08 +00:00
func removeBestEffortTmpDirs ( l log . Logger , dir string ) error {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return err
}
for _ , fi := range files {
if isTmpBlockDir ( fi ) {
if err := os . RemoveAll ( filepath . Join ( dir , fi . Name ( ) ) ) ; err != nil {
level . Error ( l ) . Log ( "msg" , "failed to delete tmp block dir" , "dir" , filepath . Join ( dir , fi . Name ( ) ) , "err" , err )
continue
}
level . Info ( l ) . Log ( "msg" , "Found and deleted tmp block dir" , "dir" , filepath . Join ( dir , fi . Name ( ) ) )
}
}
return nil
}
2020-02-06 15:58:38 +00:00
// StartTime implements the Storage interface.
func ( db * DB ) StartTime ( ) ( int64 , error ) {
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
if len ( db . blocks ) > 0 {
return db . blocks [ 0 ] . Meta ( ) . MinTime , nil
}
return db . head . MinTime ( ) , nil
}
2017-06-08 10:14:13 +00:00
// Dir returns the directory of the database.
func ( db * DB ) Dir ( ) string {
return db . dir
}
2017-01-06 11:37:28 +00:00
func ( db * DB ) run ( ) {
defer close ( db . donec )
2017-08-28 22:39:17 +00:00
backoff := time . Duration ( 0 )
2017-02-28 14:08:52 +00:00
2017-01-20 06:58:19 +00:00
for {
select {
2017-08-28 22:39:17 +00:00
case <- db . stopc :
2017-08-30 16:34:54 +00:00
return
2017-08-28 22:39:17 +00:00
case <- time . After ( backoff ) :
}
select {
case <- time . After ( 1 * time . Minute ) :
2021-01-07 07:30:08 +00:00
db . cmtx . Lock ( )
if err := db . reloadBlocks ( ) ; err != nil {
level . Error ( db . logger ) . Log ( "msg" , "reloadBlocks" , "err" , err )
}
db . cmtx . Unlock ( )
2017-02-28 14:08:52 +00:00
select {
case db . compactc <- struct { } { } :
default :
}
2017-01-06 11:37:28 +00:00
case <- db . compactc :
db . metrics . compactionsTriggered . Inc ( )
2018-11-20 10:34:26 +00:00
db . autoCompactMtx . Lock ( )
if db . autoCompact {
2020-01-20 07:29:49 +00:00
if err := db . Compact ( ) ; err != nil {
2018-11-20 10:34:26 +00:00
level . Error ( db . logger ) . Log ( "msg" , "compaction failed" , "err" , err )
backoff = exponential ( backoff , 1 * time . Second , 1 * time . Minute )
} else {
backoff = 0
}
2017-08-30 16:34:54 +00:00
} else {
2018-11-20 10:34:26 +00:00
db . metrics . compactionsSkipped . Inc ( )
2017-01-06 11:37:28 +00:00
}
2018-11-20 10:34:26 +00:00
db . autoCompactMtx . Unlock ( )
2017-01-06 11:37:28 +00:00
case <- db . stopc :
return
}
}
}
2017-08-30 16:34:54 +00:00
// Appender opens a new appender against the database.
2020-07-24 14:10:51 +00:00
func ( db * DB ) Appender ( ctx context . Context ) storage . Appender {
2020-07-30 11:11:13 +00:00
return dbAppender { db : db , Appender : db . head . Appender ( ctx ) }
2017-08-30 16:34:54 +00:00
}
2021-07-20 04:52:57 +00:00
func ( db * DB ) ApplyConfig ( conf * config . Config ) error {
return db . head . ApplyConfig ( conf )
}
2017-08-30 16:34:54 +00:00
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {
2020-02-06 15:58:38 +00:00
storage . Appender
2017-08-30 16:34:54 +00:00
db * DB
}
2021-03-19 19:28:55 +00:00
var _ storage . GetRef = dbAppender { }
2021-11-06 10:10:04 +00:00
func ( a dbAppender ) GetRef ( lset labels . Labels ) ( storage . SeriesRef , labels . Labels ) {
2021-03-19 19:28:55 +00:00
if g , ok := a . Appender . ( storage . GetRef ) ; ok {
return g . GetRef ( lset )
}
2021-03-24 15:24:58 +00:00
return 0 , nil
2021-03-19 19:28:55 +00:00
}
2017-08-30 16:34:54 +00:00
func ( a dbAppender ) Commit ( ) error {
err := a . Appender . Commit ( )
2017-09-04 13:07:30 +00:00
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
2019-04-01 08:19:06 +00:00
if a . db . head . compactable ( ) {
2017-08-30 16:34:54 +00:00
select {
case a . db . compactc <- struct { } { } :
default :
}
}
return err
}
2018-06-27 16:05:21 +00:00
// Compact data if possible. After successful compaction blocks are reloaded
2020-10-19 15:27:08 +00:00
// which will also delete the blocks that fall out of the retention window.
// Old blocks are only deleted on reloadBlocks based on the new block's parent information.
// See DB.reloadBlocks documentation for further information.
func ( db * DB ) Compact ( ) ( returnErr error ) {
2017-07-13 14:15:13 +00:00
db . cmtx . Lock ( )
defer db . cmtx . Unlock ( )
2019-05-30 11:57:28 +00:00
defer func ( ) {
2020-10-19 15:27:08 +00:00
if returnErr != nil {
2019-05-30 11:57:28 +00:00
db . metrics . compactionsFailed . Inc ( )
}
} ( )
2020-10-19 15:27:08 +00:00
lastBlockMaxt := int64 ( math . MinInt64 )
defer func ( ) {
2020-10-28 15:24:58 +00:00
returnErr = tsdb_errors . NewMulti (
returnErr ,
errors . Wrap ( db . head . truncateWAL ( lastBlockMaxt ) , "WAL truncation in Compact defer" ) ,
) . Err ( )
2020-10-19 15:27:08 +00:00
} ( )
2020-12-07 21:29:43 +00:00
start := time . Now ( )
2017-07-13 14:15:13 +00:00
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
2017-08-28 22:39:17 +00:00
for {
2017-03-04 15:50:48 +00:00
select {
case <- db . stopc :
2018-09-21 06:24:01 +00:00
return nil
2017-03-04 15:50:48 +00:00
default :
2017-02-02 08:32:06 +00:00
}
2019-04-01 08:19:06 +00:00
if ! db . head . compactable ( ) {
2017-08-28 22:39:17 +00:00
break
}
2018-12-04 10:30:49 +00:00
mint := db . head . MinTime ( )
2020-07-28 04:42:42 +00:00
maxt := rangeForTimestamp ( mint , db . head . chunkRange . Load ( ) )
2017-02-02 08:32:06 +00:00
2017-08-28 22:39:17 +00:00
// Wrap head into a range that bounds all reads to it.
2020-02-14 09:50:24 +00:00
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
2020-10-19 15:27:08 +00:00
if err := db . compactHead ( NewRangeHead ( db . head , mint , maxt - 1 ) ) ; err != nil {
return errors . Wrap ( err , "compact head" )
2017-03-04 15:50:48 +00:00
}
2020-10-19 15:27:08 +00:00
// Consider only successful compactions for WAL truncation.
lastBlockMaxt = maxt
}
// Clear some disk space before compacting blocks, especially important
// when Head compaction happened over a long time range.
if err := db . head . truncateWAL ( lastBlockMaxt ) ; err != nil {
return errors . Wrap ( err , "WAL truncation in Compact" )
2020-02-14 09:50:24 +00:00
}
2017-08-09 09:10:29 +00:00
2020-12-07 21:29:43 +00:00
compactionDuration := time . Since ( start )
2020-12-25 13:45:23 +00:00
if compactionDuration . Milliseconds ( ) > db . head . chunkRange . Load ( ) {
2020-12-07 21:29:43 +00:00
level . Warn ( db . logger ) . Log (
"msg" , "Head compaction took longer than the block time range, compactions are falling behind and won't be able to catch up" ,
"duration" , compactionDuration . String ( ) ,
"block_range" , db . head . chunkRange . Load ( ) ,
)
}
2020-02-14 09:50:24 +00:00
return db . compactBlocks ( )
}
2017-09-08 13:09:24 +00:00
2020-10-19 15:27:08 +00:00
// CompactHead compacts the given RangeHead.
func ( db * DB ) CompactHead ( head * RangeHead ) error {
2020-02-14 09:50:24 +00:00
db . cmtx . Lock ( )
defer db . cmtx . Unlock ( )
2020-10-19 15:27:08 +00:00
if err := db . compactHead ( head ) ; err != nil {
return errors . Wrap ( err , "compact head" )
}
if err := db . head . truncateWAL ( head . BlockMaxTime ( ) ) ; err != nil {
return errors . Wrap ( err , "WAL truncation" )
}
return nil
2020-02-14 09:50:24 +00:00
}
2020-10-19 15:27:08 +00:00
// compactHead compacts the given RangeHead.
2020-02-14 09:50:24 +00:00
// The compaction mutex should be held before calling this method.
2020-10-19 15:27:08 +00:00
func ( db * DB ) compactHead ( head * RangeHead ) error {
uid , err := db . compactor . Write ( db . dir , head , head . MinTime ( ) , head . BlockMaxTime ( ) , nil )
2020-02-14 09:50:24 +00:00
if err != nil {
return errors . Wrap ( err , "persist head block" )
}
2020-10-19 15:27:08 +00:00
if err := db . reloadBlocks ( ) ; err != nil {
2020-10-14 09:35:24 +00:00
if errRemoveAll := os . RemoveAll ( filepath . Join ( db . dir , uid . String ( ) ) ) ; errRemoveAll != nil {
2020-10-28 15:24:58 +00:00
return tsdb_errors . NewMulti (
errors . Wrap ( err , "reloadBlocks blocks" ) ,
errors . Wrapf ( errRemoveAll , "delete persisted head block after failed db reloadBlocks:%s" , uid ) ,
) . Err ( )
2017-08-09 09:10:29 +00:00
}
2020-10-19 15:27:08 +00:00
return errors . Wrap ( err , "reloadBlocks blocks" )
2020-02-14 09:50:24 +00:00
}
2020-10-19 15:27:08 +00:00
if err = db . head . truncateMemory ( head . BlockMaxTime ( ) ) ; err != nil {
return errors . Wrap ( err , "head memory truncate" )
2017-03-04 15:50:48 +00:00
}
2020-02-14 09:50:24 +00:00
return nil
}
2017-01-06 11:37:28 +00:00
2020-02-14 09:50:24 +00:00
// compactBlocks compacts all the eligible on-disk blocks.
// The compaction mutex should be held before calling this method.
func ( db * DB ) compactBlocks ( ) ( err error ) {
2017-03-02 08:13:29 +00:00
// Check for compactions of multiple blocks.
for {
2017-08-09 09:10:29 +00:00
plan , err := db . compactor . Plan ( db . dir )
2017-03-02 08:13:29 +00:00
if err != nil {
2018-09-21 06:24:01 +00:00
return errors . Wrap ( err , "plan compaction" )
2017-03-02 08:13:29 +00:00
}
2017-08-09 09:10:29 +00:00
if len ( plan ) == 0 {
2017-03-21 11:21:02 +00:00
break
}
2017-01-06 11:37:28 +00:00
2017-03-02 08:13:29 +00:00
select {
case <- db . stopc :
2018-09-21 06:24:01 +00:00
return nil
2017-03-02 08:13:29 +00:00
default :
}
2017-03-20 09:41:43 +00:00
2019-01-29 08:26:01 +00:00
uid , err := db . compactor . Compact ( db . dir , plan , db . blocks )
if err != nil {
2018-09-21 06:24:01 +00:00
return errors . Wrapf ( err , "compact %s" , plan )
2017-08-09 09:10:29 +00:00
}
2017-08-28 22:39:17 +00:00
2020-10-19 15:27:08 +00:00
if err := db . reloadBlocks ( ) ; err != nil {
2019-01-29 08:26:01 +00:00
if err := os . RemoveAll ( filepath . Join ( db . dir , uid . String ( ) ) ) ; err != nil {
2020-10-19 15:27:08 +00:00
return errors . Wrapf ( err , "delete compacted block after failed db reloadBlocks:%s" , uid )
2019-01-29 08:26:01 +00:00
}
2020-10-19 15:27:08 +00:00
return errors . Wrap ( err , "reloadBlocks blocks" )
2017-08-28 22:39:17 +00:00
}
2017-02-23 09:50:22 +00:00
}
2018-09-21 06:24:01 +00:00
return nil
2017-02-10 01:54:26 +00:00
}
2019-07-23 08:04:48 +00:00
// getBlock iterates a given block range to find a block by a given id.
// If found it returns the block itself and a boolean to indicate that it was found.
func getBlock ( allBlocks [ ] * Block , id ulid . ULID ) ( * Block , bool ) {
for _ , b := range allBlocks {
2017-05-18 14:09:30 +00:00
if b . Meta ( ) . ULID == id {
2017-03-20 07:41:56 +00:00
return b , true
}
}
return nil , false
}
2020-10-19 15:27:08 +00:00
// reload reloads blocks and truncates the head and its WAL.
func ( db * DB ) reload ( ) error {
if err := db . reloadBlocks ( ) ; err != nil {
return errors . Wrap ( err , "reloadBlocks" )
}
if len ( db . blocks ) == 0 {
return nil
}
if err := db . head . Truncate ( db . blocks [ len ( db . blocks ) - 1 ] . MaxTime ( ) ) ; err != nil {
return errors . Wrap ( err , "head truncate" )
}
return nil
}
// reloadBlocks reloads blocks without touching head.
2018-06-27 13:47:11 +00:00
// Blocks that are obsolete due to replacement or retention will be deleted.
2020-10-19 15:27:08 +00:00
func ( db * DB ) reloadBlocks ( ) ( err error ) {
2017-08-30 15:38:25 +00:00
defer func ( ) {
2017-05-26 13:13:03 +00:00
if err != nil {
db . metrics . reloadsFailed . Inc ( )
}
db . metrics . reloads . Inc ( )
2017-08-30 15:38:25 +00:00
} ( )
2017-05-26 13:13:03 +00:00
2021-02-17 05:32:43 +00:00
// Now that we reload TSDB every minute, there is high chance for race condition with a reload
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
// a normal reload and CleanTombstones try to delete the same block.
db . mtx . Lock ( )
defer db . mtx . Unlock ( )
2019-07-23 08:04:48 +00:00
loadable , corrupted , err := openBlocks ( db . logger , db . dir , db . blocks , db . chunkPool )
2017-03-02 08:13:29 +00:00
if err != nil {
2019-01-16 10:03:52 +00:00
return err
}
2018-06-27 16:05:21 +00:00
2020-07-22 15:19:33 +00:00
deletableULIDs := db . blocksToDelete ( loadable )
deletable := make ( map [ ulid . ULID ] * Block , len ( deletableULIDs ) )
2019-01-16 10:03:52 +00:00
2020-08-11 14:53:23 +00:00
// Mark all parents of loaded blocks as deletable (no matter if they exists). This makes it resilient against the process
// crashing towards the end of a compaction but before deletions. By doing that, we can pick up the deletion where it left off during a crash.
2019-01-16 10:03:52 +00:00
for _ , block := range loadable {
2020-07-22 15:19:33 +00:00
if _ , ok := deletableULIDs [ block . meta . ULID ] ; ok {
deletable [ block . meta . ULID ] = block
}
2019-01-16 10:03:52 +00:00
for _ , b := range block . Meta ( ) . Compaction . Parents {
2020-08-11 14:53:23 +00:00
if _ , ok := corrupted [ b . ULID ] ; ok {
delete ( corrupted , b . ULID )
level . Warn ( db . logger ) . Log ( "msg" , "Found corrupted block, but replaced by compacted one so it's safe to delete. This should not happen with atomic deletes." , "block" , b . ULID )
}
2019-01-16 10:03:52 +00:00
deletable [ b . ULID ] = nil
2017-02-10 01:54:26 +00:00
}
2019-01-16 10:03:52 +00:00
}
2020-08-11 14:53:23 +00:00
2019-01-16 10:03:52 +00:00
if len ( corrupted ) > 0 {
2020-08-11 14:53:23 +00:00
// Corrupted but no child loaded for it.
2019-01-30 09:40:40 +00:00
// Close all new blocks to release the lock for windows.
for _ , block := range loadable {
2019-07-23 08:04:48 +00:00
if _ , open := getBlock ( db . blocks , block . Meta ( ) . ULID ) ; ! open {
2019-01-30 09:40:40 +00:00
block . Close ( )
}
}
2020-10-28 15:24:58 +00:00
errs := tsdb_errors . NewMulti ( )
2020-06-17 14:40:00 +00:00
for ulid , err := range corrupted {
2020-10-28 15:24:58 +00:00
errs . Add ( errors . Wrapf ( err , "corrupted block %s" , ulid . String ( ) ) )
2020-06-17 14:40:00 +00:00
}
2020-10-28 15:24:58 +00:00
return errs . Err ( )
2019-01-16 10:03:52 +00:00
}
var (
2020-08-11 14:53:23 +00:00
toLoad [ ] * Block
2019-01-16 10:03:52 +00:00
blocksSize int64
)
2020-08-11 14:53:23 +00:00
// All deletable blocks should be unloaded.
// NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block).
2019-01-16 10:03:52 +00:00
for _ , block := range loadable {
if _ , ok := deletable [ block . Meta ( ) . ULID ] ; ok {
deletable [ block . Meta ( ) . ULID ] = block
2017-11-03 19:34:21 +00:00
continue
}
2019-01-16 10:03:52 +00:00
2020-08-11 14:53:23 +00:00
toLoad = append ( toLoad , block )
blocksSize += block . Size ( )
2019-01-16 10:03:52 +00:00
}
db . metrics . blocksBytes . Set ( float64 ( blocksSize ) )
2020-08-11 14:53:23 +00:00
sort . Slice ( toLoad , func ( i , j int ) bool {
return toLoad [ i ] . Meta ( ) . MinTime < toLoad [ j ] . Meta ( ) . MinTime
2019-01-16 10:03:52 +00:00
} )
2019-03-02 13:54:49 +00:00
if ! db . opts . AllowOverlappingBlocks {
2020-08-11 14:53:23 +00:00
if err := validateBlockSequence ( toLoad ) ; err != nil {
2019-02-26 19:50:37 +00:00
return errors . Wrap ( err , "invalid block sequence" )
}
}
2019-01-16 10:03:52 +00:00
// Swap new blocks first for subsequently created readers to be seen.
oldBlocks := db . blocks
2020-08-11 14:53:23 +00:00
db . blocks = toLoad
2019-01-16 10:03:52 +00:00
2020-08-11 14:53:23 +00:00
blockMetas := make ( [ ] BlockMeta , 0 , len ( toLoad ) )
for _ , b := range toLoad {
2019-02-14 13:29:41 +00:00
blockMetas = append ( blockMetas , b . Meta ( ) )
}
if overlaps := OverlappingBlocks ( blockMetas ) ; len ( overlaps ) > 0 {
2020-10-19 15:27:08 +00:00
level . Warn ( db . logger ) . Log ( "msg" , "Overlapping blocks found during reloadBlocks" , "detail" , overlaps . String ( ) )
2019-02-14 13:29:41 +00:00
}
2020-08-11 14:53:23 +00:00
// Append blocks to old, deletable blocks, so we can close them.
2019-01-16 10:03:52 +00:00
for _ , b := range oldBlocks {
if _ , ok := deletable [ b . Meta ( ) . ULID ] ; ok {
deletable [ b . Meta ( ) . ULID ] = b
2018-06-27 16:05:21 +00:00
}
}
2019-01-16 10:03:52 +00:00
if err := db . deleteBlocks ( deletable ) ; err != nil {
2020-10-19 15:27:08 +00:00
return errors . Wrapf ( err , "delete %v blocks" , len ( deletable ) )
2019-01-16 10:03:52 +00:00
}
2020-10-19 15:27:08 +00:00
return nil
2019-01-16 10:03:52 +00:00
}
2019-07-23 08:04:48 +00:00
func openBlocks ( l log . Logger , dir string , loaded [ ] * Block , chunkPool chunkenc . Pool ) ( blocks [ ] * Block , corrupted map [ ulid . ULID ] error , err error ) {
bDirs , err := blockDirs ( dir )
2019-01-16 10:03:52 +00:00
if err != nil {
return nil , nil , errors . Wrap ( err , "find blocks" )
}
corrupted = make ( map [ ulid . ULID ] error )
2019-07-23 08:04:48 +00:00
for _ , bDir := range bDirs {
meta , _ , err := readMetaFile ( bDir )
2018-06-27 13:47:11 +00:00
if err != nil {
2020-10-19 15:27:08 +00:00
level . Error ( l ) . Log ( "msg" , "Failed to read meta.json for a block during reloadBlocks. Skipping" , "dir" , bDir , "err" , err )
2018-06-27 13:47:11 +00:00
continue
}
2019-01-16 10:03:52 +00:00
2018-06-27 13:47:11 +00:00
// See if we already have the block in memory or open it otherwise.
2019-07-23 08:04:48 +00:00
block , open := getBlock ( loaded , meta . ULID )
if ! open {
block , err = OpenBlock ( l , bDir , chunkPool )
2017-05-18 14:09:30 +00:00
if err != nil {
2019-01-16 10:03:52 +00:00
corrupted [ meta . ULID ] = err
continue
2017-03-02 08:13:29 +00:00
}
}
2019-01-16 10:03:52 +00:00
blocks = append ( blocks , block )
2016-12-09 09:00:14 +00:00
}
2019-01-16 10:03:52 +00:00
return blocks , corrupted , nil
}
2020-07-22 15:19:33 +00:00
// DefaultBlocksToDelete returns a filter which decides time based and size based
// retention from the options of the db.
func DefaultBlocksToDelete ( db * DB ) BlocksToDeleteFunc {
return func ( blocks [ ] * Block ) map [ ulid . ULID ] struct { } {
return deletableBlocks ( db , blocks )
}
}
2020-08-11 05:56:08 +00:00
// deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block.
2020-07-22 15:19:33 +00:00
func deletableBlocks ( db * DB , blocks [ ] * Block ) map [ ulid . ULID ] struct { } {
deletable := make ( map [ ulid . ULID ] struct { } )
2019-01-16 10:03:52 +00:00
// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
2018-05-28 20:00:36 +00:00
sort . Slice ( blocks , func ( i , j int ) bool {
2019-01-16 10:03:52 +00:00
return blocks [ i ] . Meta ( ) . MaxTime > blocks [ j ] . Meta ( ) . MaxTime
2018-05-28 20:00:36 +00:00
} )
2019-01-16 10:03:52 +00:00
2019-01-18 08:35:16 +00:00
for _ , block := range blocks {
if block . Meta ( ) . Compaction . Deletable {
2020-07-22 15:19:33 +00:00
deletable [ block . Meta ( ) . ULID ] = struct { } { }
2019-01-18 08:35:16 +00:00
}
2017-05-18 14:09:30 +00:00
}
2017-05-26 11:01:45 +00:00
2020-07-22 15:19:33 +00:00
for ulid := range BeyondTimeRetention ( db , blocks ) {
deletable [ ulid ] = struct { } { }
2017-05-18 14:09:30 +00:00
}
2020-07-22 15:19:33 +00:00
for ulid := range BeyondSizeRetention ( db , blocks ) {
deletable [ ulid ] = struct { } { }
2019-01-16 10:03:52 +00:00
}
2017-05-18 14:09:30 +00:00
2019-01-16 10:03:52 +00:00
return deletable
}
2020-07-22 15:19:33 +00:00
// BeyondTimeRetention returns those blocks which are beyond the time retention
// set in the db options.
func BeyondTimeRetention ( db * DB , blocks [ ] * Block ) ( deletable map [ ulid . ULID ] struct { } ) {
2019-01-16 10:03:52 +00:00
// Time retention is disabled or no blocks to work with.
2020-10-19 11:21:54 +00:00
if len ( blocks ) == 0 || db . opts . RetentionDuration == 0 {
2019-01-16 10:03:52 +00:00
return
}
2020-07-22 15:19:33 +00:00
deletable = make ( map [ ulid . ULID ] struct { } )
2019-01-16 10:03:52 +00:00
for i , block := range blocks {
// The difference between the first block and this block is larger than
2020-01-02 14:54:09 +00:00
// the retention period so any blocks after that are added as deletable.
2020-02-11 16:34:09 +00:00
if i > 0 && blocks [ 0 ] . Meta ( ) . MaxTime - block . Meta ( ) . MaxTime > db . opts . RetentionDuration {
2019-01-16 10:03:52 +00:00
for _ , b := range blocks [ i : ] {
2020-07-22 15:19:33 +00:00
deletable [ b . meta . ULID ] = struct { } { }
2019-01-16 10:03:52 +00:00
}
db . metrics . timeRetentionCount . Inc ( )
break
2017-11-03 19:34:21 +00:00
}
2019-01-16 10:03:52 +00:00
}
2020-01-02 14:54:09 +00:00
return deletable
2019-01-16 10:03:52 +00:00
}
2020-07-22 15:19:33 +00:00
// BeyondSizeRetention returns those blocks which are beyond the size retention
// set in the db options.
func BeyondSizeRetention ( db * DB , blocks [ ] * Block ) ( deletable map [ ulid . ULID ] struct { } ) {
2019-01-16 10:03:52 +00:00
// Size retention is disabled or no blocks to work with.
2020-10-19 11:21:54 +00:00
if len ( blocks ) == 0 || db . opts . MaxBytes <= 0 {
2019-01-16 10:03:52 +00:00
return
}
2020-07-22 15:19:33 +00:00
deletable = make ( map [ ulid . ULID ] struct { } )
2019-11-12 02:40:16 +00:00
2020-05-06 15:30:00 +00:00
// Initializing size counter with WAL size and Head chunks
// written to disk, as that is part of the retention strategy.
2020-10-12 21:15:40 +00:00
blocksSize := db . Head ( ) . Size ( )
2019-01-16 10:03:52 +00:00
for i , block := range blocks {
blocksSize += block . Size ( )
2020-02-06 15:58:38 +00:00
if blocksSize > int64 ( db . opts . MaxBytes ) {
2019-01-16 10:03:52 +00:00
// Add this and all following blocks for deletion.
for _ , b := range blocks [ i : ] {
2020-07-22 15:19:33 +00:00
deletable [ b . meta . ULID ] = struct { } { }
2019-01-16 10:03:52 +00:00
}
db . metrics . sizeRetentionCount . Inc ( )
break
2017-11-03 19:34:21 +00:00
}
2018-06-27 13:47:11 +00:00
}
2020-01-02 14:54:09 +00:00
return deletable
2019-01-16 10:03:52 +00:00
}
2020-08-11 14:53:23 +00:00
// deleteBlocks closes the block if loaded and deletes blocks from the disk if exists.
2019-01-16 10:03:52 +00:00
// When the map contains a non nil block object it means it is loaded in memory
// so needs to be closed first as it might need to wait for pending readers to complete.
func ( db * DB ) deleteBlocks ( blocks map [ ulid . ULID ] * Block ) error {
for ulid , block := range blocks {
if block != nil {
if err := block . Close ( ) ; err != nil {
2020-06-17 14:40:00 +00:00
level . Warn ( db . logger ) . Log ( "msg" , "Closing block failed" , "err" , err , "block" , ulid )
2019-01-16 10:03:52 +00:00
}
}
2020-08-11 05:56:08 +00:00
2020-08-11 14:53:23 +00:00
toDelete := filepath . Join ( db . dir , ulid . String ( ) )
if _ , err := os . Stat ( toDelete ) ; os . IsNotExist ( err ) {
// Noop.
continue
} else if err != nil {
return errors . Wrapf ( err , "stat dir %v" , toDelete )
}
// Replace atomically to avoid partial block when process would crash during deletion.
2020-08-11 05:56:08 +00:00
tmpToDelete := filepath . Join ( db . dir , fmt . Sprintf ( "%s%s" , ulid , tmpForDeletionBlockDirSuffix ) )
2020-08-11 14:53:23 +00:00
if err := fileutil . Replace ( toDelete , tmpToDelete ) ; err != nil {
2020-08-11 05:56:08 +00:00
return errors . Wrapf ( err , "replace of obsolete block for deletion %s" , ulid )
}
if err := os . RemoveAll ( tmpToDelete ) ; err != nil {
2018-06-27 13:47:11 +00:00
return errors . Wrapf ( err , "delete obsolete block %s" , ulid )
2017-10-23 18:30:03 +00:00
}
2020-08-11 14:53:23 +00:00
level . Info ( db . logger ) . Log ( "msg" , "Deleting obsolete block" , "block" , ulid )
2017-10-23 18:30:03 +00:00
}
2020-08-11 05:56:08 +00:00
2019-01-16 10:03:52 +00:00
return nil
2017-05-18 14:09:30 +00:00
}
2017-01-03 14:43:26 +00:00
2019-02-26 19:50:37 +00:00
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence ( bs [ ] * Block ) error {
if len ( bs ) <= 1 {
return nil
}
var metas [ ] BlockMeta
for _ , b := range bs {
metas = append ( metas , b . meta )
}
overlaps := OverlappingBlocks ( metas )
if len ( overlaps ) > 0 {
return errors . Errorf ( "block time ranges overlap: %s" , overlaps )
}
return nil
}
2018-04-05 13:15:24 +00:00
// TimeRange specifies minTime and maxTime range.
2018-03-29 11:50:46 +00:00
type TimeRange struct {
2018-04-05 12:51:33 +00:00
Min , Max int64
2018-03-29 11:50:46 +00:00
}
2018-04-05 12:51:33 +00:00
2018-04-05 13:15:24 +00:00
// Overlaps contains overlapping blocks aggregated by overlapping range.
type Overlaps map [ TimeRange ] [ ] BlockMeta
// String returns human readable string form of overlapped blocks.
func ( o Overlaps ) String ( ) string {
var res [ ] string
for r , overlaps := range o {
var groups [ ] string
for _ , m := range overlaps {
groups = append ( groups , fmt . Sprintf (
2018-04-05 15:53:24 +00:00
"<ulid: %s, mint: %d, maxt: %d, range: %s>" ,
2018-04-05 13:15:24 +00:00
m . ULID . String ( ) ,
m . MinTime ,
m . MaxTime ,
( time . Duration ( ( m . MaxTime - m . MinTime ) / 1000 ) * time . Second ) . String ( ) ,
) )
}
2018-04-05 15:01:16 +00:00
res = append ( res , fmt . Sprintf (
2018-04-05 15:53:24 +00:00
"[mint: %d, maxt: %d, range: %s, blocks: %d]: %s" ,
2018-04-05 15:01:16 +00:00
r . Min , r . Max ,
( time . Duration ( ( r . Max - r . Min ) / 1000 ) * time . Second ) . String ( ) ,
len ( overlaps ) ,
2018-04-05 15:53:24 +00:00
strings . Join ( groups , ", " ) ) ,
2018-04-05 15:01:16 +00:00
)
2018-04-05 13:15:24 +00:00
}
2018-04-05 15:01:16 +00:00
return strings . Join ( res , "\n" )
2018-04-05 13:15:24 +00:00
}
// OverlappingBlocks returns all overlapping blocks from given meta files.
func OverlappingBlocks ( bm [ ] BlockMeta ) Overlaps {
2018-03-28 17:33:41 +00:00
if len ( bm ) <= 1 {
2018-03-28 14:50:52 +00:00
return nil
}
2018-03-28 22:50:42 +00:00
var (
2018-04-05 12:51:33 +00:00
overlaps [ ] [ ] BlockMeta
2018-03-28 22:50:42 +00:00
// pending contains not ended blocks in regards to "current" timestamp.
pending = [ ] BlockMeta { bm [ 0 ] }
2018-03-29 11:50:46 +00:00
// continuousPending helps to aggregate same overlaps to single group.
continuousPending = true
2018-03-28 22:50:42 +00:00
)
2018-04-05 13:15:24 +00:00
// We have here blocks sorted by minTime. We iterate over each block and treat its minTime as our "current" timestamp.
// We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current
// timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending.
2018-03-28 22:18:24 +00:00
for _ , b := range bm [ 1 : ] {
2018-03-28 22:50:42 +00:00
var newPending [ ] BlockMeta
2018-03-28 17:33:41 +00:00
2018-03-28 22:18:24 +00:00
for _ , p := range pending {
2018-03-28 22:50:42 +00:00
// "b.MinTime" is our current time.
2018-03-28 22:18:24 +00:00
if b . MinTime >= p . MaxTime {
2018-03-29 11:50:46 +00:00
continuousPending = false
2018-03-28 22:18:24 +00:00
continue
2018-03-28 17:33:41 +00:00
}
2018-03-28 22:18:24 +00:00
// "p" overlaps with "b" and "p" is still pending.
newPending = append ( newPending , p )
2017-05-18 14:09:30 +00:00
}
2018-03-28 22:50:42 +00:00
2018-03-28 22:18:24 +00:00
// Our block "b" is now pending.
pending = append ( newPending , b )
if len ( newPending ) == 0 {
2018-03-28 22:50:42 +00:00
// No overlaps.
2018-03-28 22:18:24 +00:00
continue
2018-03-28 17:33:41 +00:00
}
2018-03-29 11:50:46 +00:00
if continuousPending && len ( overlaps ) > 0 {
2018-03-28 22:18:24 +00:00
overlaps [ len ( overlaps ) - 1 ] = append ( overlaps [ len ( overlaps ) - 1 ] , b )
2018-03-28 17:33:41 +00:00
continue
2017-05-18 14:09:30 +00:00
}
2018-03-28 22:18:24 +00:00
overlaps = append ( overlaps , append ( newPending , b ) )
2018-03-29 11:50:46 +00:00
// Start new pendings.
continuousPending = true
2017-05-18 14:09:30 +00:00
}
2018-04-05 12:51:33 +00:00
// Fetch the critical overlapped time range foreach overlap groups.
2018-04-05 13:15:24 +00:00
overlapGroups := Overlaps { }
2018-04-05 12:51:33 +00:00
for _ , overlap := range overlaps {
minRange := TimeRange { Min : 0 , Max : math . MaxInt64 }
for _ , b := range overlap {
if minRange . Max > b . MaxTime {
minRange . Max = b . MaxTime
}
if minRange . Min < b . MinTime {
minRange . Min = b . MinTime
}
}
overlapGroups [ minRange ] = overlap
}
return overlapGroups
2017-01-02 21:24:35 +00:00
}
2017-10-09 13:21:46 +00:00
func ( db * DB ) String ( ) string {
return "HEAD"
}
// Blocks returns the databases persisted blocks.
func ( db * DB ) Blocks ( ) [ ] * Block {
2017-08-29 13:39:27 +00:00
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
return db . blocks
}
2017-10-09 13:21:46 +00:00
// Head returns the databases's head.
2017-09-25 14:45:24 +00:00
func ( db * DB ) Head ( ) * Head {
return db . head
}
2017-01-06 07:08:02 +00:00
// Close the partition.
2017-01-06 10:40:09 +00:00
func ( db * DB ) Close ( ) error {
2017-01-06 11:37:28 +00:00
close ( db . stopc )
2020-10-28 10:09:03 +00:00
if db . compactCancel != nil {
db . compactCancel ( )
}
2017-01-06 11:37:28 +00:00
<- db . donec
2017-01-06 10:40:09 +00:00
db . mtx . Lock ( )
2017-03-17 11:12:50 +00:00
defer db . mtx . Unlock ( )
2017-03-04 15:50:48 +00:00
2017-03-06 11:13:15 +00:00
var g errgroup . Group
2017-01-02 09:34:55 +00:00
2017-03-20 07:41:56 +00:00
// blocks also contains all head blocks.
for _ , pb := range db . blocks {
2017-03-06 11:13:15 +00:00
g . Go ( pb . Close )
2016-12-15 07:31:26 +00:00
}
2021-11-11 16:45:25 +00:00
errs := tsdb_errors . NewMulti ( g . Wait ( ) , db . locker . Release ( ) )
2020-10-21 15:08:28 +00:00
if db . head != nil {
2020-10-28 15:24:58 +00:00
errs . Add ( db . head . Close ( ) )
2020-10-21 15:08:28 +00:00
}
2020-10-28 15:24:58 +00:00
return errs . Err ( )
2016-12-09 09:00:14 +00:00
}
2018-11-20 10:34:26 +00:00
// DisableCompactions disables auto compactions.
2017-06-06 18:15:23 +00:00
func ( db * DB ) DisableCompactions ( ) {
2018-11-20 10:34:26 +00:00
db . autoCompactMtx . Lock ( )
defer db . autoCompactMtx . Unlock ( )
2017-07-14 08:06:07 +00:00
2018-11-20 10:34:26 +00:00
db . autoCompact = false
2020-04-11 08:22:18 +00:00
level . Info ( db . logger ) . Log ( "msg" , "Compactions disabled" )
2017-06-06 14:53:20 +00:00
}
2018-11-20 10:34:26 +00:00
// EnableCompactions enables auto compactions.
2017-06-06 18:15:23 +00:00
func ( db * DB ) EnableCompactions ( ) {
2018-11-20 10:34:26 +00:00
db . autoCompactMtx . Lock ( )
defer db . autoCompactMtx . Unlock ( )
2017-07-14 08:06:07 +00:00
2018-11-20 10:34:26 +00:00
db . autoCompact = true
2020-04-11 08:22:18 +00:00
level . Info ( db . logger ) . Log ( "msg" , "Compactions enabled" )
2017-06-05 08:18:31 +00:00
}
2018-02-28 11:04:55 +00:00
// Snapshot writes the current data to the directory. If withHead is set to true it
// will create a new block containing all data that's currently in the memory buffer/WAL.
func ( db * DB ) Snapshot ( dir string , withHead bool ) error {
2017-08-30 16:34:54 +00:00
if dir == db . dir {
return errors . Errorf ( "cannot snapshot into base directory" )
}
2019-03-18 14:14:10 +00:00
if _ , err := ulid . ParseStrict ( dir ) ; err == nil {
2017-08-30 16:34:54 +00:00
return errors . Errorf ( "dir must not be a valid ULID" )
}
2017-06-05 08:18:31 +00:00
2017-08-30 16:34:54 +00:00
db . cmtx . Lock ( )
defer db . cmtx . Unlock ( )
2017-10-23 18:30:03 +00:00
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
for _ , b := range db . blocks {
2020-04-11 08:22:18 +00:00
level . Info ( db . logger ) . Log ( "msg" , "Snapshotting block" , "block" , b )
2017-08-30 16:34:54 +00:00
if err := b . Snapshot ( dir ) ; err != nil {
2017-11-22 12:28:06 +00:00
return errors . Wrapf ( err , "error snapshotting block: %s" , b . Dir ( ) )
2017-08-30 16:34:54 +00:00
}
}
2018-02-28 11:04:55 +00:00
if ! withHead {
return nil
}
2019-07-03 10:47:31 +00:00
mint := db . head . MinTime ( )
maxt := db . head . MaxTime ( )
2020-08-13 09:55:35 +00:00
head := NewRangeHead ( db . head , mint , maxt )
2019-07-03 10:47:31 +00:00
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes.
if _ , err := db . compactor . Write ( dir , head , mint , maxt + 1 , nil ) ; err != nil {
return errors . Wrap ( err , "snapshot head block" )
}
return nil
2017-08-28 22:39:17 +00:00
}
2017-07-14 07:00:22 +00:00
2017-08-28 22:39:17 +00:00
// Querier returns a new querier over the data partition for the given time range.
2020-02-06 15:58:38 +00:00
func ( db * DB ) Querier ( _ context . Context , mint , maxt int64 ) ( storage . Querier , error ) {
2017-10-09 13:21:46 +00:00
var blocks [ ] BlockReader
2017-08-28 22:39:17 +00:00
2017-10-23 18:30:03 +00:00
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
for _ , b := range db . blocks {
2018-07-02 08:23:36 +00:00
if b . OverlapsClosedInterval ( mint , maxt ) {
2017-10-09 13:21:46 +00:00
blocks = append ( blocks , b )
}
}
2021-07-20 08:47:20 +00:00
var headQuerier storage . Querier
2017-10-09 13:21:46 +00:00
if maxt >= db . head . MinTime ( ) {
2021-07-20 08:47:20 +00:00
rh := NewRangeHead ( db . head , mint , maxt )
var err error
headQuerier , err = NewBlockQuerier ( rh , mint , maxt )
if err != nil {
return nil , errors . Wrapf ( err , "open querier for head %s" , rh )
}
// Getting the querier above registers itself in the queue that the truncation waits on.
// So if the querier is currently not colliding with any truncation, we can continue to use it and still
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
shouldClose , getNew , newMint := db . head . IsQuerierCollidingWithTruncation ( mint , maxt )
if shouldClose {
if err := headQuerier . Close ( ) ; err != nil {
return nil , errors . Wrapf ( err , "closing head querier %s" , rh )
}
headQuerier = nil
}
if getNew {
rh := NewRangeHead ( db . head , newMint , maxt )
headQuerier , err = NewBlockQuerier ( rh , newMint , maxt )
if err != nil {
return nil , errors . Wrapf ( err , "open querier for head while getting new querier %s" , rh )
}
}
2017-10-09 13:21:46 +00:00
}
2017-08-28 22:39:17 +00:00
2020-02-06 15:58:38 +00:00
blockQueriers := make ( [ ] storage . Querier , 0 , len ( blocks ) )
2017-06-06 12:45:54 +00:00
for _ , b := range blocks {
2017-10-09 13:21:46 +00:00
q , err := NewBlockQuerier ( b , mint , maxt )
2017-10-23 18:30:03 +00:00
if err == nil {
2019-02-14 13:29:41 +00:00
blockQueriers = append ( blockQueriers , q )
2017-10-23 18:30:03 +00:00
continue
}
// If we fail, all previously opened queriers must be closed.
2019-02-14 13:29:41 +00:00
for _ , q := range blockQueriers {
2020-07-31 15:03:02 +00:00
// TODO(bwplotka): Handle error.
_ = q . Close ( )
2017-10-09 13:21:46 +00:00
}
2017-10-23 18:30:03 +00:00
return nil , errors . Wrapf ( err , "open querier for block %s" , b )
2017-10-09 13:21:46 +00:00
}
2021-07-20 08:47:20 +00:00
if headQuerier != nil {
blockQueriers = append ( blockQueriers , headQuerier )
}
2020-07-31 15:03:02 +00:00
return storage . NewMergeQuerier ( blockQueriers , nil , storage . ChainedSeriesMerge ) , nil
}
// ChunkQuerier returns a new chunk querier over the data partition for the given time range.
func ( db * DB ) ChunkQuerier ( _ context . Context , mint , maxt int64 ) ( storage . ChunkQuerier , error ) {
var blocks [ ] BlockReader
2019-02-14 13:29:41 +00:00
2020-07-31 15:03:02 +00:00
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
for _ , b := range db . blocks {
if b . OverlapsClosedInterval ( mint , maxt ) {
blocks = append ( blocks , b )
}
}
2021-07-20 08:47:20 +00:00
var headQuerier storage . ChunkQuerier
2020-07-31 15:03:02 +00:00
if maxt >= db . head . MinTime ( ) {
2021-07-20 08:47:20 +00:00
rh := NewRangeHead ( db . head , mint , maxt )
var err error
headQuerier , err = NewBlockChunkQuerier ( rh , mint , maxt )
if err != nil {
return nil , errors . Wrapf ( err , "open querier for head %s" , rh )
}
// Getting the querier above registers itself in the queue that the truncation waits on.
// So if the querier is currently not colliding with any truncation, we can continue to use it and still
// won't run into a race later since any truncation that comes after will wait on this querier if it overlaps.
shouldClose , getNew , newMint := db . head . IsQuerierCollidingWithTruncation ( mint , maxt )
if shouldClose {
if err := headQuerier . Close ( ) ; err != nil {
return nil , errors . Wrapf ( err , "closing head querier %s" , rh )
}
headQuerier = nil
}
if getNew {
rh := NewRangeHead ( db . head , newMint , maxt )
headQuerier , err = NewBlockChunkQuerier ( rh , newMint , maxt )
if err != nil {
return nil , errors . Wrapf ( err , "open querier for head while getting new querier %s" , rh )
}
}
2019-02-14 13:29:41 +00:00
}
2020-07-31 15:03:02 +00:00
blockQueriers := make ( [ ] storage . ChunkQuerier , 0 , len ( blocks ) )
for _ , b := range blocks {
q , err := NewBlockChunkQuerier ( b , mint , maxt )
if err == nil {
blockQueriers = append ( blockQueriers , q )
continue
}
// If we fail, all previously opened queriers must be closed.
for _ , q := range blockQueriers {
// TODO(bwplotka): Handle error.
_ = q . Close ( )
}
return nil , errors . Wrapf ( err , "open querier for block %s" , b )
}
2021-07-20 08:47:20 +00:00
if headQuerier != nil {
blockQueriers = append ( blockQueriers , headQuerier )
}
2017-08-28 22:39:17 +00:00
2020-07-31 15:03:02 +00:00
return storage . NewMergeChunkQuerier ( blockQueriers , nil , storage . NewCompactingChunkSeriesMerger ( storage . ChainedSeriesMerge ) ) , nil
2020-06-24 13:41:52 +00:00
}
2021-03-16 09:47:45 +00:00
func ( db * DB ) ExemplarQuerier ( ctx context . Context ) ( storage . ExemplarQuerier , error ) {
return db . head . exemplars . ExemplarQuerier ( ctx )
}
2021-10-22 08:06:44 +00:00
func rangeForTimestamp ( t , width int64 ) ( maxt int64 ) {
2018-12-04 10:30:49 +00:00
return ( t / width ) * width + width
2017-02-01 14:29:48 +00:00
}
2017-08-28 22:39:17 +00:00
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
2019-11-18 19:53:33 +00:00
func ( db * DB ) Delete ( mint , maxt int64 , ms ... * labels . Matcher ) error {
2017-05-20 07:51:10 +00:00
db . cmtx . Lock ( )
defer db . cmtx . Unlock ( )
2017-07-14 07:00:22 +00:00
2017-05-19 19:05:50 +00:00
var g errgroup . Group
2017-10-23 18:30:03 +00:00
db . mtx . RLock ( )
defer db . mtx . RUnlock ( )
for _ , b := range db . blocks {
2018-07-02 08:23:36 +00:00
if b . OverlapsClosedInterval ( mint , maxt ) {
2017-10-09 13:21:46 +00:00
g . Go ( func ( b * Block ) func ( ) error {
2017-08-28 22:39:17 +00:00
return func ( ) error { return b . Delete ( mint , maxt , ms ... ) }
} ( b ) )
}
}
2021-09-16 06:50:03 +00:00
if db . head . OverlapsClosedInterval ( mint , maxt ) {
g . Go ( func ( ) error {
return db . head . Delete ( mint , maxt , ms ... )
} )
}
2018-03-21 21:23:47 +00:00
return g . Wait ( )
2017-05-19 19:05:50 +00:00
}
2017-11-22 12:34:50 +00:00
// CleanTombstones re-writes any blocks with tombstones.
2018-05-31 02:09:30 +00:00
func ( db * DB ) CleanTombstones ( ) ( err error ) {
2017-11-22 12:34:50 +00:00
db . cmtx . Lock ( )
defer db . cmtx . Unlock ( )
start := time . Now ( )
2018-03-21 20:23:47 +00:00
defer db . metrics . tombCleanTimer . Observe ( time . Since ( start ) . Seconds ( ) )
2017-11-22 12:34:50 +00:00
2021-02-17 05:32:43 +00:00
cleanUpCompleted := false
// Repeat cleanup until there is no tombstones left.
for ! cleanUpCompleted {
cleanUpCompleted = true
for _ , pb := range db . Blocks ( ) {
uid , safeToDelete , cleanErr := pb . CleanTombstones ( db . Dir ( ) , db . compactor )
if cleanErr != nil {
return errors . Wrapf ( cleanErr , "clean tombstones: %s" , pb . Dir ( ) )
}
if ! safeToDelete {
// There was nothing to clean.
continue
}
// In case tombstones of the old block covers the whole block,
// then there would be no resultant block to tell the parent.
// The lock protects against race conditions when deleting blocks
// during an already running reload.
db . mtx . Lock ( )
pb . meta . Compaction . Deletable = safeToDelete
db . mtx . Unlock ( )
cleanUpCompleted = false
if err = db . reloadBlocks ( ) ; err == nil { // Will try to delete old block.
// Successful reload will change the existing blocks.
// We need to loop over the new set of blocks.
break
}
// Delete new block if it was created.
if uid != nil && * uid != ( ulid . ULID { } ) {
2018-05-31 02:09:30 +00:00
dir := filepath . Join ( db . Dir ( ) , uid . String ( ) )
if err := os . RemoveAll ( dir ) ; err != nil {
level . Error ( db . logger ) . Log ( "msg" , "failed to delete block after failed `CleanTombstones`" , "dir" , dir , "err" , err )
}
}
2021-02-17 05:32:43 +00:00
return errors . Wrap ( err , "reload blocks" )
2018-05-31 02:09:30 +00:00
}
2020-10-19 15:27:08 +00:00
}
return nil
2017-11-22 12:34:50 +00:00
}
2017-01-19 10:22:47 +00:00
func isBlockDir ( fi os . FileInfo ) bool {
if ! fi . IsDir ( ) {
return false
}
2019-03-18 14:14:10 +00:00
_ , err := ulid . ParseStrict ( fi . Name ( ) )
2017-05-18 14:09:30 +00:00
return err == nil
2017-01-19 10:22:47 +00:00
}
2020-08-11 05:56:08 +00:00
// isTmpBlockDir returns dir that consists of block dir ULID and tmp extension.
func isTmpBlockDir ( fi os . FileInfo ) bool {
if ! fi . IsDir ( ) {
return false
}
fn := fi . Name ( )
ext := filepath . Ext ( fn )
2021-01-09 09:02:26 +00:00
if ext == tmpForDeletionBlockDirSuffix || ext == tmpForCreationBlockDirSuffix || ext == tmpLegacy {
2020-08-11 05:56:08 +00:00
if _ , err := ulid . ParseStrict ( fn [ : len ( fn ) - len ( ext ) ] ) ; err == nil {
return true
}
}
return false
}
2017-01-19 10:22:47 +00:00
func blockDirs ( dir string ) ( [ ] string , error ) {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
var dirs [ ] string
for _ , fi := range files {
if isBlockDir ( fi ) {
dirs = append ( dirs , filepath . Join ( dir , fi . Name ( ) ) )
}
}
return dirs , nil
2017-01-03 14:43:26 +00:00
}
2016-12-09 12:41:38 +00:00
2017-08-30 16:34:54 +00:00
func sequenceFiles ( dir string ) ( [ ] string , error ) {
2017-02-14 07:53:19 +00:00
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
var res [ ] string
for _ , fi := range files {
2017-08-30 16:34:54 +00:00
if _ , err := strconv . ParseUint ( fi . Name ( ) , 10 , 64 ) ; err != nil {
continue
2017-02-14 07:53:19 +00:00
}
2017-08-30 16:34:54 +00:00
res = append ( res , filepath . Join ( dir , fi . Name ( ) ) )
2017-02-14 07:53:19 +00:00
}
return res , nil
}
2017-08-30 16:34:54 +00:00
func nextSequenceFile ( dir string ) ( string , int , error ) {
2020-04-06 13:34:20 +00:00
files , err := ioutil . ReadDir ( dir )
2017-01-06 08:26:39 +00:00
if err != nil {
2017-01-29 07:11:47 +00:00
return "" , 0 , err
2017-01-06 08:26:39 +00:00
}
2017-01-06 12:13:22 +00:00
2017-01-06 08:26:39 +00:00
i := uint64 ( 0 )
2020-04-06 13:34:20 +00:00
for _ , f := range files {
j , err := strconv . ParseUint ( f . Name ( ) , 10 , 64 )
2017-01-06 12:13:22 +00:00
if err != nil {
continue
2017-01-06 08:26:39 +00:00
}
2017-01-06 12:13:22 +00:00
i = j
2017-01-06 08:26:39 +00:00
}
2017-08-30 16:34:54 +00:00
return filepath . Join ( dir , fmt . Sprintf ( "%0.6d" , i + 1 ) ) , int ( i + 1 ) , nil
2017-01-06 08:26:39 +00:00
}
2016-12-09 12:41:38 +00:00
2017-08-28 22:39:17 +00:00
func exponential ( d , min , max time . Duration ) time . Duration {
d *= 2
if d < min {
d = min
}
if d > max {
d = max
}
return d
}