2015-01-21 19:07:45 +00:00
// Copyright 2014 The Prometheus Authors
2014-09-19 16:18:44 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-09-16 13:47:24 +00:00
// Package local contains the local time series storage used by Prometheus.
package local
2014-06-06 09:55:53 +00:00
import (
2014-11-13 19:50:25 +00:00
"container/list"
2015-06-15 10:49:28 +00:00
"fmt"
2014-10-23 13:18:32 +00:00
"sync/atomic"
2014-06-06 09:55:53 +00:00
"time"
2014-10-22 17:21:23 +00:00
"github.com/prometheus/client_golang/prometheus"
2015-08-20 15:18:46 +00:00
"github.com/prometheus/common/model"
2015-08-22 07:42:45 +00:00
"github.com/prometheus/log"
2014-10-22 17:21:23 +00:00
2014-06-06 09:55:53 +00:00
"github.com/prometheus/prometheus/storage/metric"
)
2014-11-05 19:02:45 +00:00
const (
2014-11-13 19:50:25 +00:00
evictRequestsCap = 1024
chunkLen = 1024
2014-11-13 15:55:15 +00:00
// See waitForNextFP.
fpMaxSweepTime = 6 * time . Hour
2015-04-01 15:52:03 +00:00
fpMaxWaitDuration = 10 * time . Second
2014-11-13 19:50:25 +00:00
2015-04-01 15:52:03 +00:00
// See waitForNextFP.
2014-11-13 19:50:25 +00:00
maxEvictInterval = time . Minute
2015-03-19 14:41:50 +00:00
// If numChunskToPersist is this percentage of maxChunksToPersist, we
// consider the storage in "graceful degradation mode", i.e. we do not
// checkpoint anymore based on the dirty series count, and we do not
// sync series files anymore if using the adaptive sync strategy.
percentChunksToPersistForDegradation = 80
2014-11-05 19:02:45 +00:00
)
2014-06-06 09:55:53 +00:00
2015-03-09 01:33:10 +00:00
var (
2015-03-18 18:36:41 +00:00
numChunksToPersistDesc = prometheus . NewDesc (
prometheus . BuildFQName ( namespace , subsystem , "chunks_to_persist" ) ,
"The current number of chunks waiting for persistence." ,
nil , nil ,
)
maxChunksToPersistDesc = prometheus . NewDesc (
prometheus . BuildFQName ( namespace , subsystem , "max_chunks_to_persist" ) ,
"The maximum number of chunks that can be waiting for persistence before sample ingestion will stop." ,
2015-03-09 01:33:10 +00:00
nil , nil ,
)
2014-06-06 09:55:53 +00:00
)
2014-11-13 19:50:25 +00:00
type evictRequest struct {
cd * chunkDesc
evict bool
}
2015-03-19 14:41:50 +00:00
// SyncStrategy is an enum to select a sync strategy for series files.
type SyncStrategy int
2015-06-15 10:49:28 +00:00
// String implements flag.Value.
func ( ss SyncStrategy ) String ( ) string {
switch ss {
case Adaptive :
return "adaptive"
case Always :
return "always"
case Never :
return "never"
}
return "<unknown>"
}
// Set implements flag.Value.
func ( ss * SyncStrategy ) Set ( s string ) error {
switch s {
case "adaptive" :
* ss = Adaptive
case "always" :
* ss = Always
case "never" :
* ss = Never
default :
return fmt . Errorf ( "invalid sync strategy: %s" , s )
}
return nil
}
2015-03-19 14:41:50 +00:00
// Possible values for SyncStrategy.
const (
_ SyncStrategy = iota
Never
Always
Adaptive
)
2015-03-19 16:54:59 +00:00
// A syncStrategy is a function that returns whether series files should be
// synced or not. It does not need to be goroutine safe.
2015-03-19 14:41:50 +00:00
type syncStrategy func ( ) bool
2014-10-24 18:27:27 +00:00
type memorySeriesStorage struct {
2015-05-20 23:37:04 +00:00
// numChunksToPersist has to be aligned for atomic operations.
numChunksToPersist int64 // The number of chunks waiting for persistence.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will stall.
degraded bool
2014-10-27 19:40:48 +00:00
fpLocker * fingerprintLocker
fpToSeries * seriesMap
2014-06-06 09:55:53 +00:00
2015-05-18 17:26:28 +00:00
options * MemorySeriesStorageOptions
2015-01-08 19:15:58 +00:00
loopStopping , loopStopped chan struct { }
maxMemoryChunks int
2015-02-26 14:19:44 +00:00
dropAfter time . Duration
2015-01-08 19:15:58 +00:00
checkpointInterval time . Duration
checkpointDirtySeriesLimit int
2014-10-24 18:27:27 +00:00
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
persistence * persistence
2015-05-06 14:53:12 +00:00
mapper * fpMapper
2014-10-24 18:27:27 +00:00
2014-11-13 19:50:25 +00:00
evictList * list . List
evictRequests chan evictRequest
evictStopping , evictStopped chan struct { }
2015-02-01 11:47:51 +00:00
persistErrors prometheus . Counter
2014-11-13 19:50:25 +00:00
numSeries prometheus . Gauge
seriesOps * prometheus . CounterVec
ingestedSamplesCount prometheus . Counter
2015-07-13 19:12:27 +00:00
outOfOrderSamplesCount prometheus . Counter
2014-11-13 19:50:25 +00:00
invalidPreloadRequestsCount prometheus . Counter
2015-03-19 16:06:16 +00:00
maintainSeriesDuration * prometheus . SummaryVec
2014-06-06 09:55:53 +00:00
}
2014-09-16 13:47:24 +00:00
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
2014-06-06 09:55:53 +00:00
type MemorySeriesStorageOptions struct {
2014-11-13 19:50:25 +00:00
MemoryChunks int // How many chunks to keep in memory.
2015-03-18 18:36:41 +00:00
MaxChunksToPersist int // Max number of chunks waiting to be persisted.
2014-10-07 17:11:24 +00:00
PersistenceStoragePath string // Location of persistence files.
2015-02-26 14:19:44 +00:00
PersistenceRetentionPeriod time . Duration // Chunks at least that old are dropped.
2014-10-24 18:27:27 +00:00
CheckpointInterval time . Duration // How often to checkpoint the series map and head chunks.
2015-01-08 19:15:58 +00:00
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
2014-11-05 19:02:45 +00:00
Dirty bool // Force the storage to consider itself dirty on startup.
2015-03-19 11:03:09 +00:00
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
2015-03-19 14:41:50 +00:00
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
2014-06-06 09:55:53 +00:00
}
2014-09-16 13:47:24 +00:00
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
2015-05-18 17:26:28 +00:00
func NewMemorySeriesStorage ( o * MemorySeriesStorageOptions ) Storage {
2015-02-13 13:26:54 +00:00
s := & memorySeriesStorage {
2015-03-19 14:41:50 +00:00
fpLocker : newFingerprintLocker ( 1024 ) ,
2014-06-06 09:55:53 +00:00
2015-05-18 17:26:28 +00:00
options : o ,
2015-01-08 19:15:58 +00:00
loopStopping : make ( chan struct { } ) ,
loopStopped : make ( chan struct { } ) ,
maxMemoryChunks : o . MemoryChunks ,
2015-02-26 14:19:44 +00:00
dropAfter : o . PersistenceRetentionPeriod ,
2015-01-08 19:15:58 +00:00
checkpointInterval : o . CheckpointInterval ,
checkpointDirtySeriesLimit : o . CheckpointDirtySeriesLimit ,
2014-06-06 09:55:53 +00:00
2015-03-18 18:36:41 +00:00
maxChunksToPersist : o . MaxChunksToPersist ,
2014-10-23 13:18:32 +00:00
2014-11-13 19:50:25 +00:00
evictList : list . New ( ) ,
evictRequests : make ( chan evictRequest , evictRequestsCap ) ,
evictStopping : make ( chan struct { } ) ,
evictStopped : make ( chan struct { } ) ,
2015-02-01 11:47:51 +00:00
persistErrors : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "persist_errors_total" ,
Help : "The total number of errors while persisting chunks." ,
} ) ,
2015-03-19 14:41:50 +00:00
numSeries : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "memory_series" ,
Help : "The current number of series in memory." ,
} ) ,
2014-10-23 13:18:32 +00:00
seriesOps : prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "series_ops_total" ,
Help : "The total number of series operations by their type." ,
} ,
[ ] string { opTypeLabel } ,
) ,
ingestedSamplesCount : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "ingested_samples_total" ,
Help : "The total number of samples ingested." ,
} ) ,
2015-07-13 19:12:27 +00:00
outOfOrderSamplesCount : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "out_of_order_samples_total" ,
Help : "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series." ,
} ) ,
2014-11-05 19:02:45 +00:00
invalidPreloadRequestsCount : prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "invalid_preload_requests_total" ,
Help : "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes." ,
} ) ,
2015-03-19 16:06:16 +00:00
maintainSeriesDuration : prometheus . NewSummaryVec (
prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "maintain_series_duration_milliseconds" ,
Help : "The duration (in milliseconds) it took to perform maintenance on a series." ,
} ,
[ ] string { seriesLocationLabel } ,
) ,
2015-02-13 13:26:54 +00:00
}
2015-05-18 17:26:28 +00:00
return s
}
2015-02-13 13:26:54 +00:00
2015-05-18 17:26:28 +00:00
// Start implements Storage.
2015-05-23 10:03:14 +00:00
func ( s * memorySeriesStorage ) Start ( ) ( err error ) {
2015-03-19 14:41:50 +00:00
var syncStrategy syncStrategy
2015-05-18 17:26:28 +00:00
switch s . options . SyncStrategy {
2015-03-19 14:41:50 +00:00
case Never :
syncStrategy = func ( ) bool { return false }
case Always :
syncStrategy = func ( ) bool { return true }
case Adaptive :
syncStrategy = func ( ) bool { return ! s . isDegraded ( ) }
default :
panic ( "unknown sync strategy" )
}
2015-05-23 10:03:14 +00:00
var p * persistence
p , err = newPersistence ( s . options . PersistenceStoragePath , s . options . Dirty , s . options . PedanticChecks , syncStrategy )
2015-03-19 14:41:50 +00:00
if err != nil {
2015-05-18 17:26:28 +00:00
return err
2015-03-19 14:41:50 +00:00
}
s . persistence = p
2015-05-23 10:03:14 +00:00
// Persistence must start running before loadSeriesMapAndHeads() is called.
go s . persistence . run ( )
defer func ( ) {
if err != nil {
if e := p . close ( ) ; e != nil {
log . Errorln ( "Error closing persistence:" , e )
}
}
} ( )
2015-03-19 14:41:50 +00:00
2015-05-20 16:10:29 +00:00
log . Info ( "Loading series map and head chunks..." )
2015-03-19 14:41:50 +00:00
s . fpToSeries , s . numChunksToPersist , err = p . loadSeriesMapAndHeads ( )
if err != nil {
2015-05-18 17:26:28 +00:00
return err
2015-03-19 14:41:50 +00:00
}
2015-05-20 16:10:29 +00:00
log . Infof ( "%d series loaded." , s . fpToSeries . length ( ) )
2015-03-19 14:41:50 +00:00
s . numSeries . Set ( float64 ( s . fpToSeries . length ( ) ) )
2015-05-23 10:03:14 +00:00
s . mapper , err = newFPMapper ( s . fpToSeries , p )
2015-05-06 14:53:12 +00:00
if err != nil {
2015-05-18 17:26:28 +00:00
return err
2015-05-06 14:53:12 +00:00
}
2014-06-06 09:55:53 +00:00
2014-11-13 19:50:25 +00:00
go s . handleEvictList ( )
2014-10-24 18:27:27 +00:00
go s . loop ( )
2015-05-18 17:26:28 +00:00
return nil
2014-10-24 18:27:27 +00:00
}
// Stop implements Storage.
func ( s * memorySeriesStorage ) Stop ( ) error {
2015-05-20 16:10:29 +00:00
log . Info ( "Stopping local storage..." )
2014-11-13 19:50:25 +00:00
2015-05-20 16:10:29 +00:00
log . Info ( "Stopping maintenance loop..." )
2014-10-24 18:27:27 +00:00
close ( s . loopStopping )
<- s . loopStopped
2015-05-20 16:10:29 +00:00
log . Info ( "Stopping chunk eviction..." )
2014-11-13 19:50:25 +00:00
close ( s . evictStopping )
<- s . evictStopped
2014-10-24 18:27:27 +00:00
// One final checkpoint of the series map and the head chunks.
2014-10-27 19:40:48 +00:00
if err := s . persistence . checkpointSeriesMapAndHeads ( s . fpToSeries , s . fpLocker ) ; err != nil {
2014-10-24 18:27:27 +00:00
return err
}
if err := s . persistence . close ( ) ; err != nil {
return err
}
2015-05-20 16:10:29 +00:00
log . Info ( "Local storage stopped." )
2014-10-24 18:27:27 +00:00
return nil
}
// WaitForIndexing implements Storage.
func ( s * memorySeriesStorage ) WaitForIndexing ( ) {
s . persistence . waitForIndexing ( )
}
2015-06-22 20:50:47 +00:00
// NewIterator implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) NewIterator ( fp model . Fingerprint ) SeriesIterator {
2014-10-24 18:27:27 +00:00
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
2014-10-27 19:40:48 +00:00
series , ok := s . fpToSeries . get ( fp )
2014-10-24 18:27:27 +00:00
if ! ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator { }
}
2015-05-27 09:24:56 +00:00
return & boundedIterator {
it : series . newIterator ( ) ,
2015-08-20 15:18:46 +00:00
start : model . Now ( ) . Add ( - s . dropAfter ) ,
2015-05-27 09:24:56 +00:00
}
}
2015-06-22 20:50:47 +00:00
// LastSampleForFingerprint implements Storage.
2015-08-22 12:52:35 +00:00
func ( s * memorySeriesStorage ) LastSamplePairForFingerprint ( fp model . Fingerprint ) * model . SamplePair {
2015-06-22 20:50:47 +00:00
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
series , ok := s . fpToSeries . get ( fp )
if ! ok {
return nil
}
return series . head ( ) . lastSamplePair ( )
}
2015-05-27 09:24:56 +00:00
// boundedIterator wraps a SeriesIterator and does not allow fetching
// data from earlier than the configured start time.
type boundedIterator struct {
it SeriesIterator
2015-08-20 15:18:46 +00:00
start model . Time
2015-05-27 09:24:56 +00:00
}
// ValueAtTime implements the SeriesIterator interface.
2015-08-22 12:52:35 +00:00
func ( bit * boundedIterator ) ValueAtTime ( ts model . Time ) [ ] model . SamplePair {
2015-05-27 09:24:56 +00:00
if ts < bit . start {
2015-08-22 12:52:35 +00:00
return [ ] model . SamplePair { }
2015-05-27 09:24:56 +00:00
}
return bit . it . ValueAtTime ( ts )
}
// BoundaryValues implements the SeriesIterator interface.
2015-08-22 12:52:35 +00:00
func ( bit * boundedIterator ) BoundaryValues ( interval metric . Interval ) [ ] model . SamplePair {
2015-05-27 09:24:56 +00:00
if interval . NewestInclusive < bit . start {
2015-08-22 12:52:35 +00:00
return [ ] model . SamplePair { }
2015-05-27 09:24:56 +00:00
}
if interval . OldestInclusive < bit . start {
interval . OldestInclusive = bit . start
}
return bit . it . BoundaryValues ( interval )
}
// RangeValues implements the SeriesIterator interface.
2015-08-22 12:52:35 +00:00
func ( bit * boundedIterator ) RangeValues ( interval metric . Interval ) [ ] model . SamplePair {
2015-05-27 09:24:56 +00:00
if interval . NewestInclusive < bit . start {
2015-08-22 12:52:35 +00:00
return [ ] model . SamplePair { }
2015-05-27 09:24:56 +00:00
}
if interval . OldestInclusive < bit . start {
interval . OldestInclusive = bit . start
}
return bit . it . RangeValues ( interval )
2014-10-24 18:27:27 +00:00
}
// NewPreloader implements Storage.
func ( s * memorySeriesStorage ) NewPreloader ( ) Preloader {
return & memorySeriesPreloader {
storage : s ,
}
}
2015-06-15 16:25:31 +00:00
// fingerprintsForLabelPairs returns the set of fingerprints that have the given labels.
// This does not work with empty label values.
2015-08-22 11:32:13 +00:00
func ( s * memorySeriesStorage ) fingerprintsForLabelPairs ( pairs ... model . LabelPair ) map [ model . Fingerprint ] struct { } {
2015-08-20 15:18:46 +00:00
var result map [ model . Fingerprint ] struct { }
2015-06-15 16:25:31 +00:00
for _ , pair := range pairs {
2015-08-20 15:18:46 +00:00
intersection := map [ model . Fingerprint ] struct { } { }
2015-06-15 16:25:31 +00:00
fps , err := s . persistence . fingerprintsForLabelPair ( pair )
if err != nil {
log . Error ( "Error getting fingerprints for label pair: " , err )
}
if len ( fps ) == 0 {
return nil
}
for _ , fp := range fps {
if _ , ok := result [ fp ] ; ok || result == nil {
intersection [ fp ] = struct { } { }
2014-10-24 18:27:27 +00:00
}
}
if len ( intersection ) == 0 {
return nil
}
result = intersection
}
2015-06-15 16:25:31 +00:00
return result
}
2014-10-24 18:27:27 +00:00
2015-06-15 16:25:31 +00:00
// MetricsForLabelMatchers implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) MetricsForLabelMatchers ( matchers ... * metric . LabelMatcher ) map [ model . Fingerprint ] model . COWMetric {
2015-06-15 16:25:31 +00:00
var (
2015-08-22 11:32:13 +00:00
equals [ ] model . LabelPair
2015-06-15 16:25:31 +00:00
filters [ ] * metric . LabelMatcher
)
for _ , lm := range matchers {
if lm . Type == metric . Equal && lm . Value != "" {
2015-08-22 11:32:13 +00:00
equals = append ( equals , model . LabelPair {
2015-06-15 16:25:31 +00:00
Name : lm . Name ,
Value : lm . Value ,
} )
} else {
filters = append ( filters , lm )
}
}
2015-08-20 15:18:46 +00:00
var resFPs map [ model . Fingerprint ] struct { }
2015-06-15 16:25:31 +00:00
if len ( equals ) > 0 {
resFPs = s . fingerprintsForLabelPairs ( equals ... )
2015-07-23 20:46:13 +00:00
} else {
// If we cannot make a preselection based on equality matchers, expanding the other matchers to labels
// and intersecting their fingerprints is still likely to be the best choice.
var remaining metric . LabelMatchers
for _ , matcher := range filters {
// Equal matches are all empty values.
if matcher . Match ( "" ) {
remaining = append ( remaining , matcher )
continue
}
2015-08-20 15:18:46 +00:00
intersection := map [ model . Fingerprint ] struct { } { }
2015-06-15 16:25:31 +00:00
2015-07-23 20:46:13 +00:00
matches := matcher . Filter ( s . LabelValuesForLabelName ( matcher . Name ) )
if len ( matches ) == 0 {
return nil
}
for _ , v := range matches {
2015-08-22 11:32:13 +00:00
fps := s . fingerprintsForLabelPairs ( model . LabelPair {
2015-07-23 20:46:13 +00:00
Name : matcher . Name ,
Value : v ,
} )
for fp := range fps {
if _ , ok := resFPs [ fp ] ; ok || resFPs == nil {
intersection [ fp ] = struct { } { }
}
2015-06-15 16:25:31 +00:00
}
}
2015-07-23 20:46:13 +00:00
resFPs = intersection
2015-06-15 16:25:31 +00:00
}
2015-07-23 20:46:13 +00:00
// The intersected matchers no longer need to be compared against the actual metrics.
filters = remaining
2015-06-15 16:25:31 +00:00
}
2015-08-20 15:18:46 +00:00
result := make ( map [ model . Fingerprint ] model . COWMetric , len ( resFPs ) )
2015-06-15 16:25:31 +00:00
for fp := range resFPs {
result [ fp ] = s . MetricForFingerprint ( fp )
}
for _ , matcher := range filters {
for fp , met := range result {
if ! matcher . Match ( met . Metric [ matcher . Name ] ) {
delete ( result , fp )
}
}
2014-10-24 18:27:27 +00:00
}
2015-06-15 16:25:31 +00:00
return result
2014-10-24 18:27:27 +00:00
}
2015-05-20 17:13:06 +00:00
// LabelValuesForLabelName implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) LabelValuesForLabelName ( labelName model . LabelName ) model . LabelValues {
2015-05-20 17:13:06 +00:00
lvs , err := s . persistence . labelValuesForLabelName ( labelName )
2014-10-24 18:27:27 +00:00
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error getting label values for label name %q: %v" , labelName , err )
2014-10-24 18:27:27 +00:00
}
return lvs
}
2015-05-20 17:13:06 +00:00
// MetricForFingerprint implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) MetricForFingerprint ( fp model . Fingerprint ) model . COWMetric {
2014-10-24 18:27:27 +00:00
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
2014-10-27 19:40:48 +00:00
series , ok := s . fpToSeries . get ( fp )
2014-10-24 18:27:27 +00:00
if ok {
2014-12-08 15:55:49 +00:00
// Wrap the returned metric in a copy-on-write (COW) metric here because
// the caller might mutate it.
2015-08-20 15:18:46 +00:00
return model . COWMetric {
2014-12-08 15:55:49 +00:00
Metric : series . metric ,
2014-10-24 18:27:27 +00:00
}
}
2015-05-20 17:13:06 +00:00
metric , err := s . persistence . archivedMetric ( fp )
2014-10-24 18:27:27 +00:00
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error retrieving archived metric for fingerprint %v: %v" , fp , err )
2014-10-24 18:27:27 +00:00
}
2015-08-20 15:18:46 +00:00
return model . COWMetric {
2014-12-08 15:55:49 +00:00
Metric : metric ,
}
2014-06-06 09:55:53 +00:00
}
2015-05-27 15:41:57 +00:00
// DropMetric implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) DropMetricsForFingerprints ( fps ... model . Fingerprint ) {
2015-05-27 15:41:57 +00:00
for _ , fp := range fps {
s . fpLocker . Lock ( fp )
if series , ok := s . fpToSeries . get ( fp ) ; ok {
s . fpToSeries . del ( fp )
s . numSeries . Dec ( )
s . persistence . unindexMetric ( fp , series . metric )
if _ , err := s . persistence . deleteSeriesFile ( fp ) ; err != nil {
log . Errorf ( "Error deleting series file for %v: %v" , fp , err )
}
} else if err := s . persistence . purgeArchivedMetric ( fp ) ; err != nil {
log . Errorf ( "Error purging metric with fingerprint %v: %v" , fp , err )
}
s . fpLocker . Unlock ( fp )
}
}
2015-03-15 02:36:15 +00:00
// Append implements Storage.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) Append ( sample * model . Sample ) {
2015-06-22 20:35:19 +00:00
for ln , lv := range sample . Metric {
if len ( lv ) == 0 {
delete ( sample . Metric , ln )
}
}
2015-03-18 18:36:41 +00:00
if s . getNumChunksToPersist ( ) >= s . maxChunksToPersist {
2015-05-20 16:10:29 +00:00
log . Warnf (
2015-03-15 02:36:15 +00:00
"%d chunks waiting for persistence, sample ingestion suspended." ,
2015-03-18 18:36:41 +00:00
s . getNumChunksToPersist ( ) ,
2015-03-15 02:36:15 +00:00
)
2015-03-18 18:36:41 +00:00
for s . getNumChunksToPersist ( ) >= s . maxChunksToPersist {
2015-03-15 02:36:15 +00:00
time . Sleep ( time . Second )
2015-02-12 16:23:42 +00:00
}
2015-05-20 16:10:29 +00:00
log . Warn ( "Sample ingestion resumed." )
2014-06-06 09:55:53 +00:00
}
2015-05-06 14:53:12 +00:00
rawFP := sample . Metric . FastFingerprint ( )
s . fpLocker . Lock ( rawFP )
fp , err := s . mapper . mapFP ( rawFP , sample . Metric )
if err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error while mapping fingerprint %v: %v" , rawFP , err )
2015-05-06 14:53:12 +00:00
s . persistence . setDirty ( true )
}
if fp != rawFP {
// Switch locks.
s . fpLocker . Unlock ( rawFP )
s . fpLocker . Lock ( fp )
}
2014-09-24 14:55:45 +00:00
series := s . getOrCreateSeries ( fp , sample . Metric )
2015-07-13 19:12:27 +00:00
if sample . Timestamp <= series . lastTime {
2015-08-09 20:43:31 +00:00
// Don't log and track equal timestamps, as they are a common occurrence
// when using client-side timestamps (e.g. Pushgateway or federation).
// It would be even better to also compare the sample values here, but
// we don't have efficient access to a series's last value.
if sample . Timestamp != series . lastTime {
log . Warnf ( "Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v" , fp , series . metric , sample . Timestamp , series . lastTime )
s . outOfOrderSamplesCount . Inc ( )
}
2015-07-13 19:12:27 +00:00
s . fpLocker . Unlock ( fp )
return
}
2015-08-22 12:52:35 +00:00
completedChunksCount := series . add ( & model . SamplePair {
2014-06-06 09:55:53 +00:00
Value : sample . Value ,
Timestamp : sample . Timestamp ,
2014-10-27 19:40:48 +00:00
} )
s . fpLocker . Unlock ( fp )
2015-02-12 16:23:42 +00:00
s . ingestedSamplesCount . Inc ( )
2015-03-18 18:36:41 +00:00
s . incNumChunksToPersist ( completedChunksCount )
2014-06-06 09:55:53 +00:00
}
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) getOrCreateSeries ( fp model . Fingerprint , m model . Metric ) * memorySeries {
2014-10-27 19:40:48 +00:00
series , ok := s . fpToSeries . get ( fp )
2014-06-06 09:55:53 +00:00
if ! ok {
2015-07-13 19:12:27 +00:00
var cds [ ] * chunkDesc
var modTime time . Time
unarchived , err := s . persistence . unarchiveMetric ( fp )
2014-09-10 16:41:52 +00:00
if err != nil {
2015-07-13 19:12:27 +00:00
log . Errorf ( "Error unarchiving fingerprint %v (metric %v): %v" , fp , m , err )
2014-09-10 16:41:52 +00:00
}
2014-10-22 17:21:23 +00:00
if unarchived {
2014-10-23 13:18:32 +00:00
s . seriesOps . WithLabelValues ( unarchive ) . Inc ( )
2015-07-13 19:12:27 +00:00
// We have to load chunkDescs anyway to do anything with
// the series, so let's do it right now so that we don't
// end up with a series without any chunkDescs for a
// while (which is confusing as it makes the series
// appear as archived or purged).
2015-07-06 23:10:14 +00:00
cds , err = s . loadChunkDescs ( fp , 0 )
2015-07-13 19:12:27 +00:00
if err != nil {
log . Errorf ( "Error loading chunk descs for fingerprint %v (metric %v): %v" , fp , m , err )
}
modTime = s . persistence . seriesFileModTime ( fp )
2014-10-22 17:21:23 +00:00
} else {
2014-09-10 16:41:52 +00:00
// This was a genuinely new series, so index the metric.
2014-10-28 18:01:41 +00:00
s . persistence . indexMetric ( fp , m )
2014-10-23 13:18:32 +00:00
s . seriesOps . WithLabelValues ( create ) . Inc ( )
2014-06-06 09:55:53 +00:00
}
2015-07-13 19:12:27 +00:00
series = newMemorySeries ( m , cds , modTime )
2014-10-27 19:40:48 +00:00
s . fpToSeries . put ( fp , series )
2014-10-23 13:18:32 +00:00
s . numSeries . Inc ( )
2014-06-06 09:55:53 +00:00
}
return series
}
2014-10-15 13:53:05 +00:00
func ( s * memorySeriesStorage ) preloadChunksForRange (
2015-08-20 15:18:46 +00:00
fp model . Fingerprint ,
from model . Time , through model . Time ,
2014-10-15 13:53:05 +00:00
stalenessDelta time . Duration ,
) ( [ ] * chunkDesc , error ) {
2014-10-07 17:11:24 +00:00
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
2014-09-16 13:47:24 +00:00
2014-10-27 19:40:48 +00:00
series , ok := s . fpToSeries . get ( fp )
2014-06-06 09:55:53 +00:00
if ! ok {
2014-10-07 17:11:24 +00:00
has , first , last , err := s . persistence . hasArchivedMetric ( fp )
2014-09-16 13:47:24 +00:00
if err != nil {
return nil , err
}
if ! has {
2014-11-05 19:02:45 +00:00
s . invalidPreloadRequestsCount . Inc ( )
return nil , nil
2014-09-16 13:47:24 +00:00
}
if from . Add ( - stalenessDelta ) . Before ( last ) && through . Add ( stalenessDelta ) . After ( first ) {
2015-05-20 17:13:06 +00:00
metric , err := s . persistence . archivedMetric ( fp )
2014-09-16 13:47:24 +00:00
if err != nil {
return nil , err
}
2014-09-24 14:55:45 +00:00
series = s . getOrCreateSeries ( fp , metric )
2014-10-13 16:55:46 +00:00
} else {
return nil , nil
2014-09-16 13:47:24 +00:00
}
2014-06-06 09:55:53 +00:00
}
2014-11-13 19:50:25 +00:00
return series . preloadChunksForRange ( from , through , fp , s )
}
func ( s * memorySeriesStorage ) handleEvictList ( ) {
ticker := time . NewTicker ( maxEvictInterval )
count := 0
2015-01-07 18:02:38 +00:00
2014-11-13 19:50:25 +00:00
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
2014-11-20 20:03:51 +00:00
// requests with evict==true that have happened since the last
2014-11-13 19:50:25 +00:00
// evict run is more than maxMemoryChunks/1000.
select {
case req := <- s . evictRequests :
if req . evict {
req . cd . evictListElement = s . evictList . PushBack ( req . cd )
count ++
if count > s . maxMemoryChunks / 1000 {
s . maybeEvict ( )
count = 0
}
} else {
if req . cd . evictListElement != nil {
s . evictList . Remove ( req . cd . evictListElement )
req . cd . evictListElement = nil
}
}
case <- ticker . C :
if s . evictList . Len ( ) > 0 {
s . maybeEvict ( )
}
case <- s . evictStopping :
2015-01-22 13:42:15 +00:00
// Drain evictRequests forever in a goroutine to not let
// requesters hang.
go func ( ) {
for {
<- s . evictRequests
2015-01-07 18:02:38 +00:00
}
2015-01-22 13:42:15 +00:00
} ( )
ticker . Stop ( )
2015-05-20 16:10:29 +00:00
log . Info ( "Chunk eviction stopped." )
2015-01-22 13:42:15 +00:00
close ( s . evictStopped )
return
2014-11-13 19:50:25 +00:00
}
}
}
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func ( s * memorySeriesStorage ) maybeEvict ( ) {
numChunksToEvict := int ( atomic . LoadInt64 ( & numMemChunks ) ) - s . maxMemoryChunks
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make ( [ ] * chunkDesc , numChunksToEvict )
for i := range chunkDescsToEvict {
e := s . evictList . Front ( )
if e == nil {
break
}
cd := e . Value . ( * chunkDesc )
cd . evictListElement = nil
chunkDescsToEvict [ i ] = cd
s . evictList . Remove ( e )
}
// Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
2014-11-20 20:03:51 +00:00
// goroutine that is supposed to empty the channel is waiting for the
2014-11-13 19:50:25 +00:00
// chunkDesc lock to try to evict the chunk.
go func ( ) {
for _ , cd := range chunkDescsToEvict {
if cd == nil {
break
}
cd . maybeEvict ( )
// We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again.
}
} ( )
2014-06-06 09:55:53 +00:00
}
2014-11-10 21:26:07 +00:00
// waitForNextFP waits an estimated duration, after which we want to process
// another fingerprint so that we will process all fingerprints in a tenth of
2015-02-26 14:19:44 +00:00
// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
// to drop chunks after 40h, we want to cycle through all fingerprints within
2015-04-01 15:52:03 +00:00
// 4h. The estimation is based on the total number of fingerprints as passed
// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the
// method will never wait for longer than fpMaxWaitDuration.
//
// The maxWaitDurationFactor can be used to reduce the waiting time if a faster
// processing is required (for example because unpersisted chunks pile up too
// much).
//
// Normally, the method returns true once the wait duration has passed. However,
// if s.loopStopped is closed, it will return false immediately.
func ( s * memorySeriesStorage ) waitForNextFP ( numberOfFPs int , maxWaitDurationFactor float64 ) bool {
2014-11-13 15:55:15 +00:00
d := fpMaxWaitDuration
2014-11-10 21:26:07 +00:00
if numberOfFPs != 0 {
2015-02-26 14:19:44 +00:00
sweepTime := s . dropAfter / 10
2014-11-13 15:55:15 +00:00
if sweepTime > fpMaxSweepTime {
sweepTime = fpMaxSweepTime
}
2015-04-01 15:52:03 +00:00
calculatedWait := time . Duration ( float64 ( sweepTime ) / float64 ( numberOfFPs ) * maxWaitDurationFactor )
if calculatedWait < d {
d = calculatedWait
2014-11-10 21:26:07 +00:00
}
}
2015-04-01 15:52:03 +00:00
if d == 0 {
return true
}
2014-11-10 21:26:07 +00:00
t := time . NewTimer ( d )
select {
case <- t . C :
return true
case <- s . loopStopping :
return false
}
}
2014-11-20 20:03:51 +00:00
// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
// series in memory in a throttled fashion. It continues to cycle through all
// fingerprints in memory until s.loopStopping is closed.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) cycleThroughMemoryFingerprints ( ) chan model . Fingerprint {
memoryFingerprints := make ( chan model . Fingerprint )
2014-11-10 21:26:07 +00:00
go func ( ) {
2015-08-20 15:18:46 +00:00
var fpIter <- chan model . Fingerprint
2014-11-10 21:26:07 +00:00
defer func ( ) {
if fpIter != nil {
2014-12-26 12:37:30 +00:00
for range fpIter {
2014-11-10 21:26:07 +00:00
// Consume the iterator.
}
}
close ( memoryFingerprints )
} ( )
for {
// Initial wait, also important if there are no FPs yet.
2015-04-01 15:52:03 +00:00
if ! s . waitForNextFP ( s . fpToSeries . length ( ) , 1 ) {
2014-11-10 21:26:07 +00:00
return
}
2014-11-20 20:03:51 +00:00
begin := time . Now ( )
2014-11-10 21:26:07 +00:00
fpIter = s . fpToSeries . fpIter ( )
2015-02-26 14:19:44 +00:00
count := 0
2014-11-10 21:26:07 +00:00
for fp := range fpIter {
select {
case memoryFingerprints <- fp :
case <- s . loopStopping :
return
}
2015-04-01 15:52:03 +00:00
// Reduce the wait time by the backlog score.
s . waitForNextFP ( s . fpToSeries . length ( ) , s . persistenceBacklogScore ( ) )
2015-02-26 14:19:44 +00:00
count ++
}
if count > 0 {
2015-05-20 16:10:29 +00:00
log . Infof (
2015-02-26 14:19:44 +00:00
"Completed maintenance sweep through %d in-memory fingerprints in %v." ,
count , time . Since ( begin ) ,
)
2014-11-10 21:26:07 +00:00
}
}
} ( )
2014-11-20 20:03:51 +00:00
return memoryFingerprints
}
// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
// for archived series in a throttled fashion. It continues to cycle through all
// archived fingerprints until s.loopStopping is closed.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) cycleThroughArchivedFingerprints ( ) chan model . Fingerprint {
archivedFingerprints := make ( chan model . Fingerprint )
2014-11-10 21:26:07 +00:00
go func ( ) {
defer close ( archivedFingerprints )
for {
2015-05-20 17:13:06 +00:00
archivedFPs , err := s . persistence . fingerprintsModifiedBefore (
2015-08-20 15:18:46 +00:00
model . Now ( ) . Add ( - s . dropAfter ) ,
2014-11-10 21:26:07 +00:00
)
if err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Failed to lookup archived fingerprint ranges: " , err )
2015-04-01 15:52:03 +00:00
s . waitForNextFP ( 0 , 1 )
2014-11-10 21:26:07 +00:00
continue
}
// Initial wait, also important if there are no FPs yet.
2015-04-01 15:52:03 +00:00
if ! s . waitForNextFP ( len ( archivedFPs ) , 1 ) {
2014-11-10 21:26:07 +00:00
return
}
2014-11-20 20:03:51 +00:00
begin := time . Now ( )
2014-11-10 21:26:07 +00:00
for _ , fp := range archivedFPs {
select {
case archivedFingerprints <- fp :
case <- s . loopStopping :
return
}
2015-04-01 15:52:03 +00:00
// Never speed up maintenance of archived FPs.
s . waitForNextFP ( len ( archivedFPs ) , 1 )
2014-11-10 21:26:07 +00:00
}
2015-02-26 14:19:44 +00:00
if len ( archivedFPs ) > 0 {
2015-05-20 16:10:29 +00:00
log . Infof (
2015-02-26 14:19:44 +00:00
"Completed maintenance sweep through %d archived fingerprints in %v." ,
len ( archivedFPs ) , time . Since ( begin ) ,
)
}
2014-11-10 21:26:07 +00:00
}
} ( )
2014-11-20 20:03:51 +00:00
return archivedFingerprints
}
func ( s * memorySeriesStorage ) loop ( ) {
2015-01-08 19:15:58 +00:00
checkpointTimer := time . NewTimer ( s . checkpointInterval )
2015-03-09 01:33:10 +00:00
dirtySeriesCount := 0
2014-11-20 20:03:51 +00:00
defer func ( ) {
2015-01-08 19:15:58 +00:00
checkpointTimer . Stop ( )
2015-05-20 16:10:29 +00:00
log . Info ( "Maintenance loop stopped." )
2014-11-20 20:03:51 +00:00
close ( s . loopStopped )
} ( )
memoryFingerprints := s . cycleThroughMemoryFingerprints ( )
archivedFingerprints := s . cycleThroughArchivedFingerprints ( )
2014-11-10 21:26:07 +00:00
loop :
2014-06-06 09:55:53 +00:00
for {
select {
2014-10-24 18:27:27 +00:00
case <- s . loopStopping :
2014-11-10 21:26:07 +00:00
break loop
2015-01-08 19:15:58 +00:00
case <- checkpointTimer . C :
2014-10-27 19:40:48 +00:00
s . persistence . checkpointSeriesMapAndHeads ( s . fpToSeries , s . fpLocker )
2015-03-09 01:33:10 +00:00
dirtySeriesCount = 0
2015-01-08 19:15:58 +00:00
checkpointTimer . Reset ( s . checkpointInterval )
2014-11-10 21:26:07 +00:00
case fp := <- memoryFingerprints :
2015-08-20 15:18:46 +00:00
if s . maintainMemorySeries ( fp , model . Now ( ) . Add ( - s . dropAfter ) ) {
2015-03-09 01:33:10 +00:00
dirtySeriesCount ++
2015-03-19 14:41:50 +00:00
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if the storage is not in "graceful
// degratadion mode".
if dirtySeriesCount >= s . checkpointDirtySeriesLimit && ! s . isDegraded ( ) {
2015-03-09 01:33:10 +00:00
checkpointTimer . Reset ( 0 )
}
}
2014-11-10 21:26:07 +00:00
case fp := <- archivedFingerprints :
2015-08-20 15:18:46 +00:00
s . maintainArchivedSeries ( fp , model . Now ( ) . Add ( - s . dropAfter ) )
2014-06-06 09:55:53 +00:00
}
}
2014-11-10 21:26:07 +00:00
// Wait until both channels are closed.
2014-12-26 12:37:30 +00:00
for range memoryFingerprints {
2014-11-10 21:26:07 +00:00
}
2014-12-26 12:37:30 +00:00
for range archivedFingerprints {
2014-11-10 21:26:07 +00:00
}
2014-06-06 09:55:53 +00:00
}
2015-03-09 01:33:10 +00:00
// maintainMemorySeries maintains a series that is in memory (i.e. not
// archived). It returns true if the method has changed from clean to dirty
// (i.e. it is inconsistent with the latest checkpoint now so that in case of a
// crash a recovery operation that requires a disk seek needed to be applied).
//
// The method first closes the head chunk if it was not touched for the duration
// of headChunkTimeout.
//
// Then it determines the chunks that need to be purged and the chunks that need
// to be persisted. Depending on the result, it does the following:
//
// - If all chunks of a series need to be purged, the whole series is deleted
// for good and the method returns false. (Detecting non-existence of a series
// file does not require a disk seek.)
//
// - If any chunks need to be purged (but not all of them), it purges those
// chunks from memory and rewrites the series file on disk, leaving out the
// purged chunks and appending all chunks not yet persisted (with the exception
// of a still open head chunk).
//
// - If no chunks on disk need to be purged, but chunks need to be persisted,
// those chunks are simply appended to the existing series file (or the file is
// created if it does not exist yet).
//
// - If no chunks need to be purged and no chunks need to be persisted, nothing
// happens in this step.
//
// Next, the method checks if all chunks in the series are evicted. In that
// case, it archives the series and returns true.
//
// Finally, it evicts chunkDescs if there are too many.
func ( s * memorySeriesStorage ) maintainMemorySeries (
2015-08-20 15:18:46 +00:00
fp model . Fingerprint , beforeTime model . Time ,
2015-03-09 01:33:10 +00:00
) ( becameDirty bool ) {
2015-03-19 16:06:16 +00:00
defer func ( begin time . Time ) {
s . maintainSeriesDuration . WithLabelValues ( maintainInMemory ) . Observe (
float64 ( time . Since ( begin ) ) / float64 ( time . Millisecond ) ,
)
} ( time . Now ( ) )
2014-11-13 19:50:25 +00:00
s . fpLocker . Lock ( fp )
2015-03-09 01:33:10 +00:00
defer s . fpLocker . Unlock ( fp )
2014-11-13 19:50:25 +00:00
series , ok := s . fpToSeries . get ( fp )
if ! ok {
2015-02-26 14:19:44 +00:00
// Series is actually not in memory, perhaps archived or dropped in the meantime.
2015-03-09 01:33:10 +00:00
return false
2015-02-26 14:19:44 +00:00
}
defer s . seriesOps . WithLabelValues ( memoryMaintenance ) . Inc ( )
2015-03-09 01:33:10 +00:00
if series . maybeCloseHeadChunk ( ) {
2015-03-18 18:36:41 +00:00
s . incNumChunksToPersist ( 1 )
2015-03-09 01:33:10 +00:00
}
seriesWasDirty := series . dirty
if s . writeMemorySeries ( fp , series , beforeTime ) {
2015-02-26 14:19:44 +00:00
// Series is gone now, we are done.
2015-03-09 01:33:10 +00:00
return false
2014-11-13 19:50:25 +00:00
}
2015-02-26 14:19:44 +00:00
2014-11-13 19:50:25 +00:00
iOldestNotEvicted := - 1
for i , cd := range series . chunkDescs {
if ! cd . isEvicted ( ) {
iOldestNotEvicted = i
break
}
}
// Archive if all chunks are evicted.
if iOldestNotEvicted == - 1 {
s . fpToSeries . del ( fp )
s . numSeries . Dec ( )
if err := s . persistence . archiveMetric (
2015-07-13 19:12:27 +00:00
fp , series . metric , series . firstTime ( ) , series . lastTime ,
2014-11-13 19:50:25 +00:00
) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error archiving metric %v: %v" , series . metric , err )
Fix a bug handling freshly unarchived series.
Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
2015-01-08 15:10:31 +00:00
return
2014-11-13 19:50:25 +00:00
}
Fix a bug handling freshly unarchived series.
Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
2015-01-08 15:10:31 +00:00
s . seriesOps . WithLabelValues ( archive ) . Inc ( )
2014-11-13 19:50:25 +00:00
return
}
// If we are here, the series is not archived, so check for chunkDesc
2015-07-15 17:53:15 +00:00
// eviction next.
2014-11-13 19:50:25 +00:00
series . evictChunkDescs ( iOldestNotEvicted )
2015-03-09 01:33:10 +00:00
return series . dirty && ! seriesWasDirty
2014-11-13 19:50:25 +00:00
}
2015-03-09 01:33:10 +00:00
// writeMemorySeries (re-)writes a memory series file. While doing so, it drops
2015-03-18 18:09:07 +00:00
// chunks older than beforeTime from both the series file (if it exists) as well
// as from memory. The provided chunksToPersist are appended to the newly
2015-03-09 01:33:10 +00:00
// written series file. If no chunks need to be purged, but chunksToPersist is
// not empty, those chunks are simply appended to the series file. If the series
// contains no chunks after dropping old chunks, it is purged entirely. In that
// case, the method returns true.
//
// The caller must have locked the fp.
func ( s * memorySeriesStorage ) writeMemorySeries (
2015-08-20 15:18:46 +00:00
fp model . Fingerprint , series * memorySeries , beforeTime model . Time ,
2015-03-09 01:33:10 +00:00
) bool {
2015-05-20 17:13:06 +00:00
cds := series . chunksToPersist ( )
2015-03-09 01:33:10 +00:00
defer func ( ) {
for _ , cd := range cds {
cd . unpin ( s . evictRequests )
}
2015-03-18 18:36:41 +00:00
s . incNumChunksToPersist ( - len ( cds ) )
2015-03-09 01:33:10 +00:00
chunkOps . WithLabelValues ( persistAndUnpin ) . Add ( float64 ( len ( cds ) ) )
2015-05-20 17:13:06 +00:00
series . modTime = s . persistence . seriesFileModTime ( fp )
2015-03-09 01:33:10 +00:00
} ( )
// Get the actual chunks from underneath the chunkDescs.
2015-05-20 17:13:06 +00:00
// No lock required as chunks still to persist cannot be evicted.
2015-03-09 01:33:10 +00:00
chunks := make ( [ ] chunk , len ( cds ) )
for i , cd := range cds {
2015-05-20 17:13:06 +00:00
chunks [ i ] = cd . c
2015-03-09 01:33:10 +00:00
}
2015-02-26 14:19:44 +00:00
if ! series . firstTime ( ) . Before ( beforeTime ) {
2015-03-09 01:33:10 +00:00
// Oldest sample not old enough, just append chunks, if any.
if len ( cds ) == 0 {
return false
}
offset , err := s . persistence . persistChunks ( fp , chunks )
if err != nil {
s . persistErrors . Inc ( )
return false
}
if series . chunkDescsOffset == - 1 {
// This is the first chunk persisted for a newly created
// series that had prior chunks on disk. Finally, we can
// set the chunkDescsOffset.
series . chunkDescsOffset = offset
}
2015-02-26 14:19:44 +00:00
return false
}
2015-03-09 01:33:10 +00:00
newFirstTime , offset , numDroppedFromPersistence , allDroppedFromPersistence , err :=
s . persistence . dropAndPersistChunks ( fp , beforeTime , chunks )
2015-02-26 14:19:44 +00:00
if err != nil {
2015-03-09 01:33:10 +00:00
s . persistErrors . Inc ( )
return false
2015-02-26 14:19:44 +00:00
}
2015-03-09 01:33:10 +00:00
series . dropChunks ( beforeTime )
2015-04-09 13:57:11 +00:00
if len ( series . chunkDescs ) == 0 && allDroppedFromPersistence {
// All chunks dropped from both memory and persistence. Delete the series for good.
2015-02-26 14:19:44 +00:00
s . fpToSeries . del ( fp )
s . numSeries . Dec ( )
s . seriesOps . WithLabelValues ( memoryPurge ) . Inc ( )
s . persistence . unindexMetric ( fp , series . metric )
return true
}
2015-03-09 01:33:10 +00:00
series . savedFirstTime = newFirstTime
if series . chunkDescsOffset == - 1 {
series . chunkDescsOffset = offset
} else {
series . chunkDescsOffset -= numDroppedFromPersistence
2015-02-26 14:19:44 +00:00
if series . chunkDescsOffset < 0 {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Dropped more chunks from persistence than from memory for fingerprint %v, series %v." , fp , series )
2015-03-18 18:58:47 +00:00
s . persistence . setDirty ( true )
series . chunkDescsOffset = - 1 // Makes sure it will be looked at during crash recovery.
2015-02-26 14:19:44 +00:00
}
}
return false
}
// maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) maintainArchivedSeries ( fp model . Fingerprint , beforeTime model . Time ) {
2015-03-19 16:06:16 +00:00
defer func ( begin time . Time ) {
s . maintainSeriesDuration . WithLabelValues ( maintainArchived ) . Observe (
float64 ( time . Since ( begin ) ) / float64 ( time . Millisecond ) ,
)
} ( time . Now ( ) )
2014-10-07 17:11:24 +00:00
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
2014-09-10 16:41:52 +00:00
2014-11-10 17:22:08 +00:00
has , firstTime , lastTime , err := s . persistence . hasArchivedMetric ( fp )
if err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error looking up archived time range: " , err )
2014-11-10 17:22:08 +00:00
return
}
if ! has || ! firstTime . Before ( beforeTime ) {
// Oldest sample not old enough, or metric purged or unarchived in the meantime.
return
}
2015-02-26 14:19:44 +00:00
defer s . seriesOps . WithLabelValues ( archiveMaintenance ) . Inc ( )
2015-03-09 01:33:10 +00:00
newFirstTime , _ , _ , allDropped , err := s . persistence . dropAndPersistChunks ( fp , beforeTime , nil )
2014-11-10 17:22:08 +00:00
if err != nil {
2015-05-20 16:10:29 +00:00
log . Error ( "Error dropping persisted chunks: " , err )
2014-11-10 17:22:08 +00:00
}
2014-10-15 13:53:05 +00:00
if allDropped {
2015-02-26 14:19:44 +00:00
if err := s . persistence . purgeArchivedMetric ( fp ) ; err != nil {
2015-05-20 16:10:29 +00:00
log . Errorf ( "Error purging archived metric for fingerprint %v: %v" , fp , err )
2014-11-10 17:22:08 +00:00
return
2014-10-15 13:53:05 +00:00
}
2014-11-10 17:22:08 +00:00
s . seriesOps . WithLabelValues ( archivePurge ) . Inc ( )
return
2014-06-06 09:55:53 +00:00
}
2014-11-10 17:22:08 +00:00
s . persistence . updateArchivedTimeRange ( fp , newFirstTime , lastTime )
2014-06-06 09:55:53 +00:00
}
2014-11-27 19:46:45 +00:00
// See persistence.loadChunks for detailed explanation.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) loadChunks ( fp model . Fingerprint , indexes [ ] int , indexOffset int ) ( [ ] chunk , error ) {
2014-11-27 19:46:45 +00:00
return s . persistence . loadChunks ( fp , indexes , indexOffset )
}
// See persistence.loadChunkDescs for detailed explanation.
2015-08-20 15:18:46 +00:00
func ( s * memorySeriesStorage ) loadChunkDescs ( fp model . Fingerprint , offsetFromEnd int ) ( [ ] * chunkDesc , error ) {
2015-07-06 23:10:14 +00:00
return s . persistence . loadChunkDescs ( fp , offsetFromEnd )
2014-11-27 19:46:45 +00:00
}
2015-03-18 18:36:41 +00:00
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
func ( s * memorySeriesStorage ) getNumChunksToPersist ( ) int {
return int ( atomic . LoadInt64 ( & s . numChunksToPersist ) )
2015-03-09 01:33:10 +00:00
}
2015-03-18 18:36:41 +00:00
// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
2015-03-09 01:33:10 +00:00
// negative 'by' to decrement.
2015-03-18 18:36:41 +00:00
func ( s * memorySeriesStorage ) incNumChunksToPersist ( by int ) {
atomic . AddInt64 ( & s . numChunksToPersist , int64 ( by ) )
2015-03-09 01:33:10 +00:00
}
2015-03-19 14:41:50 +00:00
// isDegraded returns whether the storage is in "graceful degradation mode",
// which is the case if the number of chunks waiting for persistence has reached
2015-03-19 16:54:59 +00:00
// a percentage of maxChunksToPersist that exceeds
2015-03-19 14:41:50 +00:00
// percentChunksToPersistForDegradation. The method is not goroutine safe (but
// only ever called from the goroutine dealing with series maintenance).
// Changes of degradation mode are logged.
func ( s * memorySeriesStorage ) isDegraded ( ) bool {
nowDegraded := s . getNumChunksToPersist ( ) > s . maxChunksToPersist * percentChunksToPersistForDegradation / 100
if s . degraded && ! nowDegraded {
2015-05-20 16:10:29 +00:00
log . Warn ( "Storage has left graceful degradation mode. Things are back to normal." )
2015-03-19 14:41:50 +00:00
} else if ! s . degraded && nowDegraded {
2015-05-20 16:10:29 +00:00
log . Warnf (
2015-04-01 15:52:03 +00:00
"%d chunks waiting for persistence (%d%% of the allowed maximum %d). Storage is now in graceful degradation mode. Series files are not synced anymore if following the adaptive strategy. Checkpoints are not performed more often than every %v. Series maintenance happens as frequently as possible." ,
2015-03-19 16:54:59 +00:00
s . getNumChunksToPersist ( ) ,
s . getNumChunksToPersist ( ) * 100 / s . maxChunksToPersist ,
s . maxChunksToPersist ,
s . checkpointInterval )
2015-03-19 14:41:50 +00:00
}
s . degraded = nowDegraded
return s . degraded
}
2015-04-01 15:52:03 +00:00
// persistenceBacklogScore works similar to isDegraded, but returns a score
// about how close we are to degradation. This score is 1.0 if no chunks are
// waiting for persistence and 0.0 if we are at or above the degradation
// threshold.
func ( s * memorySeriesStorage ) persistenceBacklogScore ( ) float64 {
score := 1 - float64 ( s . getNumChunksToPersist ( ) ) / float64 ( s . maxChunksToPersist * percentChunksToPersistForDegradation / 100 )
if score < 0 {
return 0
}
return score
}
2014-10-07 17:11:24 +00:00
// Describe implements prometheus.Collector.
func ( s * memorySeriesStorage ) Describe ( ch chan <- * prometheus . Desc ) {
s . persistence . Describe ( ch )
2015-05-21 15:50:06 +00:00
s . mapper . Describe ( ch )
2014-10-23 13:18:32 +00:00
2015-02-01 11:47:51 +00:00
ch <- s . persistErrors . Desc ( )
2015-03-18 18:36:41 +00:00
ch <- maxChunksToPersistDesc
ch <- numChunksToPersistDesc
2014-10-23 13:18:32 +00:00
ch <- s . numSeries . Desc ( )
s . seriesOps . Describe ( ch )
ch <- s . ingestedSamplesCount . Desc ( )
2015-07-13 19:12:27 +00:00
ch <- s . outOfOrderSamplesCount . Desc ( )
2014-11-05 19:02:45 +00:00
ch <- s . invalidPreloadRequestsCount . Desc ( )
2014-10-23 13:18:32 +00:00
ch <- numMemChunksDesc
2015-03-19 16:06:16 +00:00
s . maintainSeriesDuration . Describe ( ch )
2014-10-07 17:11:24 +00:00
}
// Collect implements prometheus.Collector.
func ( s * memorySeriesStorage ) Collect ( ch chan <- prometheus . Metric ) {
s . persistence . Collect ( ch )
2015-05-21 15:50:06 +00:00
s . mapper . Collect ( ch )
2014-10-23 13:18:32 +00:00
2015-02-01 11:47:51 +00:00
ch <- s . persistErrors
2015-03-09 01:33:10 +00:00
ch <- prometheus . MustNewConstMetric (
2015-03-18 18:36:41 +00:00
maxChunksToPersistDesc ,
prometheus . GaugeValue ,
float64 ( s . maxChunksToPersist ) ,
)
ch <- prometheus . MustNewConstMetric (
numChunksToPersistDesc ,
2015-03-09 01:33:10 +00:00
prometheus . GaugeValue ,
2015-03-18 18:36:41 +00:00
float64 ( s . getNumChunksToPersist ( ) ) ,
2015-03-09 01:33:10 +00:00
)
2014-10-23 13:18:32 +00:00
ch <- s . numSeries
s . seriesOps . Collect ( ch )
ch <- s . ingestedSamplesCount
2015-07-13 19:12:27 +00:00
ch <- s . outOfOrderSamplesCount
2014-11-05 19:02:45 +00:00
ch <- s . invalidPreloadRequestsCount
2015-02-26 23:06:16 +00:00
ch <- prometheus . MustNewConstMetric (
numMemChunksDesc ,
prometheus . GaugeValue ,
2015-03-09 01:33:10 +00:00
float64 ( atomic . LoadInt64 ( & numMemChunks ) ) ,
)
2015-03-19 16:06:16 +00:00
s . maintainSeriesDuration . Collect ( ch )
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
}