diff --git a/storage/local/interface.go b/storage/local/interface.go index 79dc47a5d..de053c2ac 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -46,7 +46,11 @@ type Storage interface { Close() error } -// SeriesIterator enables efficient access of sample values in a series. +// SeriesIterator enables efficient access of sample values in a series. All +// methods are goroutine-safe. A SeriesIterator iterates over a snapshot of a +// series, i.e. it is safe to continue using a SeriesIterator after modifying +// the corresponding series, but the iterator will represent the state of the +// series prior the modification. type SeriesIterator interface { // Gets the two values that are immediately adjacent to a given time. In // case a value exist at precisely the given time, only that single @@ -62,33 +66,47 @@ type SeriesIterator interface { } // A Persistence is used by a Storage implementation to store samples -// persistently across restarts. +// persistently across restarts. The methods are generally not goroutine-safe +// unless marked otherwise. The chunk-related methods PersistChunk, DropChunks, +// LoadChunks, and LoadChunkDescs can be called concurrently with each other if +// each call refers to a different fingerprint. +// +// TODO: As a Persistence is really only used within this package, consider not +// exporting it. type Persistence interface { - // PersistChunk persists a single chunk of a series. + // PersistChunk persists a single chunk of a series. It is the caller's + // responsibility to not modify chunk concurrently. PersistChunk(clientmodel.Fingerprint, chunk) error - // PersistSeriesMapAndHeads persists the fingerprint to memory-series - // mapping and all open (non-full) head chunks. - PersistSeriesMapAndHeads(SeriesMap) error - - // DropChunks deletes all chunks from a timeseries whose last sample time is - // before beforeTime. + // DropChunks deletes all chunks from a series whose last sample time is + // before beforeTime. It returns true if all chunks of the series have + // been deleted. DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (allDropped bool, err error) - // LoadChunks loads a group of chunks of a timeseries by their index. The // chunk with the earliest time will have index 0, the following ones will // have incrementally larger indexes. LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) // LoadChunkDescs loads chunkDescs for a series up until a given time. LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) + + // PersistSeriesMapAndHeads persists the fingerprint to memory-series + // mapping and all open (non-full) head chunks. It is the caller's + // responsibility to not modify SeriesMap concurrently. Do not call + // concurrently with LoadSeriesMapAndHeads. + PersistSeriesMapAndHeads(SeriesMap) error // LoadSeriesMapAndHeads loads the fingerprint to memory-series mapping - // and all open (non-full) head chunks. + // and all open (non-full) head chunks. Do not call + // concurrently with PersistSeriesMapAndHeads. LoadSeriesMapAndHeads() (SeriesMap, error) // GetFingerprintsForLabelPair returns the fingerprints for the given - // label pair. + // label pair. This method is goroutine-safe but take into account that + // metrics queued for indexing with IndexMetric might not yet made it + // into the index. (Same applies correspondingly to UnindexMetric.) GetFingerprintsForLabelPair(metric.LabelPair) (clientmodel.Fingerprints, error) // GetLabelValuesForLabelName returns the label values for the given - // label name. + // label name. This method is goroutine-safe but take into account that + // metrics queued for indexing with IndexMetric might not yet made it + // into the index. (Same applies correspondingly to UnindexMetric.) GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error) // IndexMetric queues the given metric for addition to the indexes @@ -107,43 +125,51 @@ type Persistence interface { // WaitForIndexing waits until all items in the indexing queue are // processed. If queue processing is currently on hold (to gather more // ops for batching), this method will trigger an immediate start of - // processing. + // processing. This method is goroutine-safe. WaitForIndexing() // ArchiveMetric persists the mapping of the given fingerprint to the // given metric, together with the first and last timestamp of the - // series belonging to the metric. + // series belonging to the metric. Do not call concurrently with + // UnarchiveMetric or DropArchivedMetric. ArchiveMetric( fingerprint clientmodel.Fingerprint, metric clientmodel.Metric, firstTime, lastTime clientmodel.Timestamp, ) error // HasArchivedMetric returns whether the archived metric for the given // fingerprint exists and if yes, what the first and last timestamp in - // the corresponding series is. + // the corresponding series is. This method is goroutine-safe. HasArchivedMetric(clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) // GetFingerprintsModifiedBefore returns the fingerprints of archived - // timeseries that have live samples before the provided timestamp. + // timeseries that have live samples before the provided timestamp. This + // method is goroutine-safe (but behavior during concurrent modification + // via ArchiveMetric, UnarchiveMetric, or DropArchivedMetric is + // undefined). GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) // GetArchivedMetric retrieves the archived metric with the given - // fingerprint. + // fingerprint. This method is goroutine-safe. GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error) // DropArchivedMetric deletes an archived fingerprint and its // corresponding metric entirely. It also queues the metric for // un-indexing (no need to call UnindexMetric for the deleted metric.) + // Do not call concurrently with UnarchiveMetric or ArchiveMetric. DropArchivedMetric(clientmodel.Fingerprint) error // UnarchiveMetric deletes an archived fingerprint and its metric, but // (in contrast to DropArchivedMetric) does not un-index the metric. - // The method returns true if a metric was actually deleted. + // The method returns true if a metric was actually deleted. Do not call + // concurrently with DropArchivedMetric or ArchiveMetric. UnarchiveMetric(clientmodel.Fingerprint) (bool, error) - // Close flushes buffered data and releases any held resources. + // Close flushes the indexing queue and other buffered data and releases + // any held resources. Close() error } // A Preloader preloads series data necessary for a query into memory and pins -// them until released via Close(). +// them until released via Close(). Its methods are generally not +// goroutine-safe. type Preloader interface { PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error /* diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 37ff9b781..1543b5668 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -124,6 +124,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { return p, nil } +// GetFingerprintsForLabelPair implements persistence. func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { @@ -132,6 +133,7 @@ func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clie return fps, nil } +// GetLabelValuesForLabelName implements persistence. func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { @@ -140,25 +142,7 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) ( return lvs, nil } -func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { - var fp codable.Fingerprint - var tr codable.TimeRange - fps := []clientmodel.Fingerprint{} - p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { - if err := kv.Value(&tr); err != nil { - return err - } - if tr.First.Before(beforeTime) { - if err := kv.Key(&fp); err != nil { - return err - } - fps = append(fps, clientmodel.Fingerprint(fp)) - } - return nil - }) - return fps, nil -} - +// PersistChunk implements Persistence. func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) error { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) @@ -180,6 +164,7 @@ func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) erro return c.marshal(b) } +// LoadChunks implements Persistence. func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) { // TODO: we need to verify at some point that file length is a multiple of // the chunk size. When is the best time to do this, and where to remember @@ -278,6 +263,7 @@ func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime return cds, nil } +// PersistSeriesMapAndHeads implements Persistence. func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap) error { f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) if err != nil { @@ -340,6 +326,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap return w.Flush() } +// LoadSeriesMapAndHeads implements Persistence. func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { f, err := os.Open(p.headsPath()) if os.IsNotExist(err) { @@ -432,6 +419,7 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { return fingerprintToSeries, nil } +// DropChunks implements persistence. func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { @@ -500,8 +488,19 @@ func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fin p.indexingQueue <- indexingOp{fp, m, remove} } +// WaitForIndexing implements Persistence. +func (p *diskPersistence) WaitForIndexing() { + wait := make(chan int) + for { + p.indexingFlush <- wait + if <-wait == 0 { + break + } + } +} + +// ArchiveMetric implements Persistence. func (p *diskPersistence) ArchiveMetric( - // TODO: Two step process, make sure this happens atomically. fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { @@ -513,17 +512,7 @@ func (p *diskPersistence) ArchiveMetric( return nil } -// WaitForIndexing implements persistence. -func (p *diskPersistence) WaitForIndexing() { - wait := make(chan int) - for { - p.indexingFlush <- wait - if <-wait == 0 { - break - } - } -} - +// HasArchivedMetric implements Persistence. func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { @@ -531,13 +520,34 @@ func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( return } +// GetFingerprintsModifiedBefore implements Persistence. +func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { + var fp codable.Fingerprint + var tr codable.TimeRange + fps := []clientmodel.Fingerprint{} + p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { + if err := kv.Value(&tr); err != nil { + return err + } + if tr.First.Before(beforeTime) { + if err := kv.Key(&fp); err != nil { + return err + } + fps = append(fps, clientmodel.Fingerprint(fp)) + } + return nil + }) + return fps, nil +} + +// GetArchivedMetric implements Persistence. func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) return metric, err } +// DropArchivedMetric implements Persistence. func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { - // TODO: Multi-step process, make sure this happens atomically. metric, err := p.GetArchivedMetric(fp) if err != nil || metric == nil { return err @@ -552,8 +562,8 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { return nil } +// UnarchiveMetric implements Persistence. func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { - // TODO: Multi-step process, make sure this happens atomically. has, err := p.archivedFingerprintToTimeRange.Has(fp) if err != nil || !has { return false, err @@ -567,6 +577,7 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err return true, nil } +// Close implements Persistence. func (p *diskPersistence) Close() error { close(p.indexingQueue) <-p.indexingStopped diff --git a/storage/local/preload.go b/storage/local/preload.go index 65fc380b0..ab7c6d762 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -23,6 +23,7 @@ type memorySeriesPreloader struct { pinnedChunkDescs chunkDescs } +// PreloadRange implements Preloader. func (p *memorySeriesPreloader) PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error { cds, err := p.storage.preloadChunksForRange(fp, from, through) if err != nil { diff --git a/storage/local/series.go b/storage/local/series.go index 4957482a1..9e8401f6d 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -325,6 +325,7 @@ func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through return s.preloadChunks(pinIndexes, p) } +// memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { mtx *sync.Mutex chunkIt chunkIterator @@ -332,6 +333,9 @@ type memorySeriesIterator struct { } func (s *memorySeries) newIterator() SeriesIterator { + // TODO: Possible concurrency issue if series is modified while this is + // running. Only caller at the moment is in NewIterator() in storage.go, + // where there is no locking. chunks := make(chunks, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { if cd.chunk != nil { @@ -419,6 +423,7 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V return it.chunkIt.getValueAtTime(t) } +// GetBoundaryValues implements SeriesIterator. func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { it.mtx.Lock() defer it.mtx.Unlock() @@ -468,6 +473,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val return values } +// GetRangeValues implements SeriesIterator. func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values { it.mtx.Lock() defer it.mtx.Unlock() diff --git a/storage/local/storage.go b/storage/local/storage.go index a803aa1bd..4ea81b700 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -191,6 +191,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, return series.preloadChunksForRange(from, through, s.persistence) } +// NewIterator implements storage. func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator { s.mtx.RLock() series, ok := s.fingerprintToSeries[fp] @@ -249,7 +250,8 @@ func (s *memorySeriesStorage) handlePersistQueue() { s.persistDone <- true } -// Close stops serving, flushes all pending operations, and frees all resources. +// Close stops serving, flushes all pending operations, and frees all +// resources. It implements Storage. func (s *memorySeriesStorage) Close() error { s.mtx.Lock() defer s.mtx.Unlock() @@ -376,6 +378,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime } } +// Serve implements Storage. func (s *memorySeriesStorage) Serve(started chan<- bool) { s.mtx.Lock() if s.state != storageStarting {