Changes after pair code review.

Change-Id: Ib72d40f8e9027818cfbbd32a7a7201eebda07455
pull/413/head
Bjoern Rabenstein 2014-11-20 21:03:51 +01:00
parent a2feed343a
commit 14bda4180c
18 changed files with 163 additions and 201 deletions

21
main.go
View File

@ -42,28 +42,27 @@ const deletionBatchSize = 100
// Commandline flags.
var (
configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.")
configFile = flag.String("config.file", "prometheus.conf", "Prometheus configuration file name.")
alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.")
metricsStoragePath = flag.String("storage.path", "/tmp/metrics", "Base path for metrics storage.")
metricsStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
samplesQueueCapacity = flag.Int("storage.queue.samplesCapacity", 4096, "The size of the unwritten samples queue.")
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 4096, "The capacity of the queue of samples to be stored.")
numMemoryChunks = flag.Int("storage.memoryChunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.")
storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
storageDirty = flag.Bool("storage.dirty", false, "If set, the storage layer will perform crash recovery even if the last shutdown appears to be clean.")
storageDirty = flag.Bool("storage.local.dirty", false, "If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.")
notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.")
printVersion = flag.Bool("version", false, "print version information")
printVersion = flag.Bool("version", false, "Print version information.")
)
// Instrumentation.

View File

@ -47,7 +47,7 @@ const (
)
var (
deadline = flag.Duration("alertmanager.httpDeadline", 10*time.Second, "Alert manager HTTP API timeout.")
deadline = flag.Duration("alertmanager.http-deadline", 10*time.Second, "Alert manager HTTP API timeout.")
)
// A request for sending a notification to the alert manager for a single alert

View File

@ -151,8 +151,7 @@ type target struct {
lastError error
// The last time a scrape was attempted.
lastScrape time.Time
// Channel to signal RunScraper should stop, holds a channel
// to notify once stopped.
// Closing stopScraper signals that scraping should stop.
stopScraper chan struct{}
// Channel to queue base labels to be replaced.
newBaseLabels chan clientmodel.LabelSet
@ -265,7 +264,7 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
// StopScraper implements Target.
func (t *target) StopScraper() {
t.stopScraper <- struct{}{}
close(t.stopScraper)
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,application/json;schema="prometheus/telemetry";version=0.0.2;q=0.2,*/*;q=0.1`

View File

@ -149,7 +149,7 @@ func TestTargetRunScraperScrapes(t *testing.T) {
state: UNKNOWN,
address: "bad schema",
httpClient: utility.NewDeadlineClient(0),
stopScraper: make(chan bool, 1),
stopScraper: make(chan struct{}),
}
go testTarget.RunScraper(nopIngester{}, time.Duration(time.Millisecond))

View File

@ -116,28 +116,28 @@ func TestTargetPoolReplaceTargets(t *testing.T) {
oldTarget1 := &target{
address: "example1",
state: UNREACHABLE,
stopScraper: make(chan bool, 1),
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
oldTarget2 := &target{
address: "example2",
state: UNREACHABLE,
stopScraper: make(chan bool, 1),
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget1 := &target{
address: "example1",
state: ALIVE,
stopScraper: make(chan bool, 1),
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}
newTarget2 := &target{
address: "example3",
state: ALIVE,
stopScraper: make(chan bool, 1),
stopScraper: make(chan struct{}),
newBaseLabels: make(chan clientmodel.LabelSet, 1),
httpClient: &http.Client{},
}

View File

@ -29,7 +29,7 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
var stalenessDelta = flag.Duration("stalenessDelta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
// ----------------------------------------------------------------------------
// Raw data value types.

View File

@ -24,9 +24,11 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// chunkDesc contains meta-data for a chunk. Many of its methods are
// goroutine-safe proxies for chunk methods.
type chunkDesc struct {
sync.Mutex
chunk chunk
chunk chunk // nil if chunk is evicted.
refCount int
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
@ -37,11 +39,14 @@ type chunkDesc struct {
evictListElement *list.Element
}
// newChunkDesc creates a new chunkDesc pointing to the given chunk. The
// refCount of the new chunkDesc is 1.
// newChunkDesc creates a new chunkDesc pointing to the provided chunk. The
// provided chunk is assumed to be not persisted yet. Therefore, the refCount of
// the new chunkDesc is 1 (preventing eviction prior to persisting).
func newChunkDesc(c chunk) *chunkDesc {
chunkOps.WithLabelValues(createAndPin).Inc()
atomic.AddInt64(&numMemChunks, 1)
// TODO: numMemChunkDescs is actually never read except during metrics
// collection. Turn it into a real metric.
atomic.AddInt64(&numMemChunkDescs, 1)
return &chunkDesc{chunk: c, refCount: 1}
}
@ -53,6 +58,9 @@ func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
return cd.chunk.add(s)
}
// pin increments the refCount by one. Upon increment from 0 to 1, this
// chunkDesc is removed from the evict list. To enable the latter, the
// evictRequests channel has to be provided.
func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
cd.Lock()
defer cd.Unlock()
@ -64,6 +72,9 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
cd.refCount++
}
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this
// chunkDesc is added to the evict list. To enable the latter, the evictRequests
// channel has to be provided.
func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) {
cd.Lock()
defer cd.Unlock()
@ -116,8 +127,6 @@ func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
}
// setChunk points this chunkDesc to the given chunk. It panics if
// this chunkDesc already has a chunk set.
func (cd *chunkDesc) setChunk(c chunk) {
cd.Lock()
defer cd.Unlock()
@ -128,7 +137,7 @@ func (cd *chunkDesc) setChunk(c chunk) {
cd.chunk = c
}
// maybeEvict evicts the chunk if the refCount is 0. It returns wether the chunk
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
// is now evicted, which includes the case that the chunk was evicted even
// before this method was called.
func (cd *chunkDesc) maybeEvict() bool {

View File

@ -81,8 +81,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool) *deltaEncodedChunk {
}
func (c *deltaEncodedChunk) newFollowupChunk() chunk {
return newDeltaEncodedChunk(d1, d1, true)
//return newDeltaEncodedChunk(c.timeBytes(), c.valueBytes(), c.isInt())
return newDeltaEncodedChunk(d1, d0, true)
}
// clone implements chunk.
@ -170,7 +169,6 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
//fmt.Println("overflow")
overflowChunks := c.newFollowupChunk().add(s)
return []chunk{c, overflowChunks[0]}
}
@ -186,18 +184,15 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
// int->float.
// Note: Using math.Modf is slower than the conversion approach below.
if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv {
//fmt.Println("int->float", len(c.buf), cap(c.buf), dv)
return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s)
}
// float32->float64.
if !c.isInt() && vb == d4 && clientmodel.SampleValue(float32(dv)) != dv {
//fmt.Println("float32->float64", float32(dv), dv, len(c.buf), cap(c.buf))
return transcodeAndAdd(newDeltaEncodedChunk(tb, d8, false), c, s)
}
if tb < d8 || vb < d8 {
// Maybe more bytes per sample.
if ntb, nvb := neededDeltaBytes(dt, dv, c.isInt()); ntb > tb || nvb > vb {
//fmt.Printf("transcoding T: %v->%v, V: %v->%v, I: %v; len %v, cap %v\n", tb, ntb, vb, nvb, c.isInt(), len(c.buf), cap(c.buf))
ntb = max(ntb, tb)
nvb = max(nvb, vb)
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, c.isInt()), c, s)

View File

@ -35,10 +35,11 @@ const (
)
var (
fingerprintToMetricCacheSize = flag.Int("storage.fingerprintToMetricCacheSizeBytes", 25*1024*1024, "The size in bytes for the fingerprint to metric index cache.")
labelNameToLabelValuesCacheSize = flag.Int("storage.labelNameToLabelValuesCacheSizeBytes", 25*1024*1024, "The size in bytes for the label name to label values index cache.")
labelPairToFingerprintsCacheSize = flag.Int("storage.labelPairToFingerprintsCacheSizeBytes", 25*1024*1024, "The size in bytes for the label pair to fingerprints index cache.")
fingerprintTimeRangeCacheSize = flag.Int("storage.fingerprintTimeRangeCacheSizeBytes", 5*1024*1024, "The size in bytes for the metric time range index cache.")
// TODO: Tweak default values.
fingerprintToMetricCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-metric", 10*1024*1024, "The size in bytes for the fingerprint to metric index cache.")
fingerprintTimeRangeCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-timerange", 5*1024*1024, "The size in bytes for the metric time range index cache.")
labelNameToLabelValuesCacheSize = flag.Int("storage.local.index-cache-size.label-name-to-label-values", 10*1024*1024, "The size in bytes for the label name to label values index cache.")
labelPairToFingerprintsCacheSize = flag.Int("storage.local.index-cache-size.label-pair-to-fingerprints", 20*1024*1024, "The size in bytes for the label pair to fingerprints index cache.")
)
// FingerprintMetricMapping is an in-memory map of fingerprints to metrics.
@ -53,7 +54,7 @@ type FingerprintMetricIndex struct {
//
// This method is goroutine-safe, but note that no specific order of execution
// can be guaranteed (especially critical if IndexBatch and UnindexBatch are
// called concurrently).
// called concurrently for the same fingerprint).
func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := i.NewBatch()
@ -68,7 +69,7 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er
//
// This method is goroutine-safe, but note that no specific order of execution
// can be guaranteed (especially critical if IndexBatch and UnindexBatch are
// called concurrently).
// called concurrently for the same fingerprint).
func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) error {
b := i.NewBatch()
@ -137,9 +138,9 @@ func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) er
return i.Commit(batch)
}
// Lookup looks up all label values for a given label name. Looking up a
// non-existing label name is not an error. In that case, (nil, false, nil) is
// returned.
// Lookup looks up all label values for a given label name and returns them as
// clientmodel.LabelValues (which is a slice). Looking up a non-existing label
// name is not an error. In that case, (nil, false, nil) is returned.
//
// This method is goroutine-safe.
func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) {
@ -147,9 +148,9 @@ func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clie
return
}
// LookupSet looks up all label values for a given label name. Looking up a
// non-existing label name is not an error. In that case, (nil, false, nil) is
// returned.
// LookupSet looks up all label values for a given label name and returns them
// as a set. Looking up a non-existing label name is not an error. In that case,
// (nil, false, nil) is returned.
//
// This method is goroutine-safe.
func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values map[clientmodel.LabelValue]struct{}, ok bool, err error) {

View File

@ -21,7 +21,7 @@ import "encoding"
// UnmarshalBinary methods of the keys and values). For example, if you call the
// Put method of a KeyValueStore implementation, but the key or the value are
// modified concurrently while being marshaled into its binary representation,
// you obviously have a problem. Methods of KeyValueStore only return after
// you obviously have a problem. Methods of KeyValueStore return only after
// (un)marshaling is complete.
type KeyValueStore interface {
Put(key, value encoding.BinaryMarshaler) error
@ -34,11 +34,21 @@ type KeyValueStore interface {
NewBatch() Batch
Commit(b Batch) error
// ForEach iterates through the complete KeyValueStore and calls the
// supplied function for each mapping.
ForEach(func(kv KeyValueAccessor) error) error
Close() error
}
// KeyValueAccessor allows access to the key and value of an entry in a
// KeyValueStore.
type KeyValueAccessor interface {
Key(encoding.BinaryUnmarshaler) error
Value(encoding.BinaryUnmarshaler) error
}
// Batch allows KeyValueStore mutations to be pooled and committed together. An
// implementation does not have to be goroutine-safe. Never modify a Batch
// concurrently or commit the same batch multiple times concurrently. Marshaling

View File

@ -133,19 +133,22 @@ func (l *LevelDB) Commit(b Batch) error {
}
func (l *LevelDB) ForEach(cb func(kv KeyValueAccessor) error) error {
it, err := l.NewIterator(true)
snap, err := l.storage.GetSnapshot()
if err != nil {
return err
}
defer snap.Release()
defer it.Close()
iter := snap.NewIterator(keyspace, iteratorOpts)
for valid := it.SeekToFirst(); valid; valid = it.Next() {
if err = it.Error(); err != nil {
kv := &levelDBKeyValueAccessor{it: iter}
for valid := iter.First(); valid; valid = iter.Next() {
if err = iter.Error(); err != nil {
return err
}
if err := cb(it); err != nil {
if err := cb(kv); err != nil {
return err
}
}
@ -186,88 +189,15 @@ func (b *LevelDBBatch) Reset() {
b.batch.Reset()
}
// levelDBIterator implements Iterator.
type levelDBIterator struct {
// levelDBKeyValueAccessor implements KeyValueAccessor.
type levelDBKeyValueAccessor struct {
it leveldb_iterator.Iterator
}
func (i *levelDBIterator) Error() error {
return i.it.Error()
}
func (i *levelDBIterator) Valid() bool {
return i.it.Valid()
}
func (i *levelDBIterator) SeekToFirst() bool {
return i.it.First()
}
func (i *levelDBIterator) SeekToLast() bool {
return i.it.Last()
}
func (i *levelDBIterator) Seek(k encoding.BinaryMarshaler) bool {
key, err := k.MarshalBinary()
if err != nil {
panic(err)
}
return i.it.Seek(key)
}
func (i *levelDBIterator) Next() bool {
return i.it.Next()
}
func (i *levelDBIterator) Previous() bool {
return i.it.Prev()
}
func (i *levelDBIterator) Key(key encoding.BinaryUnmarshaler) error {
func (i *levelDBKeyValueAccessor) Key(key encoding.BinaryUnmarshaler) error {
return key.UnmarshalBinary(i.it.Key())
}
func (i *levelDBIterator) Value(value encoding.BinaryUnmarshaler) error {
func (i *levelDBKeyValueAccessor) Value(value encoding.BinaryUnmarshaler) error {
return value.UnmarshalBinary(i.it.Value())
}
func (*levelDBIterator) Close() error {
return nil
}
type snapshottedIterator struct {
levelDBIterator
snap *leveldb.Snapshot
}
func (i *snapshottedIterator) Close() error {
i.snap.Release()
return nil
}
// newIterator creates a new LevelDB iterator which is optionally based on a
// snapshot of the current DB state.
//
// For each of the iterator methods that have a return signature of (ok bool),
// if ok == false, the iterator may not be used any further and must be closed.
// Further work with the database requires the creation of a new iterator.
func (l *LevelDB) NewIterator(snapshotted bool) (Iterator, error) {
if !snapshotted {
return &levelDBIterator{
it: l.storage.NewIterator(keyspace, iteratorOpts),
}, nil
}
snap, err := l.storage.GetSnapshot()
if err != nil {
return nil, err
}
return &snapshottedIterator{
levelDBIterator: levelDBIterator{
it: snap.NewIterator(keyspace, iteratorOpts),
},
snap: snap,
}, nil
}

View File

@ -16,7 +16,7 @@ package local
import "github.com/prometheus/client_golang/prometheus"
// Usually, a separate file for instrumentation is frowned upon. Metrics should
// be close to where they are used. However,the metrics below are set all over
// be close to where they are used. However, the metrics below are set all over
// the place, so we go for a separate instrumentation file in this case.
var (
chunkOps = prometheus.NewCounterVec(

View File

@ -43,7 +43,7 @@ type Storage interface {
NewIterator(clientmodel.Fingerprint) SeriesIterator
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Close is called.
// until Stop is called.
Start()
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
@ -82,16 +82,6 @@ type Preloader interface {
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) error
/*
// GetMetricAtTime loads and pins samples around a given time.
GetMetricAtTime(clientmodel.Fingerprint, clientmodel.Timestamp) error
// GetMetricAtInterval loads and pins samples at intervals.
GetMetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error
// GetMetricRange loads and pins a given range of samples.
GetMetricRange(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp) error
// GetMetricRangeAtInterval loads and pins sample ranges at intervals.
GetMetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error
*/
// Close unpins any previously requested series data from memory.
Close()
}

View File

@ -39,12 +39,15 @@ import (
const (
seriesFileSuffix = ".db"
seriesTempFileSuffix = ".db.tmp"
seriesDirNameLen = 2 // How many bytes of the fingerprint in dir name.
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 1
headsMagicString = "PrometheusHeads"
dirtyFileName = "DIRTY"
fileBufSize = 1 << 16 // 64kiB.
chunkHeaderLen = 17
@ -57,6 +60,8 @@ const (
indexingQueueCapacity = 1024 * 16
)
var fpLen = len(clientmodel.Fingerprint(0).String()) // Length of a fingerprint as string.
const (
flagHeadChunkPersisted byte = 1 << iota
// Add more flags here like:
@ -101,9 +106,9 @@ type persistence struct {
indexingBatchLatency prometheus.Summary
checkpointDuration prometheus.Gauge
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty, becameDirty bool
dirtyMtx sync.Mutex // Protects dirty and becameDirty.
dirty bool // true if persistence was started in dirty state.
becameDirty bool // true if an inconsistency came up during runtime.
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
@ -225,7 +230,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
// dirtyFileName returns the name of the (empty) file used to mark the
// persistency layer as dirty.
func (p *persistence) dirtyFileName() string {
return path.Join(p.basePath, "DIRTY")
return path.Join(p.basePath, dirtyFileName)
}
// isDirty returns the dirty flag in a goroutine-safe way.
@ -252,19 +257,21 @@ func (p *persistence) setDirty(dirty bool) {
}
}
// crashRecovery is called by loadSeriesMapAndHeads if the persistence appears
// to be dirty after the loading (either because the loading resulted in an
// error or because the persistence was dirty from the start). Not goroutine
// safe. Only call before anything else is running.
func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error {
// recoverFromCrash is called by loadSeriesMapAndHeads if the persistence
// appears to be dirty after the loading (either because the loading resulted in
// an error or because the persistence was dirty from the start). Not goroutine
// safe. Only call before anything else is running (except index processing
// queue as started by newPersistence).
func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) error {
glog.Warning("Starting crash recovery. Prometheus is inoperational until complete.")
fpsSeen := map[clientmodel.Fingerprint]struct{}{}
count := 0
seriesDirNameFmt := fmt.Sprintf("0%dx", seriesDirNameLen)
glog.Info("Scanning files.")
for i := 0; i < 256; i++ {
dirname := path.Join(p.basePath, fmt.Sprintf("%02x", i))
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
@ -273,8 +280,7 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr
return err
}
defer dir.Close()
var fis []os.FileInfo
for ; err != io.EOF; fis, err = dir.Readdir(1024) {
for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) {
if err != nil {
return err
}
@ -290,7 +296,7 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr
}
}
}
glog.Infof("File scan complete. %d fingerprints found.", len(fpsSeen))
glog.Infof("File scan complete. %d series found.", len(fpsSeen))
glog.Info("Checking for series without series file.")
for fp, s := range fingerprintToSeries {
@ -311,11 +317,18 @@ func (p *persistence) crashRecovery(fingerprintToSeries map[clientmodel.Fingerpr
// If we are here, the only chunk we have is the head chunk.
// Adjust things accordingly.
if len(s.chunkDescs) > 1 || s.chunkDescsOffset != 0 {
glog.Warningf(
"Lost at least %d chunks for fingerprint %v, metric %v.",
len(s.chunkDescs)+s.chunkDescsOffset-1, fp, s.metric,
// If chunkDescsOffset is -1, this will underreport. Oh well...
)
minLostChunks := len(s.chunkDescs) + s.chunkDescsOffset - 1
if minLostChunks <= 0 {
glog.Warningf(
"Possible loss of chunks for fingerprint %v, metric %v.",
fp, s.metric,
)
} else {
glog.Warningf(
"Lost at least %d chunks for fingerprint %v, metric %v.",
minLostChunks, fp, s.metric,
)
}
s.chunkDescs = s.chunkDescs[len(s.chunkDescs)-1:]
s.chunkDescsOffset = 0
}
@ -345,12 +358,13 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
}
var fp clientmodel.Fingerprint
if len(fi.Name()) != 17 || !strings.HasSuffix(fi.Name(), ".db") {
if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) ||
!strings.HasSuffix(fi.Name(), seriesFileSuffix) {
glog.Warningf("Unexpected series file name %s.", filename)
purge()
return fp, false
}
fp.LoadFromString(path.Base(dirname) + fi.Name()[:14]) // TODO: Panics if that doesn't parse as hex.
fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) // TODO: Panics if that doesn't parse as hex.
bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen)
chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen)
@ -378,8 +392,7 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
}
s, ok := fingerprintToSeries[fp]
if ok {
// This series is supposed to not be archived.
if ok { // This series is supposed to not be archived.
if s == nil {
panic("fingerprint mapped to nil pointer")
}
@ -396,7 +409,7 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint
// unarchived one. No chunks or chunkDescs in memory, no
// current head chunk.
glog.Warningf(
"Treating recovered metric %v, fingerprint %v, as freshly unarchvied, with %d chunks in series file.",
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
s.metric, fp, chunksInFile,
)
s.chunkDescs = nil
@ -593,8 +606,8 @@ func (p *persistence) rebuildLabelIndexes(
// getFingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not yet made it into the index. (Same
// applies correspondingly to UnindexMetric.)
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil {
@ -605,8 +618,8 @@ func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmo
// getLabelValuesForLabelName returns the label values for the given label
// name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not yet made it into the index. (Same
// applies correspondingly to UnindexMetric.)
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil {
@ -616,11 +629,11 @@ func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clie
}
// persistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify chunk concurrently and to not persist or drop
// anything for the same fingerprint concurrently. It returns the (zero-based)
// index of the persisted chunk within the series file. In case of an error, the
// returned index is -1 (to avoid the misconception that the chunk was written
// at position 0).
// responsibility to not modify the chunk concurrently and to not persist or
// drop anything for the same fingerprint concurrently. It returns the
// (zero-based) index of the persisted chunk within the series file. In case of
// an error, the returned index is -1 (to avoid the misconception that the chunk
// was written at position 0).
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, error) {
// 1. Open chunk file.
f, err := p.openChunkFileForWriting(fp)
@ -896,7 +909,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) {
defer func() {
if sm != nil && p.dirty {
glog.Warning("Persistence layer appears dirty.")
err = p.crashRecovery(fingerprintToSeries)
err = p.recoverFromCrash(fingerprintToSeries)
if err != nil {
sm = nil
}
@ -1298,17 +1311,17 @@ func (p *persistence) close() error {
func (p *persistence) dirNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:2])
return path.Join(p.basePath, fpStr[0:seriesDirNameLen])
}
func (p *persistence) fileNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesFileSuffix)
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
}
func (p *persistence) tempFileNameForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String()
return path.Join(p.basePath, fpStr[0:2], fpStr[2:]+seriesTempFileSuffix)
return path.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
}
func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) {

View File

@ -428,7 +428,7 @@ loop:
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
// requests with evict==true that has happened since the last
// requests with evict==true that have happened since the last
// evict run is more than maxMemoryChunks/1000.
select {
case req := <-s.evictRequests:
@ -482,7 +482,7 @@ func (s *memorySeriesStorage) maybeEvict() {
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is wating for the
// goroutine that is supposed to empty the channel is waiting for the
// chunkDesc lock to try to evict the chunk.
go func() {
for _, cd := range chunkDescsToEvict {
@ -557,15 +557,10 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
}
}
func (s *memorySeriesStorage) loop() {
checkpointTicker := time.NewTicker(s.checkpointInterval)
defer func() {
checkpointTicker.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
// series in memory in a throttled fashion. It continues to cycle through all
// fingerprints in memory until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel.Fingerprint {
memoryFingerprints := make(chan clientmodel.Fingerprint)
go func() {
var fpIter <-chan clientmodel.Fingerprint
@ -584,7 +579,7 @@ func (s *memorySeriesStorage) loop() {
if !s.waitForNextFP(s.fpToSeries.length()) {
return
}
begun := time.Now()
begin := time.Now()
fpIter = s.fpToSeries.fpIter()
for fp := range fpIter {
select {
@ -594,10 +589,17 @@ func (s *memorySeriesStorage) loop() {
}
s.waitForNextFP(s.fpToSeries.length())
}
glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begun))
glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begin))
}
}()
return memoryFingerprints
}
// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
// for archived series in a throttled fashion. It continues to cycle through all
// archived fingerprints until s.loopStopping is closed.
func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmodel.Fingerprint {
archivedFingerprints := make(chan clientmodel.Fingerprint)
go func() {
defer close(archivedFingerprints)
@ -615,7 +617,7 @@ func (s *memorySeriesStorage) loop() {
if !s.waitForNextFP(len(archivedFPs)) {
return
}
begun := time.Now()
begin := time.Now()
for _, fp := range archivedFPs {
select {
case archivedFingerprints <- fp:
@ -624,9 +626,23 @@ func (s *memorySeriesStorage) loop() {
}
s.waitForNextFP(len(archivedFPs))
}
glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begun))
glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begin))
}
}()
return archivedFingerprints
}
func (s *memorySeriesStorage) loop() {
checkpointTicker := time.NewTicker(s.checkpointInterval)
defer func() {
checkpointTicker.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()
loop:
for {
@ -645,9 +661,9 @@ loop:
}
}
// Wait until both channels are closed.
for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-memoryFingerprints {
for _ = range memoryFingerprints {
}
for channelStillOpen := true; channelStillOpen; _, channelStillOpen = <-archivedFingerprints {
for _ = range archivedFingerprints {
}
}

View File

@ -25,7 +25,7 @@ import (
"github.com/prometheus/prometheus/rules"
)
var ruleFile = flag.String("ruleFile", "", "The path to the rule file to check.")
var ruleFile = flag.String("rule-file", "", "The path to the rule file to check.")
func main() {
flag.Parse()

View File

@ -27,8 +27,8 @@ import (
)
var (
consoleTemplatesPath = flag.String("consoleTemplates", "consoles", "Path to console template directory, available at /console")
consoleLibrariesPath = flag.String("consoleLibraries", "console_libraries", "Path to console library directory")
consoleTemplatesPath = flag.String("web.console.templates", "consoles", "Path to the console template directory, available at /console.")
consoleLibrariesPath = flag.String("web.console.libraries", "console_libraries", "Path to the console library directory.")
)
type ConsolesHandler struct {

View File

@ -35,10 +35,10 @@ import (
// Commandline flags.
var (
listenAddress = flag.String("listenAddress", ":9090", "Address to listen on for web interface.")
useLocalAssets = flag.Bool("useLocalAssets", false, "Read assets/templates from file instead of binary.")
userAssetsPath = flag.String("userAssets", "", "Path to static asset directory, available at /user")
enableQuit = flag.Bool("web.enableRemoteShutdown", false, "Enable remote service shutdown")
listenAddress = flag.String("web.listen-address", ":9090", "Address to listen on for web interface.")
useLocalAssets = flag.Bool("web.use-local-assets", false, "Read assets/templates from file instead of binary.")
userAssetsPath = flag.String("web.user-assets", "", "Path to static asset directory, available at /user.")
enableQuit = flag.Bool("web.enable-remote-shutdown", false, "Enable remote service shutdown.")
)
type WebService struct {