Browse Source

storage: Make MemorySeriesStorage a public type

See https://twitter.com/fabxc/status/748032597876482048
pull/1771/head
Julius Volz 9 years ago
parent
commit
91401794fa
  1. 2
      storage/local/chunk.go
  2. 2
      storage/local/instrumentation.go
  3. 4
      storage/local/preload.go
  4. 6
      storage/local/series.go
  5. 78
      storage/local/storage.go
  6. 4
      storage/local/storage_test.go
  7. 2
      storage/local/test_helpers.go

2
storage/local/chunk.go

@ -97,7 +97,7 @@ type chunkDesc struct {
// evictListElement is nil if the chunk is not in the evict list.
// evictListElement is _not_ protected by the chunkDesc mutex.
// It must only be touched by the evict list handler in memorySeriesStorage.
// It must only be touched by the evict list handler in MemorySeriesStorage.
evictListElement *list.Element
}

2
storage/local/instrumentation.go

@ -98,7 +98,7 @@ func init() {
var (
// Global counter, also used internally, so not implemented as
// metrics. Collected in memorySeriesStorage.Collect.
// metrics. Collected in MemorySeriesStorage.Collect.
// TODO(beorn7): As it is used internally, it is actually very bad style
// to have it as a global variable.
numMemChunks int64

4
storage/local/preload.go

@ -19,9 +19,9 @@ import (
"github.com/prometheus/common/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage.
// memorySeriesPreloader is a Preloader for the MemorySeriesStorage.
type memorySeriesPreloader struct {
storage *memorySeriesStorage
storage *MemorySeriesStorage
pinnedChunkDescs []*chunkDesc
}

6
storage/local/series.go

@ -339,7 +339,7 @@ func (s *memorySeries) dropChunks(t model.Time) error {
// preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks(
indexes []int, fp model.Fingerprint, mss *memorySeriesStorage,
indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
@ -412,7 +412,7 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun
func (s *memorySeries) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
mss *memorySeriesStorage,
mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have a lastSamplePair in the series, and thas last samplePair
// is in the interval, just take it in a singleSampleSeriesIterator. No
@ -437,7 +437,7 @@ func (s *memorySeries) preloadChunksForInstant(
func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
mss *memorySeriesStorage,
mss *MemorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 {

78
storage/local/storage.go

@ -128,7 +128,7 @@ const (
// synced or not. It does not need to be goroutine safe.
type syncStrategy func() bool
type memorySeriesStorage struct {
type MemorySeriesStorage struct {
// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.
archiveHighWatermark model.Time // No archived series has samples after this time.
numChunksToPersist int64 // The number of chunks waiting for persistence.
@ -189,8 +189,8 @@ type MemorySeriesStorageOptions struct {
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage {
s := &memorySeriesStorage{
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage {
s := &MemorySeriesStorage{
fpLocker: newFingerprintLocker(o.NumMutexes),
options: o,
@ -303,7 +303,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage
}
// Start implements Storage.
func (s *memorySeriesStorage) Start() (err error) {
func (s *MemorySeriesStorage) Start() (err error) {
var syncStrategy syncStrategy
switch s.options.SyncStrategy {
case Never:
@ -360,7 +360,7 @@ func (s *memorySeriesStorage) Start() (err error) {
}
// Stop implements Storage.
func (s *memorySeriesStorage) Stop() error {
func (s *MemorySeriesStorage) Stop() error {
log.Info("Stopping local storage...")
log.Info("Stopping maintenance loop...")
@ -391,12 +391,12 @@ func (s *memorySeriesStorage) Stop() error {
}
// WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() {
func (s *MemorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing()
}
// LastSampleForFingerprint implements Storage.
func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample {
func (s *MemorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -439,7 +439,7 @@ func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.Sample
}
// NewPreloader implements Storage.
func (s *memorySeriesStorage) NewPreloader() Preloader {
func (s *MemorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
@ -447,7 +447,7 @@ func (s *memorySeriesStorage) NewPreloader() Preloader {
// fingerprintsForLabelPairs returns the set of fingerprints that have the given labels.
// This does not work with empty label values.
func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair) map[model.Fingerprint]struct{} {
func (s *MemorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair) map[model.Fingerprint]struct{} {
var result map[model.Fingerprint]struct{}
for _, pair := range pairs {
intersection := map[model.Fingerprint]struct{}{}
@ -469,7 +469,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair
}
// MetricsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) MetricsForLabelMatchers(
func (s *MemorySeriesStorage) MetricsForLabelMatchers(
from, through model.Time,
matchers ...*metric.LabelMatcher,
) map[model.Fingerprint]metric.Metric {
@ -550,7 +550,7 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers(
// 'through', it returns (metric, nil, true).
//
// The caller must have locked the fp.
func (s *memorySeriesStorage) metricForRange(
func (s *MemorySeriesStorage) metricForRange(
fp model.Fingerprint,
from, through model.Time,
) (model.Metric, *memorySeries, bool) {
@ -589,12 +589,12 @@ func (s *memorySeriesStorage) metricForRange(
}
// LabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues {
return s.persistence.labelValuesForLabelName(labelName)
}
// DropMetric implements Storage.
func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) {
func (s *MemorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) {
for _, fp := range fps {
s.purgeSeries(fp, nil, nil)
}
@ -612,7 +612,7 @@ var (
)
// Append implements Storage.
func (s *memorySeriesStorage) Append(sample *model.Sample) error {
func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
for ln, lv := range sample.Metric {
if len(lv) == 0 {
delete(sample.Metric, ln)
@ -666,7 +666,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error {
}
// NeedsThrottling implements Storage.
func (s *memorySeriesStorage) NeedsThrottling() bool {
func (s *MemorySeriesStorage) NeedsThrottling() bool {
if s.getNumChunksToPersist() > s.maxChunksToPersist ||
float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks {
select {
@ -688,7 +688,7 @@ func (s *memorySeriesStorage) NeedsThrottling() bool {
// no signal has arrived for a minute, an Info is logged that the storage is not
// throttled anymore. This resets things to the initial state, i.e. once a
// signal arrives again, the Error will be logged again.
func (s *memorySeriesStorage) logThrottling() {
func (s *MemorySeriesStorage) logThrottling() {
timer := time.NewTimer(time.Minute)
timer.Stop()
@ -719,7 +719,7 @@ func (s *memorySeriesStorage) logThrottling() {
}
}
func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp)
if !ok {
var cds []*chunkDesc
@ -761,7 +761,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
// seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
//
// The caller must have locked the fp.
func (s *memorySeriesStorage) seriesForRange(
func (s *MemorySeriesStorage) seriesForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) *memorySeries {
@ -776,7 +776,7 @@ func (s *memorySeriesStorage) seriesForRange(
return series
}
func (s *memorySeriesStorage) preloadChunksForRange(
func (s *MemorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) {
@ -795,7 +795,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
return cds, iter
}
func (s *memorySeriesStorage) preloadChunksForInstant(
func (s *MemorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator) {
@ -814,7 +814,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant(
return cds, iter
}
func (s *memorySeriesStorage) handleEvictList() {
func (s *MemorySeriesStorage) handleEvictList() {
ticker := time.NewTicker(maxEvictInterval)
count := 0
@ -859,7 +859,7 @@ func (s *memorySeriesStorage) handleEvictList() {
}
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func (s *memorySeriesStorage) maybeEvict() {
func (s *MemorySeriesStorage) maybeEvict() {
numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks
if numChunksToEvict <= 0 {
return
@ -911,7 +911,7 @@ func (s *memorySeriesStorage) maybeEvict() {
//
// Normally, the method returns true once the wait duration has passed. However,
// if s.loopStopped is closed, it will return false immediately.
func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool {
func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool {
d := fpMaxWaitDuration
if numberOfFPs != 0 {
sweepTime := s.dropAfter / 10
@ -938,7 +938,7 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFact
// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
// series in memory in a throttled fashion. It continues to cycle through all
// fingerprints in memory until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
memoryFingerprints := make(chan model.Fingerprint)
go func() {
var fpIter <-chan model.Fingerprint
@ -985,7 +985,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger
// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
// for archived series in a throttled fashion. It continues to cycle through all
// archived fingerprints until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint {
func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint {
archivedFingerprints := make(chan model.Fingerprint)
go func() {
defer close(archivedFingerprints)
@ -1024,7 +1024,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing
return archivedFingerprints
}
func (s *memorySeriesStorage) loop() {
func (s *MemorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
dirtySeriesCount := 0
@ -1114,7 +1114,7 @@ loop:
// case, it archives the series and returns true.
//
// Finally, it evicts chunkDescs if there are too many.
func (s *memorySeriesStorage) maintainMemorySeries(
func (s *MemorySeriesStorage) maintainMemorySeries(
fp model.Fingerprint, beforeTime model.Time,
) (becameDirty bool) {
defer func(begin time.Time) {
@ -1187,7 +1187,7 @@ func (s *memorySeriesStorage) maintainMemorySeries(
// case, the method returns true.
//
// The caller must have locked the fp.
func (s *memorySeriesStorage) writeMemorySeries(
func (s *MemorySeriesStorage) writeMemorySeries(
fp model.Fingerprint, series *memorySeries, beforeTime model.Time,
) bool {
var (
@ -1269,7 +1269,7 @@ func (s *memorySeriesStorage) writeMemorySeries(
// maintainArchivedSeries drops chunks older than beforeTime from an archived
// series. If the series contains no chunks after that, it is purged entirely.
func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) {
func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe(
float64(time.Since(begin)) / float64(time.Second),
@ -1302,23 +1302,23 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor
}
// See persistence.loadChunks for detailed explanation.
func (s *memorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
return s.persistence.loadChunks(fp, indexes, indexOffset)
}
// See persistence.loadChunkDescs for detailed explanation.
func (s *memorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) {
func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) {
return s.persistence.loadChunkDescs(fp, offsetFromEnd)
}
// getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way.
func (s *memorySeriesStorage) getNumChunksToPersist() int {
func (s *MemorySeriesStorage) getNumChunksToPersist() int {
return int(atomic.LoadInt64(&s.numChunksToPersist))
}
// incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a
// negative 'by' to decrement.
func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
atomic.AddInt64(&s.numChunksToPersist, int64(by))
}
@ -1350,7 +1350,7 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) {
// checkpointing based on dirty-series count should be disabled, and series
// files should not by synced anymore provided the user has specified the
// adaptive sync strategy.
func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
s.rushedMtx.Lock()
defer s.rushedMtx.Unlock()
@ -1416,7 +1416,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 {
// and all its traces are removed from indices. Call this method if an
// unrecoverable error is detected while dealing with a series, and pass in the
// encountered error. It will be saved as a hint in the orphaned directory.
func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
func (s *MemorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
req := quarantineRequest{fp: fp, metric: metric, reason: err}
select {
case s.quarantineRequests <- req:
@ -1431,7 +1431,7 @@ func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric mode
}
}
func (s *memorySeriesStorage) handleQuarantine() {
func (s *MemorySeriesStorage) handleQuarantine() {
for {
select {
case req := <-s.quarantineRequests:
@ -1454,7 +1454,7 @@ func (s *memorySeriesStorage) handleQuarantine() {
// provided, the series file will not be deleted completely, but moved to the
// orphaned directory with the reason and the metric in a hint file. The
// provided metric might be nil if unknown.
func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
s.fpLocker.Lock(fp)
var (
@ -1518,7 +1518,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric,
}
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)
s.mapper.Describe(ch)
@ -1537,7 +1537,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
}
// Collect implements prometheus.Collector.
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch)
s.mapper.Collect(ch)

4
storage/local/storage_test.go

@ -1678,7 +1678,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples {
return result
}
func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool {
func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Samples) bool {
s.WaitForIndexing()
result := true
for _, i := range rand.Perm(len(samples)) {
@ -1709,7 +1709,7 @@ func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Sam
return result
}
func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool {
func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model.Samples) bool {
s.WaitForIndexing()
var (
result = true

2
storage/local/test_helpers.go

@ -40,7 +40,7 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, testutil.Closer) {
func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, testutil.Closer) {
DefaultChunkEncoding = encoding
directory := testutil.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{

Loading…
Cancel
Save