From 8fba3302bcb87c3a6290f7e3777b07d5977ea9e9 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 7 Oct 2014 19:11:24 +0200 Subject: [PATCH] Bold changes to concurrency. (WIP. Probably doesn't work yet.) Change-Id: Id1537dfcca53831a1d428078a5863ece7bdf4875 --- main.go | 10 +- storage/local/interface.go | 108 +------------ storage/local/persistence.go | 234 +++++++++++++++++++--------- storage/local/persistence_test.go | 34 ++--- storage/local/series.go | 170 ++++++++++++++++----- storage/local/storage.go | 246 +++++++++++------------------- storage/local/storage_test.go | 4 +- storage/local/test_helpers.go | 6 +- 8 files changed, 405 insertions(+), 407 deletions(-) diff --git a/main.go b/main.go index c5d4f1118..d9e69e869 100644 --- a/main.go +++ b/main.go @@ -137,16 +137,10 @@ func main() { glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) } - persistence, err := local.NewDiskPersistence(*metricsStoragePath, 1024) - if err != nil { - glog.Fatal("Error opening disk persistence: ", err) - } - registry.MustRegister(persistence) - o := &local.MemorySeriesStorageOptions{ - Persistence: persistence, MemoryEvictionInterval: *memoryEvictionInterval, MemoryRetentionPeriod: *memoryRetentionPeriod, + PersistenceStoragePath: *metricsStoragePath, PersistencePurgeInterval: *storagePurgeInterval, PersistenceRetentionPeriod: *storageRetentionPeriod, } @@ -155,7 +149,7 @@ func main() { glog.Fatal("Error opening memory series storage: ", err) } defer memStorage.Close() - //registry.MustRegister(memStorage) + registry.MustRegister(memStorage) var remoteTSDBQueue *remote.TSDBQueueManager if *remoteTSDBUrl == "" { diff --git a/storage/local/interface.go b/storage/local/interface.go index de053c2ac..81b17de63 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -15,15 +15,15 @@ package local import ( clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage/metric" ) -// SeriesMap maps fingerprints to memory series. -type SeriesMap map[clientmodel.Fingerprint]*memorySeries - // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. type Storage interface { + prometheus.Collector // AppendSamples stores a group of new samples. Multiple samples for the same // fingerprint need to be submitted in chronological order, from oldest to // newest (both in the same call to AppendSamples and across multiple calls). @@ -65,108 +65,6 @@ type SeriesIterator interface { GetRangeValues(metric.Interval) metric.Values } -// A Persistence is used by a Storage implementation to store samples -// persistently across restarts. The methods are generally not goroutine-safe -// unless marked otherwise. The chunk-related methods PersistChunk, DropChunks, -// LoadChunks, and LoadChunkDescs can be called concurrently with each other if -// each call refers to a different fingerprint. -// -// TODO: As a Persistence is really only used within this package, consider not -// exporting it. -type Persistence interface { - // PersistChunk persists a single chunk of a series. It is the caller's - // responsibility to not modify chunk concurrently. - PersistChunk(clientmodel.Fingerprint, chunk) error - // DropChunks deletes all chunks from a series whose last sample time is - // before beforeTime. It returns true if all chunks of the series have - // been deleted. - DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (allDropped bool, err error) - // LoadChunks loads a group of chunks of a timeseries by their index. The - // chunk with the earliest time will have index 0, the following ones will - // have incrementally larger indexes. - LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) - // LoadChunkDescs loads chunkDescs for a series up until a given time. - LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) - - // PersistSeriesMapAndHeads persists the fingerprint to memory-series - // mapping and all open (non-full) head chunks. It is the caller's - // responsibility to not modify SeriesMap concurrently. Do not call - // concurrently with LoadSeriesMapAndHeads. - PersistSeriesMapAndHeads(SeriesMap) error - // LoadSeriesMapAndHeads loads the fingerprint to memory-series mapping - // and all open (non-full) head chunks. Do not call - // concurrently with PersistSeriesMapAndHeads. - LoadSeriesMapAndHeads() (SeriesMap, error) - - // GetFingerprintsForLabelPair returns the fingerprints for the given - // label pair. This method is goroutine-safe but take into account that - // metrics queued for indexing with IndexMetric might not yet made it - // into the index. (Same applies correspondingly to UnindexMetric.) - GetFingerprintsForLabelPair(metric.LabelPair) (clientmodel.Fingerprints, error) - // GetLabelValuesForLabelName returns the label values for the given - // label name. This method is goroutine-safe but take into account that - // metrics queued for indexing with IndexMetric might not yet made it - // into the index. (Same applies correspondingly to UnindexMetric.) - GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error) - - // IndexMetric queues the given metric for addition to the indexes - // needed by GetFingerprintsForLabelPair and GetLabelValuesForLabelName. - // If the queue is full, this method blocks until the metric can be queued. - // This method is goroutine-safe. - IndexMetric(clientmodel.Metric, clientmodel.Fingerprint) - // UnindexMetric queues references to the given metric for removal from - // the indexes used for GetFingerprintsForLabelPair and - // GetLabelValuesForLabelName. The index of fingerprints to archived - // metrics is not affected by this removal. (In fact, never call this - // method for an archived metric. To drop an archived metric, call - // DropArchivedFingerprint.) If the queue is full, this method blocks - // until the metric can be queued. This method is goroutine-safe. - UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) - // WaitForIndexing waits until all items in the indexing queue are - // processed. If queue processing is currently on hold (to gather more - // ops for batching), this method will trigger an immediate start of - // processing. This method is goroutine-safe. - WaitForIndexing() - - // ArchiveMetric persists the mapping of the given fingerprint to the - // given metric, together with the first and last timestamp of the - // series belonging to the metric. Do not call concurrently with - // UnarchiveMetric or DropArchivedMetric. - ArchiveMetric( - fingerprint clientmodel.Fingerprint, metric clientmodel.Metric, - firstTime, lastTime clientmodel.Timestamp, - ) error - // HasArchivedMetric returns whether the archived metric for the given - // fingerprint exists and if yes, what the first and last timestamp in - // the corresponding series is. This method is goroutine-safe. - HasArchivedMetric(clientmodel.Fingerprint) ( - hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, - ) - // GetFingerprintsModifiedBefore returns the fingerprints of archived - // timeseries that have live samples before the provided timestamp. This - // method is goroutine-safe (but behavior during concurrent modification - // via ArchiveMetric, UnarchiveMetric, or DropArchivedMetric is - // undefined). - GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) - // GetArchivedMetric retrieves the archived metric with the given - // fingerprint. This method is goroutine-safe. - GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error) - // DropArchivedMetric deletes an archived fingerprint and its - // corresponding metric entirely. It also queues the metric for - // un-indexing (no need to call UnindexMetric for the deleted metric.) - // Do not call concurrently with UnarchiveMetric or ArchiveMetric. - DropArchivedMetric(clientmodel.Fingerprint) error - // UnarchiveMetric deletes an archived fingerprint and its metric, but - // (in contrast to DropArchivedMetric) does not un-index the metric. - // The method returns true if a metric was actually deleted. Do not call - // concurrently with DropArchivedMetric or ArchiveMetric. - UnarchiveMetric(clientmodel.Fingerprint) (bool, error) - - // Close flushes the indexing queue and other buffered data and releases - // any held resources. - Close() error -} - // A Preloader preloads series data necessary for a query into memory and pins // them until released via Close(). Its methods are generally not // goroutine-safe. diff --git a/storage/local/persistence.go b/storage/local/persistence.go index a3e50b14f..830dd9434 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -20,6 +20,7 @@ import ( "io" "os" "path" + "sync" "time" "github.com/golang/glog" @@ -75,10 +76,20 @@ type indexingOp struct { opType indexingOpType } -type diskPersistence struct { +// A Persistence is used by a Storage implementation to store samples +// persistently across restarts. The methods are only goroutine-safe if +// explicitly marked as such below. The chunk-related methods PersistChunk, +// DropChunks, LoadChunks, and LoadChunkDescs can be called concurrently with +// each other if each call refers to a different fingerprint. +type persistence struct { basePath string chunkLen int + // archiveMtx protects the archiving-related methods ArchiveMetric, + // UnarchiveMetric, DropArchiveMetric, and GetFingerprintsModifiedBefore + // from concurrent calls. + archiveMtx sync.Mutex + archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex labelPairToFingerprints *index.LabelPairFingerprintIndex @@ -94,8 +105,8 @@ type diskPersistence struct { indexingBatchLatency prometheus.Summary } -// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. -func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) { +// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. +func newPersistence(basePath string, chunkLen int) (*persistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } @@ -117,7 +128,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) return nil, err } - p := &diskPersistence{ + p := &persistence{ basePath: basePath, chunkLen: chunkLen, @@ -167,7 +178,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) } // Describe implements prometheus.Collector. -func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) { +func (p *persistence) Describe(ch chan<- *prometheus.Desc) { ch <- p.indexingQueueLength.Desc() ch <- p.indexingQueueCapacity.Desc() p.indexingBatchSizes.Describe(ch) @@ -175,7 +186,7 @@ func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) { } // Collect implements prometheus.Collector. -func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) { +func (p *persistence) Collect(ch chan<- prometheus.Metric) { p.indexingQueueLength.Set(float64(len(p.indexingQueue))) ch <- p.indexingQueueLength @@ -184,8 +195,11 @@ func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) { p.indexingBatchLatency.Collect(ch) } -// GetFingerprintsForLabelPair implements persistence. -func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { +// getFingerprintsForLabelPair returns the fingerprints for the given label +// pair. This method is goroutine-safe but take into account that metrics queued +// for indexing with IndexMetric might not yet made it into the index. (Same +// applies correspondingly to UnindexMetric.) +func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { return nil, err @@ -193,8 +207,11 @@ func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clie return fps, nil } -// GetLabelValuesForLabelName implements persistence. -func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { +// getLabelValuesForLabelName returns the label values for the given label +// name. This method is goroutine-safe but take into account that metrics queued +// for indexing with IndexMetric might not yet made it into the index. (Same +// applies correspondingly to UnindexMetric.) +func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { return nil, err @@ -202,8 +219,10 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) ( return lvs, nil } -// PersistChunk implements Persistence. -func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) error { +// persistChunk persists a single chunk of a series. It is the caller's +// responsibility to not modify chunk concurrently and to not persist or drop anything +// for the same fingerprint concurrently. +func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) if err != nil { @@ -224,8 +243,11 @@ func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) erro return c.marshal(b) } -// LoadChunks implements Persistence. -func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) { +// loadChunks loads a group of chunks of a timeseries by their index. The chunk +// with the earliest time will have index 0, the following ones will have +// incrementally larger indexes. It is the caller's responsibility to not +// persist or drop anything for the same fingerprint concurrently. +func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) { // TODO: we need to verify at some point that file length is a multiple of // the chunk size. When is the best time to do this, and where to remember // it? Right now, we only do it when loading chunkDescs. @@ -270,7 +292,10 @@ func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int) return chunks, nil } -func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) { +// loadChunkDescs loads chunkDescs for a series up until a given time. It is +// the caller's responsibility to not persist or drop anything for the same +// fingerprint concurrently. +func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil @@ -323,8 +348,17 @@ func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime return cds, nil } -// PersistSeriesMapAndHeads implements Persistence. -func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap) error { +// persistSeriesMapAndHeads persists the fingerprint to memory-series mapping +// and all open (non-full) head chunks. Do not call concurrently with +// LoadSeriesMapAndHeads. +// +// TODO: Currently, this method assumes to be called while nothing else is going +// on in the storage concurrently. To make this method callable during normal +// operations, certain things have to be done: +// - Make sure the length of the seriesMap doesn't change during the runtime. +// - Lock the fingerprints while persisting unpersisted head chunks. +// - Write to temporary file and only rename after successfully finishing. +func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) error { f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { return err @@ -338,34 +372,41 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap if err := codable.EncodeVarint(w, headsFormatVersion); err != nil { return err } - if err := codable.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil { + if err := codable.EncodeVarint(w, int64(fingerprintToSeries.length())); err != nil { return err } - for fp, series := range fingerprintToSeries { + iter := fingerprintToSeries.iter() + defer func() { + // Consume the iterator in any case to not leak goroutines. + for _ = range iter { + } + }() + + for m := range iter { var seriesFlags byte - if series.chunkDescsLoaded { + if m.series.chunkDescsLoaded { seriesFlags |= flagChunkDescsLoaded } - if series.headChunkPersisted { + if m.series.headChunkPersisted { seriesFlags |= flagHeadChunkPersisted } if err := w.WriteByte(seriesFlags); err != nil { return err } - if err := codable.EncodeUint64(w, uint64(fp)); err != nil { + if err := codable.EncodeUint64(w, uint64(m.fp)); err != nil { return err } - buf, err := codable.Metric(series.metric).MarshalBinary() + buf, err := codable.Metric(m.series.metric).MarshalBinary() if err != nil { return err } w.Write(buf) - if err := codable.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil { + if err := codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil { return err } - for i, chunkDesc := range series.chunkDescs { - if series.headChunkPersisted || i < len(series.chunkDescs)-1 { + for i, chunkDesc := range m.series.chunkDescs { + if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 { if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return err } @@ -386,55 +427,58 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap return w.Flush() } -// LoadSeriesMapAndHeads implements Persistence. -func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { +// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all +// open (non-full) head chunks. Only call this method during start-up while +// nothing else is running in storage land. This method is utterly +// goroutine-unsafe. +func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) { f, err := os.Open(p.headsPath()) if os.IsNotExist(err) { - return SeriesMap{}, nil + return newSeriesMap(), nil } if err != nil { - return nil, err + return seriesMap{}, err } defer f.Close() r := bufio.NewReaderSize(f, fileBufSize) buf := make([]byte, len(headsMagicString)) if _, err := io.ReadFull(r, buf); err != nil { - return nil, err + return seriesMap{}, err } magic := string(buf) if magic != headsMagicString { - return nil, fmt.Errorf( + return seriesMap{}, fmt.Errorf( "unexpected magic string, want %q, got %q", headsMagicString, magic, ) } if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil { - return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) + return seriesMap{}, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) } numSeries, err := binary.ReadVarint(r) if err != nil { - return nil, err + return seriesMap{}, err } - fingerprintToSeries := make(SeriesMap, numSeries) + fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries) for ; numSeries > 0; numSeries-- { seriesFlags, err := r.ReadByte() if err != nil { - return nil, err + return seriesMap{}, err } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 fp, err := codable.DecodeUint64(r) if err != nil { - return nil, err + return seriesMap{}, err } var metric codable.Metric if err := metric.UnmarshalFromReader(r); err != nil { - return nil, err + return seriesMap{}, err } numChunkDescs, err := binary.ReadVarint(r) if err != nil { - return nil, err + return seriesMap{}, err } chunkDescs := make(chunkDescs, numChunkDescs) @@ -442,11 +486,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { if headChunkPersisted || i < numChunkDescs-1 { firstTime, err := binary.ReadVarint(r) if err != nil { - return nil, err + return seriesMap{}, err } lastTime, err := binary.ReadVarint(r) if err != nil { - return nil, err + return seriesMap{}, err } chunkDescs[i] = &chunkDesc{ firstTimeField: clientmodel.Timestamp(firstTime), @@ -456,11 +500,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { // Non-persisted head chunk. chunkType, err := r.ReadByte() if err != nil { - return nil, err + return seriesMap{}, err } chunk := chunkForType(chunkType) if err := chunk.unmarshal(r); err != nil { - return nil, err + return seriesMap{}, err } chunkDescs[i] = &chunkDesc{ chunk: chunk, @@ -476,11 +520,14 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { headChunkPersisted: headChunkPersisted, } } - return fingerprintToSeries, nil + return seriesMap{m: fingerprintToSeries}, nil } -// DropChunks implements persistence. -func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { +// dropChunks deletes all chunks from a series whose last sample time is before +// beforeTime. It returns true if all chunks of the series have been deleted. +// It is the caller's responsibility to make sure nothing is persisted or loaded +// for the same fingerprint concurrently. +func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return true, nil @@ -538,18 +585,30 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie return false, nil } -// IndexMetric implements Persistence. -func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { +// indexMetric queues the given metric for addition to the indexes needed by +// getFingerprintsForLabelPair, getLabelValuesForLabelName, and +// getFingerprintsModifiedBefore. If the queue is full, this method blocks +// until the metric can be queued. This method is goroutine-safe. +func (p *persistence) indexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { p.indexingQueue <- indexingOp{fp, m, add} } -// UnindexMetric implements Persistence. -func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { +// unindexMetric queues references to the given metric for removal from the +// indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and +// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics +// is not affected by this removal. (In fact, never call this method for an +// archived metric. To drop an archived metric, call dropArchivedFingerprint.) +// If the queue is full, this method blocks until the metric can be queued. This +// method is goroutine-safe. +func (p *persistence) unindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { p.indexingQueue <- indexingOp{fp, m, remove} } -// WaitForIndexing implements Persistence. -func (p *diskPersistence) WaitForIndexing() { +// waitForIndexing waits until all items in the indexing queue are processed. If +// queue processing is currently on hold (to gather more ops for batching), this +// method will trigger an immediate start of processing. This method is +// goroutine-safe. +func (p *persistence) waitForIndexing() { wait := make(chan int) for { p.indexingFlush <- wait @@ -559,10 +618,15 @@ func (p *diskPersistence) WaitForIndexing() { } } -// ArchiveMetric implements Persistence. -func (p *diskPersistence) ArchiveMetric( +// archiveMetric persists the mapping of the given fingerprint to the given +// metric, together with the first and last timestamp of the series belonging to +// the metric. This method is goroutine-safe. +func (p *persistence) archiveMetric( fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { + p.archiveMtx.Lock() + defer p.archiveMtx.Unlock() + if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { return err } @@ -572,16 +636,26 @@ func (p *diskPersistence) ArchiveMetric( return nil } -// HasArchivedMetric implements Persistence. -func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( +// hasArchivedMetric returns whether the archived metric for the given +// fingerprint exists and if yes, what the first and last timestamp in the +// corresponding series is. This method is goroutine-safe. +func (p *persistence) hasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) return } -// GetFingerprintsModifiedBefore implements Persistence. -func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { +// getFingerprintsModifiedBefore returns the fingerprints of archived timeseries +// that have live samples before the provided timestamp. This method is +// goroutine-safe. +func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { + // The locking makes sure archivedFingerprintToTimeRange won't be + // mutated while being iterated over (which will probably not result in + // races, but might still yield weird results). + p.archiveMtx.Lock() + defer p.archiveMtx.Unlock() + var fp codable.Fingerprint var tr codable.TimeRange fps := []clientmodel.Fingerprint{} @@ -600,15 +674,21 @@ func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.T return fps, nil } -// GetArchivedMetric implements Persistence. -func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { +// getArchivedMetric retrieves the archived metric with the given +// fingerprint. This method is goroutine-safe. +func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) return metric, err } -// DropArchivedMetric implements Persistence. -func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { - metric, err := p.GetArchivedMetric(fp) +// dropArchivedMetric deletes an archived fingerprint and its corresponding +// metric entirely. It also queues the metric for un-indexing (no need to call +// unindexMetric for the deleted metric.) This method is goroutine-safe. +func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error { + p.archiveMtx.Lock() + defer p.archiveMtx.Unlock() + + metric, err := p.getArchivedMetric(fp) if err != nil || metric == nil { return err } @@ -618,12 +698,17 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return err } - p.UnindexMetric(metric, fp) + p.unindexMetric(metric, fp) return nil } -// UnarchiveMetric implements Persistence. -func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { +// unarchiveMetric deletes an archived fingerprint and its metric, but (in +// contrast to dropArchivedMetric) does not un-index the metric. The method +// returns true if a metric was actually deleted. This method is goroutine-safe. +func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { + p.archiveMtx.Lock() + defer p.archiveMtx.Unlock() + has, err := p.archivedFingerprintToTimeRange.Has(fp) if err != nil || !has { return false, err @@ -637,8 +722,9 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err return true, nil } -// Close implements Persistence. -func (p *diskPersistence) Close() error { +// close flushes the indexing queue and other buffered data and releases any +// held resources. +func (p *persistence) close() error { close(p.indexingQueue) <-p.indexingStopped @@ -662,12 +748,12 @@ func (p *diskPersistence) Close() error { return lastError } -func (p *diskPersistence) dirForFingerprint(fp clientmodel.Fingerprint) string { +func (p *persistence) dirForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() return fmt.Sprintf("%s/%c%c/%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:]) } -func (p *diskPersistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { +func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { dirname := p.dirForFingerprint(fp) ex, err := exists(dirname) if err != nil { @@ -681,7 +767,7 @@ func (p *diskPersistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (* return os.OpenFile(path.Join(dirname, seriesFileName), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) } -func (p *diskPersistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { +func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { dirname := p.dirForFingerprint(fp) return os.Open(path.Join(dirname, seriesFileName)) } @@ -695,15 +781,15 @@ func writeChunkHeader(w io.Writer, c chunk) error { return err } -func (p *diskPersistence) offsetForChunkIndex(i int) int64 { +func (p *persistence) offsetForChunkIndex(i int) int64 { return int64(i * (chunkHeaderLen + p.chunkLen)) } -func (p *diskPersistence) headsPath() string { +func (p *persistence) headsPath() string { return path.Join(p.basePath, headsFileName) } -func (p *diskPersistence) processIndexingQueue() { +func (p *persistence) processIndexingQueue() { batchSize := 0 nameToValues := index.LabelNameLabelValuesMapping{} pairToFPs := index.LabelPairFingerprintsMapping{} diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 32da344fa..a949db8c9 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -25,15 +25,15 @@ import ( "github.com/prometheus/prometheus/utility/test" ) -func newTestPersistence(t *testing.T) (Persistence, test.Closer) { +func newTestPersistence(t *testing.T) (*persistence, test.Closer) { dir := test.NewTemporaryDirectory("test_persistence", t) - p, err := NewDiskPersistence(dir.Path(), 1024) + p, err := newPersistence(dir.Path(), 1024) if err != nil { dir.Close() t.Fatal(err) } return p, test.NewCallbackCloser(func() { - p.Close() + p.close() dir.Close() }) } @@ -83,7 +83,7 @@ func TestPersistChunk(t *testing.T) { for fp, chunks := range fpToChunks { for _, c := range chunks { - if err := p.PersistChunk(fp, c); err != nil { + if err := p.persistChunk(fp, c); err != nil { t.Fatal(err) } } @@ -94,7 +94,7 @@ func TestPersistChunk(t *testing.T) { for i := range expectedChunks { indexes = append(indexes, i) } - actualChunks, err := p.LoadChunks(fp, indexes) + actualChunks, err := p.loadChunks(fp, indexes) if err != nil { t.Fatal(err) } @@ -254,21 +254,21 @@ func TestIndexing(t *testing.T) { indexedFpsToMetrics := index.FingerprintMetricMapping{} for i, b := range batches { for fp, m := range b.fpToMetric { - p.IndexMetric(m, fp) - if err := p.ArchiveMetric(fp, m, 1, 2); err != nil { + p.indexMetric(m, fp) + if err := p.archiveMetric(fp, m, 1, 2); err != nil { t.Fatal(err) } indexedFpsToMetrics[fp] = m } - verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) + verifyIndexedState(i, t, b, indexedFpsToMetrics, p) } for i := len(batches) - 1; i >= 0; i-- { b := batches[i] - verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) + verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p) for fp, m := range b.fpToMetric { - p.UnindexMetric(m, fp) - unarchived, err := p.UnarchiveMetric(fp) + p.unindexMetric(m, fp) + unarchived, err := p.unarchiveMetric(fp) if err != nil { t.Fatal(err) } @@ -280,11 +280,11 @@ func TestIndexing(t *testing.T) { } } -func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) { - p.WaitForIndexing() +func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) { + p.waitForIndexing() for fp, m := range indexedFpsToMetrics { // Compare archived metrics with input metrics. - mOut, err := p.GetArchivedMetric(fp) + mOut, err := p.getArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet } // Check that archived metrics are in membership index. - has, first, last, err := p.HasArchivedMetric(fp) + has, first, last, err := p.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -310,7 +310,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs, err := p.GetLabelValuesForLabelName(ln) + outLvs, err := p.getLabelValuesForLabelName(ln) if err != nil { t.Fatal(err) } @@ -327,7 +327,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label pair -> fingerprints mappings. for lp, fps := range b.expectedLpToFps { - outFps, err := p.GetFingerprintsForLabelPair(lp) + outFps, err := p.getFingerprintsForLabelPair(lp) if err != nil { t.Fatal(err) } diff --git a/storage/local/series.go b/storage/local/series.go index 4cff46461..2eb4e14da 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -22,6 +22,98 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. +type fingerprintSeriesPair struct { + fp clientmodel.Fingerprint + series *memorySeries +} + +// seriesMap maps fingerprints to memory series. All its methods are +// goroutine-safe. A SeriesMap is effectively is a goroutine-safe version of +// map[clientmodel.Fingerprint]*memorySeries. +type seriesMap struct { + mtx sync.RWMutex + m map[clientmodel.Fingerprint]*memorySeries +} + +// newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap +// based on a prefilled map, use an explicit initializer. +func newSeriesMap() seriesMap { + return seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)} +} + +// length returns the number of mappings in the seriesMap. +func (sm seriesMap) length() int { + sm.mtx.RLock() + defer sm.mtx.RUnlock() + + return len(sm.m) +} + +// get returns a memorySeries for a fingerprint. Return values have the same +// semantics as the native Go map. +func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) { + sm.mtx.RLock() + defer sm.mtx.RUnlock() + + s, ok = sm.m[fp] + return +} + +// put adds a mapping to the seriesMap. +func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) { + sm.mtx.Lock() + defer sm.mtx.Unlock() + + sm.m[fp] = s +} + +// del removes a mapping from the series Map. +func (sm seriesMap) del(fp clientmodel.Fingerprint) { + sm.mtx.Lock() + defer sm.mtx.Unlock() + + delete(sm.m, fp) +} + +// iter returns a channel that produces all mappings in the seriesMap. The +// channel will be closed once all fingerprints have been received. Not +// consuming all fingerprints from the channel will leak a goroutine. The +// semantics of concurrent modification of seriesMap is the same as for +// iterating over a map with a 'range' clause. +func (sm seriesMap) iter() <-chan fingerprintSeriesPair { + ch := make(chan fingerprintSeriesPair) + go func() { + sm.mtx.RLock() + for fp, s := range sm.m { + sm.mtx.RUnlock() + ch <- fingerprintSeriesPair{fp, s} + sm.mtx.RLock() + } + sm.mtx.RUnlock() + }() + return ch +} + +// fpIter returns a channel that produces all fingerprints in the seriesMap. The +// channel will be closed once all fingerprints have been received. Not +// consuming all fingerprints from the channel will leak a goroutine. The +// semantics of concurrent modification of seriesMap is the same as for +// iterating over a map with a 'range' clause. +func (sm seriesMap) fpIter() <-chan clientmodel.Fingerprint { + ch := make(chan clientmodel.Fingerprint) + go func() { + sm.mtx.RLock() + for fp := range sm.m { + sm.mtx.RUnlock() + ch <- fp + sm.mtx.RLock() + } + sm.mtx.RUnlock() + }() + return ch +} + type chunkDescs []*chunkDesc type chunkDesc struct { @@ -110,8 +202,6 @@ func (cd *chunkDesc) evictNow() { } type memorySeries struct { - mtx sync.Mutex - metric clientmodel.Metric // Sorted by start time, overlapping chunk ranges are forbidden. chunkDescs chunkDescs @@ -125,17 +215,21 @@ type memorySeries struct { headChunkPersisted bool } -func newMemorySeries(m clientmodel.Metric) *memorySeries { +// newMemorySeries returns a pointer to a newly allocated memorySeries for the +// given metric. reallyNew defines if the memorySeries is a genuinely new series +// or (if false) a series for a metric being unarchived, i.e. a series that +// existed before but has been evicted from memory. +func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries { return &memorySeries{ - metric: m, - chunkDescsLoaded: true, + metric: m, + chunkDescsLoaded: reallyNew, + headChunkPersisted: !reallyNew, } } +// add adds a sample pair to the series. +// The caller must have locked the fingerprint of the series. func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { - s.mtx.Lock() - defer s.mtx.Unlock() - if len(s.chunkDescs) == 0 || s.headChunkPersisted { newHead := &chunkDesc{ chunk: newDeltaEncodedChunk(d1, d0, true), @@ -172,9 +266,9 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per } } +// persistHeadChunk queues the head chunk for persisting if not already done. +// The caller must have locked the fingerprint of the series. func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) { - s.mtx.Lock() - defer s.mtx.Unlock() if s.headChunkPersisted { return } @@ -185,10 +279,9 @@ func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue } } +// evictOlderThan evicts chunks whose latest sample is older than the given timestamp. +// The caller must have locked the fingerprint of the series. func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) { - s.mtx.Lock() - defer s.mtx.Unlock() - // For now, always drop the entire range from oldest to t. for _, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { @@ -203,10 +296,8 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) } // purgeOlderThan returns true if all chunks have been purged. +// The caller must have locked the fingerprint of the series. func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { - s.mtx.Lock() - defer s.mtx.Unlock() - keepIdx := len(s.chunkDescs) for i, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { @@ -224,10 +315,11 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { return len(s.chunkDescs) == 0 } +// preloadChunks is an internal helper method. // TODO: in this method (and other places), we just fudge around with chunkDesc // internals without grabbing the chunkDesc lock. Study how this needs to be -// protected against other accesses that don't hold the series lock. -func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs, error) { +// protected against other accesses that don't hold the fp lock. +func (s *memorySeries) preloadChunks(indexes []int, p *persistence) (chunkDescs, error) { loadIndexes := []int{} pinnedChunkDescs := make(chunkDescs, 0, len(indexes)) for _, idx := range indexes { @@ -241,7 +333,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs, if len(loadIndexes) > 0 { fp := s.metric.Fingerprint() - chunks, err := p.LoadChunks(fp, loadIndexes) + chunks, err := p.loadChunks(fp, loadIndexes) if err != nil { // Unpin any pinned chunks that were already loaded. for _, cd := range pinnedChunkDescs { @@ -261,7 +353,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs, } /* -func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p Persistence) (chunkDescs, error) { +func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p *persistence) (chunkDescs, error) { s.mtx.Lock() defer s.mtx.Unlock() @@ -291,8 +383,9 @@ func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p Persistenc } */ -func (s *memorySeries) loadChunkDescs(p Persistence) error { - cds, err := p.LoadChunkDescs(s.metric.Fingerprint(), s.chunkDescs[0].firstTime()) +// loadChunkDescs is an internal helper method. +func (s *memorySeries) loadChunkDescs(p *persistence) error { + cds, err := p.loadChunkDescs(s.metric.Fingerprint(), s.chunkDescs[0].firstTime()) if err != nil { return err } @@ -301,10 +394,9 @@ func (s *memorySeries) loadChunkDescs(p Persistence) error { return nil } -func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through clientmodel.Timestamp, p Persistence) (chunkDescs, error) { - s.mtx.Lock() - defer s.mtx.Unlock() - +// preloadChunksForRange loads chunks for the given range from the persistence. +// The caller must have locked the fingerprint of the series. +func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through clientmodel.Timestamp, p *persistence) (chunkDescs, error) { if !s.chunkDescsLoaded && from.Before(s.chunkDescs[0].firstTime()) { if err := s.loadChunkDescs(p); err != nil { return nil, err @@ -339,15 +431,12 @@ func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - mtx *sync.Mutex - chunkIt chunkIterator - chunks chunks + lock, unlock func() + chunkIt chunkIterator + chunks chunks } -func (s *memorySeries) newIterator() SeriesIterator { - // TODO: Possible concurrency issue if series is modified while this is - // running. Only caller at the moment is in NewIterator() in storage.go, - // where there is no locking. +func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { chunks := make(chunks, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { if cd.chunk != nil { @@ -360,7 +449,8 @@ func (s *memorySeries) newIterator() SeriesIterator { } return &memorySeriesIterator{ - mtx: &s.mtx, + lock: lockFunc, + unlock: unlockFunc, chunks: chunks, } } @@ -389,8 +479,8 @@ func (s *memorySeries) lastTime() clientmodel.Timestamp { // GetValueAtTime implements SeriesIterator. func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { - it.mtx.Lock() - defer it.mtx.Unlock() + it.lock() + defer it.unlock() // The most common case. We are iterating through a chunk. if it.chunkIt != nil && it.chunkIt.contains(t) { @@ -437,8 +527,8 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V // GetBoundaryValues implements SeriesIterator. func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { - it.mtx.Lock() - defer it.mtx.Unlock() + it.lock() + defer it.unlock() // Find the first relevant chunk. i := sort.Search(len(it.chunks), func(i int) bool { @@ -487,8 +577,8 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val // GetRangeValues implements SeriesIterator. func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values { - it.mtx.Lock() - defer it.mtx.Unlock() + it.lock() + defer it.unlock() // Find the first relevant chunk. i := sort.Search(len(it.chunks), func(i int) bool { diff --git a/storage/local/storage.go b/storage/local/storage.go index 53c4a3e09..6b5d031da 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,12 +16,12 @@ package local import ( "fmt" - "sync" "time" "github.com/golang/glog" clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/storage/metric" ) @@ -36,13 +36,12 @@ const ( ) type memorySeriesStorage struct { - mtx sync.RWMutex + fpLocker *fingerprintLocker - state storageState persistDone chan bool stopServing chan chan<- bool - fingerprintToSeries SeriesMap + fingerprintToSeries seriesMap memoryEvictionInterval time.Duration memoryRetentionPeriod time.Duration @@ -51,16 +50,16 @@ type memorySeriesStorage struct { persistenceRetentionPeriod time.Duration persistQueue chan *persistRequest - persistence Persistence + persistence *persistence } // MemorySeriesStorageOptions contains options needed by // NewMemorySeriesStorage. It is not safe to leave any of those at their zero // values. type MemorySeriesStorageOptions struct { - Persistence Persistence // Used to persist storage content across restarts. MemoryEvictionInterval time.Duration // How often to check for memory eviction. MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory. + PersistenceStoragePath string // Location of persistence files. PersistencePurgeInterval time.Duration // How often to check for purging. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. } @@ -68,14 +67,20 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { + p, err := newPersistence(o.PersistenceStoragePath, 1024) + if err != nil { + return nil, err + } glog.Info("Loading series map and head chunks...") - fingerprintToSeries, err := o.Persistence.LoadSeriesMapAndHeads() + fingerprintToSeries, err := p.loadSeriesMapAndHeads() if err != nil { return nil, err } - numSeries.Set(float64(len(fingerprintToSeries))) + numSeries.Set(float64(fingerprintToSeries.length())) return &memorySeriesStorage{ + fpLocker: newFingerprintLocker(100), // TODO: Tweak value. + fingerprintToSeries: fingerprintToSeries, persistDone: make(chan bool), stopServing: make(chan chan<- bool), @@ -87,7 +92,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { persistenceRetentionPeriod: o.PersistenceRetentionPeriod, persistQueue: make(chan *persistRequest, persistQueueCap), - persistence: o.Persistence, + persistence: p, }, nil } @@ -106,14 +111,10 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) { } func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { - s.mtx.Lock() - defer s.mtx.Unlock() - - if s.state != storageServing { - panic("storage is not serving") - } - fp := sample.Metric.Fingerprint() + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + series := s.getOrCreateSeries(fp, sample.Metric) series.add(fp, &metric.SamplePair{ Value: sample.Value, @@ -122,37 +123,27 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { } func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { - series, ok := s.fingerprintToSeries[fp] - + series, ok := s.fingerprintToSeries.get(fp) if !ok { - series = newMemorySeries(m) - s.fingerprintToSeries[fp] = series - numSeries.Set(float64(len(s.fingerprintToSeries))) - - unarchived, err := s.persistence.UnarchiveMetric(fp) + unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) } - - if unarchived { - // The series existed before, had been archived at some - // point, and has now been unarchived, i.e. it has - // chunks on disk. Set chunkDescsLoaded accordingly so - // that they will be looked at later. Also, an - // unarchived series comes with a persisted head chunk. - series.chunkDescsLoaded = false - series.headChunkPersisted = true - } else { + if !unarchived { // This was a genuinely new series, so index the metric. - s.persistence.IndexMetric(m, fp) + s.persistence.indexMetric(m, fp) } + series = newMemorySeries(m, !unarchived) + s.fingerprintToSeries.put(fp, series) + numSeries.Set(float64(s.fingerprintToSeries.length())) + } return series } /* func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) { - series, ok := s.fingerprintToSeries[fp] + series, ok := s.fingerprintToSeries.get(fp) if !ok { panic("requested preload for non-existent series") } @@ -162,13 +153,12 @@ func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) { stalenessDelta := 300 * time.Second // TODO: Turn into parameter. + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) - s.mtx.RLock() - series, ok := s.fingerprintToSeries[fp] - s.mtx.RUnlock() - + series, ok := s.fingerprintToSeries.get(fp) if !ok { - has, first, last, err := s.persistence.HasArchivedMetric(fp) + has, first, last, err := s.persistence.hasArchivedMetric(fp) if err != nil { return nil, err } @@ -176,13 +166,11 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, return nil, fmt.Errorf("requested preload for non-existent series %v", fp) } if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { - metric, err := s.persistence.GetArchivedMetric(fp) + metric, err := s.persistence.getArchivedMetric(fp) if err != nil { return nil, err } - s.mtx.Lock() series = s.getOrCreateSeries(fp, metric) - defer s.mtx.Unlock() // Ugh. } } return series.preloadChunksForRange(from, through, s.persistence) @@ -190,58 +178,37 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, // NewIterator implements storage. func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator { - s.mtx.RLock() - series, ok := s.fingerprintToSeries[fp] - s.mtx.RUnlock() + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) + series, ok := s.fingerprintToSeries.get(fp) if !ok { + // TODO: Could this legitimately happen? Series just got purged? panic("requested iterator for non-existent series") } - return series.newIterator() + return series.newIterator( + func() { s.fpLocker.Lock(fp) }, + func() { s.fpLocker.Unlock(fp) }, + ) } func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { - fpsToArchive := []clientmodel.Fingerprint{} - defer func(begin time.Time) { evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) }(time.Now()) - s.mtx.RLock() - for fp, series := range s.fingerprintToSeries { - if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { - fpsToArchive = append(fpsToArchive, fp) - series.persistHeadChunk(fp, s.persistQueue) - } - } - s.mtx.RUnlock() - - if len(fpsToArchive) == 0 { - return - } - - // If we are here, we have metrics to archive. For that, we need the write lock. - s.mtx.Lock() - defer s.mtx.Unlock() - - for _, fp := range fpsToArchive { - series, ok := s.fingerprintToSeries[fp] - if !ok { - // Oops, perhaps another evict run happening in parallel? - continue - } - // TODO: Need series lock (or later FP lock)? - if !series.headChunkPersisted { - // Oops. The series has received new samples all of a - // sudden, giving it a new head chunk. Leave it alone. - continue - } - if err := s.persistence.ArchiveMetric( - fp, series.metric, series.firstTime(), series.lastTime(), - ); err != nil { - glog.Errorf("Error archiving metric %v: %v", series.metric, err) + for m := range s.fingerprintToSeries.iter() { + s.fpLocker.Lock(m.fp) + if m.series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { + m.series.persistHeadChunk(m.fp, s.persistQueue) + s.fingerprintToSeries.del(m.fp) + if err := s.persistence.archiveMetric( + m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(), + ); err != nil { + glog.Errorf("Error archiving metric %v: %v", m.series.metric, err) + } } - delete(s.fingerprintToSeries, fp) + s.fpLocker.Unlock(m.fp) } } @@ -255,12 +222,11 @@ func recordPersist(start time.Time, err error) { func (s *memorySeriesStorage) handlePersistQueue() { for req := range s.persistQueue { - // TODO: Make this thread-safe? persistQueueLength.Set(float64(len(s.persistQueue))) //glog.Info("Persist request: ", *req.fingerprint) start := time.Now() - err := s.persistence.PersistChunk(req.fingerprint, req.chunkDesc.chunk) + err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk) recordPersist(start, err) if err != nil { glog.Error("Error persisting chunk, requeuing: ", err) @@ -275,16 +241,9 @@ func (s *memorySeriesStorage) handlePersistQueue() { // Close stops serving, flushes all pending operations, and frees all // resources. It implements Storage. func (s *memorySeriesStorage) Close() error { - s.mtx.Lock() - defer s.mtx.Unlock() - - if s.state == storageStopping { - panic("Illegal State: Attempted to restop memorySeriesStorage.") - } - stopped := make(chan bool) glog.Info("Waiting for storage to stop serving...") - s.stopServing <- (stopped) + s.stopServing <- stopped glog.Info("Serving stopped.") <-stopped @@ -294,17 +253,14 @@ func (s *memorySeriesStorage) Close() error { glog.Info("Persist loop stopped.") glog.Info("Persisting head chunks...") - if err := s.persistence.PersistSeriesMapAndHeads(s.fingerprintToSeries); err != nil { + if err := s.persistence.persistSeriesMapAndHeads(s.fingerprintToSeries); err != nil { return err } glog.Info("Done persisting head chunks.") - s.fingerprintToSeries = nil - if err := s.persistence.Close(); err != nil { + if err := s.persistence.close(); err != nil { return err } - - s.state = storageStopping return nil } @@ -318,29 +274,25 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { return case <-purgeTicker.C: glog.Info("Purging old series data...") - s.mtx.RLock() - + ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod) begin := time.Now() - fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries)) - for fp := range s.fingerprintToSeries { - fps = append(fps, fp) + for fp := range s.fingerprintToSeries.fpIter() { + select { + case <-stop: + glog.Info("Interrupted running series purge.") + return + default: + s.purgeSeries(fp, ts) + } } - s.mtx.RUnlock() - ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod) - - // TODO: If we decide not to remove entries from the timerange disk index - // upon unarchival, we could remove the memory copy above and only use - // the fingerprints from the disk index. - persistedFPs, err := s.persistence.GetFingerprintsModifiedBefore(ts) + persistedFPs, err := s.persistence.getFingerprintsModifiedBefore(ts) if err != nil { glog.Error("Failed to lookup persisted fingerprint ranges: ", err) break } - fps = append(fps, persistedFPs...) - - for _, fp := range fps { + for _, fp := range persistedFPs { select { case <-stop: glog.Info("Interrupted running series purge.") @@ -362,33 +314,20 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) { // series. If the series contains no chunks after the purge, it is dropped // entirely. func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { - s.mtx.Lock() - // TODO: This is a lock FAR to coarse! However, we cannot lock using the - // memorySeries since we might have none (for series that are on disk - // only). And we really don't want to un-archive a series from disk - // while we are at the same time purging it. A locking per fingerprint - // would be nice. Or something... Have to think about it... Careful, - // more race conditions lurk below. Also unsolved: If there are chunks - // in the persist queue. persistence.DropChunks and - // persistence.PersistChunck needs to be locked on fp level, or - // something. And even then, what happens if everything is dropped, but - // there are still chunks hung in the persist queue? They would later - // re-create a file for a series that doesn't exist anymore... But - // there is the ref count, which is one higher if you have not yet - // persisted the chunk. - defer s.mtx.Unlock() + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) // First purge persisted chunks. We need to do that anyway. - allDropped, err := s.persistence.DropChunks(fp, beforeTime) + allDropped, err := s.persistence.dropChunks(fp, beforeTime) if err != nil { glog.Error("Error purging persisted chunks: ", err) } // Purge chunks from memory accordingly. - if series, ok := s.fingerprintToSeries[fp]; ok { + if series, ok := s.fingerprintToSeries.get(fp); ok { if series.purgeOlderThan(beforeTime) && allDropped { - delete(s.fingerprintToSeries, fp) - s.persistence.UnindexMetric(series.metric, fp) + s.fingerprintToSeries.del(fp) + s.persistence.unindexMetric(series.metric, fp) } return } @@ -399,20 +338,13 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if !allDropped { return } - if err := s.persistence.DropArchivedMetric(fp); err != nil { + if err := s.persistence.dropArchivedMetric(fp); err != nil { glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) } } // Serve implements Storage. func (s *memorySeriesStorage) Serve(started chan<- bool) { - s.mtx.Lock() - if s.state != storageStarting { - panic("Illegal State: Attempted to restart memorySeriesStorage.") - } - s.state = storageServing - s.mtx.Unlock() - evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval) defer evictMemoryTicker.Stop() @@ -443,16 +375,12 @@ func (s *memorySeriesStorage) NewPreloader() Preloader { // GetFingerprintsForLabelMatchers implements Storage. func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints { - // TODO: Is this lock needed? - s.mtx.RLock() - defer s.mtx.RUnlock() - var result map[clientmodel.Fingerprint]struct{} for _, matcher := range labelMatchers { intersection := map[clientmodel.Fingerprint]struct{}{} switch matcher.Type { case metric.Equal: - fps, err := s.persistence.GetFingerprintsForLabelPair( + fps, err := s.persistence.getFingerprintsForLabelPair( metric.LabelPair{ Name: matcher.Name, Value: matcher.Value, @@ -470,7 +398,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr } } default: - values, err := s.persistence.GetLabelValuesForLabelName(matcher.Name) + values, err := s.persistence.getLabelValuesForLabelName(matcher.Name) if err != nil { glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err) } @@ -479,7 +407,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr return nil } for _, v := range matches { - fps, err := s.persistence.GetFingerprintsForLabelPair( + fps, err := s.persistence.getFingerprintsForLabelPair( metric.LabelPair{ Name: matcher.Name, Value: v, @@ -510,11 +438,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr // GetLabelValuesForLabelName implements Storage. func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues { - // TODO: Is this lock needed? - s.mtx.RLock() - defer s.mtx.RUnlock() - - lvs, err := s.persistence.GetLabelValuesForLabelName(labelName) + lvs, err := s.persistence.getLabelValuesForLabelName(labelName) if err != nil { glog.Errorf("Error getting label values for label name %q: %v", labelName, err) } @@ -523,10 +447,10 @@ func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.L // GetMetricForFingerprint implements Storage. func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric { - s.mtx.RLock() - defer s.mtx.RUnlock() + s.fpLocker.Lock(fp) + defer s.fpLocker.Unlock(fp) - series, ok := s.fingerprintToSeries[fp] + series, ok := s.fingerprintToSeries.get(fp) if ok { // Copy required here because caller might mutate the returned // metric. @@ -536,9 +460,19 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint } return m } - metric, err := s.persistence.GetArchivedMetric(fp) + metric, err := s.persistence.getArchivedMetric(fp) if err != nil { glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) } return metric } + +// Describe implements prometheus.Collector. +func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { + s.persistence.Describe(ch) +} + +// Collect implements prometheus.Collector. +func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { + s.persistence.Collect(ch) +} diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 45ee2d368..b63f2e30b 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -37,8 +37,8 @@ func TestChunk(t *testing.T) { s.AppendSamples(samples) - for _, s := range s.(*memorySeriesStorage).fingerprintToSeries { - for i, v := range s.values() { + for m := range s.(*memorySeriesStorage).fingerprintToSeries.iter() { + for i, v := range m.series.values() { if samples[i].Timestamp != v.Timestamp { t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp) } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 238464d0e..5ec8cf9ae 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -35,16 +35,12 @@ func (t *testStorageCloser) Close() { // returned test.Closer, the temporary directory is cleaned up. func NewTestStorage(t testing.TB) (Storage, test.Closer) { directory := test.NewTemporaryDirectory("test_storage", t) - persistence, err := NewDiskPersistence(directory.Path(), 1024) - if err != nil { - t.Fatal("Error opening disk persistence: ", err) - } o := &MemorySeriesStorageOptions{ - Persistence: persistence, MemoryEvictionInterval: time.Minute, MemoryRetentionPeriod: time.Hour, PersistencePurgeInterval: time.Hour, PersistenceRetentionPeriod: 24 * 7 * time.Hour, + PersistenceStoragePath: directory.Path(), } storage, err := NewMemorySeriesStorage(o) if err != nil {