Merge "Snapshot of no more frontier."

changes/14/14/1
Matt T. Proud 2013-08-23 17:18:27 +02:00 committed by Gerrit Code Review
commit 6d1605667d
6 changed files with 195 additions and 358 deletions

View File

@ -89,7 +89,7 @@ $(FULL_GOPATH):
-[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; }
[ -d "$(FULL_GOPATH)" ]
test: build
test: config dependencies model preparation tools web
$(GO) test $(GO_TEST_FLAGS) ./...
tools: dependencies preparation

View File

@ -20,6 +20,7 @@ import (
"time"
"code.google.com/p/goprotobuf/proto"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
@ -70,9 +71,6 @@ type Curator struct {
type watermarkScanner struct {
// curationState is the data store for curation remarks.
curationState CurationRemarker
// diskFrontier models the available seekable ranges for the provided
// sampleIterator.
diskFrontier *diskFrontier
// ignoreYoungerThan is passed into the curation remark for the given series.
ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the
@ -89,6 +87,8 @@ type watermarkScanner struct {
stop chan bool
// status is the outbound channel for notifying the status page of its state.
status CurationStateUpdater
firstBlock, lastBlock *SampleKey
}
// run facilitates the curation lifecycle.
@ -119,14 +119,19 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
iterator := samples.NewIterator(true)
defer iterator.Close()
diskFrontier, present, err := newDiskFrontier(iterator)
if err != nil {
if !iterator.SeekToLast() {
glog.Info("Empty database; skipping curation.")
return
}
if !present {
// No sample database exists; no work to do!
lastBlock, _ := extractSampleKey(iterator)
if !iterator.SeekToFirst() {
glog.Info("Empty database; skipping curation.")
return
}
firstBlock, _ := extractSampleKey(iterator)
scanner := &watermarkScanner{
curationState: curationState,
@ -136,9 +141,11 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
stop: c.Stop,
stopAt: instant.Add(-1 * ignoreYoungerThan),
diskFrontier: diskFrontier,
sampleIterator: iterator,
samples: samples,
firstBlock: firstBlock,
lastBlock: lastBlock,
}
// Right now, the ability to stop a curation is limited to the beginning of
@ -271,12 +278,11 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
fingerprint := key.(*clientmodel.Fingerprint)
seriesFrontier, present, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
if err != nil || !present {
// An anomaly with the series frontier is severe in the sense that some sort
// of an illegal state condition exists in the storage layer, which would
// probably signify an illegal disk frontier.
return &storage.OperatorError{error: err, Continuable: false}
if fingerprint.Less(w.firstBlock.Fingerprint) {
return nil
}
if w.lastBlock.Fingerprint.Less(fingerprint) {
return nil
}
curationState, present, err := w.curationState.Get(&curationKey{
@ -285,7 +291,6 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
})
if err != nil {
// An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop
@ -293,23 +298,21 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
// work forward. With an idempotent processor, this is safe.
return &storage.OperatorError{error: err, Continuable: true}
}
var firstSeek time.Time
switch {
case !present, seriesFrontier.After(curationState):
firstSeek = seriesFrontier.firstSupertime
case !seriesFrontier.InSafeSeekRange(curationState):
firstSeek = seriesFrontier.lastSupertime
default:
firstSeek = curationState
}
startKey := &SampleKey{
keySet := &SampleKey{
Fingerprint: fingerprint,
FirstTimestamp: firstSeek,
}
dto := new(dto.SampleKey)
startKey.Dump(dto)
if !present && fingerprint.Equal(w.firstBlock.Fingerprint) {
// If the fingerprint is the same, then we simply need to use the earliest
// block found in the database.
*keySet = *w.firstBlock
} else if present {
keySet.FirstTimestamp = curationState
}
dto := new(dto.SampleKey)
keySet.Dump(dto)
prospectiveKey := coding.NewPBEncoder(dto).MustEncode()
if !w.sampleIterator.Seek(prospectiveKey) {
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
@ -317,25 +320,41 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
}
newestAllowedSample := w.stopAt
if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
newestAllowedSample = seriesFrontier.lastSupertime
for {
sampleKey, err := extractSampleKey(w.sampleIterator)
if err != nil {
return
}
if !sampleKey.Fingerprint.Equal(fingerprint) {
return
}
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
if !present {
break
}
if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) {
break
}
if !w.sampleIterator.Next() {
return
}
}
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
if err != nil {
// We can't divine the severity of a processor error without refactoring the
// interface.
return &storage.OperatorError{error: err, Continuable: false}
}
err = w.curationState.Update(&curationKey{
if err = w.curationState.Update(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}, lastTime)
if err != nil {
}, lastTime); err != nil {
// Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause
// to cease further progress.

View File

@ -1,196 +0,0 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/storage/raw/leveldb"
dto "github.com/prometheus/prometheus/model/generated"
)
// diskFrontier describes an on-disk store of series to provide a
// representation of the known keyspace and time series values available.
//
// This is used to reduce the burden associated with LevelDB iterator
// management.
type diskFrontier struct {
firstFingerprint *clientmodel.Fingerprint
firstSupertime time.Time
lastFingerprint *clientmodel.Fingerprint
lastSupertime time.Time
}
func (f diskFrontier) String() string {
return fmt.Sprintf("diskFrontier from %s at %s to %s at %s", f.firstFingerprint, f.firstSupertime, f.lastFingerprint, f.lastSupertime)
}
func (f *diskFrontier) ContainsFingerprint(fingerprint *clientmodel.Fingerprint) bool {
return !(fingerprint.Less(f.firstFingerprint) || f.lastFingerprint.Less(fingerprint))
}
func newDiskFrontier(i leveldb.Iterator) (d *diskFrontier, present bool, err error) {
if !i.SeekToLast() || i.Key() == nil {
return nil, false, nil
}
lastKey, err := extractSampleKey(i)
if err != nil {
return nil, false, err
}
if !i.SeekToFirst() || i.Key() == nil {
return nil, false, nil
}
firstKey, err := extractSampleKey(i)
if err != nil {
return nil, false, err
}
return &diskFrontier{
firstFingerprint: firstKey.Fingerprint,
firstSupertime: firstKey.FirstTimestamp,
lastFingerprint: lastKey.Fingerprint,
lastSupertime: lastKey.FirstTimestamp,
}, true, nil
}
// seriesFrontier represents the valid seek frontier for a given series.
type seriesFrontier struct {
firstSupertime time.Time
lastSupertime time.Time
lastTime time.Time
}
func (f *seriesFrontier) String() string {
return fmt.Sprintf("seriesFrontier from %s to %s at %s", f.firstSupertime, f.lastSupertime, f.lastTime)
}
// newSeriesFrontier furnishes a populated diskFrontier for a given
// fingerprint. If the series is absent, present will be false.
func newSeriesFrontier(f *clientmodel.Fingerprint, d *diskFrontier, i leveldb.Iterator) (s *seriesFrontier, present bool, err error) {
lowerSeek := firstSupertime
upperSeek := lastSupertime
// If the diskFrontier for this iterator says that the candidate fingerprint
// is outside of its seeking domain, there is no way that a seriesFrontier
// could be materialized. Simply bail.
if !d.ContainsFingerprint(f) {
return nil, false, nil
}
// If we are either the first or the last key in the database, we need to use
// pessimistic boundary frontiers.
if f.Equal(d.firstFingerprint) {
lowerSeek = indexable.EncodeTime(d.firstSupertime)
}
if f.Equal(d.lastFingerprint) {
upperSeek = indexable.EncodeTime(d.lastSupertime)
}
// TODO: Convert this to SampleKey.ToPartialDTO.
fp := new(dto.Fingerprint)
dumpFingerprint(fp, f)
key := &dto.SampleKey{
Fingerprint: fp,
Timestamp: upperSeek,
}
raw := coding.NewPBEncoder(key).MustEncode()
i.Seek(raw)
if i.Key() == nil {
return nil, false, fmt.Errorf("illegal condition: empty key")
}
retrievedKey, err := extractSampleKey(i)
if err != nil {
panic(err)
}
retrievedFingerprint := retrievedKey.Fingerprint
// The returned fingerprint may not match if the original seek key lives
// outside of a metric's frontier. This is probable, for we are seeking to
// to the maximum allowed time, which could advance us to the next
// fingerprint.
//
//
if !retrievedFingerprint.Equal(f) {
i.Previous()
retrievedKey, err = extractSampleKey(i)
if err != nil {
panic(err)
}
retrievedFingerprint := retrievedKey.Fingerprint
// If the previous key does not match, we know that the requested
// fingerprint does not live in the database.
if !retrievedFingerprint.Equal(f) {
return nil, false, nil
}
}
s = &seriesFrontier{
lastSupertime: retrievedKey.FirstTimestamp,
lastTime: retrievedKey.LastTimestamp,
}
key.Timestamp = lowerSeek
raw = coding.NewPBEncoder(key).MustEncode()
i.Seek(raw)
retrievedKey, err = extractSampleKey(i)
if err != nil {
panic(err)
}
retrievedFingerprint = retrievedKey.Fingerprint
s.firstSupertime = retrievedKey.FirstTimestamp
return s, true, nil
}
// Contains indicates whether a given time value is within the recorded
// interval.
func (s *seriesFrontier) Contains(t time.Time) bool {
return !(t.Before(s.firstSupertime) || t.After(s.lastTime))
}
// InSafeSeekRange indicates whether the time is within the recorded time range
// and is safely seekable such that a seek does not result in an iterator point
// after the last value of the series or outside of the entire store.
func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
if !s.Contains(t) {
return
}
if s.lastSupertime.Before(t) {
return
}
return true
}
func (s *seriesFrontier) After(t time.Time) bool {
return s.firstSupertime.After(t)
}

View File

@ -107,12 +107,13 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
if err != nil {
return
}
unactedSamples, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) {
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:
@ -289,6 +290,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
if err != nil {
return
}
sampleValues, err := extractSampleValues(sampleIterator)
if err != nil {
return
@ -296,7 +298,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
pendingMutations := 0
for lastCurated.Before(stopAt) {
for lastCurated.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:

View File

@ -35,6 +35,24 @@ type SampleKey struct {
SampleCount uint32
}
func (s *SampleKey) Equal(o *SampleKey) bool {
if s == o {
return true
}
if !s.Fingerprint.Equal(o.Fingerprint) {
return false
}
if !s.FirstTimestamp.Equal(o.FirstTimestamp) {
return false
}
if !s.LastTimestamp.Equal(o.LastTimestamp) {
return false
}
return s.SampleCount == o.SampleCount
}
// MayContain indicates whether the given SampleKey could potentially contain a
// value at the provided time. Even if true is emitted, that does not mean a
// satisfactory value, in fact, exists.
@ -49,6 +67,21 @@ func (s *SampleKey) MayContain(t time.Time) bool {
}
}
func (s *SampleKey) Before(fp *clientmodel.Fingerprint, t time.Time) bool {
if s.Fingerprint.Less(fp) {
return true
}
if !s.Fingerprint.Equal(fp) {
return false
}
if s.FirstTimestamp.Before(t) {
return true
}
return s.LastTimestamp.Before(t)
}
// ToDTO converts this SampleKey into a DTO for use in serialization purposes.
func (s *SampleKey) Dump(d *dto.SampleKey) {
d.Reset()
@ -61,19 +94,6 @@ func (s *SampleKey) Dump(d *dto.SampleKey) {
d.SampleCount = proto.Uint32(s.SampleCount)
}
// ToPartialDTO converts this SampleKey into a DTO that is only suitable for
// database exploration purposes for a given (Fingerprint, First Sample Time)
// tuple.
func (s *SampleKey) FOOdumpPartial(d *dto.SampleKey) {
d.Reset()
f := &dto.Fingerprint{}
dumpFingerprint(f, s.Fingerprint)
d.Fingerprint = f
d.Timestamp = indexable.EncodeTime(s.FirstTimestamp)
}
func (s *SampleKey) String() string {
return fmt.Sprintf("SampleKey for %s at %s to %s with %d values.", s.Fingerprint, s.FirstTimestamp, s.LastTimestamp, s.SampleCount)
}

View File

@ -24,7 +24,6 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/stats"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/utility"
@ -90,7 +89,6 @@ type TieredStorage struct {
state tieredStorageState
memorySemaphore chan bool
diskSemaphore chan bool
wmCache *watermarkCache
@ -107,7 +105,6 @@ type viewJob struct {
}
const (
tieredDiskSemaphores = 1
tieredMemorySemaphores = 5
)
@ -136,15 +133,11 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
diskSemaphore: make(chan bool, tieredDiskSemaphores),
memorySemaphore: make(chan bool, tieredMemorySemaphores),
wmCache: wmCache,
}
for i := 0; i < tieredDiskSemaphores; i++ {
s.diskSemaphore <- true
}
for i := 0; i < tieredMemorySemaphores; i++ {
s.memorySemaphore <- true
}
@ -370,9 +363,9 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
scanJobsTimer.Stop()
view := newView()
var iterator leveldb.Iterator = nil
var diskFrontier *diskFrontier = nil
var diskPresent = true
var iterator leveldb.Iterator
diskPresent := true
var firstBlock, lastBlock *SampleKey
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
for _, scanJob := range scans {
@ -385,9 +378,6 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
continue
}
var seriesFrontier *seriesFrontier = nil
var seriesPresent = true
standingOps := scanJob.operations
memValues := t.memoryArena.CloneSamples(scanJob.fingerprint)
@ -402,51 +392,41 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
currentChunk := chunk{}
// If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent && seriesPresent {
diskPrepareTimer := viewJob.stats.GetTimer(stats.ViewDiskPreparationTime).Start()
// Conditionalize disk access.
if diskFrontier == nil && diskPresent {
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && diskPresent {
if iterator == nil {
<-t.diskSemaphore
defer func() {
t.diskSemaphore <- true
}()
// Get a single iterator that will be used for all data extraction
// below.
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
defer iterator.Close()
if diskPresent = iterator.SeekToLast(); diskPresent {
lastBlock, _ = extractSampleKey(iterator)
if !iterator.SeekToFirst() {
diskPresent = false
} else {
firstBlock, _ = extractSampleKey(iterator)
}
diskFrontier, diskPresent, err = newDiskFrontier(iterator)
if err != nil {
panic(err)
}
if !diskPresent {
seriesPresent = false
}
}
if seriesFrontier == nil && diskPresent {
seriesFrontier, seriesPresent, err = newSeriesFrontier(scanJob.fingerprint, diskFrontier, iterator)
if err != nil {
panic(err)
}
}
diskPrepareTimer.Stop()
if diskPresent && seriesPresent {
if diskPresent {
diskTimer := viewJob.stats.GetTimer(stats.ViewDiskExtractionTime).Start()
diskValues := t.loadChunkAroundTime(iterator, seriesFrontier, scanJob.fingerprint, targetTime)
diskValues, expired := t.loadChunkAroundTime(iterator, scanJob.fingerprint, targetTime, firstBlock, lastBlock)
if expired {
diskPresent = false
}
diskTimer.Stop()
// If we aimed past the newest value on disk, combine it with the next value from memory.
if len(diskValues) == 0 {
currentChunk = chunk(memValues)
} else {
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:]
currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else {
currentChunk = chunk(diskValues)
}
}
} else {
currentChunk = chunk(memValues)
}
@ -513,32 +493,41 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
return
}
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier *seriesFrontier, fingerprint *clientmodel.Fingerprint, ts time.Time) (chunk Values) {
fd := &dto.Fingerprint{}
dumpFingerprint(fd, fingerprint)
targetKey := &dto.SampleKey{
Fingerprint: fd,
func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerprint *clientmodel.Fingerprint, ts time.Time, firstBlock, lastBlock *SampleKey) (chunk Values, expired bool) {
if fingerprint.Less(firstBlock.Fingerprint) {
return nil, false
}
if lastBlock.Fingerprint.Less(fingerprint) {
return nil, true
}
seekingKey := &SampleKey{
Fingerprint: fingerprint,
}
if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
} else if fingerprint.Equal(lastBlock.Fingerprint) && ts.After(lastBlock.FirstTimestamp) {
seekingKey.FirstTimestamp = lastBlock.FirstTimestamp
} else {
seekingKey.FirstTimestamp = ts
}
fd := new(dto.SampleKey)
seekingKey.Dump(fd)
// Try seeking to target key.
rawKey := coding.NewPBEncoder(fd).MustEncode()
if !iterator.Seek(rawKey) {
return chunk, true
}
var foundKey *SampleKey
var foundValues Values
// Limit the target key to be within the series' keyspace.
if ts.After(frontier.lastSupertime) {
targetKey.Timestamp = indexable.EncodeTime(frontier.lastSupertime)
} else {
targetKey.Timestamp = indexable.EncodeTime(ts)
}
// Try seeking to target key.
rawKey := coding.NewPBEncoder(targetKey).MustEncode()
iterator.Seek(rawKey)
foundKey, err := extractSampleKey(iterator)
if err != nil {
panic(err)
}
foundKey, _ = extractSampleKey(iterator)
if foundKey.Fingerprint.Equal(fingerprint) {
// Figure out if we need to rewind by one block.
// Imagine the following supertime blocks with time ranges:
//
@ -550,44 +539,47 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, frontier
// iterator seek behavior.
//
// Only do the rewind if there is another chunk before this one.
rewound := false
firstTime := foundKey.FirstTimestamp
if ts.Before(firstTime) && !frontier.firstSupertime.After(ts) {
iterator.Previous()
rewound = true
if !foundKey.MayContain(ts) {
postValues, _ := extractSampleValues(iterator)
if !foundKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
foundValues, err = extractSampleValues(iterator)
if err != nil {
return
foundKey, _ = extractSampleKey(iterator)
if !foundKey.Fingerprint.Equal(fingerprint) {
return postValues, false
}
// If we rewound, but the target time is still past the current block, return
// the last value of the current (rewound) block and the entire next block.
if rewound {
foundKey, err = extractSampleKey(iterator)
if err != nil {
return
}
currentChunkLastTime := foundKey.LastTimestamp
if ts.After(currentChunkLastTime) {
sampleCount := len(foundValues)
chunk = append(chunk, foundValues[sampleCount-1])
// We know there's a next block since we have rewound from it.
iterator.Next()
foundValues, err = extractSampleValues(iterator)
if err != nil {
return
}
foundValues, _ = extractSampleValues(iterator)
foundValues = append(foundValues, postValues...)
return foundValues, false
}
}
// Now append all the samples of the currently seeked block to the output.
chunk = append(chunk, foundValues...)
foundValues, _ = extractSampleValues(iterator)
return foundValues, false
}
return
if fingerprint.Less(foundKey.Fingerprint) {
if !foundKey.Equal(firstBlock) {
if !iterator.Previous() {
panic("This should never return false.")
}
foundKey, _ = extractSampleKey(iterator)
if !foundKey.Fingerprint.Equal(fingerprint) {
return nil, false
}
foundValues, _ = extractSampleValues(iterator)
return foundValues, false
}
}
panic("illegal state: violated sort invariant")
}
// Get all label values that are associated with the provided label name.