@ -129,12 +129,13 @@ const (
type syncStrategy func ( ) bool
type syncStrategy func ( ) bool
type memorySeriesStorage struct {
type memorySeriesStorage struct {
// numChunksToPersist has to be aligned for atomic operations.
// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
numChunksToPersist int64 // The number of chunks waiting for persistence.
archiveHighWatermark model . Time // No archived series has samples after this time.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
numChunksToPersist int64 // The number of chunks waiting for persistence.
rushed bool // Whether the storage is in rushed mode.
maxChunksToPersist int // If numChunksToPersist reaches this threshold, ingestion will be throttled.
rushedMtx sync . Mutex // Protects entering and exiting rushed mode.
rushed bool // Whether the storage is in rushed mode.
throttled chan struct { } // This chan is sent to whenever NeedsThrottling() returns true (for logging).
rushedMtx sync . Mutex // Protects entering and exiting rushed mode.
throttled chan struct { } // This chan is sent to whenever NeedsThrottling() returns true (for logging).
fpLocker * fingerprintLocker
fpLocker * fingerprintLocker
fpToSeries * seriesMap
fpToSeries * seriesMap
@ -201,6 +202,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
dropAfter : o . PersistenceRetentionPeriod ,
dropAfter : o . PersistenceRetentionPeriod ,
checkpointInterval : o . CheckpointInterval ,
checkpointInterval : o . CheckpointInterval ,
checkpointDirtySeriesLimit : o . CheckpointDirtySeriesLimit ,
checkpointDirtySeriesLimit : o . CheckpointDirtySeriesLimit ,
archiveHighWatermark : model . Now ( ) . Add ( - headChunkTimeout ) ,
maxChunksToPersist : o . MaxChunksToPersist ,
maxChunksToPersist : o . MaxChunksToPersist ,
@ -368,15 +370,20 @@ func (s *memorySeriesStorage) WaitForIndexing() {
}
}
// LastSampleForFingerprint implements Storage.
// LastSampleForFingerprint implements Storage.
func ( s * memorySeriesStorage ) LastSample Pair ForFingerprint( fp model . Fingerprint ) model . Sample Pair {
func ( s * memorySeriesStorage ) LastSample ForFingerprint( fp model . Fingerprint ) model . Sample {
s . fpLocker . Lock ( fp )
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
defer s . fpLocker . Unlock ( fp )
series , ok := s . fpToSeries . get ( fp )
series , ok := s . fpToSeries . get ( fp )
if ! ok {
if ! ok {
return ZeroSamplePair
return ZeroSample
}
sp := series . lastSamplePair ( )
return model . Sample {
Metric : series . metric ,
Value : sp . Value ,
Timestamp : sp . Timestamp ,
}
}
return series . lastSamplePair ( )
}
}
// boundedIterator wraps a SeriesIterator and does not allow fetching
// boundedIterator wraps a SeriesIterator and does not allow fetching
@ -439,7 +446,10 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
}
}
// MetricsForLabelMatchers implements Storage.
// MetricsForLabelMatchers implements Storage.
func ( s * memorySeriesStorage ) MetricsForLabelMatchers ( matchers ... * metric . LabelMatcher ) map [ model . Fingerprint ] metric . Metric {
func ( s * memorySeriesStorage ) MetricsForLabelMatchers (
from , through model . Time ,
matchers ... * metric . LabelMatcher ,
) map [ model . Fingerprint ] metric . Metric {
var (
var (
equals [ ] model . LabelPair
equals [ ] model . LabelPair
filters [ ] * metric . LabelMatcher
filters [ ] * metric . LabelMatcher
@ -491,9 +501,11 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
filters = remaining
filters = remaining
}
}
result := make ( map [ model . Fingerprint ] metric . Metric , len ( resFPs ) )
result := map [ model . Fingerprint ] metric . Metric {}
for fp := range resFPs {
for fp := range resFPs {
result [ fp ] = s . MetricForFingerprint ( fp )
if metric , ok := s . metricForFingerprint ( fp , from , through ) ; ok {
result [ fp ] = metric
}
}
}
for _ , matcher := range filters {
for _ , matcher := range filters {
for fp , met := range result {
for fp , met := range result {
@ -505,37 +517,65 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(matchers ...*metric.LabelM
return result
return result
}
}
// LabelValuesForLabelName implements Storage.
// metricForFingerprint returns the metric for the given fingerprint if the
func ( s * memorySeriesStorage ) LabelValuesForLabelName ( labelName model . LabelName ) model . LabelValues {
// corresponding time series has samples between 'from' and 'through'.
lvs , err := s . persistence . labelValuesForLabelName ( labelName )
func ( s * memorySeriesStorage ) metricForFingerprint (
if err != nil {
fp model . Fingerprint ,
log . Errorf ( "Error getting label values for label name %q: %v" , labelName , err )
from , through model . Time ,
}
) ( metric . Metric , bool ) {
return lvs
// Lock FP so that no (un-)archiving will happen during lookup.
}
// MetricForFingerprint implements Storage.
func ( s * memorySeriesStorage ) MetricForFingerprint ( fp model . Fingerprint ) metric . Metric {
s . fpLocker . Lock ( fp )
s . fpLocker . Lock ( fp )
defer s . fpLocker . Unlock ( fp )
defer s . fpLocker . Unlock ( fp )
watermark := model . Time ( atomic . LoadInt64 ( ( * int64 ) ( & s . archiveHighWatermark ) ) )
series , ok := s . fpToSeries . get ( fp )
series , ok := s . fpToSeries . get ( fp )
if ok {
if ok {
if series . lastTime . Before ( from ) || series . savedFirstTime . After ( through ) {
return metric . Metric { } , false
}
// Wrap the returned metric in a copy-on-write (COW) metric here because
// Wrap the returned metric in a copy-on-write (COW) metric here because
// the caller might mutate it.
// the caller might mutate it.
return metric . Metric {
return metric . Metric {
Metric : series . metric ,
Metric : series . metric ,
} , true
}
// From here on, we are only concerned with archived metrics.
// If the high watermark of archived series is before 'from', we are done.
if watermark < from {
return metric . Metric { } , false
}
if from . After ( model . Earliest ) || through . Before ( model . Latest ) {
// The range lookup is relatively cheap, so let's do it first.
ok , first , last , err := s . persistence . hasArchivedMetric ( fp )
if err != nil {
log . Errorf ( "Error retrieving archived time range for fingerprint %v: %v" , fp , err )
return metric . Metric { } , false
}
if ! ok || first . After ( through ) || last . Before ( from ) {
return metric . Metric { } , false
}
}
}
}
met , err := s . persistence . archivedMetric ( fp )
met , err := s . persistence . archivedMetric ( fp )
if err != nil {
if err != nil {
log . Errorf ( "Error retrieving archived metric for fingerprint %v: %v" , fp , err )
log . Errorf ( "Error retrieving archived metric for fingerprint %v: %v" , fp , err )
return metric . Metric { } , false
}
}
return metric . Metric {
return metric . Metric {
Metric : met ,
Metric : met ,
Copied : false ,
Copied : false ,
} , true
}
// LabelValuesForLabelName implements Storage.
func ( s * memorySeriesStorage ) LabelValuesForLabelName ( labelName model . LabelName ) model . LabelValues {
lvs , err := s . persistence . labelValuesForLabelName ( labelName )
if err != nil {
log . Errorf ( "Error getting label values for label name %q: %v" , labelName , err )
}
}
return lvs
}
}
// DropMetric implements Storage.
// DropMetric implements Storage.
@ -1077,8 +1117,9 @@ func (s *memorySeriesStorage) maintainMemorySeries(
}
}
}
}
// Archive if all chunks are evicted.
// Archive if all chunks are evicted. Also make sure the last sample has
if iOldestNotEvicted == - 1 {
// an age of at least headChunkTimeout (which is very likely anyway).
if iOldestNotEvicted == - 1 && model . Now ( ) . Sub ( series . lastTime ) > headChunkTimeout {
s . fpToSeries . del ( fp )
s . fpToSeries . del ( fp )
s . numSeries . Dec ( )
s . numSeries . Dec ( )
if err := s . persistence . archiveMetric (
if err := s . persistence . archiveMetric (
@ -1088,6 +1129,15 @@ func (s *memorySeriesStorage) maintainMemorySeries(
return
return
}
}
s . seriesOps . WithLabelValues ( archive ) . Inc ( )
s . seriesOps . WithLabelValues ( archive ) . Inc ( )
oldWatermark := atomic . LoadInt64 ( ( * int64 ) ( & s . archiveHighWatermark ) )
if oldWatermark < int64 ( series . lastTime ) {
if ! atomic . CompareAndSwapInt64 (
( * int64 ) ( & s . archiveHighWatermark ) ,
oldWatermark , int64 ( series . lastTime ) ,
) {
panic ( "s.archiveHighWatermark modified outside of maintainMemorySeries" )
}
}
return
return
}
}
// If we are here, the series is not archived, so check for chunkDesc
// If we are here, the series is not archived, so check for chunkDesc