Browse Source

Comment/code cleanup.

Change-Id: I38736e3d0fec79759a2bafa35aecf914480ff810
pull/413/head
Bjoern Rabenstein 10 years ago
parent
commit
e9ff29c547
  1. 68
      storage/local/interface.go
  2. 77
      storage/local/persistence.go
  3. 1
      storage/local/preload.go
  4. 6
      storage/local/series.go
  5. 5
      storage/local/storage.go

68
storage/local/interface.go

@ -46,7 +46,11 @@ type Storage interface {
Close() error
}
// SeriesIterator enables efficient access of sample values in a series.
// SeriesIterator enables efficient access of sample values in a series. All
// methods are goroutine-safe. A SeriesIterator iterates over a snapshot of a
// series, i.e. it is safe to continue using a SeriesIterator after modifying
// the corresponding series, but the iterator will represent the state of the
// series prior the modification.
type SeriesIterator interface {
// Gets the two values that are immediately adjacent to a given time. In
// case a value exist at precisely the given time, only that single
@ -62,33 +66,47 @@ type SeriesIterator interface {
}
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts.
// persistently across restarts. The methods are generally not goroutine-safe
// unless marked otherwise. The chunk-related methods PersistChunk, DropChunks,
// LoadChunks, and LoadChunkDescs can be called concurrently with each other if
// each call refers to a different fingerprint.
//
// TODO: As a Persistence is really only used within this package, consider not
// exporting it.
type Persistence interface {
// PersistChunk persists a single chunk of a series.
// PersistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify chunk concurrently.
PersistChunk(clientmodel.Fingerprint, chunk) error
// PersistSeriesMapAndHeads persists the fingerprint to memory-series
// mapping and all open (non-full) head chunks.
PersistSeriesMapAndHeads(SeriesMap) error
// DropChunks deletes all chunks from a timeseries whose last sample time is
// before beforeTime.
// DropChunks deletes all chunks from a series whose last sample time is
// before beforeTime. It returns true if all chunks of the series have
// been deleted.
DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (allDropped bool, err error)
// LoadChunks loads a group of chunks of a timeseries by their index. The
// chunk with the earliest time will have index 0, the following ones will
// have incrementally larger indexes.
LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error)
// LoadChunkDescs loads chunkDescs for a series up until a given time.
LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error)
// PersistSeriesMapAndHeads persists the fingerprint to memory-series
// mapping and all open (non-full) head chunks. It is the caller's
// responsibility to not modify SeriesMap concurrently. Do not call
// concurrently with LoadSeriesMapAndHeads.
PersistSeriesMapAndHeads(SeriesMap) error
// LoadSeriesMapAndHeads loads the fingerprint to memory-series mapping
// and all open (non-full) head chunks.
// and all open (non-full) head chunks. Do not call
// concurrently with PersistSeriesMapAndHeads.
LoadSeriesMapAndHeads() (SeriesMap, error)
// GetFingerprintsForLabelPair returns the fingerprints for the given
// label pair.
// label pair. This method is goroutine-safe but take into account that
// metrics queued for indexing with IndexMetric might not yet made it
// into the index. (Same applies correspondingly to UnindexMetric.)
GetFingerprintsForLabelPair(metric.LabelPair) (clientmodel.Fingerprints, error)
// GetLabelValuesForLabelName returns the label values for the given
// label name.
// label name. This method is goroutine-safe but take into account that
// metrics queued for indexing with IndexMetric might not yet made it
// into the index. (Same applies correspondingly to UnindexMetric.)
GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error)
// IndexMetric queues the given metric for addition to the indexes
@ -107,43 +125,51 @@ type Persistence interface {
// WaitForIndexing waits until all items in the indexing queue are
// processed. If queue processing is currently on hold (to gather more
// ops for batching), this method will trigger an immediate start of
// processing.
// processing. This method is goroutine-safe.
WaitForIndexing()
// ArchiveMetric persists the mapping of the given fingerprint to the
// given metric, together with the first and last timestamp of the
// series belonging to the metric.
// series belonging to the metric. Do not call concurrently with
// UnarchiveMetric or DropArchivedMetric.
ArchiveMetric(
fingerprint clientmodel.Fingerprint, metric clientmodel.Metric,
firstTime, lastTime clientmodel.Timestamp,
) error
// HasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in
// the corresponding series is.
// the corresponding series is. This method is goroutine-safe.
HasArchivedMetric(clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
)
// GetFingerprintsModifiedBefore returns the fingerprints of archived
// timeseries that have live samples before the provided timestamp.
// timeseries that have live samples before the provided timestamp. This
// method is goroutine-safe (but behavior during concurrent modification
// via ArchiveMetric, UnarchiveMetric, or DropArchivedMetric is
// undefined).
GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error)
// GetArchivedMetric retrieves the archived metric with the given
// fingerprint.
// fingerprint. This method is goroutine-safe.
GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error)
// DropArchivedMetric deletes an archived fingerprint and its
// corresponding metric entirely. It also queues the metric for
// un-indexing (no need to call UnindexMetric for the deleted metric.)
// Do not call concurrently with UnarchiveMetric or ArchiveMetric.
DropArchivedMetric(clientmodel.Fingerprint) error
// UnarchiveMetric deletes an archived fingerprint and its metric, but
// (in contrast to DropArchivedMetric) does not un-index the metric.
// The method returns true if a metric was actually deleted.
// The method returns true if a metric was actually deleted. Do not call
// concurrently with DropArchivedMetric or ArchiveMetric.
UnarchiveMetric(clientmodel.Fingerprint) (bool, error)
// Close flushes buffered data and releases any held resources.
// Close flushes the indexing queue and other buffered data and releases
// any held resources.
Close() error
}
// A Preloader preloads series data necessary for a query into memory and pins
// them until released via Close().
// them until released via Close(). Its methods are generally not
// goroutine-safe.
type Preloader interface {
PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error
/*

77
storage/local/persistence.go

@ -124,6 +124,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
return p, nil
}
// GetFingerprintsForLabelPair implements persistence.
func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil {
@ -132,6 +133,7 @@ func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clie
return fps, nil
}
// GetLabelValuesForLabelName implements persistence.
func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil {
@ -140,25 +142,7 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (
return lvs, nil
}
func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
var fp codable.Fingerprint
var tr codable.TimeRange
fps := []clientmodel.Fingerprint{}
p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
if err := kv.Value(&tr); err != nil {
return err
}
if tr.First.Before(beforeTime) {
if err := kv.Key(&fp); err != nil {
return err
}
fps = append(fps, clientmodel.Fingerprint(fp))
}
return nil
})
return fps, nil
}
// PersistChunk implements Persistence.
func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) error {
// 1. Open chunk file.
f, err := p.openChunkFileForWriting(fp)
@ -180,6 +164,7 @@ func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) erro
return c.marshal(b)
}
// LoadChunks implements Persistence.
func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) {
// TODO: we need to verify at some point that file length is a multiple of
// the chunk size. When is the best time to do this, and where to remember
@ -278,6 +263,7 @@ func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime
return cds, nil
}
// PersistSeriesMapAndHeads implements Persistence.
func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap) error {
f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
@ -340,6 +326,7 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
return w.Flush()
}
// LoadSeriesMapAndHeads implements Persistence.
func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
f, err := os.Open(p.headsPath())
if os.IsNotExist(err) {
@ -432,6 +419,7 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
return fingerprintToSeries, nil
}
// DropChunks implements persistence.
func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) {
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
@ -500,8 +488,19 @@ func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fin
p.indexingQueue <- indexingOp{fp, m, remove}
}
// WaitForIndexing implements Persistence.
func (p *diskPersistence) WaitForIndexing() {
wait := make(chan int)
for {
p.indexingFlush <- wait
if <-wait == 0 {
break
}
}
}
// ArchiveMetric implements Persistence.
func (p *diskPersistence) ArchiveMetric(
// TODO: Two step process, make sure this happens atomically.
fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp,
) error {
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
@ -513,17 +512,7 @@ func (p *diskPersistence) ArchiveMetric(
return nil
}
// WaitForIndexing implements persistence.
func (p *diskPersistence) WaitForIndexing() {
wait := make(chan int)
for {
p.indexingFlush <- wait
if <-wait == 0 {
break
}
}
}
// HasArchivedMetric implements Persistence.
func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
) {
@ -531,13 +520,34 @@ func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) (
return
}
// GetFingerprintsModifiedBefore implements Persistence.
func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
var fp codable.Fingerprint
var tr codable.TimeRange
fps := []clientmodel.Fingerprint{}
p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
if err := kv.Value(&tr); err != nil {
return err
}
if tr.First.Before(beforeTime) {
if err := kv.Key(&fp); err != nil {
return err
}
fps = append(fps, clientmodel.Fingerprint(fp))
}
return nil
})
return fps, nil
}
// GetArchivedMetric implements Persistence.
func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
return metric, err
}
// DropArchivedMetric implements Persistence.
func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
// TODO: Multi-step process, make sure this happens atomically.
metric, err := p.GetArchivedMetric(fp)
if err != nil || metric == nil {
return err
@ -552,8 +562,8 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
return nil
}
// UnarchiveMetric implements Persistence.
func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
// TODO: Multi-step process, make sure this happens atomically.
has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil || !has {
return false, err
@ -567,6 +577,7 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err
return true, nil
}
// Close implements Persistence.
func (p *diskPersistence) Close() error {
close(p.indexingQueue)
<-p.indexingStopped

1
storage/local/preload.go

@ -23,6 +23,7 @@ type memorySeriesPreloader struct {
pinnedChunkDescs chunkDescs
}
// PreloadRange implements Preloader.
func (p *memorySeriesPreloader) PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error {
cds, err := p.storage.preloadChunksForRange(fp, from, through)
if err != nil {

6
storage/local/series.go

@ -325,6 +325,7 @@ func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through
return s.preloadChunks(pinIndexes, p)
}
// memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct {
mtx *sync.Mutex
chunkIt chunkIterator
@ -332,6 +333,9 @@ type memorySeriesIterator struct {
}
func (s *memorySeries) newIterator() SeriesIterator {
// TODO: Possible concurrency issue if series is modified while this is
// running. Only caller at the moment is in NewIterator() in storage.go,
// where there is no locking.
chunks := make(chunks, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs {
if cd.chunk != nil {
@ -419,6 +423,7 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
return it.chunkIt.getValueAtTime(t)
}
// GetBoundaryValues implements SeriesIterator.
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
it.mtx.Lock()
defer it.mtx.Unlock()
@ -468,6 +473,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
return values
}
// GetRangeValues implements SeriesIterator.
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
it.mtx.Lock()
defer it.mtx.Unlock()

5
storage/local/storage.go

@ -191,6 +191,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint,
return series.preloadChunksForRange(from, through, s.persistence)
}
// NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.mtx.RLock()
series, ok := s.fingerprintToSeries[fp]
@ -249,7 +250,8 @@ func (s *memorySeriesStorage) handlePersistQueue() {
s.persistDone <- true
}
// Close stops serving, flushes all pending operations, and frees all resources.
// Close stops serving, flushes all pending operations, and frees all
// resources. It implements Storage.
func (s *memorySeriesStorage) Close() error {
s.mtx.Lock()
defer s.mtx.Unlock()
@ -376,6 +378,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
}
}
// Serve implements Storage.
func (s *memorySeriesStorage) Serve(started chan<- bool) {
s.mtx.Lock()
if s.state != storageStarting {

Loading…
Cancel
Save