From 14bda4180ccdd763d2207c4eb7efb91d205f7f46 Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Thu, 20 Nov 2014 21:03:51 +0100 Subject: [PATCH] Changes after pair code review. Change-Id: Ib72d40f8e9027818cfbbd32a7a7201eebda07455 --- main.go | 21 ++++--- notification/notification.go | 2 +- retrieval/target.go | 5 +- retrieval/target_test.go | 2 +- retrieval/targetpool_test.go | 8 +-- rules/ast/ast.go | 2 +- storage/local/chunk.go | 21 +++++-- storage/local/delta.go | 7 +-- storage/local/index/index.go | 25 +++++---- storage/local/index/interface.go | 12 +++- storage/local/index/leveldb.go | 94 ++++---------------------------- storage/local/instrumentation.go | 2 +- storage/local/interface.go | 12 +--- storage/local/persistence.go | 87 ++++++++++++++++------------- storage/local/storage.go | 50 +++++++++++------ tools/rule_checker/main.go | 2 +- web/consoles.go | 4 +- web/web.go | 8 +-- 18 files changed, 163 insertions(+), 201 deletions(-) diff --git a/main.go b/main.go index 183e770bb..726aad853 100644 --- a/main.go +++ b/main.go @@ -42,28 +42,27 @@ const deletionBatchSize = 100 // Commandline flags. var ( - configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.") + configFile = flag.String("config.file", "prometheus.conf", "Prometheus configuration file name.") - alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") + alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") + notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.") - metricsStoragePath = flag.String("storage.path", "/tmp/metrics", "Base path for metrics storage.") + metricsStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") - samplesQueueCapacity = flag.Int("storage.queue.samplesCapacity", 4096, "The size of the unwritten samples queue.") + samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 4096, "The capacity of the queue of samples to be stored.") - numMemoryChunks = flag.Int("storage.memoryChunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") + numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") - storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.") + storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.") - checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") + checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.") - storageDirty = flag.Bool("storage.dirty", false, "If set, the storage layer will perform crash recovery even if the last shutdown appears to be clean.") + storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.") - notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.") - - printVersion = flag.Bool("version", false, "print version information") + printVersion = flag.Bool("version", false, "Print version information.") ) // Instrumentation. diff --git a/notification/notification.go b/notification/notification.go index df215137f..3035b0629 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -47,7 +47,7 @@ const ( ) var ( - deadline = flag.Duration("alertmanager.httpDeadline", 10*time.Second, "Alert manager HTTP API timeout.") + deadline = flag.Duration("alertmanager.http-deadline", 10*time.Second, "Alert manager HTTP API timeout.") ) // A request for sending a notification to the alert manager for a single alert diff --git a/retrieval/target.go b/retrieval/target.go index d5579bbe8..11ec36d81 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -151,8 +151,7 @@ type target struct { lastError error // The last time a scrape was attempted. lastScrape time.Time - // Channel to signal RunScraper should stop, holds a channel - // to notify once stopped. + // Closing stopScraper signals that scraping should stop. stopScraper chan struct{} // Channel to queue base labels to be replaced. newBaseLabels chan clientmodel.LabelSet @@ -265,7 +264,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration // StopScraper implements Target. func (t *target) StopScraper() { - t.stopScraper <- struct{}{} + close(t.stopScraper) } const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1` diff --git a/retrieval/target_test.go b/retrieval/target_test.go index 93238489f..de43dc963 100644 --- a/retrieval/target_test.go +++ b/retrieval/target_test.go @@ -149,7 +149,7 @@ func TestTargetRunScraperScrapes(t *testing.T) { state: UNKNOWN, address: "bad schema", httpClient: utility.NewDeadlineClient(0), - stopScraper: make(chan bool, 1), + stopScraper: make(chan struct{}), } go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond)) diff --git a/retrieval/targetpool_test.go b/retrieval/targetpool_test.go index fbe3c4104..636600add 100644 --- a/retrieval/targetpool_test.go +++ b/retrieval/targetpool_test.go @@ -116,28 +116,28 @@ func TestTargetPoolReplaceTargets(t *testing.T) { oldTarget1 := &target{ address: "example1", state: UNREACHABLE, - stopScraper: make(chan bool, 1), + stopScraper: make(chan struct{}), newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } oldTarget2 := &target{ address: "example2", state: UNREACHABLE, - stopScraper: make(chan bool, 1), + stopScraper: make(chan struct{}), newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } newTarget1 := &target{ address: "example1", state: ALIVE, - stopScraper: make(chan bool, 1), + stopScraper: make(chan struct{}), newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } newTarget2 := &target{ address: "example3", state: ALIVE, - stopScraper: make(chan bool, 1), + stopScraper: make(chan struct{}), newBaseLabels: make(chan clientmodel.LabelSet, 1), httpClient: &http.Client{}, } diff --git a/rules/ast/ast.go b/rules/ast/ast.go index 215269c51..be379f931 100644 --- a/rules/ast/ast.go +++ b/rules/ast/ast.go @@ -29,7 +29,7 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -var stalenessDelta = flag.Duration("stalenessDelta", 300*time.Second, "Staleness delta allowance during expression evaluations.") +var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") // ---------------------------------------------------------------------------- // Raw data value types. diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 0efbe0f91..c0f47d467 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -24,9 +24,11 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) +// chunkDesc contains meta-data for a chunk. Many of its methods are +// goroutine-safe proxies for chunk methods. type chunkDesc struct { sync.Mutex - chunk chunk + chunk chunk // nil if chunk is evicted. refCount int chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted. chunkLastTime clientmodel.Timestamp // Used if chunk is evicted. @@ -37,11 +39,14 @@ type chunkDesc struct { evictListElement *list.Element } -// newChunkDesc creates a new chunkDesc pointing to the given chunk. The -// refCount of the new chunkDesc is 1. +// newChunkDesc creates a new chunkDesc pointing to the provided chunk. The +// provided chunk is assumed to be not persisted yet. Therefore, the refCount of +// the new chunkDesc is 1 (preventing eviction prior to persisting). func newChunkDesc(c chunk) *chunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() atomic.AddInt64(&numMemChunks, 1) + // TODO: numMemChunkDescs is actually never read except during metrics + // collection. Turn it into a real metric. atomic.AddInt64(&numMemChunkDescs, 1) return &chunkDesc{chunk: c, refCount: 1} } @@ -53,6 +58,9 @@ func (cd *chunkDesc) add(s *metric.SamplePair) []chunk { return cd.chunk.add(s) } +// pin increments the refCount by one. Upon increment from 0 to 1, this +// chunkDesc is removed from the evict list. To enable the latter, the +// evictRequests channel has to be provided. func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -64,6 +72,9 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { cd.refCount++ } +// unpin decrements the refCount by one. Upon decrement from 1 to 0, this +// chunkDesc is added to the evict list. To enable the latter, the evictRequests +// channel has to be provided. func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -116,8 +127,6 @@ func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) } -// setChunk points this chunkDesc to the given chunk. It panics if -// this chunkDesc already has a chunk set. func (cd *chunkDesc) setChunk(c chunk) { cd.Lock() defer cd.Unlock() @@ -128,7 +137,7 @@ func (cd *chunkDesc) setChunk(c chunk) { cd.chunk = c } -// maybeEvict evicts the chunk if the refCount is 0. It returns wether the chunk +// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // is now evicted, which includes the case that the chunk was evicted even // before this method was called. func (cd *chunkDesc) maybeEvict() bool { diff --git a/storage/local/delta.go b/storage/local/delta.go index 0e9c82083..15d8d6e3a 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -81,8 +81,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk { } func (c *deltaEncodedChunk) newFollowupChunk() chunk { - return newDeltaEncodedChunk(d1, d1, true) - //return newDeltaEncodedChunk(c.timeBytes(), c.valueBytes(), c.isInt()) + return newDeltaEncodedChunk(d1, d0, true) } // clone implements chunk. @@ -170,7 +169,6 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { // Do we generally have space for another sample in this chunk? If not, // overflow into a new one. if remainingBytes < sampleSize { - //fmt.Println("overflow") overflowChunks := c.newFollowupChunk().add(s) return []chunk{c, overflowChunks[0]} } @@ -186,18 +184,15 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk { // int->float. // Note: Using math.Modf is slower than the conversion approach below. if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv { - //fmt.Println("int->float", len(c.buf), cap(c.buf), dv) return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s) } // float32->float64. if !c.isInt() && vb == d4 && clientmodel.SampleValue(float32(dv)) != dv { - //fmt.Println("float32->float64", float32(dv), dv, len(c.buf), cap(c.buf)) return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false), c, s) } if tb < d8 || vb < d8 { // Maybe more bytes per sample. if ntb, nvb := neededDeltaBytes(dt, dv, c.isInt()); ntb > tb || nvb > vb { - //fmt.Printf("transcoding T: %v->%v, V: %v->%v, I: %v; len %v, cap %v\n", tb, ntb, vb, nvb, c.isInt(), len(c.buf), cap(c.buf)) ntb = max(ntb, tb) nvb = max(nvb, vb) return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt()), c, s) diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 2aa55e89d..1c4c123d6 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -35,10 +35,11 @@ const ( ) var ( - fingerprintToMetricCacheSize = flag.Int("storage.fingerprintToMetricCacheSizeBytes", 25*1024*1024, "The size in bytes for the fingerprint to metric index cache.") - labelNameToLabelValuesCacheSize = flag.Int("storage.labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size in bytes for the label name to label values index cache.") - labelPairToFingerprintsCacheSize = flag.Int("storage.labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size in bytes for the label pair to fingerprints index cache.") - fingerprintTimeRangeCacheSize = flag.Int("storage.fingerprintTimeRangeCacheSizeBytes", 5*1024*1024, "The size in bytes for the metric time range index cache.") + // TODO: Tweak default values. + fingerprintToMetricCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-metric", 10*1024*1024, "The size in bytes for the fingerprint to metric index cache.") + fingerprintTimeRangeCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-timerange", 5*1024*1024, "The size in bytes for the metric time range index cache.") + labelNameToLabelValuesCacheSize = flag.Int("storage.local.index-cache-size.label-name-to-label-values", 10*1024*1024, "The size in bytes for the label name to label values index cache.") + labelPairToFingerprintsCacheSize = flag.Int("storage.local.index-cache-size.label-pair-to-fingerprints", 20*1024*1024, "The size in bytes for the label pair to fingerprints index cache.") ) // FingerprintMetricMapping is an in-memory map of fingerprints to metrics. @@ -53,7 +54,7 @@ type FingerprintMetricIndex struct { // // This method is goroutine-safe, but note that no specific order of execution // can be guaranteed (especially critical if IndexBatch and UnindexBatch are -// called concurrently). +// called concurrently for the same fingerprint). func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error { b := i.NewBatch() @@ -68,7 +69,7 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er // // This method is goroutine-safe, but note that no specific order of execution // can be guaranteed (especially critical if IndexBatch and UnindexBatch are -// called concurrently). +// called concurrently for the same fingerprint). func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) error { b := i.NewBatch() @@ -137,9 +138,9 @@ func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) er return i.Commit(batch) } -// Lookup looks up all label values for a given label name. Looking up a -// non-existing label name is not an error. In that case, (nil, false, nil) is -// returned. +// Lookup looks up all label values for a given label name and returns them as +// clientmodel.LabelValues (which is a slice). Looking up a non-existing label +// name is not an error. In that case, (nil, false, nil) is returned. // // This method is goroutine-safe. func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { @@ -147,9 +148,9 @@ func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clie return } -// LookupSet looks up all label values for a given label name. Looking up a -// non-existing label name is not an error. In that case, (nil, false, nil) is -// returned. +// LookupSet looks up all label values for a given label name and returns them +// as a set. Looking up a non-existing label name is not an error. In that case, +// (nil, false, nil) is returned. // // This method is goroutine-safe. func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values map[clientmodel.LabelValue]struct{}, ok bool, err error) { diff --git a/storage/local/index/interface.go b/storage/local/index/interface.go index b7820925e..c475b4d51 100644 --- a/storage/local/index/interface.go +++ b/storage/local/index/interface.go @@ -21,7 +21,7 @@ import "encoding" // UnmarshalBinary methods of the keys and values). For example, if you call the // Put method of a KeyValueStore implementation, but the key or the value are // modified concurrently while being marshaled into its binary representation, -// you obviously have a problem. Methods of KeyValueStore only return after +// you obviously have a problem. Methods of KeyValueStore return only after // (un)marshaling is complete. type KeyValueStore interface { Put(key, value encoding.BinaryMarshaler) error @@ -34,11 +34,21 @@ type KeyValueStore interface { NewBatch() Batch Commit(b Batch) error + + // ForEach iterates through the complete KeyValueStore and calls the + // supplied function for each mapping. ForEach(func(kv KeyValueAccessor) error) error Close() error } +// KeyValueAccessor allows access to the key and value of an entry in a +// KeyValueStore. +type KeyValueAccessor interface { + Key(encoding.BinaryUnmarshaler) error + Value(encoding.BinaryUnmarshaler) error +} + // Batch allows KeyValueStore mutations to be pooled and committed together. An // implementation does not have to be goroutine-safe. Never modify a Batch // concurrently or commit the same batch multiple times concurrently. Marshaling diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index 8af42803b..c082099cd 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -133,19 +133,22 @@ func (l *LevelDB) Commit(b Batch) error { } func (l *LevelDB) ForEach(cb func(kv KeyValueAccessor) error) error { - it, err := l.NewIterator(true) + snap, err := l.storage.GetSnapshot() if err != nil { return err } + defer snap.Release() - defer it.Close() + iter := snap.NewIterator(keyspace, iteratorOpts) - for valid := it.SeekToFirst(); valid; valid = it.Next() { - if err = it.Error(); err != nil { + kv := &levelDBKeyValueAccessor{it: iter} + + for valid := iter.First(); valid; valid = iter.Next() { + if err = iter.Error(); err != nil { return err } - if err := cb(it); err != nil { + if err := cb(kv); err != nil { return err } } @@ -186,88 +189,15 @@ func (b *LevelDBBatch) Reset() { b.batch.Reset() } -// levelDBIterator implements Iterator. -type levelDBIterator struct { +// levelDBKeyValueAccessor implements KeyValueAccessor. +type levelDBKeyValueAccessor struct { it leveldb_iterator.Iterator } -func (i *levelDBIterator) Error() error { - return i.it.Error() -} - -func (i *levelDBIterator) Valid() bool { - return i.it.Valid() -} - -func (i *levelDBIterator) SeekToFirst() bool { - return i.it.First() -} - -func (i *levelDBIterator) SeekToLast() bool { - return i.it.Last() -} - -func (i *levelDBIterator) Seek(k encoding.BinaryMarshaler) bool { - key, err := k.MarshalBinary() - if err != nil { - panic(err) - } - return i.it.Seek(key) -} - -func (i *levelDBIterator) Next() bool { - return i.it.Next() -} - -func (i *levelDBIterator) Previous() bool { - return i.it.Prev() -} - -func (i *levelDBIterator) Key(key encoding.BinaryUnmarshaler) error { +func (i *levelDBKeyValueAccessor) Key(key encoding.BinaryUnmarshaler) error { return key.UnmarshalBinary(i.it.Key()) } -func (i *levelDBIterator) Value(value encoding.BinaryUnmarshaler) error { +func (i *levelDBKeyValueAccessor) Value(value encoding.BinaryUnmarshaler) error { return value.UnmarshalBinary(i.it.Value()) } - -func (*levelDBIterator) Close() error { - return nil -} - -type snapshottedIterator struct { - levelDBIterator - snap *leveldb.Snapshot -} - -func (i *snapshottedIterator) Close() error { - i.snap.Release() - - return nil -} - -// newIterator creates a new LevelDB iterator which is optionally based on a -// snapshot of the current DB state. -// -// For each of the iterator methods that have a return signature of (ok bool), -// if ok == false, the iterator may not be used any further and must be closed. -// Further work with the database requires the creation of a new iterator. -func (l *LevelDB) NewIterator(snapshotted bool) (Iterator, error) { - if !snapshotted { - return &levelDBIterator{ - it: l.storage.NewIterator(keyspace, iteratorOpts), - }, nil - } - - snap, err := l.storage.GetSnapshot() - if err != nil { - return nil, err - } - - return &snapshottedIterator{ - levelDBIterator: levelDBIterator{ - it: snap.NewIterator(keyspace, iteratorOpts), - }, - snap: snap, - }, nil -} diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 0db846cd4..58f6125a2 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -16,7 +16,7 @@ package local import "github.com/prometheus/client_golang/prometheus" // Usually, a separate file for instrumentation is frowned upon. Metrics should -// be close to where they are used. However,the metrics below are set all over +// be close to where they are used. However, the metrics below are set all over // the place, so we go for a separate instrumentation file in this case. var ( chunkOps = prometheus.NewCounterVec( diff --git a/storage/local/interface.go b/storage/local/interface.go index fd28d4750..4b8b8e39e 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -43,7 +43,7 @@ type Storage interface { NewIterator(clientmodel.Fingerprint) SeriesIterator // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background - // until Close is called. + // until Stop is called. Start() // Stop shuts down the Storage gracefully, flushes all pending // operations, stops all maintenance loops,and frees all resources. @@ -82,16 +82,6 @@ type Preloader interface { from clientmodel.Timestamp, through clientmodel.Timestamp, stalenessDelta time.Duration, ) error - /* - // GetMetricAtTime loads and pins samples around a given time. - GetMetricAtTime(clientmodel.Fingerprint, clientmodel.Timestamp) error - // GetMetricAtInterval loads and pins samples at intervals. - GetMetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error - // GetMetricRange loads and pins a given range of samples. - GetMetricRange(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp) error - // GetMetricRangeAtInterval loads and pins sample ranges at intervals. - GetMetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error - */ // Close unpins any previously requested series data from memory. Close() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 328f4b6a8..513bf726b 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -39,12 +39,15 @@ import ( const ( seriesFileSuffix = ".db" seriesTempFileSuffix = ".db.tmp" + seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name. headsFileName = "heads.db" headsTempFileName = "heads.db.tmp" headsFormatVersion = 1 headsMagicString = "PrometheusHeads" + dirtyFileName = "DIRTY" + fileBufSize = 1 << 16 // 64kiB. chunkHeaderLen = 17 @@ -57,6 +60,8 @@ const ( indexingQueueCapacity = 1024 * 16 ) +var fpLen = len(clientmodel.Fingerprint(0).String()) // Length of a fingerprint as string. + const ( flagHeadChunkPersisted byte = 1 << iota // Add more flags here like: @@ -101,9 +106,9 @@ type persistence struct { indexingBatchLatency prometheus.Summary checkpointDuration prometheus.Gauge - dirtyMtx sync.Mutex // Protects dirty and becameDirty. - - dirty, becameDirty bool + dirtyMtx sync.Mutex // Protects dirty and becameDirty. + dirty bool // true if persistence was started in dirty state. + becameDirty bool // true if an inconsistency came up during runtime. } // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use. @@ -225,7 +230,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) { // dirtyFileName returns the name of the (empty) file used to mark the // persistency layer as dirty. func (p *persistence) dirtyFileName() string { - return path.Join(p.basePath, "DIRTY") + return path.Join(p.basePath, dirtyFileName) } // isDirty returns the dirty flag in a goroutine-safe way. @@ -252,19 +257,21 @@ func (p *persistence) setDirty(dirty bool) { } } -// crashRecovery is called by loadSeriesMapAndHeads if the persistence appears -// to be dirty after the loading (either because the loading resulted in an -// error or because the persistence was dirty from the start). Not goroutine -// safe. Only call before anything else is running. -func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error { +// recoverFromCrash is called by loadSeriesMapAndHeads if the persistence +// appears to be dirty after the loading (either because the loading resulted in +// an error or because the persistence was dirty from the start). Not goroutine +// safe. Only call before anything else is running (except index processing +// queue as started by newPersistence). +func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error { glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.") fpsSeen := map[clientmodel.Fingerprint]struct{}{} count := 0 + seriesDirNameFmt := fmt.Sprintf("0%dx", seriesDirNameLen) glog.Info("Scanning files.") - for i := 0; i < 256; i++ { - dirname := path.Join(p.basePath, fmt.Sprintf("%02x", i)) + for i := 0; i < 1<<(seriesDirNameLen*4); i++ { + dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i)) dir, err := os.Open(dirname) if os.IsNotExist(err) { continue @@ -273,8 +280,7 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr return err } defer dir.Close() - var fis []os.FileInfo - for ; err != io.EOF; fis, err = dir.Readdir(1024) { + for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) { if err != nil { return err } @@ -290,7 +296,7 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr } } } - glog.Infof("File scan complete. %d fingerprints found.", len(fpsSeen)) + glog.Infof("File scan complete. %d series found.", len(fpsSeen)) glog.Info("Checking for series without series file.") for fp, s := range fingerprintToSeries { @@ -311,11 +317,18 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr // If we are here, the only chunk we have is the head chunk. // Adjust things accordingly. if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 { - glog.Warningf( - "Lost at least %d chunks for fingerprint %v, metric %v.", - len(s.chunkDescs)+s.chunkDescsOffset-1, fp, s.metric, - // If chunkDescsOffset is -1, this will underreport. Oh well... - ) + minLostChunks := len(s.chunkDescs) + s.chunkDescsOffset - 1 + if minLostChunks <= 0 { + glog.Warningf( + "Possible loss of chunks for fingerprint %v, metric %v.", + fp, s.metric, + ) + } else { + glog.Warningf( + "Lost at least %d chunks for fingerprint %v, metric %v.", + minLostChunks, fp, s.metric, + ) + } s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:] s.chunkDescsOffset = 0 } @@ -345,12 +358,13 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint } var fp clientmodel.Fingerprint - if len(fi.Name()) != 17 || !strings.HasSuffix(fi.Name(), ".db") { + if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) || + !strings.HasSuffix(fi.Name(), seriesFileSuffix) { glog.Warningf("Unexpected series file name %s.", filename) purge() return fp, false } - fp.LoadFromString(path.Base(dirname) + fi.Name()[:14]) // TODO: Panics if that doesn't parse as hex. + fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) // TODO: Panics if that doesn't parse as hex. bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) @@ -378,8 +392,7 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint } s, ok := fingerprintToSeries[fp] - if ok { - // This series is supposed to not be archived. + if ok { // This series is supposed to not be archived. if s == nil { panic("fingerprint mapped to nil pointer") } @@ -396,7 +409,7 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint // unarchived one. No chunks or chunkDescs in memory, no // current head chunk. glog.Warningf( - "Treating recovered metric %v, fingerprint %v, as freshly unarchvied, with %d chunks in series file.", + "Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.", s.metric, fp, chunksInFile, ) s.chunkDescs = nil @@ -593,8 +606,8 @@ func (p *persistence) rebuildLabelIndexes( // getFingerprintsForLabelPair returns the fingerprints for the given label // pair. This method is goroutine-safe but take into account that metrics queued -// for indexing with IndexMetric might not yet made it into the index. (Same -// applies correspondingly to UnindexMetric.) +// for indexing with IndexMetric might not have made it into the index +// yet. (Same applies correspondingly to UnindexMetric.) func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { fps, _, err := p.labelPairToFingerprints.Lookup(lp) if err != nil { @@ -605,8 +618,8 @@ func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmo // getLabelValuesForLabelName returns the label values for the given label // name. This method is goroutine-safe but take into account that metrics queued -// for indexing with IndexMetric might not yet made it into the index. (Same -// applies correspondingly to UnindexMetric.) +// for indexing with IndexMetric might not have made it into the index +// yet. (Same applies correspondingly to UnindexMetric.) func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { @@ -616,11 +629,11 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie } // persistChunk persists a single chunk of a series. It is the caller's -// responsibility to not modify chunk concurrently and to not persist or drop -// anything for the same fingerprint concurrently. It returns the (zero-based) -// index of the persisted chunk within the series file. In case of an error, the -// returned index is -1 (to avoid the misconception that the chunk was written -// at position 0). +// responsibility to not modify the chunk concurrently and to not persist or +// drop anything for the same fingerprint concurrently. It returns the +// (zero-based) index of the persisted chunk within the series file. In case of +// an error, the returned index is -1 (to avoid the misconception that the chunk +// was written at position 0). func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) { // 1. Open chunk file. f, err := p.openChunkFileForWriting(fp) @@ -896,7 +909,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { defer func() { if sm != nil && p.dirty { glog.Warning("Persistence layer appears dirty.") - err = p.crashRecovery(fingerprintToSeries) + err = p.recoverFromCrash(fingerprintToSeries) if err != nil { sm = nil } @@ -1298,17 +1311,17 @@ func (p *persistence) close() error { func (p *persistence) dirNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:2]) + return path.Join(p.basePath, fpStr[0:seriesDirNameLen]) } func (p *persistence) fileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesFileSuffix) + return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix) } func (p *persistence) tempFileNameForFingerprint(fp clientmodel.Fingerprint) string { fpStr := fp.String() - return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesTempFileSuffix) + return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix) } func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { diff --git a/storage/local/storage.go b/storage/local/storage.go index 413f28ec2..20d807083 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -428,7 +428,7 @@ loop: for { // To batch up evictions a bit, this tries evictions at least // once per evict interval, but earlier if the number of evict - // requests with evict==true that has happened since the last + // requests with evict==true that have happened since the last // evict run is more than maxMemoryChunks/1000. select { case req := <-s.evictRequests: @@ -482,7 +482,7 @@ func (s *memorySeriesStorage) maybeEvict() { // currently locked the chunk and tries to send the evict request (to // remove the chunk from the evict list) to the evictRequests // channel. The send blocks because evictRequests is full. However, the - // goroutine that is supposed to empty the channel is wating for the + // goroutine that is supposed to empty the channel is waiting for the // chunkDesc lock to try to evict the chunk. go func() { for _, cd := range chunkDescsToEvict { @@ -557,15 +557,10 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { } } -func (s *memorySeriesStorage) loop() { - checkpointTicker := time.NewTicker(s.checkpointInterval) - - defer func() { - checkpointTicker.Stop() - glog.Info("Maintenance loop stopped.") - close(s.loopStopped) - }() - +// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for +// series in memory in a throttled fashion. It continues to cycle through all +// fingerprints in memory until s.loopStopping is closed. +func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.Fingerprint { memoryFingerprints := make(chan clientmodel.Fingerprint) go func() { var fpIter <-chan clientmodel.Fingerprint @@ -584,7 +579,7 @@ func (s *memorySeriesStorage) loop() { if !s.waitForNextFP(s.fpToSeries.length()) { return } - begun := time.Now() + begin := time.Now() fpIter = s.fpToSeries.fpIter() for fp := range fpIter { select { @@ -594,10 +589,17 @@ func (s *memorySeriesStorage) loop() { } s.waitForNextFP(s.fpToSeries.length()) } - glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begun)) + glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begin)) } }() + return memoryFingerprints +} + +// cycleThroughArchivedFingerprints returns a channel that emits fingerprints +// for archived series in a throttled fashion. It continues to cycle through all +// archived fingerprints until s.loopStopping is closed. +func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmodel.Fingerprint { archivedFingerprints := make(chan clientmodel.Fingerprint) go func() { defer close(archivedFingerprints) @@ -615,7 +617,7 @@ func (s *memorySeriesStorage) loop() { if !s.waitForNextFP(len(archivedFPs)) { return } - begun := time.Now() + begin := time.Now() for _, fp := range archivedFPs { select { case archivedFingerprints <- fp: @@ -624,9 +626,23 @@ func (s *memorySeriesStorage) loop() { } s.waitForNextFP(len(archivedFPs)) } - glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begun)) + glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begin)) } }() + return archivedFingerprints +} + +func (s *memorySeriesStorage) loop() { + checkpointTicker := time.NewTicker(s.checkpointInterval) + + defer func() { + checkpointTicker.Stop() + glog.Info("Maintenance loop stopped.") + close(s.loopStopped) + }() + + memoryFingerprints := s.cycleThroughMemoryFingerprints() + archivedFingerprints := s.cycleThroughArchivedFingerprints() loop: for { @@ -645,9 +661,9 @@ loop: } } // Wait until both channels are closed. - for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-memoryFingerprints { + for _ = range memoryFingerprints { } - for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-archivedFingerprints { + for _ = range archivedFingerprints { } } diff --git a/tools/rule_checker/main.go b/tools/rule_checker/main.go index 35d25a227..c7d92644c 100644 --- a/tools/rule_checker/main.go +++ b/tools/rule_checker/main.go @@ -25,7 +25,7 @@ import ( "github.com/prometheus/prometheus/rules" ) -var ruleFile = flag.String("ruleFile", "", "The path to the rule file to check.") +var ruleFile = flag.String("rule-file", "", "The path to the rule file to check.") func main() { flag.Parse() diff --git a/web/consoles.go b/web/consoles.go index d3fe9da9a..fab4a2a63 100644 --- a/web/consoles.go +++ b/web/consoles.go @@ -27,8 +27,8 @@ import ( ) var ( - consoleTemplatesPath = flag.String("consoleTemplates", "consoles", "Path to console template directory, available at /console") - consoleLibrariesPath = flag.String("consoleLibraries", "console_libraries", "Path to console library directory") + consoleTemplatesPath = flag.String("web.console.templates", "consoles", "Path to the console template directory, available at /console.") + consoleLibrariesPath = flag.String("web.console.libraries", "console_libraries", "Path to the console library directory.") ) type ConsolesHandler struct { diff --git a/web/web.go b/web/web.go index 2a8321a9b..5b0bdd10c 100644 --- a/web/web.go +++ b/web/web.go @@ -35,10 +35,10 @@ import ( // Commandline flags. var ( - listenAddress = flag.String("listenAddress", ":9090", "Address to listen on for web interface.") - useLocalAssets = flag.Bool("useLocalAssets", false, "Read assets/templates from file instead of binary.") - userAssetsPath = flag.String("userAssets", "", "Path to static asset directory, available at /user") - enableQuit = flag.Bool("web.enableRemoteShutdown", false, "Enable remote service shutdown") + listenAddress = flag.String("web.listen-address", ":9090", "Address to listen on for web interface.") + useLocalAssets = flag.Bool("web.use-local-assets", false, "Read assets/templates from file instead of binary.") + userAssetsPath = flag.String("web.user-assets", "", "Path to static asset directory, available at /user.") + enableQuit = flag.Bool("web.enable-remote-shutdown", false, "Enable remote service shutdown.") ) type WebService struct {