2015-01-21 19:07:45 +00:00
// Copyright 2014 The Prometheus Authors
2014-09-19 16:18:44 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-09-16 13:47:24 +00:00
package local
2014-06-06 09:55:53 +00:00
import (
"bufio"
"encoding/binary"
"fmt"
"io"
2015-03-03 17:59:39 +00:00
"io/ioutil"
2017-02-07 16:33:54 +00:00
"math"
2014-06-06 09:55:53 +00:00
"os"
2015-01-14 15:52:09 +00:00
"path/filepath"
2015-03-02 19:02:37 +00:00
"strconv"
"strings"
2014-10-07 17:11:24 +00:00
"sync"
2015-03-09 01:33:10 +00:00
"sync/atomic"
2014-09-23 17:21:10 +00:00
"time"
2014-06-06 09:55:53 +00:00
2014-09-24 14:51:18 +00:00
"github.com/prometheus/client_golang/prometheus"
2015-10-03 08:21:43 +00:00
"github.com/prometheus/common/log"
2015-08-20 15:18:46 +00:00
"github.com/prometheus/common/model"
2014-06-06 09:55:53 +00:00
2016-09-21 21:44:27 +00:00
"github.com/prometheus/prometheus/storage/local/chunk"
2014-09-23 17:21:10 +00:00
"github.com/prometheus/prometheus/storage/local/codable"
2014-08-21 20:06:11 +00:00
"github.com/prometheus/prometheus/storage/local/index"
2015-05-29 11:30:30 +00:00
"github.com/prometheus/prometheus/util/flock"
2014-06-06 09:55:53 +00:00
)
const (
2015-03-03 17:59:39 +00:00
// Version of the storage as it can be found in the version file.
// Increment to protect against incompatible changes.
2015-03-02 19:02:37 +00:00
Version = 1
versionFileName = "VERSION"
2014-10-15 15:07:12 +00:00
seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp"
2014-11-20 20:03:51 +00:00
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
hintFileSuffix = ".hint"
2014-09-10 16:41:52 +00:00
2015-05-06 14:53:12 +00:00
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
mappingsFormatVersion = 1
mappingsMagicString = "PrometheusMappings"
2014-11-20 20:03:51 +00:00
dirtyFileName = "DIRTY"
2014-11-04 15:27:14 +00:00
fileBufSize = 1 << 16 // 64kiB.
2014-08-12 15:46:46 +00:00
2014-06-06 09:55:53 +00:00
chunkHeaderLen = 17
chunkHeaderTypeOffset = 0
chunkHeaderFirstTimeOffset = 1
chunkHeaderLastTimeOffset = 9
2016-09-21 21:44:27 +00:00
chunkLenWithHeader = chunk . ChunkLen + chunkHeaderLen
2016-01-25 17:57:51 +00:00
chunkMaxBatchSize = 62 // Max no. of chunks to load or write in
// one batch. Note that 62 is the largest number of chunks that fit
// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
2014-09-23 17:21:10 +00:00
2014-10-06 13:58:12 +00:00
indexingMaxBatchSize = 1024 * 1024
indexingBatchTimeout = 500 * time . Millisecond // Commit batch when idle for that long.
2017-01-05 17:57:42 +00:00
indexingQueueCapacity = 1024 * 256
2014-06-06 09:55:53 +00:00
)
2015-08-20 15:18:46 +00:00
var fpLen = len ( model . Fingerprint ( 0 ) . String ( ) ) // Length of a fingerprint as string.
2014-11-20 20:03:51 +00:00
2014-09-16 13:47:24 +00:00
const (
2014-10-27 19:40:48 +00:00
flagHeadChunkPersisted byte = 1 << iota
// Add more flags here like:
// flagFoo
// flagBar
2014-09-16 13:47:24 +00:00
)
2014-09-23 17:21:10 +00:00
type indexingOpType byte
const (
add indexingOpType = iota
remove
)
type indexingOp struct {
2015-08-20 15:18:46 +00:00
fingerprint model . Fingerprint
metric model . Metric
2014-09-23 17:21:10 +00:00
opType indexingOpType
}
2014-10-07 17:11:24 +00:00
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
// explicitly marked as such below. The chunk-related methods persistChunks,
Fix a bug handling freshly unarchived series.
Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
2015-01-08 15:10:31 +00:00
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
2014-10-07 17:11:24 +00:00
// each other if each call refers to a different fingerprint.
type persistence struct {
2015-03-13 14:49:07 +00:00
basePath string
2014-09-10 16:41:52 +00:00
archivedFingerprintToMetrics * index . FingerprintMetricIndex
archivedFingerprintToTimeRange * index . FingerprintTimeRangeIndex
labelPairToFingerprints * index . LabelPairFingerprintIndex
labelNameToLabelValues * index . LabelNameLabelValuesIndex
2014-09-23 17:21:10 +00:00
indexingQueue chan indexingOp
indexingStopped chan struct { }
2014-09-24 12:41:38 +00:00
indexingFlush chan chan int
2014-09-24 14:51:18 +00:00
2017-01-11 15:11:19 +00:00
indexingQueueLength prometheus . Gauge
indexingQueueCapacity prometheus . Metric
indexingBatchSizes prometheus . Summary
indexingBatchDuration prometheus . Summary
checkpointDuration prometheus . Summary
checkpointLastDuration prometheus . Gauge
checkpointLastSize prometheus . Gauge
checkpointChunksWritten prometheus . Summary
dirtyCounter prometheus . Counter
startedDirty prometheus . Gauge
checkpointing prometheus . Gauge
seriesChunksPersisted prometheus . Histogram
2014-11-05 19:02:45 +00:00
2015-03-19 11:03:09 +00:00
dirtyMtx sync . Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime.
pedanticChecks bool // true if crash recovery should check each series.
dirtyFileName string // The file used for locking and to mark dirty state.
fLock flock . Releaser // The file lock to protect against concurrent usage.
2015-03-19 14:41:50 +00:00
shouldSync syncStrategy
2015-04-14 08:43:09 +00:00
2016-01-11 15:42:10 +00:00
minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.
2015-04-14 08:43:09 +00:00
bufPool sync . Pool
2014-06-06 09:55:53 +00:00
}
2014-10-07 17:11:24 +00:00
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
2016-01-11 15:42:10 +00:00
func newPersistence (
basePath string ,
dirty , pedanticChecks bool ,
shouldSync syncStrategy ,
minShrinkRatio float64 ,
) ( * persistence , error ) {
2015-03-02 19:02:37 +00:00
dirtyPath := filepath . Join ( basePath , dirtyFileName )
versionPath := filepath . Join ( basePath , versionFileName )
2015-03-03 17:59:39 +00:00
if versionData , err := ioutil . ReadFile ( versionPath ) ; err == nil {
if persistedVersion , err := strconv . Atoi ( strings . TrimSpace ( string ( versionData ) ) ) ; err != nil {
return nil , fmt . Errorf ( "cannot parse content of %s: %s" , versionPath , versionData )
2015-03-02 19:02:37 +00:00
} else if persistedVersion != Version {
return nil , fmt . Errorf ( "found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d" , persistedVersion , Version , persistedVersion )
}
} else if os . IsNotExist ( err ) {
// No version file found. Let's create the directory (in case
// it's not there yet) and then check if it is actually
// empty. If not, we have found an old storage directory without
// version file, so we have to bail out.
if err := os . MkdirAll ( basePath , 0700 ) ; err != nil {
2016-12-13 18:03:22 +00:00
if abspath , e := filepath . Abs ( basePath ) ; e == nil {
2016-12-09 22:36:27 +00:00
return nil , fmt . Errorf ( "cannot create persistent directory %s: %s" , abspath , err )
}
2016-12-13 18:03:22 +00:00
return nil , fmt . Errorf ( "cannot create persistent directory %s: %s" , basePath , err )
2015-03-02 19:02:37 +00:00
}
2015-03-03 17:59:39 +00:00
fis , err := ioutil . ReadDir ( basePath )
2015-03-02 19:02:37 +00:00
if err != nil {
return nil , err
}
2016-12-15 11:48:23 +00:00
filesPresent := len ( fis )
for i := range fis {
switch {
case fis [ i ] . Name ( ) == "lost+found" && fis [ i ] . IsDir ( ) :
filesPresent --
case strings . HasPrefix ( fis [ i ] . Name ( ) , "." ) :
filesPresent --
}
}
if filesPresent > 0 {
2016-11-29 13:57:06 +00:00
return nil , fmt . Errorf ( "found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path" )
2015-03-02 19:02:37 +00:00
}
// Finally we can write our own version into a new version file.
file , err := os . Create ( versionPath )
if err != nil {
return nil , err
}
defer file . Close ( )
if _ , err := fmt . Fprintf ( file , "%d\n" , Version ) ; err != nil {
return nil , err
}
} else {
2014-09-10 16:41:52 +00:00
return nil , err
}
2015-01-14 15:52:09 +00:00
fLock , dirtyfileExisted , err := flock . New ( dirtyPath )
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Could not lock %s, Prometheus already running?" , dirtyPath )
2015-01-14 15:52:09 +00:00
return nil , err
}
if dirtyfileExisted {
dirty = true
}
2014-09-23 17:21:10 +00:00
archivedFingerprintToMetrics , err := index . NewFingerprintMetricIndex ( basePath )
2014-08-21 20:06:11 +00:00
if err != nil {
return nil , err
}
2014-09-23 17:21:10 +00:00
archivedFingerprintToTimeRange , err := index . NewFingerprintTimeRangeIndex ( basePath )
2014-09-10 16:41:52 +00:00
if err != nil {
return nil , err
2014-06-06 09:55:53 +00:00
}
2014-10-07 17:11:24 +00:00
p := & persistence {
2015-03-13 14:49:07 +00:00
basePath : basePath ,
2014-09-23 17:21:10 +00:00
archivedFingerprintToMetrics : archivedFingerprintToMetrics ,
archivedFingerprintToTimeRange : archivedFingerprintToTimeRange ,
indexingQueue : make ( chan indexingOp , indexingQueueCapacity ) ,
indexingStopped : make ( chan struct { } ) ,
2014-09-24 12:41:38 +00:00
indexingFlush : make ( chan chan int ) ,
2014-09-24 14:51:18 +00:00
indexingQueueLength : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "indexing_queue_length" ,
Help : "The number of metrics waiting to be indexed." ,
} ) ,
indexingQueueCapacity : prometheus . MustNewConstMetric (
prometheus . NewDesc (
prometheus . BuildFQName ( namespace , subsystem , "indexing_queue_capacity" ) ,
"The capacity of the indexing queue." ,
nil , nil ,
) ,
prometheus . GaugeValue ,
float64 ( indexingQueueCapacity ) ,
) ,
indexingBatchSizes : prometheus . NewSummary (
prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "indexing_batch_sizes" ,
Help : "Quantiles for indexing batch sizes (number of metrics per batch)." ,
} ,
) ,
2015-03-19 16:06:16 +00:00
indexingBatchDuration : prometheus . NewSummary (
2014-09-24 14:51:18 +00:00
prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2016-06-23 15:50:06 +00:00
Name : "indexing_batch_duration_seconds" ,
Help : "Quantiles for batch indexing duration in seconds." ,
2014-09-24 14:51:18 +00:00
} ,
) ,
2017-01-11 15:11:19 +00:00
checkpointLastDuration : prometheus . NewGauge ( prometheus . GaugeOpts {
2014-10-24 18:27:27 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2017-01-11 15:11:19 +00:00
Name : "checkpoint_last_duration_seconds" ,
Help : "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted." ,
} ) ,
checkpointDuration : prometheus . NewSummary ( prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Objectives : map [ float64 ] float64 { } ,
Name : "checkpoint_duration_seconds" ,
Help : "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted" ,
} ) ,
checkpointLastSize : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "checkpoint_last_size_bytes" ,
Help : "The size of the last checkpoint of open chunks and chunks yet to be persisted" ,
} ) ,
checkpointChunksWritten : prometheus . NewSummary ( prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Objectives : map [ float64 ] float64 { } ,
Name : "checkpoint_series_chunks_written" ,
Help : "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted." ,
2014-10-24 18:27:27 +00:00
} ) ,
2015-05-21 15:50:06 +00:00
dirtyCounter : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "inconsistencies_total" ,
Help : "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible." ,
} ) ,
2016-08-27 19:39:27 +00:00
startedDirty : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "started_dirty" ,
Help : "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup." ,
} ) ,
2017-01-11 15:11:19 +00:00
checkpointing : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "checkpointing" ,
Help : "1 if the storage is checkpointing, 0 otherwise." ,
} ) ,
seriesChunksPersisted : prometheus . NewHistogram ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "series_chunks_persisted" ,
Help : "The number of chunks persisted per series." ,
// Even with 4 bytes per sample, you're not going to get more than 85
// chunks in 6 hours for a time series with 1s resolution.
Buckets : [ ] float64 { 1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 } ,
} ) ,
2015-03-19 11:03:09 +00:00
dirty : dirty ,
pedanticChecks : pedanticChecks ,
dirtyFileName : dirtyPath ,
fLock : fLock ,
2015-03-19 14:41:50 +00:00
shouldSync : shouldSync ,
2017-02-01 14:36:38 +00:00
minShrinkRatio : minShrinkRatio ,
2015-04-14 08:43:09 +00:00
// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
// and at the same time enough for many uses. The contract is to never return buffer smaller than
// that to the pool so that callers can rely on a minimum buffer size.
bufPool : sync . Pool { New : func ( ) interface { } { return make ( [ ] byte , 0 , 3 * chunkLenWithHeader ) } } ,
2014-11-05 19:02:45 +00:00
}
if p . dirty {
// Blow away the label indexes. We'll rebuild them later.
if err := index . DeleteLabelPairFingerprintIndex ( basePath ) ; err != nil {
return nil , err
}
if err := index . DeleteLabelNameLabelValuesIndex ( basePath ) ; err != nil {
return nil , err
}
}
labelPairToFingerprints , err := index . NewLabelPairFingerprintIndex ( basePath )
if err != nil {
return nil , err
}
labelNameToLabelValues , err := index . NewLabelNameLabelValuesIndex ( basePath )
if err != nil {
return nil , err
2014-09-23 17:21:10 +00:00
}
2014-11-05 19:02:45 +00:00
p . labelPairToFingerprints = labelPairToFingerprints
p . labelNameToLabelValues = labelNameToLabelValues
2014-09-23 17:21:10 +00:00
return p , nil
2014-06-06 09:55:53 +00:00
}
2015-05-18 17:26:28 +00:00
func ( p * persistence ) run ( ) {
p . processIndexingQueue ( )
}
2014-09-24 14:51:18 +00:00
// Describe implements prometheus.Collector.
2014-10-07 17:11:24 +00:00
func ( p * persistence ) Describe ( ch chan <- * prometheus . Desc ) {
2014-09-24 14:51:18 +00:00
ch <- p . indexingQueueLength . Desc ( )
ch <- p . indexingQueueCapacity . Desc ( )
p . indexingBatchSizes . Describe ( ch )
2015-03-19 16:06:16 +00:00
p . indexingBatchDuration . Describe ( ch )
2014-10-24 18:27:27 +00:00
ch <- p . checkpointDuration . Desc ( )
2017-01-11 15:11:19 +00:00
ch <- p . checkpointLastDuration . Desc ( )
ch <- p . checkpointLastSize . Desc ( )
ch <- p . checkpointChunksWritten . Desc ( )
ch <- p . checkpointing . Desc ( )
2015-05-21 15:50:06 +00:00
ch <- p . dirtyCounter . Desc ( )
2016-08-27 19:39:27 +00:00
ch <- p . startedDirty . Desc ( )
2017-01-11 15:11:19 +00:00
ch <- p . seriesChunksPersisted . Desc ( )
2014-09-24 14:51:18 +00:00
}
// Collect implements prometheus.Collector.
2014-10-07 17:11:24 +00:00
func ( p * persistence ) Collect ( ch chan <- prometheus . Metric ) {
2014-09-24 14:51:18 +00:00
p . indexingQueueLength . Set ( float64 ( len ( p . indexingQueue ) ) )
ch <- p . indexingQueueLength
ch <- p . indexingQueueCapacity
p . indexingBatchSizes . Collect ( ch )
2015-03-19 16:06:16 +00:00
p . indexingBatchDuration . Collect ( ch )
2014-10-24 18:27:27 +00:00
ch <- p . checkpointDuration
2017-01-11 15:11:19 +00:00
ch <- p . checkpointLastDuration
ch <- p . checkpointLastSize
ch <- p . checkpointChunksWritten
ch <- p . checkpointing
2015-05-21 15:50:06 +00:00
ch <- p . dirtyCounter
2016-08-27 19:39:27 +00:00
ch <- p . startedDirty
2017-01-11 15:11:19 +00:00
ch <- p . seriesChunksPersisted
2014-09-24 14:51:18 +00:00
}
2014-11-05 19:02:45 +00:00
// isDirty returns the dirty flag in a goroutine-safe way.
func ( p * persistence ) isDirty ( ) bool {
p . dirtyMtx . Lock ( )
defer p . dirtyMtx . Unlock ( )
return p . dirty
}
2016-03-09 17:56:30 +00:00
// setDirty flags the storage as dirty in a goroutine-safe way. The provided
// error will be logged as a reason the first time the storage is flagged as dirty.
func ( p * persistence ) setDirty ( err error ) {
p . dirtyCounter . Inc ( )
2014-11-05 19:02:45 +00:00
p . dirtyMtx . Lock ( )
defer p . dirtyMtx . Unlock ( )
if p . becameDirty {
return
}
2016-03-09 17:56:30 +00:00
p . dirty = true
p . becameDirty = true
log . With ( "error" , err ) . Error ( "The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery." )
2014-11-05 19:02:45 +00:00
}
2015-05-20 17:13:06 +00:00
// fingerprintsForLabelPair returns the fingerprints for the given label
2014-10-07 17:11:24 +00:00
// pair. This method is goroutine-safe but take into account that metrics queued
2014-11-20 20:03:51 +00:00
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
2016-03-09 17:56:30 +00:00
func ( p * persistence ) fingerprintsForLabelPair ( lp model . LabelPair ) model . Fingerprints {
2014-09-10 16:41:52 +00:00
fps , _ , err := p . labelPairToFingerprints . Lookup ( lp )
2014-06-06 09:55:53 +00:00
if err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method fingerprintsForLabelPair(%v): %s" , lp , err ) )
return nil
2014-06-06 09:55:53 +00:00
}
2016-03-09 17:56:30 +00:00
return fps
2014-06-06 09:55:53 +00:00
}
2015-05-20 17:13:06 +00:00
// labelValuesForLabelName returns the label values for the given label
2014-10-07 17:11:24 +00:00
// name. This method is goroutine-safe but take into account that metrics queued
2014-11-20 20:03:51 +00:00
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
2016-07-11 18:27:25 +00:00
func ( p * persistence ) labelValuesForLabelName ( ln model . LabelName ) ( model . LabelValues , error ) {
2014-09-10 16:41:52 +00:00
lvs , _ , err := p . labelNameToLabelValues . Lookup ( ln )
if err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method labelValuesForLabelName(%v): %s" , ln , err ) )
2016-07-11 18:27:25 +00:00
return nil , err
2014-09-10 16:41:52 +00:00
}
2016-07-11 18:27:25 +00:00
return lvs , nil
2014-06-06 09:55:53 +00:00
}
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
// persistChunks persists a number of consecutive chunks of a series. It is the
// caller's responsibility to not modify the chunks concurrently and to not
// persist or drop anything for the same fingerprint concurrently. It returns
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
2016-03-03 12:15:02 +00:00
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
2016-09-21 21:44:27 +00:00
func ( p * persistence ) persistChunks ( fp model . Fingerprint , chunks [ ] chunk . Chunk ) ( index int , err error ) {
2014-06-06 09:55:53 +00:00
f , err := p . openChunkFileForWriting ( fp )
if err != nil {
2014-10-27 19:40:48 +00:00
return - 1 , err
2014-06-06 09:55:53 +00:00
}
2015-03-19 14:41:50 +00:00
defer p . closeChunkFile ( f )
2014-06-06 09:55:53 +00:00
2016-01-25 15:36:36 +00:00
if err := p . writeChunks ( f , chunks ) ; err != nil {
2015-03-09 01:33:10 +00:00
return - 1 , err
2014-10-27 19:40:48 +00:00
}
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
// Determine index within the file.
2014-10-27 19:40:48 +00:00
offset , err := f . Seek ( 0 , os . SEEK_CUR )
if err != nil {
return - 1 , err
}
2015-03-09 01:33:10 +00:00
index , err = chunkIndexForOffset ( offset )
2014-10-27 19:40:48 +00:00
if err != nil {
return - 1 , err
}
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
return index - len ( chunks ) , err
2014-06-06 09:55:53 +00:00
}
2014-10-07 17:11:24 +00:00
// loadChunks loads a group of chunks of a timeseries by their index. The chunk
// with the earliest time will have index 0, the following ones will have
2014-10-27 19:40:48 +00:00
// incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently.
2016-09-21 21:44:27 +00:00
func ( p * persistence ) loadChunks ( fp model . Fingerprint , indexes [ ] int , indexOffset int ) ( [ ] chunk . Chunk , error ) {
2014-06-06 09:55:53 +00:00
f , err := p . openChunkFileForReading ( fp )
if err != nil {
return nil , err
}
defer f . Close ( )
2016-09-21 21:44:27 +00:00
chunks := make ( [ ] chunk . Chunk , 0 , len ( indexes ) )
2015-04-14 08:43:09 +00:00
buf := p . bufPool . Get ( ) . ( [ ] byte )
2015-04-13 18:20:26 +00:00
defer func ( ) {
2016-01-25 15:36:36 +00:00
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// would only put back the original buf.
2015-04-14 08:43:09 +00:00
p . bufPool . Put ( buf )
2015-04-13 18:20:26 +00:00
} ( )
for i := 0 ; i < len ( indexes ) ; i ++ {
// This loads chunks in batches. A batch is a streak of
// consecutive chunks, read from disk in one go.
batchSize := 1
if _ , err := f . Seek ( offsetForChunkIndex ( indexes [ i ] + indexOffset ) , os . SEEK_SET ) ; err != nil {
2014-06-06 09:55:53 +00:00
return nil , err
}
2015-04-13 18:20:26 +00:00
for ; batchSize < chunkMaxBatchSize &&
i + 1 < len ( indexes ) &&
indexes [ i ] + 1 == indexes [ i + 1 ] ; i , batchSize = i + 1 , batchSize + 1 {
2014-06-06 09:55:53 +00:00
}
2015-04-13 18:20:26 +00:00
readSize := batchSize * chunkLenWithHeader
if cap ( buf ) < readSize {
buf = make ( [ ] byte , readSize )
2014-06-06 09:55:53 +00:00
}
2015-04-13 18:20:26 +00:00
buf = buf [ : readSize ]
2014-06-06 09:55:53 +00:00
2015-04-13 18:20:26 +00:00
if _ , err := io . ReadFull ( f , buf ) ; err != nil {
2014-06-06 09:55:53 +00:00
return nil , err
}
2015-04-13 18:20:26 +00:00
for c := 0 ; c < batchSize ; c ++ {
2016-09-28 21:33:34 +00:00
chunk , err := chunk . NewForEncoding ( chunk . Encoding ( buf [ c * chunkLenWithHeader + chunkHeaderTypeOffset ] ) )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
if err != nil {
return nil , err
}
2016-09-21 15:56:55 +00:00
if err := chunk . UnmarshalFromBuf ( buf [ c * chunkLenWithHeader + chunkHeaderLen : ] ) ; err != nil {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return nil , err
}
2015-04-13 18:20:26 +00:00
chunks = append ( chunks , chunk )
}
2014-06-06 09:55:53 +00:00
}
2016-09-28 21:33:34 +00:00
chunk . Ops . WithLabelValues ( chunk . Load ) . Add ( float64 ( len ( chunks ) ) )
2016-09-21 21:44:27 +00:00
atomic . AddInt64 ( & chunk . NumMemChunks , int64 ( len ( chunks ) ) )
2014-06-06 09:55:53 +00:00
return chunks , nil
}
2016-09-21 21:44:27 +00:00
// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is
// the number of chunk.Descs to skip from the end of the series file. It is the
2015-07-06 23:10:14 +00:00
// caller's responsibility to not persist or drop anything for the same
2014-10-07 17:11:24 +00:00
// fingerprint concurrently.
2016-09-21 21:44:27 +00:00
func ( p * persistence ) loadChunkDescs ( fp model . Fingerprint , offsetFromEnd int ) ( [ ] * chunk . Desc , error ) {
2014-06-06 09:55:53 +00:00
f , err := p . openChunkFileForReading ( fp )
if os . IsNotExist ( err ) {
return nil , nil
}
if err != nil {
return nil , err
}
defer f . Close ( )
fi , err := f . Stat ( )
if err != nil {
return nil , err
}
2015-04-13 18:20:26 +00:00
if fi . Size ( ) % int64 ( chunkLenWithHeader ) != 0 {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
// The returned error will bubble up and lead to quarantining of the whole series.
2014-11-27 19:46:45 +00:00
return nil , fmt . Errorf (
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d" ,
2015-04-13 18:20:26 +00:00
fp , fi . Size ( ) , chunkLenWithHeader ,
2014-11-27 19:46:45 +00:00
)
2014-06-06 09:55:53 +00:00
}
2015-07-06 23:10:14 +00:00
numChunks := int ( fi . Size ( ) ) / chunkLenWithHeader - offsetFromEnd
2016-09-21 21:44:27 +00:00
cds := make ( [ ] * chunk . Desc , numChunks )
2015-04-13 18:20:26 +00:00
chunkTimesBuf := make ( [ ] byte , 16 )
2014-06-06 09:55:53 +00:00
for i := 0 ; i < numChunks ; i ++ {
2015-03-09 01:33:10 +00:00
_ , err := f . Seek ( offsetForChunkIndex ( i ) + chunkHeaderFirstTimeOffset , os . SEEK_SET )
2014-06-06 09:55:53 +00:00
if err != nil {
return nil , err
}
_ , err = io . ReadAtLeast ( f , chunkTimesBuf , 16 )
if err != nil {
return nil , err
}
2016-09-21 21:44:27 +00:00
cds [ i ] = & chunk . Desc {
ChunkFirstTime : model . Time ( binary . LittleEndian . Uint64 ( chunkTimesBuf ) ) ,
ChunkLastTime : model . Time ( binary . LittleEndian . Uint64 ( chunkTimesBuf [ 8 : ] ) ) ,
2014-06-06 09:55:53 +00:00
}
}
2016-09-28 21:33:34 +00:00
chunk . DescOps . WithLabelValues ( chunk . Load ) . Add ( float64 ( len ( cds ) ) )
chunk . NumMemDescs . Add ( float64 ( len ( cds ) ) )
2014-06-06 09:55:53 +00:00
return cds , nil
}
2014-10-24 18:27:27 +00:00
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
2015-03-09 01:33:10 +00:00
// and all non persisted chunks. Do not call concurrently with
// loadSeriesMapAndHeads. This method will only write heads format v2, but
// loadSeriesMapAndHeads can also understand v1.
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// Description of the file format (for both, v1 and v2):
2014-11-27 19:46:45 +00:00
//
// (1) Magic string (const headsMagicString).
//
// (2) Varint-encoded format version (const headsFormatVersion).
//
// (3) Number of series in checkpoint as big-endian uint64.
//
// (4) Repeated once per series:
//
2015-03-09 01:33:10 +00:00
// (4.1) A flag byte, see flag constants above. (Present but unused in v2.)
2014-11-27 19:46:45 +00:00
//
// (4.2) The fingerprint as big-endian uint64.
//
// (4.3) The metric as defined by codable.Metric.
//
2015-03-09 01:33:10 +00:00
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
2014-11-27 19:46:45 +00:00
//
2015-03-19 16:54:59 +00:00
// (4.5) The modification time of the series file as nanoseconds elapsed since
2015-03-19 11:59:26 +00:00
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
// exists yet. (Missing in v1.)
//
// (4.6) The varint-encoded chunkDescsOffset.
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// (4.6) The varint-encoded savedFirstTime.
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// (4.7) The varint-encoded number of chunk descriptors.
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// (4.8) Repeated once per chunk descriptor, oldest to most recent, either
// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >=
// persistWatermark). In v1, everything is variant 4.8.1 except for a
// non-persisted head-chunk (determined by the flags).
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// (4.8.1.1) The varint-encoded first time.
// (4.8.1.2) The varint-encoded last time.
2014-11-27 19:46:45 +00:00
//
2015-03-09 01:33:10 +00:00
// (4.8.2.1) A byte defining the chunk type.
2016-09-21 15:56:55 +00:00
// (4.8.2.2) The chunk itself, marshaled with the Marshal() method.
2014-11-27 19:46:45 +00:00
//
2017-04-04 17:14:52 +00:00
// NOTE: Above, varint encoding is used consistently although uvarint would have
// made more sense in many cases. This was simply a glitch while designing the
// format.
2014-10-24 18:27:27 +00:00
func ( p * persistence ) checkpointSeriesMapAndHeads ( fingerprintToSeries * seriesMap , fpLocker * fingerprintLocker ) ( err error ) {
2015-05-20 16:10:29 +00:00
log . Info ( "Checkpointing in-memory metrics and chunks..." )
2017-01-11 15:11:19 +00:00
p . checkpointing . Set ( 1 )
defer p . checkpointing . Set ( 0 )
2014-10-24 18:27:27 +00:00
begin := time . Now ( )
f , err := os . OpenFile ( p . headsTempFileName ( ) , os . O_WRONLY | os . O_TRUNC | os . O_CREATE , 0640 )
2014-06-06 09:55:53 +00:00
if err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-06-06 09:55:53 +00:00
}
2014-10-24 18:27:27 +00:00
defer func ( ) {
2015-09-07 16:08:23 +00:00
syncErr := f . Sync ( )
2014-10-24 18:27:27 +00:00
closeErr := f . Close ( )
if err != nil {
return
}
2015-09-07 16:08:23 +00:00
err = syncErr
if err != nil {
return
}
2014-10-24 18:27:27 +00:00
err = closeErr
if err != nil {
return
}
err = os . Rename ( p . headsTempFileName ( ) , p . headsFileName ( ) )
duration := time . Since ( begin )
2017-01-11 15:11:19 +00:00
p . checkpointDuration . Observe ( duration . Seconds ( ) )
p . checkpointLastDuration . Set ( duration . Seconds ( ) )
2015-05-20 16:10:29 +00:00
log . Infof ( "Done checkpointing in-memory metrics and chunks in %v." , duration )
2014-10-24 18:27:27 +00:00
} ( )
2014-09-10 16:41:52 +00:00
w := bufio . NewWriterSize ( f , fileBufSize )
2014-06-06 09:55:53 +00:00
2014-10-24 18:27:27 +00:00
if _ , err = w . WriteString ( headsMagicString ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-09-10 16:41:52 +00:00
}
2014-10-24 18:27:27 +00:00
var numberOfSeriesOffset int
if numberOfSeriesOffset , err = codable . EncodeVarint ( w , headsFormatVersion ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-09-10 16:41:52 +00:00
}
2014-10-24 18:27:27 +00:00
numberOfSeriesOffset += len ( headsMagicString )
numberOfSeriesInHeader := uint64 ( fingerprintToSeries . length ( ) )
// We have to write the number of series as uint64 because we might need
// to overwrite it later, and a varint might change byte width then.
if err = codable . EncodeUint64 ( w , numberOfSeriesInHeader ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-09-10 16:41:52 +00:00
}
2014-10-07 17:11:24 +00:00
iter := fingerprintToSeries . iter ( )
defer func ( ) {
// Consume the iterator in any case to not leak goroutines.
2014-12-26 12:37:30 +00:00
for range iter {
2014-10-07 17:11:24 +00:00
}
} ( )
2014-10-24 18:27:27 +00:00
var realNumberOfSeries uint64
2014-10-07 17:11:24 +00:00
for m := range iter {
2014-10-24 18:27:27 +00:00
func ( ) { // Wrapped in function to use defer for unlocking the fp.
fpLocker . Lock ( m . fp )
defer fpLocker . Unlock ( m . fp )
2017-01-17 00:59:38 +00:00
chunksToPersist := len ( m . series . chunkDescs ) - m . series . persistWatermark
2017-02-06 16:39:59 +00:00
if len ( m . series . chunkDescs ) == 0 {
// This series was completely purged or archived
// in the meantime. Ignore.
2014-10-24 18:27:27 +00:00
return
}
realNumberOfSeries ++
2017-02-06 16:39:59 +00:00
// Sanity checks.
if m . series . chunkDescsOffset < 0 && m . series . persistWatermark > 0 {
panic ( "encountered unknown chunk desc offset in combination with positive persist watermark" )
}
// These are the values to save in the normal case.
var (
// persistWatermark is zero as we only checkpoint non-persisted chunks.
persistWatermark int64
// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
chunkDescsOffset = int64 ( m . series . chunkDescsOffset + m . series . persistWatermark )
numChunkDescs = int64 ( chunksToPersist )
)
// However, in the special case of a series being fully
// persisted but still in memory (i.e. not archived), we
// need to save a "placeholder", for which we use just
// the chunk desc of the last chunk. Values have to be
// adjusted accordingly. (The reason for doing it in
// this weird way is to keep the checkpoint format
// compatible with older versions.)
if chunksToPersist == 0 {
persistWatermark = 1
chunkDescsOffset -- // Save one chunk desc after all.
numChunkDescs = 1
}
2015-03-09 01:33:10 +00:00
// seriesFlags left empty in v2.
if err = w . WriteByte ( 0 ) ; err != nil {
2014-10-24 18:27:27 +00:00
return
}
if err = codable . EncodeUint64 ( w , uint64 ( m . fp ) ) ; err != nil {
return
}
var buf [ ] byte
buf , err = codable . Metric ( m . series . metric ) . MarshalBinary ( )
if err != nil {
return
}
2015-09-07 16:08:23 +00:00
if _ , err = w . Write ( buf ) ; err != nil {
return
}
2017-02-06 16:39:59 +00:00
if _ , err = codable . EncodeVarint ( w , persistWatermark ) ; err != nil {
2015-03-09 01:33:10 +00:00
return
}
2015-03-19 11:59:26 +00:00
if m . series . modTime . IsZero ( ) {
if _ , err = codable . EncodeVarint ( w , - 1 ) ; err != nil {
return
}
} else {
if _ , err = codable . EncodeVarint ( w , m . series . modTime . UnixNano ( ) ) ; err != nil {
return
}
}
2017-02-06 16:39:59 +00:00
if _ , err = codable . EncodeVarint ( w , chunkDescsOffset ) ; err != nil {
2014-10-27 19:40:48 +00:00
return
}
2014-11-08 00:01:34 +00:00
if _ , err = codable . EncodeVarint ( w , int64 ( m . series . savedFirstTime ) ) ; err != nil {
return
}
2017-02-06 16:39:59 +00:00
if _ , err = codable . EncodeVarint ( w , numChunkDescs ) ; err != nil {
2014-10-24 18:27:27 +00:00
return
}
2017-02-06 16:39:59 +00:00
if chunksToPersist == 0 {
// Save the one placeholder chunk desc for a fully persisted series.
chunkDesc := m . series . chunkDescs [ len ( m . series . chunkDescs ) - 1 ]
if _ , err = codable . EncodeVarint ( w , int64 ( chunkDesc . FirstTime ( ) ) ) ; err != nil {
2017-01-17 00:59:38 +00:00
return
}
2017-02-06 16:39:59 +00:00
lt , err := chunkDesc . LastTime ( )
if err != nil {
2017-01-17 00:59:38 +00:00
return
2014-10-24 18:27:27 +00:00
}
2017-02-06 16:39:59 +00:00
if _ , err = codable . EncodeVarint ( w , int64 ( lt ) ) ; err != nil {
return
}
} else {
// Save (only) the non-persisted chunks.
for _ , chunkDesc := range m . series . chunkDescs [ m . series . persistWatermark : ] {
if err = w . WriteByte ( byte ( chunkDesc . C . Encoding ( ) ) ) ; err != nil {
return
}
if err = chunkDesc . C . Marshal ( w ) ; err != nil {
return
}
p . checkpointChunksWritten . Observe ( float64 ( chunksToPersist ) )
}
2014-10-24 18:27:27 +00:00
}
2015-09-07 16:08:23 +00:00
// Series is checkpointed now, so declare it clean. In case the entire
// checkpoint fails later on, this is fine, as the storage's series
// maintenance will mark these series newly dirty again, continuously
// increasing the total number of dirty series as seen by the storage.
// This has the effect of triggering a new checkpoint attempt even
// earlier than if we hadn't incorrectly set "dirty" to "false" here
// already.
2015-03-09 01:33:10 +00:00
m . series . dirty = false
2014-10-24 18:27:27 +00:00
} ( )
2014-06-06 09:55:53 +00:00
if err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-06-06 09:55:53 +00:00
}
2014-10-24 18:27:27 +00:00
}
if err = w . Flush ( ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-10-24 18:27:27 +00:00
}
if realNumberOfSeries != numberOfSeriesInHeader {
// The number of series has changed in the meantime.
// Rewrite it in the header.
if _ , err = f . Seek ( int64 ( numberOfSeriesOffset ) , os . SEEK_SET ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-09-10 16:41:52 +00:00
}
2014-10-24 18:27:27 +00:00
if err = codable . EncodeUint64 ( f , realNumberOfSeries ) ; err != nil {
2015-09-07 16:08:23 +00:00
return err
2014-09-10 16:41:52 +00:00
}
2014-06-06 09:55:53 +00:00
}
2017-01-11 15:11:19 +00:00
info , err := f . Stat ( )
if err != nil {
return err
}
p . checkpointLastSize . Set ( float64 ( info . Size ( ) ) )
2015-09-07 16:08:23 +00:00
return err
2014-06-06 09:55:53 +00:00
}
2014-10-07 17:11:24 +00:00
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
2015-03-09 01:33:10 +00:00
// the chunks contained in the checkpoint (and thus not yet persisted to series
// files). The method is capable of loading the checkpoint format v1 and v2. If
// recoverable corruption is detected, or if the dirty flag was set from the
// beginning, crash recovery is run, which might take a while. If an
// unrecoverable error is encountered, it is returned. Call this method during
// start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe.
2015-03-18 18:36:41 +00:00
func ( p * persistence ) loadSeriesMapAndHeads ( ) ( sm * seriesMap , chunksToPersist int64 , err error ) {
2015-08-20 15:18:46 +00:00
fingerprintToSeries := make ( map [ model . Fingerprint ] * memorySeries )
2014-11-05 19:02:45 +00:00
sm = & seriesMap { m : fingerprintToSeries }
defer func ( ) {
2016-03-03 17:29:35 +00:00
if p . dirty {
2015-05-20 16:10:29 +00:00
log . Warn ( "Persistence layer appears dirty." )
2016-08-27 19:39:27 +00:00
p . startedDirty . Set ( 1 )
2014-11-20 20:03:51 +00:00
err = p . recoverFromCrash ( fingerprintToSeries )
2014-11-05 19:02:45 +00:00
if err != nil {
sm = nil
}
2016-08-27 19:39:27 +00:00
} else {
p . startedDirty . Set ( 0 )
2014-11-05 19:02:45 +00:00
}
} ( )
2014-10-23 13:18:32 +00:00
2016-03-03 17:29:35 +00:00
hs := newHeadsScanner ( p . headsFileName ( ) )
defer hs . close ( )
for hs . scan ( ) {
fingerprintToSeries [ hs . fp ] = hs . series
2014-09-10 16:41:52 +00:00
}
2016-03-03 17:29:35 +00:00
if os . IsNotExist ( hs . err ) {
2015-03-09 01:33:10 +00:00
return sm , 0 , nil
2014-09-10 16:41:52 +00:00
}
2016-03-03 17:29:35 +00:00
if hs . err != nil {
2014-11-05 19:02:45 +00:00
p . dirty = true
2016-03-03 17:29:35 +00:00
log .
With ( "file" , p . headsFileName ( ) ) .
With ( "error" , hs . err ) .
Error ( "Error reading heads file." )
return sm , 0 , hs . err
2014-09-10 16:41:52 +00:00
}
2016-03-03 17:29:35 +00:00
return sm , hs . chunksToPersistTotal , nil
2014-09-10 16:41:52 +00:00
}
2015-03-09 01:33:10 +00:00
// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
2017-02-09 13:35:07 +00:00
// of the first sample in the oldest chunk _not_ dropped, the chunk offset
// within the series file of the first chunk persisted (out of the provided
// chunks, or - if no chunks were provided - the chunk offset where chunks would
// have been persisted, i.e. the end of the file), the number of deleted chunks,
// and true if all chunks of the series have been deleted (in which case the
// returned timestamp will be 0 and must be ignored). It is the caller's
// responsibility to make sure nothing is persisted or loaded for the same
// fingerprint concurrently.
2016-03-03 12:15:02 +00:00
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
2015-03-09 01:33:10 +00:00
func ( p * persistence ) dropAndPersistChunks (
2016-09-21 21:44:27 +00:00
fp model . Fingerprint , beforeTime model . Time , chunks [ ] chunk . Chunk ,
2015-03-09 01:33:10 +00:00
) (
2015-08-20 15:18:46 +00:00
firstTimeNotDropped model . Time ,
2015-03-09 01:33:10 +00:00
offset int ,
2014-11-10 17:22:08 +00:00
numDropped int ,
allDropped bool ,
err error ,
) {
2015-03-09 01:33:10 +00:00
// Style note: With the many return values, it was decided to use naked
// returns in this method. They make the method more readable, but
// please handle with care!
if len ( chunks ) > 0 {
// We have chunks to persist. First check if those are already
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
for ; i < len ( chunks ) ; i ++ {
var lt model . Time
2016-09-21 15:56:55 +00:00
lt , err = chunks [ i ] . NewIterator ( ) . LastTimestamp ( )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
if err != nil {
return
}
if ! lt . Before ( beforeTime ) {
break
}
2015-03-09 01:33:10 +00:00
}
if i < len ( chunks ) {
2016-09-21 15:56:55 +00:00
firstTimeNotDropped = chunks [ i ] . FirstTime ( )
2015-03-09 01:33:10 +00:00
}
if i > 0 || firstTimeNotDropped . Before ( beforeTime ) {
// Series file has to go.
if numDropped , err = p . deleteSeriesFile ( fp ) ; err != nil {
return
}
numDropped += i
if i == len ( chunks ) {
allDropped = true
return
}
// Now simply persist what has to be persisted to a new file.
_ , err = p . persistChunks ( fp , chunks [ i : ] )
return
}
}
// If we are here, we have to check the series file itself.
2014-06-06 09:55:53 +00:00
f , err := p . openChunkFileForReading ( fp )
if os . IsNotExist ( err ) {
2015-03-09 01:33:10 +00:00
// No series file. Only need to create new file with chunks to
// persist, if there are any.
if len ( chunks ) == 0 {
allDropped = true
err = nil // Do not report not-exist err.
return
}
offset , err = p . persistChunks ( fp , chunks )
return
2014-06-06 09:55:53 +00:00
}
if err != nil {
2015-03-09 01:33:10 +00:00
return
2014-06-06 09:55:53 +00:00
}
defer f . Close ( )
2017-02-07 16:33:54 +00:00
fi , err := f . Stat ( )
if err != nil {
return
}
2017-02-09 13:44:10 +00:00
chunksInFile := int ( fi . Size ( ) ) / chunkLenWithHeader
totalChunks := chunksInFile + len ( chunks )
2017-02-07 16:33:54 +00:00
// Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading.
chunkIndexToStartSeek := 0
if p . minShrinkRatio < 1 {
chunkIndexToStartSeek = int ( math . Floor ( float64 ( totalChunks ) * p . minShrinkRatio ) )
2017-02-10 10:42:59 +00:00
}
if chunkIndexToStartSeek >= chunksInFile {
chunkIndexToStartSeek = chunksInFile - 1
2017-02-07 16:33:54 +00:00
}
numDropped = chunkIndexToStartSeek
2016-01-11 15:42:10 +00:00
headerBuf := make ( [ ] byte , chunkHeaderLen )
2015-03-09 01:33:10 +00:00
// Find the first chunk in the file that should be kept.
for ; ; numDropped ++ {
_ , err = f . Seek ( offsetForChunkIndex ( numDropped ) , os . SEEK_SET )
2014-06-06 09:55:53 +00:00
if err != nil {
2015-03-09 01:33:10 +00:00
return
2014-06-06 09:55:53 +00:00
}
2015-04-13 18:20:26 +00:00
_ , err = io . ReadFull ( f , headerBuf )
2014-06-06 09:55:53 +00:00
if err == io . EOF {
2016-06-16 21:00:44 +00:00
// Close the file before trying to delete it. This is necessary on Windows
// (this will cause the defer f.Close to fail, but the error is silently ignored)
f . Close ( )
2014-06-06 09:55:53 +00:00
// We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file.
2015-03-09 01:33:10 +00:00
if numDropped , err = p . deleteSeriesFile ( fp ) ; err != nil {
return
2014-06-06 09:55:53 +00:00
}
2015-03-09 01:33:10 +00:00
if len ( chunks ) == 0 {
allDropped = true
return
}
offset , err = p . persistChunks ( fp , chunks )
return
2014-06-06 09:55:53 +00:00
}
if err != nil {
2015-03-09 01:33:10 +00:00
return
2014-06-06 09:55:53 +00:00
}
2015-08-20 15:18:46 +00:00
lastTime := model . Time (
2015-03-09 01:33:10 +00:00
binary . LittleEndian . Uint64 ( headerBuf [ chunkHeaderLastTimeOffset : ] ) ,
)
2014-06-06 09:55:53 +00:00
if ! lastTime . Before ( beforeTime ) {
break
}
}
2017-02-07 16:33:54 +00:00
// If numDropped isn't incremented, the minShrinkRatio condition isn't satisfied.
if numDropped == chunkIndexToStartSeek {
2016-01-11 15:42:10 +00:00
// Nothing to drop. Just adjust the return values and append the chunks (if any).
numDropped = 0
2017-02-07 16:33:54 +00:00
_ , err = f . Seek ( offsetForChunkIndex ( 0 ) , os . SEEK_SET )
if err != nil {
return
}
_ , err = io . ReadFull ( f , headerBuf )
if err != nil {
return
}
firstTimeNotDropped = model . Time (
binary . LittleEndian . Uint64 ( headerBuf [ chunkHeaderFirstTimeOffset : ] ) ,
)
2015-03-09 01:33:10 +00:00
if len ( chunks ) > 0 {
offset , err = p . persistChunks ( fp , chunks )
2017-02-09 13:35:07 +00:00
} else {
offset = chunksInFile
2015-03-09 01:33:10 +00:00
}
return
}
2016-01-11 15:42:10 +00:00
// If we are here, we have to drop some chunks for real. So we need to
// record firstTimeNotDropped from the last read header, seek backwards
// to the beginning of its header, and start copying everything from
// there into a new file. Then append the chunks to the new file.
firstTimeNotDropped = model . Time (
binary . LittleEndian . Uint64 ( headerBuf [ chunkHeaderFirstTimeOffset : ] ) ,
)
2016-09-28 21:33:34 +00:00
chunk . Ops . WithLabelValues ( chunk . Drop ) . Add ( float64 ( numDropped ) )
2015-03-18 18:09:07 +00:00
_ , err = f . Seek ( - chunkHeaderLen , os . SEEK_CUR )
2014-06-06 09:55:53 +00:00
if err != nil {
2015-03-09 01:33:10 +00:00
return
2014-06-06 09:55:53 +00:00
}
2014-10-15 15:07:12 +00:00
temp , err := os . OpenFile ( p . tempFileNameForFingerprint ( fp ) , os . O_WRONLY | os . O_CREATE , 0640 )
2014-06-06 09:55:53 +00:00
if err != nil {
2015-03-09 01:33:10 +00:00
return
2014-06-06 09:55:53 +00:00
}
2015-03-09 01:33:10 +00:00
defer func ( ) {
2016-06-16 21:00:44 +00:00
// Close the file before trying to rename to it. This is necessary on Windows
// (this will cause the defer f.Close to fail, but the error is silently ignored)
f . Close ( )
2015-03-19 14:41:50 +00:00
p . closeChunkFile ( temp )
2015-03-09 01:33:10 +00:00
if err == nil {
err = os . Rename ( p . tempFileNameForFingerprint ( fp ) , p . fileNameForFingerprint ( fp ) )
}
} ( )
2014-06-06 09:55:53 +00:00
2015-03-09 01:33:10 +00:00
written , err := io . Copy ( temp , f )
if err != nil {
return
2014-06-06 09:55:53 +00:00
}
2015-04-13 18:20:26 +00:00
offset = int ( written / chunkLenWithHeader )
2014-06-06 09:55:53 +00:00
2015-03-09 01:33:10 +00:00
if len ( chunks ) > 0 {
2016-01-25 15:36:36 +00:00
if err = p . writeChunks ( temp , chunks ) ; err != nil {
2015-03-09 01:33:10 +00:00
return
}
2014-10-27 19:40:48 +00:00
}
2015-03-09 01:33:10 +00:00
return
}
// deleteSeriesFile deletes a series file belonging to the provided
// fingerprint. It returns the number of chunks that were contained in the
// deleted file.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) deleteSeriesFile ( fp model . Fingerprint ) ( int , error ) {
2015-03-09 01:33:10 +00:00
fname := p . fileNameForFingerprint ( fp )
fi , err := os . Stat ( fname )
if os . IsNotExist ( err ) {
// Great. The file is already gone.
return 0 , nil
}
if err != nil {
return - 1 , err
}
2015-04-13 18:20:26 +00:00
numChunks := int ( fi . Size ( ) / chunkLenWithHeader )
2015-03-09 01:33:10 +00:00
if err := os . Remove ( fname ) ; err != nil {
return - 1 , err
}
2016-09-28 21:33:34 +00:00
chunk . Ops . WithLabelValues ( chunk . Drop ) . Add ( float64 ( numChunks ) )
2015-03-09 01:33:10 +00:00
return numChunks , nil
2014-09-10 16:41:52 +00:00
}
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
// quarantineSeriesFile moves a series file to the orphaned directory. It also
// writes a hint file with the provided quarantine reason and, if series is
// non-nil, the string representation of the metric.
func ( p * persistence ) quarantineSeriesFile ( fp model . Fingerprint , quarantineReason error , metric model . Metric ) error {
var (
oldName = p . fileNameForFingerprint ( fp )
orphanedDir = filepath . Join ( p . basePath , "orphaned" , filepath . Base ( filepath . Dir ( oldName ) ) )
newName = filepath . Join ( orphanedDir , filepath . Base ( oldName ) )
hintName = newName [ : len ( newName ) - len ( seriesFileSuffix ) ] + hintFileSuffix
)
renameErr := os . MkdirAll ( orphanedDir , 0700 )
if renameErr != nil {
return renameErr
}
renameErr = os . Rename ( oldName , newName )
if os . IsNotExist ( renameErr ) {
// Source file dosn't exist. That's normal.
renameErr = nil
}
// Write hint file even if the rename ended in an error. At least try...
// And ignore errors writing the hint file. It's best effort.
if f , err := os . Create ( hintName ) ; err == nil {
if metric != nil {
f . WriteString ( metric . String ( ) + "\n" )
} else {
f . WriteString ( "[UNKNOWN METRIC]\n" )
}
if quarantineReason != nil {
f . WriteString ( quarantineReason . Error ( ) + "\n" )
} else {
f . WriteString ( "[UNKNOWN REASON]\n" )
}
f . Close ( )
}
return renameErr
}
2015-05-20 17:13:06 +00:00
// seriesFileModTime returns the modification time of the series file belonging
// to the provided fingerprint. In case of an error, the zero value of time.Time
// is returned.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) seriesFileModTime ( fp model . Fingerprint ) time . Time {
2015-03-19 11:59:26 +00:00
var modTime time . Time
if fi , err := os . Stat ( p . fileNameForFingerprint ( fp ) ) ; err == nil {
return fi . ModTime ( )
}
return modTime
}
2014-10-07 17:11:24 +00:00
// indexMetric queues the given metric for addition to the indexes needed by
2015-05-20 17:13:06 +00:00
// fingerprintsForLabelPair, labelValuesForLabelName, and
// fingerprintsModifiedBefore. If the queue is full, this method blocks until
// the metric can be queued. This method is goroutine-safe.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) indexMetric ( fp model . Fingerprint , m model . Metric ) {
2014-09-23 17:21:10 +00:00
p . indexingQueue <- indexingOp { fp , m , add }
2014-06-06 09:55:53 +00:00
}
2014-10-07 17:11:24 +00:00
// unindexMetric queues references to the given metric for removal from the
2015-05-20 17:13:06 +00:00
// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and
// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is
// not affected by this removal. (In fact, never call this method for an
2015-09-11 13:47:23 +00:00
// archived metric. To purge an archived metric, call purgeArchivedMetric.)
2014-10-07 17:11:24 +00:00
// If the queue is full, this method blocks until the metric can be queued. This
// method is goroutine-safe.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) unindexMetric ( fp model . Fingerprint , m model . Metric ) {
2014-09-23 17:21:10 +00:00
p . indexingQueue <- indexingOp { fp , m , remove }
2014-09-10 16:41:52 +00:00
}
2014-10-07 17:11:24 +00:00
// waitForIndexing waits until all items in the indexing queue are processed. If
// queue processing is currently on hold (to gather more ops for batching), this
// method will trigger an immediate start of processing. This method is
// goroutine-safe.
func ( p * persistence ) waitForIndexing ( ) {
2014-09-24 14:32:07 +00:00
wait := make ( chan int )
for {
p . indexingFlush <- wait
if <- wait == 0 {
break
}
}
}
2014-10-07 17:11:24 +00:00
// archiveMetric persists the mapping of the given fingerprint to the given
// metric, together with the first and last timestamp of the series belonging to
2014-11-10 17:33:31 +00:00
// the metric. The caller must have locked the fingerprint.
2014-10-07 17:11:24 +00:00
func ( p * persistence ) archiveMetric (
2015-08-20 15:18:46 +00:00
fp model . Fingerprint , m model . Metric , first , last model . Time ,
2016-03-09 17:56:30 +00:00
) {
2014-09-23 17:21:10 +00:00
if err := p . archivedFingerprintToMetrics . Put ( codable . Fingerprint ( fp ) , codable . Metric ( m ) ) ; err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s" , fp , err ) )
return
2014-09-14 13:33:56 +00:00
}
2014-09-23 17:21:10 +00:00
if err := p . archivedFingerprintToTimeRange . Put ( codable . Fingerprint ( fp ) , codable . TimeRange { First : first , Last : last } ) ; err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s" , fp , err ) )
2014-09-14 13:33:56 +00:00
}
2014-09-10 16:41:52 +00:00
}
2014-10-07 17:11:24 +00:00
// hasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in the
// corresponding series is. This method is goroutine-safe.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) hasArchivedMetric ( fp model . Fingerprint ) (
2016-03-09 17:56:30 +00:00
hasMetric bool , firstTime , lastTime model . Time ,
2014-09-10 16:41:52 +00:00
) {
2016-03-09 17:56:30 +00:00
firstTime , lastTime , hasMetric , err := p . archivedFingerprintToTimeRange . Lookup ( fp )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
if err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method hasArchivedMetric(%v): %s" , fp , err ) )
hasMetric = false
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
}
2016-03-09 17:56:30 +00:00
return hasMetric , firstTime , lastTime
2014-09-10 16:41:52 +00:00
}
2014-11-10 17:22:08 +00:00
// updateArchivedTimeRange updates an archived time range. The caller must make
// sure that the fingerprint is currently archived (the time range will
// otherwise be added without the corresponding metric in the archive).
func ( p * persistence ) updateArchivedTimeRange (
2015-08-20 15:18:46 +00:00
fp model . Fingerprint , first , last model . Time ,
2014-11-10 17:22:08 +00:00
) error {
return p . archivedFingerprintToTimeRange . Put ( codable . Fingerprint ( fp ) , codable . TimeRange { First : first , Last : last } )
}
2015-05-20 17:13:06 +00:00
// fingerprintsModifiedBefore returns the fingerprints of archived timeseries
2014-10-07 17:11:24 +00:00
// that have live samples before the provided timestamp. This method is
// goroutine-safe.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) fingerprintsModifiedBefore ( beforeTime model . Time ) ( [ ] model . Fingerprint , error ) {
2014-09-24 14:32:07 +00:00
var fp codable . Fingerprint
var tr codable . TimeRange
2015-08-20 15:18:46 +00:00
fps := [ ] model . Fingerprint { }
2015-09-12 23:06:40 +00:00
err := p . archivedFingerprintToTimeRange . ForEach ( func ( kv index . KeyValueAccessor ) error {
2014-09-24 14:32:07 +00:00
if err := kv . Value ( & tr ) ; err != nil {
return err
}
if tr . First . Before ( beforeTime ) {
if err := kv . Key ( & fp ) ; err != nil {
return err
}
2015-08-20 15:18:46 +00:00
fps = append ( fps , model . Fingerprint ( fp ) )
2014-09-24 14:32:07 +00:00
}
return nil
} )
2015-09-12 23:06:40 +00:00
return fps , err
2014-09-24 14:32:07 +00:00
}
2015-05-20 17:13:06 +00:00
// archivedMetric retrieves the archived metric with the given fingerprint. This
// method is goroutine-safe.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) archivedMetric ( fp model . Fingerprint ) ( model . Metric , error ) {
2014-09-16 13:47:24 +00:00
metric , _ , err := p . archivedFingerprintToMetrics . Lookup ( fp )
2016-03-08 23:09:42 +00:00
if err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method archivedMetric(%v): %s" , fp , err ) )
return nil , err
2016-03-08 23:09:42 +00:00
}
2016-03-09 17:56:30 +00:00
return metric , nil
2014-09-10 16:41:52 +00:00
}
2015-02-26 14:19:44 +00:00
// purgeArchivedMetric deletes an archived fingerprint and its corresponding
2014-10-07 17:11:24 +00:00
// metric entirely. It also queues the metric for un-indexing (no need to call
2015-02-26 14:19:44 +00:00
// unindexMetric for the deleted metric.) It does not touch the series file,
// though. The caller must have locked the fingerprint.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) purgeArchivedMetric ( fp model . Fingerprint ) ( err error ) {
2014-11-10 17:22:08 +00:00
defer func ( ) {
if err != nil {
2016-03-09 17:56:30 +00:00
p . setDirty ( fmt . Errorf ( "error in method purgeArchivedMetric(%v): %s" , fp , err ) )
2014-11-10 17:22:08 +00:00
}
} ( )
2015-05-20 17:13:06 +00:00
metric , err := p . archivedMetric ( fp )
2014-09-14 13:33:56 +00:00
if err != nil || metric == nil {
return err
}
2015-01-29 11:57:50 +00:00
deleted , err := p . archivedFingerprintToMetrics . Delete ( codable . Fingerprint ( fp ) )
if err != nil {
2014-09-14 13:33:56 +00:00
return err
}
2015-01-29 11:57:50 +00:00
if ! deleted {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen." , fp )
2015-01-29 11:57:50 +00:00
}
deleted , err = p . archivedFingerprintToTimeRange . Delete ( codable . Fingerprint ( fp ) )
if err != nil {
2014-09-14 13:33:56 +00:00
return err
}
2015-01-29 11:57:50 +00:00
if ! deleted {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen." , fp )
2015-01-29 11:57:50 +00:00
}
2014-10-28 18:01:41 +00:00
p . unindexMetric ( fp , metric )
2014-09-23 17:21:10 +00:00
return nil
2014-06-06 09:55:53 +00:00
}
2014-08-21 20:06:11 +00:00
2014-10-07 17:11:24 +00:00
// unarchiveMetric deletes an archived fingerprint and its metric, but (in
2015-02-26 14:19:44 +00:00
// contrast to purgeArchivedMetric) does not un-index the metric. If a metric
2015-07-13 19:12:27 +00:00
// was actually deleted, the method returns true and the first time and last
// time of the deleted metric. The caller must have locked the fingerprint.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) unarchiveMetric ( fp model . Fingerprint ) ( deletedAnything bool , err error ) {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
// An error returned here will bubble up and lead to quarantining of the
// series, so no setDirty required.
2015-01-29 11:57:50 +00:00
deleted , err := p . archivedFingerprintToMetrics . Delete ( codable . Fingerprint ( fp ) )
2015-07-13 19:12:27 +00:00
if err != nil || ! deleted {
return false , err
2015-01-29 11:57:50 +00:00
}
deleted , err = p . archivedFingerprintToTimeRange . Delete ( codable . Fingerprint ( fp ) )
if err != nil {
2015-07-13 19:12:27 +00:00
return false , err
2014-09-14 13:33:56 +00:00
}
2015-01-29 11:57:50 +00:00
if ! deleted {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen." , fp )
2015-01-29 11:57:50 +00:00
}
2015-07-13 19:12:27 +00:00
return true , nil
2014-09-10 16:41:52 +00:00
}
2014-10-07 17:11:24 +00:00
// close flushes the indexing queue and other buffered data and releases any
2014-11-05 19:02:45 +00:00
// held resources. It also removes the dirty marker file if successful and if
// the persistence is currently not marked as dirty.
2014-10-07 17:11:24 +00:00
func ( p * persistence ) close ( ) error {
2014-09-23 17:21:10 +00:00
close ( p . indexingQueue )
<- p . indexingStopped
2015-01-14 15:52:09 +00:00
var lastError , dirtyFileRemoveError error
2014-09-16 13:47:24 +00:00
if err := p . archivedFingerprintToMetrics . Close ( ) ; err != nil {
2014-09-10 16:41:52 +00:00
lastError = err
2015-05-20 16:10:29 +00:00
log . Error ( "Error closing archivedFingerprintToMetric index DB: " , err )
2014-09-10 16:41:52 +00:00
}
2014-09-16 13:47:24 +00:00
if err := p . archivedFingerprintToTimeRange . Close ( ) ; err != nil {
2014-09-10 16:41:52 +00:00
lastError = err
2015-05-20 16:10:29 +00:00
log . Error ( "Error closing archivedFingerprintToTimeRange index DB: " , err )
2014-09-10 16:41:52 +00:00
}
2014-09-16 13:47:24 +00:00
if err := p . labelPairToFingerprints . Close ( ) ; err != nil {
2014-09-10 16:41:52 +00:00
lastError = err
2015-05-20 16:10:29 +00:00
log . Error ( "Error closing labelPairToFingerprints index DB: " , err )
2014-09-10 16:41:52 +00:00
}
2014-09-16 13:47:24 +00:00
if err := p . labelNameToLabelValues . Close ( ) ; err != nil {
2014-09-10 16:41:52 +00:00
lastError = err
2015-05-20 16:10:29 +00:00
log . Error ( "Error closing labelNameToLabelValues index DB: " , err )
2014-09-10 16:41:52 +00:00
}
2014-11-05 19:02:45 +00:00
if lastError == nil && ! p . isDirty ( ) {
2015-01-14 15:52:09 +00:00
dirtyFileRemoveError = os . Remove ( p . dirtyFileName )
}
if err := p . fLock . Release ( ) ; err != nil {
lastError = err
2015-05-20 16:10:29 +00:00
log . Error ( "Error releasing file lock: " , err )
2015-01-14 15:52:09 +00:00
}
if dirtyFileRemoveError != nil {
// On Windows, removing the dirty file before unlocking is not
// possible. So remove it here if it failed above.
lastError = os . Remove ( p . dirtyFileName )
2014-11-05 19:02:45 +00:00
}
2014-09-10 16:41:52 +00:00
return lastError
}
2015-08-20 15:18:46 +00:00
func ( p * persistence ) dirNameForFingerprint ( fp model . Fingerprint ) string {
2014-09-10 16:41:52 +00:00
fpStr := fp . String ( )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , fpStr [ 0 : seriesDirNameLen ] )
2014-10-15 15:07:12 +00:00
}
2015-08-20 15:18:46 +00:00
func ( p * persistence ) fileNameForFingerprint ( fp model . Fingerprint ) string {
2014-10-15 15:07:12 +00:00
fpStr := fp . String ( )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , fpStr [ 0 : seriesDirNameLen ] , fpStr [ seriesDirNameLen : ] + seriesFileSuffix )
2014-10-15 15:07:12 +00:00
}
2015-08-20 15:18:46 +00:00
func ( p * persistence ) tempFileNameForFingerprint ( fp model . Fingerprint ) string {
2014-10-15 15:07:12 +00:00
fpStr := fp . String ( )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , fpStr [ 0 : seriesDirNameLen ] , fpStr [ seriesDirNameLen : ] + seriesTempFileSuffix )
2014-09-10 16:41:52 +00:00
}
2015-08-20 15:18:46 +00:00
func ( p * persistence ) openChunkFileForWriting ( fp model . Fingerprint ) ( * os . File , error ) {
2014-10-15 15:07:12 +00:00
if err := os . MkdirAll ( p . dirNameForFingerprint ( fp ) , 0700 ) ; err != nil {
2014-09-10 16:41:52 +00:00
return nil , err
}
2015-01-26 12:48:24 +00:00
return os . OpenFile ( p . fileNameForFingerprint ( fp ) , os . O_WRONLY | os . O_APPEND | os . O_CREATE , 0640 )
// NOTE: Although the file was opened for append,
// f.Seek(0, os.SEEK_CUR)
// would now return '0, nil', so we cannot check for a consistent file length right now.
2015-03-09 01:33:10 +00:00
// However, the chunkIndexForOffset function is doing that check, so a wrong file length
2015-01-26 12:48:24 +00:00
// would still be detected.
2014-09-10 16:41:52 +00:00
}
2015-03-19 16:54:59 +00:00
// closeChunkFile first syncs the provided file if mandated so by the sync
2015-03-19 14:41:50 +00:00
// strategy. Then it closes the file. Errors are logged.
func ( p * persistence ) closeChunkFile ( f * os . File ) {
if p . shouldSync ( ) {
if err := f . Sync ( ) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error syncing file:" , err )
2015-03-19 14:41:50 +00:00
}
}
if err := f . Close ( ) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error closing chunk file:" , err )
2015-03-19 14:41:50 +00:00
}
}
2015-08-20 15:18:46 +00:00
func ( p * persistence ) openChunkFileForReading ( fp model . Fingerprint ) ( * os . File , error ) {
2014-10-15 15:07:12 +00:00
return os . Open ( p . fileNameForFingerprint ( fp ) )
2014-09-10 16:41:52 +00:00
}
2014-10-24 18:27:27 +00:00
func ( p * persistence ) headsFileName ( ) string {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , headsFileName )
2014-09-10 16:41:52 +00:00
}
2014-10-24 18:27:27 +00:00
func ( p * persistence ) headsTempFileName ( ) string {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , headsTempFileName )
2014-10-24 18:27:27 +00:00
}
2015-05-06 14:53:12 +00:00
func ( p * persistence ) mappingsFileName ( ) string {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , mappingsFileName )
2015-05-06 14:53:12 +00:00
}
func ( p * persistence ) mappingsTempFileName ( ) string {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return filepath . Join ( p . basePath , mappingsTempFileName )
2015-05-06 14:53:12 +00:00
}
2014-10-07 17:11:24 +00:00
func ( p * persistence ) processIndexingQueue ( ) {
2014-09-23 17:21:10 +00:00
batchSize := 0
nameToValues := index . LabelNameLabelValuesMapping { }
pairToFPs := index . LabelPairFingerprintsMapping { }
batchTimeout := time . NewTimer ( indexingBatchTimeout )
defer batchTimeout . Stop ( )
commitBatch := func ( ) {
2014-09-24 14:51:18 +00:00
p . indexingBatchSizes . Observe ( float64 ( batchSize ) )
2014-09-24 15:18:48 +00:00
defer func ( begin time . Time ) {
2016-07-07 13:24:35 +00:00
p . indexingBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
2014-09-24 15:18:48 +00:00
} ( time . Now ( ) )
2014-09-24 14:51:18 +00:00
2014-09-23 17:21:10 +00:00
if err := p . labelPairToFingerprints . IndexBatch ( pairToFPs ) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error indexing label pair to fingerprints batch: " , err )
2017-04-06 13:29:33 +00:00
p . setDirty ( err )
2014-09-23 17:21:10 +00:00
}
if err := p . labelNameToLabelValues . IndexBatch ( nameToValues ) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error indexing label name to label values batch: " , err )
2017-04-06 13:29:33 +00:00
p . setDirty ( err )
2014-09-23 17:21:10 +00:00
}
batchSize = 0
nameToValues = index . LabelNameLabelValuesMapping { }
pairToFPs = index . LabelPairFingerprintsMapping { }
2014-09-24 12:41:38 +00:00
batchTimeout . Reset ( indexingBatchTimeout )
2014-09-23 17:21:10 +00:00
}
2014-09-24 12:41:38 +00:00
var flush chan chan int
2014-09-23 17:21:10 +00:00
loop :
for {
2014-09-24 12:41:38 +00:00
// Only process flush requests if the queue is currently empty.
if len ( p . indexingQueue ) == 0 {
flush = p . indexingFlush
} else {
flush = nil
}
2014-09-23 17:21:10 +00:00
select {
case <- batchTimeout . C :
2014-10-17 11:55:54 +00:00
// Only commit if we have something to commit _and_
// nothing is waiting in the queue to be picked up. That
// prevents a death spiral if the LookupSet calls below
// are slow for some reason.
if batchSize > 0 && len ( p . indexingQueue ) == 0 {
2014-09-24 12:41:38 +00:00
commitBatch ( )
} else {
batchTimeout . Reset ( indexingBatchTimeout )
}
case r := <- flush :
2014-09-23 17:21:10 +00:00
if batchSize > 0 {
commitBatch ( )
}
2014-09-24 12:41:38 +00:00
r <- len ( p . indexingQueue )
2014-09-23 17:21:10 +00:00
case op , ok := <- p . indexingQueue :
if ! ok {
if batchSize > 0 {
commitBatch ( )
}
break loop
}
batchSize ++
for ln , lv := range op . metric {
2015-08-22 11:32:13 +00:00
lp := model . LabelPair { Name : ln , Value : lv }
2014-09-23 17:21:10 +00:00
baseFPs , ok := pairToFPs [ lp ]
if ! ok {
var err error
baseFPs , _ , err = p . labelPairToFingerprints . LookupSet ( lp )
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error looking up label pair %v: %s" , lp , err )
2014-09-23 17:21:10 +00:00
continue
}
pairToFPs [ lp ] = baseFPs
}
baseValues , ok := nameToValues [ ln ]
if ! ok {
var err error
baseValues , _ , err = p . labelNameToLabelValues . LookupSet ( ln )
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error looking up label name %v: %s" , ln , err )
2014-09-23 17:21:10 +00:00
continue
}
nameToValues [ ln ] = baseValues
}
switch op . opType {
case add :
baseFPs [ op . fingerprint ] = struct { } { }
baseValues [ lv ] = struct { } { }
case remove :
delete ( baseFPs , op . fingerprint )
if len ( baseFPs ) == 0 {
delete ( baseValues , lv )
}
default :
panic ( "unknown op type" )
}
}
if batchSize >= indexingMaxBatchSize {
commitBatch ( )
}
}
}
close ( p . indexingStopped )
}
2015-03-09 01:33:10 +00:00
Checkpoint fingerprint mappings only upon shutdown
Before, we checkpointed after every newly detected fingerprint
collision, which is not a problem as long as collisions are
rare. However, with a sufficient number of metrics or particular
nature of the data set, there might be a lot of collisions, all to be
detected upon the first set of scrapes, and then the checkpointing
after each detection will take a quite long time (it's O(n²),
essentially).
Since we are rebuilding the fingerprint mapping during crash recovery,
the previous, very conservative approach didn't even buy us
anything. We only ever read from the checkpoint file after a clean
shutdown, so the only time we need to write the checkpoint file is
during a clean shutdown.
2016-04-14 14:02:37 +00:00
// checkpointFPMappings persists the fingerprint mappings. The caller has to
// ensure that the provided mappings are not changed concurrently. This method
// is only called upon shutdown or during crash recovery, when no samples are
// ingested.
2015-05-06 14:53:12 +00:00
//
// Description of the file format, v1:
//
// (1) Magic string (const mappingsMagicString).
//
// (2) Uvarint-encoded format version (const mappingsFormatVersion).
//
// (3) Uvarint-encoded number of mappings in fpMappings.
//
// (4) Repeated once per mapping:
//
// (4.1) The raw fingerprint as big-endian uint64.
//
// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint.
//
// (4.3) Repeated once per sub-mapping:
//
// (4.3.1) The uvarint-encoded length of the unique metric string.
// (4.3.2) The unique metric string.
// (4.3.3) The mapped fingerprint as big-endian uint64.
2015-05-07 16:58:14 +00:00
func ( p * persistence ) checkpointFPMappings ( fpm fpMappings ) ( err error ) {
2015-05-20 16:10:29 +00:00
log . Info ( "Checkpointing fingerprint mappings..." )
2015-05-06 14:53:12 +00:00
begin := time . Now ( )
f , err := os . OpenFile ( p . mappingsTempFileName ( ) , os . O_WRONLY | os . O_TRUNC | os . O_CREATE , 0640 )
if err != nil {
return
}
defer func ( ) {
2015-09-12 23:06:40 +00:00
syncErr := f . Sync ( )
2015-05-06 14:53:12 +00:00
closeErr := f . Close ( )
2015-09-12 23:06:40 +00:00
if err != nil {
return
}
err = syncErr
2015-05-06 14:53:12 +00:00
if err != nil {
return
}
err = closeErr
if err != nil {
return
}
err = os . Rename ( p . mappingsTempFileName ( ) , p . mappingsFileName ( ) )
duration := time . Since ( begin )
2015-05-20 16:10:29 +00:00
log . Infof ( "Done checkpointing fingerprint mappings in %v." , duration )
2015-05-06 14:53:12 +00:00
} ( )
w := bufio . NewWriterSize ( f , fileBufSize )
if _ , err = w . WriteString ( mappingsMagicString ) ; err != nil {
return
}
if _ , err = codable . EncodeUvarint ( w , mappingsFormatVersion ) ; err != nil {
return
}
2015-05-07 16:58:14 +00:00
if _ , err = codable . EncodeUvarint ( w , uint64 ( len ( fpm ) ) ) ; err != nil {
2015-05-06 14:53:12 +00:00
return
}
2015-05-07 16:58:14 +00:00
for fp , mappings := range fpm {
2015-05-06 14:53:12 +00:00
if err = codable . EncodeUint64 ( w , uint64 ( fp ) ) ; err != nil {
return
}
if _ , err = codable . EncodeUvarint ( w , uint64 ( len ( mappings ) ) ) ; err != nil {
return
}
for ms , mappedFP := range mappings {
if _ , err = codable . EncodeUvarint ( w , uint64 ( len ( ms ) ) ) ; err != nil {
return
}
if _ , err = w . WriteString ( ms ) ; err != nil {
return
}
if err = codable . EncodeUint64 ( w , uint64 ( mappedFP ) ) ; err != nil {
return
}
}
}
err = w . Flush ( )
return
}
// loadFPMappings loads the fingerprint mappings. It also returns the highest
// mapped fingerprint and any error encountered. If p.mappingsFileName is not
// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently
// with checkpointFPMappings.
2015-08-20 15:18:46 +00:00
func ( p * persistence ) loadFPMappings ( ) ( fpMappings , model . Fingerprint , error ) {
2015-05-06 14:53:12 +00:00
fpm := fpMappings { }
2015-08-20 15:18:46 +00:00
var highestMappedFP model . Fingerprint
2015-05-06 14:53:12 +00:00
f , err := os . Open ( p . mappingsFileName ( ) )
if os . IsNotExist ( err ) {
return fpm , 0 , nil
}
if err != nil {
return nil , 0 , err
}
defer f . Close ( )
r := bufio . NewReaderSize ( f , fileBufSize )
buf := make ( [ ] byte , len ( mappingsMagicString ) )
if _ , err := io . ReadFull ( r , buf ) ; err != nil {
return nil , 0 , err
}
magic := string ( buf )
if magic != mappingsMagicString {
return nil , 0 , fmt . Errorf (
"unexpected magic string, want %q, got %q" ,
mappingsMagicString , magic ,
)
}
version , err := binary . ReadUvarint ( r )
if version != mappingsFormatVersion || err != nil {
return nil , 0 , fmt . Errorf ( "unknown fingerprint mappings format version, want %d" , mappingsFormatVersion )
}
numRawFPs , err := binary . ReadUvarint ( r )
if err != nil {
return nil , 0 , err
}
for ; numRawFPs > 0 ; numRawFPs -- {
rawFP , err := codable . DecodeUint64 ( r )
if err != nil {
return nil , 0 , err
}
numMappings , err := binary . ReadUvarint ( r )
if err != nil {
return nil , 0 , err
}
2015-08-20 15:18:46 +00:00
mappings := make ( map [ string ] model . Fingerprint , numMappings )
2015-05-06 14:53:12 +00:00
for ; numMappings > 0 ; numMappings -- {
lenMS , err := binary . ReadUvarint ( r )
if err != nil {
return nil , 0 , err
}
buf := make ( [ ] byte , lenMS )
if _ , err := io . ReadFull ( r , buf ) ; err != nil {
return nil , 0 , err
}
fp , err := codable . DecodeUint64 ( r )
if err != nil {
return nil , 0 , err
}
2015-08-20 15:18:46 +00:00
mappedFP := model . Fingerprint ( fp )
2015-05-06 14:53:12 +00:00
if mappedFP > highestMappedFP {
highestMappedFP = mappedFP
}
mappings [ string ( buf ) ] = mappedFP
}
2015-08-20 15:18:46 +00:00
fpm [ model . Fingerprint ( rawFP ) ] = mappings
2015-05-06 14:53:12 +00:00
}
return fpm , highestMappedFP , nil
}
2016-09-21 21:44:27 +00:00
func ( p * persistence ) writeChunks ( w io . Writer , chunks [ ] chunk . Chunk ) error {
2016-01-25 15:36:36 +00:00
b := p . bufPool . Get ( ) . ( [ ] byte )
defer func ( ) {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// would only put back the original buf.
p . bufPool . Put ( b )
} ( )
2017-01-11 15:11:19 +00:00
numChunks := len ( chunks )
2016-01-25 15:36:36 +00:00
for batchSize := chunkMaxBatchSize ; len ( chunks ) > 0 ; chunks = chunks [ batchSize : ] {
if batchSize > len ( chunks ) {
batchSize = len ( chunks )
}
writeSize := batchSize * chunkLenWithHeader
if cap ( b ) < writeSize {
b = make ( [ ] byte , writeSize )
}
b = b [ : writeSize ]
for i , chunk := range chunks [ : batchSize ] {
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
if err := writeChunkHeader ( b [ i * chunkLenWithHeader : ] , chunk ) ; err != nil {
return err
}
2016-09-21 15:56:55 +00:00
if err := chunk . MarshalToBuf ( b [ i * chunkLenWithHeader + chunkHeaderLen : ] ) ; err != nil {
2016-01-25 15:36:36 +00:00
return err
}
}
if _ , err := w . Write ( b ) ; err != nil {
return err
}
}
2017-01-11 15:11:19 +00:00
p . seriesChunksPersisted . Observe ( float64 ( numChunks ) )
2016-01-25 15:36:36 +00:00
return nil
}
2015-03-09 01:33:10 +00:00
func offsetForChunkIndex ( i int ) int64 {
2015-04-13 18:20:26 +00:00
return int64 ( i * chunkLenWithHeader )
2015-03-09 01:33:10 +00:00
}
func chunkIndexForOffset ( offset int64 ) ( int , error ) {
2015-04-13 18:20:26 +00:00
if int ( offset ) % chunkLenWithHeader != 0 {
2015-03-09 01:33:10 +00:00
return - 1 , fmt . Errorf (
"offset %d is not a multiple of on-disk chunk length %d" ,
2015-04-13 18:20:26 +00:00
offset , chunkLenWithHeader ,
2015-03-09 01:33:10 +00:00
)
}
2015-04-13 18:20:26 +00:00
return int ( offset ) / chunkLenWithHeader , nil
2015-03-09 01:33:10 +00:00
}
2016-09-21 21:44:27 +00:00
func writeChunkHeader ( header [ ] byte , c chunk . Chunk ) error {
2016-09-21 15:56:55 +00:00
header [ chunkHeaderTypeOffset ] = byte ( c . Encoding ( ) )
2015-04-14 11:46:38 +00:00
binary . LittleEndian . PutUint64 (
header [ chunkHeaderFirstTimeOffset : ] ,
2016-09-21 15:56:55 +00:00
uint64 ( c . FirstTime ( ) ) ,
2015-04-14 11:46:38 +00:00
)
2016-09-21 15:56:55 +00:00
lt , err := c . NewIterator ( ) . LastTimestamp ( )
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
if err != nil {
return err
}
2015-04-14 11:46:38 +00:00
binary . LittleEndian . PutUint64 (
header [ chunkHeaderLastTimeOffset : ] ,
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
uint64 ( lt ) ,
2015-04-14 11:46:38 +00:00
)
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
return nil
2015-03-09 01:33:10 +00:00
}