mirror of https://github.com/prometheus/prometheus
Merge "Update low-level i'faces to reflect wireformats."
commit
1875620e08
56
main.go
56
main.go
|
@ -80,7 +80,7 @@ type prometheus struct {
|
|||
tailCompactionTimer *time.Ticker
|
||||
deletionTimer *time.Ticker
|
||||
|
||||
curationMutex sync.Mutex
|
||||
curationSema chan bool
|
||||
stopBackgroundOperations chan bool
|
||||
|
||||
unwrittenSamples chan *extraction.Result
|
||||
|
@ -104,41 +104,62 @@ func (p *prometheus) interruptHandler() {
|
|||
}
|
||||
|
||||
func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
|
||||
p.curationMutex.Lock()
|
||||
defer p.curationMutex.Unlock()
|
||||
|
||||
processor := &metric.CompactionProcessor{
|
||||
MaximumMutationPoolBatch: groupSize * 3,
|
||||
MinimumGroupSize: groupSize,
|
||||
select {
|
||||
case p.curationSema <- true:
|
||||
default:
|
||||
glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize)
|
||||
return
|
||||
}
|
||||
|
||||
curator := metric.Curator{
|
||||
defer func() {
|
||||
<-p.curationSema
|
||||
}()
|
||||
|
||||
processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{
|
||||
MaximumMutationPoolBatch: groupSize * 3,
|
||||
MinimumGroupSize: groupSize,
|
||||
})
|
||||
defer processor.Close()
|
||||
|
||||
curator := metric.NewCurator(&metric.CuratorOptions{
|
||||
Stop: p.stopBackgroundOperations,
|
||||
|
||||
ViewQueue: p.storage.ViewQueue,
|
||||
}
|
||||
})
|
||||
defer curator.Close()
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
}
|
||||
|
||||
func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
|
||||
p.curationMutex.Lock()
|
||||
defer p.curationMutex.Unlock()
|
||||
|
||||
processor := &metric.DeletionProcessor{
|
||||
MaximumMutationPoolBatch: batchSize,
|
||||
select {
|
||||
case p.curationSema <- true:
|
||||
default:
|
||||
glog.Warningf("Deferred compaction for %s and %s due to existing operation.", operation, groupSize)
|
||||
return
|
||||
}
|
||||
|
||||
curator := metric.Curator{
|
||||
processor := metric.NewDeletionProcessor(&metric.DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: batchSize,
|
||||
})
|
||||
defer processor.Close()
|
||||
|
||||
curator := metric.NewCurator(&metric.CuratorOptions{
|
||||
Stop: p.stopBackgroundOperations,
|
||||
|
||||
ViewQueue: p.storage.ViewQueue,
|
||||
}
|
||||
})
|
||||
defer curator.Close()
|
||||
|
||||
return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState)
|
||||
}
|
||||
|
||||
func (p *prometheus) close() {
|
||||
select {
|
||||
case p.curationSema <- true:
|
||||
default:
|
||||
}
|
||||
|
||||
if p.headCompactionTimer != nil {
|
||||
p.headCompactionTimer.Stop()
|
||||
}
|
||||
|
@ -156,8 +177,6 @@ func (p *prometheus) close() {
|
|||
p.stopBackgroundOperations <- true
|
||||
}
|
||||
|
||||
p.curationMutex.Lock()
|
||||
|
||||
p.ruleManager.Stop()
|
||||
p.storage.Close()
|
||||
|
||||
|
@ -267,6 +286,7 @@ func main() {
|
|||
deletionTimer: deletionTimer,
|
||||
|
||||
curationState: prometheusStatus,
|
||||
curationSema: make(chan bool, 1),
|
||||
|
||||
unwrittenSamples: unwrittenSamples,
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ package metric
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -24,7 +25,6 @@ import (
|
|||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
|
@ -34,6 +34,8 @@ import (
|
|||
|
||||
const curationYieldPeriod = 250 * time.Millisecond
|
||||
|
||||
var errIllegalIterator = errors.New("Iterator invalid.")
|
||||
|
||||
// CurationStateUpdater receives updates about the curation state.
|
||||
type CurationStateUpdater interface {
|
||||
UpdateCurationState(*CurationState)
|
||||
|
@ -48,10 +50,7 @@ type CurationState struct {
|
|||
Fingerprint *clientmodel.Fingerprint
|
||||
}
|
||||
|
||||
// curator is responsible for effectuating a given curation policy across the
|
||||
// stored samples on-disk. This is useful to compact sparse sample values into
|
||||
// single sample entities to reduce keyspace load on the datastore.
|
||||
type Curator struct {
|
||||
type CuratorOptions struct {
|
||||
// Stop functions as a channel that when empty allows the curator to operate.
|
||||
// The moment a value is ingested inside of it, the curator goes into drain
|
||||
// mode.
|
||||
|
@ -60,6 +59,29 @@ type Curator struct {
|
|||
ViewQueue chan viewJob
|
||||
}
|
||||
|
||||
// curator is responsible for effectuating a given curation policy across the
|
||||
// stored samples on-disk. This is useful to compact sparse sample values into
|
||||
// single sample entities to reduce keyspace load on the datastore.
|
||||
type Curator struct {
|
||||
stop chan bool
|
||||
|
||||
viewQueue chan viewJob
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
}
|
||||
|
||||
func NewCurator(o *CuratorOptions) *Curator {
|
||||
return &Curator{
|
||||
stop: o.Stop,
|
||||
|
||||
viewQueue: o.ViewQueue,
|
||||
|
||||
dtoSampleKeys: newDtoSampleKeyList(10),
|
||||
sampleKeys: newSampleKeyList(10),
|
||||
}
|
||||
}
|
||||
|
||||
// watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
||||
// into (model.Fingerprint, model.Watermark) doubles.
|
||||
//
|
||||
|
@ -95,6 +117,9 @@ type watermarkScanner struct {
|
|||
firstBlock, lastBlock *SampleKey
|
||||
|
||||
ViewQueue chan viewJob
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
}
|
||||
|
||||
// run facilitates the curation lifecycle.
|
||||
|
@ -122,7 +147,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
|
||||
defer status.UpdateCurationState(&CurationState{Active: false})
|
||||
|
||||
iterator := samples.NewIterator(true)
|
||||
iterator, err := samples.NewIterator(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
if !iterator.SeekToLast() {
|
||||
|
@ -130,21 +158,40 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
|
||||
return
|
||||
}
|
||||
lastBlock, _ := extractSampleKey(iterator)
|
||||
|
||||
keyDto, _ := c.dtoSampleKeys.Get()
|
||||
defer c.dtoSampleKeys.Give(keyDto)
|
||||
|
||||
lastBlock, _ := c.sampleKeys.Get()
|
||||
defer c.sampleKeys.Give(lastBlock)
|
||||
|
||||
if err := iterator.Key(keyDto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
lastBlock.Load(keyDto)
|
||||
|
||||
if !iterator.SeekToFirst() {
|
||||
glog.Info("Empty database; skipping curation.")
|
||||
|
||||
return
|
||||
}
|
||||
firstBlock, _ := extractSampleKey(iterator)
|
||||
|
||||
firstBlock, _ := c.sampleKeys.Get()
|
||||
defer c.sampleKeys.Give(firstBlock)
|
||||
|
||||
if err := iterator.Key(keyDto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
firstBlock.Load(keyDto)
|
||||
|
||||
scanner := &watermarkScanner{
|
||||
curationState: curationState,
|
||||
ignoreYoungerThan: ignoreYoungerThan,
|
||||
processor: processor,
|
||||
status: status,
|
||||
stop: c.Stop,
|
||||
stop: c.stop,
|
||||
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
||||
|
||||
sampleIterator: iterator,
|
||||
|
@ -153,7 +200,10 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
firstBlock: firstBlock,
|
||||
lastBlock: lastBlock,
|
||||
|
||||
ViewQueue: c.ViewQueue,
|
||||
ViewQueue: c.viewQueue,
|
||||
|
||||
dtoSampleKeys: c.dtoSampleKeys,
|
||||
sampleKeys: c.sampleKeys,
|
||||
}
|
||||
|
||||
// Right now, the ability to stop a curation is limited to the beginning of
|
||||
|
@ -167,11 +217,16 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces
|
|||
// drain instructs the curator to stop at the next convenient moment as to not
|
||||
// introduce data inconsistencies.
|
||||
func (c *Curator) Drain() {
|
||||
if len(c.Stop) == 0 {
|
||||
c.Stop <- true
|
||||
if len(c.stop) == 0 {
|
||||
c.stop <- true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Curator) Close() {
|
||||
c.dtoSampleKeys.Close()
|
||||
c.sampleKeys.Close()
|
||||
}
|
||||
|
||||
func (w *watermarkScanner) DecodeKey(in interface{}) (interface{}, error) {
|
||||
key := new(dto.Fingerprint)
|
||||
bytes := in.([]byte)
|
||||
|
@ -284,26 +339,32 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
|
|||
}
|
||||
|
||||
func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
||||
fingerprint := key.(*clientmodel.Fingerprint)
|
||||
|
||||
glog.Infof("Curating %s...", fingerprint)
|
||||
|
||||
if len(w.ViewQueue) > 0 {
|
||||
glog.Warning("Deferred due to view queue.")
|
||||
time.Sleep(curationYieldPeriod)
|
||||
}
|
||||
|
||||
fingerprint := key.(*clientmodel.Fingerprint)
|
||||
|
||||
if fingerprint.Less(w.firstBlock.Fingerprint) {
|
||||
glog.Warning("Skipped since before keyspace.")
|
||||
return nil
|
||||
}
|
||||
if w.lastBlock.Fingerprint.Less(fingerprint) {
|
||||
glog.Warning("Skipped since after keyspace.")
|
||||
return nil
|
||||
}
|
||||
|
||||
curationState, present, err := w.curationState.Get(&curationKey{
|
||||
curationState, _, err := w.curationState.Get(&curationKey{
|
||||
Fingerprint: fingerprint,
|
||||
ProcessorMessageRaw: w.processor.Signature(),
|
||||
ProcessorMessageTypeName: w.processor.Name(),
|
||||
IgnoreYoungerThan: w.ignoreYoungerThan,
|
||||
})
|
||||
if err != nil {
|
||||
glog.Warning("Unable to get curation state: %s", err)
|
||||
// An anomaly with the curation remark is likely not fatal in the sense that
|
||||
// there was a decoding error with the entity and shouldn't be cause to stop
|
||||
// work. The process will simply start from a pessimistic work time and
|
||||
|
@ -311,47 +372,45 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|||
return &storage.OperatorError{error: err, Continuable: true}
|
||||
}
|
||||
|
||||
keySet := &SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
keySet, _ := w.sampleKeys.Get()
|
||||
defer w.sampleKeys.Give(keySet)
|
||||
|
||||
keySet.Fingerprint = fingerprint
|
||||
keySet.FirstTimestamp = curationState
|
||||
|
||||
// Invariant: The fingerprint tests above ensure that we have the same
|
||||
// fingerprint.
|
||||
keySet.Constrain(w.firstBlock, w.lastBlock)
|
||||
|
||||
seeker := &iteratorSeekerState{
|
||||
i: w.sampleIterator,
|
||||
|
||||
obj: keySet,
|
||||
|
||||
first: w.firstBlock,
|
||||
last: w.lastBlock,
|
||||
|
||||
dtoSampleKeys: w.dtoSampleKeys,
|
||||
sampleKeys: w.sampleKeys,
|
||||
}
|
||||
|
||||
if !present && fingerprint.Equal(w.firstBlock.Fingerprint) {
|
||||
// If the fingerprint is the same, then we simply need to use the earliest
|
||||
// block found in the database.
|
||||
*keySet = *w.firstBlock
|
||||
} else if present {
|
||||
keySet.FirstTimestamp = curationState
|
||||
for state := seeker.initialize; state != nil; state = state() {
|
||||
}
|
||||
|
||||
dto := new(dto.SampleKey)
|
||||
keySet.Dump(dto)
|
||||
prospectiveKey := coding.NewPBEncoder(dto).MustEncode()
|
||||
if !w.sampleIterator.Seek(prospectiveKey) {
|
||||
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
||||
// no work may occur, and the iterator cannot be recovered.
|
||||
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
|
||||
if seeker.err != nil {
|
||||
glog.Warningf("Got error in state machine: %s", seeker.err)
|
||||
|
||||
return &storage.OperatorError{error: seeker.err, Continuable: !seeker.iteratorInvalid}
|
||||
}
|
||||
|
||||
for {
|
||||
sampleKey, err := extractSampleKey(w.sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !sampleKey.Fingerprint.Equal(fingerprint) {
|
||||
return
|
||||
}
|
||||
if seeker.iteratorInvalid {
|
||||
glog.Warningf("Got illegal iterator in state machine: %s", err)
|
||||
|
||||
if !present {
|
||||
break
|
||||
}
|
||||
return &storage.OperatorError{error: errIllegalIterator, Continuable: false}
|
||||
}
|
||||
|
||||
if !(sampleKey.FirstTimestamp.Before(curationState) && sampleKey.LastTimestamp.Before(curationState)) {
|
||||
break
|
||||
}
|
||||
|
||||
if !w.sampleIterator.Next() {
|
||||
return
|
||||
}
|
||||
if !seeker.seriesOperable {
|
||||
return
|
||||
}
|
||||
|
||||
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, w.stopAt, fingerprint)
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
type dtoSampleKeyList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func newDtoSampleKeyList(cap int) *dtoSampleKeyList {
|
||||
return &dtoSampleKeyList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *dtoSampleKeyList) Get() (*dto.SampleKey, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*dto.SampleKey), ok
|
||||
}
|
||||
|
||||
return new(dto.SampleKey), false
|
||||
}
|
||||
|
||||
func (l *dtoSampleKeyList) Give(v *dto.SampleKey) bool {
|
||||
v.Reset()
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func (l *dtoSampleKeyList) Close() {
|
||||
l.l.Close()
|
||||
}
|
||||
|
||||
type sampleKeyList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
var defaultSampleKey = new(SampleKey)
|
||||
|
||||
func newSampleKeyList(cap int) *sampleKeyList {
|
||||
return &sampleKeyList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *sampleKeyList) Get() (*SampleKey, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*SampleKey), ok
|
||||
}
|
||||
|
||||
return new(SampleKey), false
|
||||
}
|
||||
|
||||
func (l *sampleKeyList) Give(v *SampleKey) bool {
|
||||
*v = *defaultSampleKey
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func (l *sampleKeyList) Close() {
|
||||
l.l.Close()
|
||||
}
|
||||
|
||||
type valueAtTimeList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func (l *valueAtTimeList) Get() (*getValuesAtTimeOp, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*getValuesAtTimeOp), ok
|
||||
}
|
||||
|
||||
return new(getValuesAtTimeOp), false
|
||||
}
|
||||
|
||||
var pGetValuesAtTimeOp = new(getValuesAtTimeOp)
|
||||
|
||||
func (l *valueAtTimeList) Give(v *getValuesAtTimeOp) bool {
|
||||
*v = *pGetValuesAtTimeOp
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func newValueAtTimeList(cap int) *valueAtTimeList {
|
||||
return &valueAtTimeList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
||||
|
||||
type valueAtIntervalList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func (l *valueAtIntervalList) Get() (*getValuesAtIntervalOp, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*getValuesAtIntervalOp), ok
|
||||
}
|
||||
|
||||
return new(getValuesAtIntervalOp), false
|
||||
}
|
||||
|
||||
var pGetValuesAtIntervalOp = new(getValuesAtIntervalOp)
|
||||
|
||||
func (l *valueAtIntervalList) Give(v *getValuesAtIntervalOp) bool {
|
||||
*v = *pGetValuesAtIntervalOp
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func newValueAtIntervalList(cap int) *valueAtIntervalList {
|
||||
return &valueAtIntervalList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
||||
|
||||
type valueAlongRangeList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func (l *valueAlongRangeList) Get() (*getValuesAlongRangeOp, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*getValuesAlongRangeOp), ok
|
||||
}
|
||||
|
||||
return new(getValuesAlongRangeOp), false
|
||||
}
|
||||
|
||||
var pGetValuesAlongRangeOp = new(getValuesAlongRangeOp)
|
||||
|
||||
func (l *valueAlongRangeList) Give(v *getValuesAlongRangeOp) bool {
|
||||
*v = *pGetValuesAlongRangeOp
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func newValueAlongRangeList(cap int) *valueAlongRangeList {
|
||||
return &valueAlongRangeList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
||||
|
||||
type valueAtIntervalAlongRangeList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func (l *valueAtIntervalAlongRangeList) Get() (*getValueRangeAtIntervalOp, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*getValueRangeAtIntervalOp), ok
|
||||
}
|
||||
|
||||
return new(getValueRangeAtIntervalOp), false
|
||||
}
|
||||
|
||||
var pGetValueRangeAtIntervalOp = new(getValueRangeAtIntervalOp)
|
||||
|
||||
func (l *valueAtIntervalAlongRangeList) Give(v *getValueRangeAtIntervalOp) bool {
|
||||
*v = *pGetValueRangeAtIntervalOp
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func newValueAtIntervalAlongRangeList(cap int) *valueAtIntervalAlongRangeList {
|
||||
return &valueAtIntervalAlongRangeList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
|
@ -402,8 +402,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
|
|||
|
||||
func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
|
||||
k := &dto.SampleKey{}
|
||||
err := proto.Unmarshal(i.Key(), k)
|
||||
if err != nil {
|
||||
if err := i.Key(k); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -415,8 +414,7 @@ func extractSampleKey(i leveldb.Iterator) (*SampleKey, error) {
|
|||
|
||||
func extractSampleValues(i leveldb.Iterator) (Values, error) {
|
||||
v := &dto.SampleValueSeries{}
|
||||
err := proto.Unmarshal(i.Value(), v)
|
||||
if err != nil {
|
||||
if err := i.Value(v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
type iteratorSeekerState struct {
|
||||
// Immutable State
|
||||
i leveldb.Iterator
|
||||
|
||||
obj *SampleKey
|
||||
|
||||
first, last *SampleKey
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
|
||||
// Mutable State
|
||||
iteratorInvalid bool
|
||||
seriesOperable bool
|
||||
err error
|
||||
|
||||
key *SampleKey
|
||||
keyDto *dto.SampleKey
|
||||
}
|
||||
|
||||
// iteratorSeeker is a function that models a state machine state and
|
||||
// is responsible for choosing the subsequent state given the present
|
||||
// disposition.
|
||||
//
|
||||
// It returns the next state or nil if no remaining transition is possible.
|
||||
// It returns an error if one occurred and finally a truth value indicating
|
||||
// whether the current iterator state is usable and whether it can proceed with
|
||||
// the current fingerprint.
|
||||
type iteratorSeeker func() iteratorSeeker
|
||||
|
||||
func (s *iteratorSeekerState) initialize() iteratorSeeker {
|
||||
s.key, _ = s.sampleKeys.Get()
|
||||
s.keyDto, _ = s.dtoSampleKeys.Get()
|
||||
|
||||
return s.start
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) destroy() iteratorSeeker {
|
||||
s.sampleKeys.Give(s.key)
|
||||
s.dtoSampleKeys.Give(s.keyDto)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) start() iteratorSeeker {
|
||||
switch {
|
||||
case s.obj.Fingerprint.Less(s.first.Fingerprint):
|
||||
// The fingerprint does not exist in the database.
|
||||
return s.destroy
|
||||
|
||||
case s.last.Fingerprint.Less(s.obj.Fingerprint):
|
||||
// The fingerprint does not exist in the database.
|
||||
return s.destroy
|
||||
|
||||
case s.obj.Fingerprint.Equal(s.first.Fingerprint) && s.obj.FirstTimestamp.Before(s.first.FirstTimestamp):
|
||||
// The fingerprint is the first fingerprint, but we've requested a value
|
||||
// before what exists in the database.
|
||||
return s.seekBeginning
|
||||
|
||||
case s.last.Before(s.obj.Fingerprint, s.obj.FirstTimestamp):
|
||||
// The requested time for work is after the last sample in the database; we
|
||||
// can't do anything!
|
||||
return s.destroy
|
||||
|
||||
default:
|
||||
return s.initialSeek
|
||||
}
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) seekBeginning() iteratorSeeker {
|
||||
s.i.SeekToFirst()
|
||||
if !s.i.Valid() {
|
||||
s.err = s.i.Error()
|
||||
// If we can't seek to the beginning, there isn't any hope for us.
|
||||
glog.Warning("iterator went bad: %s", s.err)
|
||||
s.iteratorInvalid = true
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
return s.initialMatchFingerprint
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) initialSeek() iteratorSeeker {
|
||||
s.obj.Dump(s.keyDto)
|
||||
|
||||
s.i.Seek(s.keyDto)
|
||||
if !s.i.Valid() {
|
||||
s.err = s.i.Error()
|
||||
glog.Warningf("iterator went bad %s", s.err)
|
||||
s.iteratorInvalid = true
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
return s.initialMatchFingerprint
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) initialMatchFingerprint() iteratorSeeker {
|
||||
if err := s.i.Key(s.keyDto); err != nil {
|
||||
s.err = err
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
s.key.Load(s.keyDto)
|
||||
|
||||
switch {
|
||||
case s.obj.Fingerprint.Less(s.key.Fingerprint):
|
||||
return s.initialFingerprintOvershot
|
||||
|
||||
case s.key.Fingerprint.Less(s.obj.Fingerprint):
|
||||
panic("violated invariant")
|
||||
|
||||
default:
|
||||
return s.initialMatchTime
|
||||
}
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) initialFingerprintOvershot() iteratorSeeker {
|
||||
s.i.Previous()
|
||||
if !s.i.Valid() {
|
||||
glog.Warningf("Could not backtrack for %s", s)
|
||||
panic("violated invariant")
|
||||
}
|
||||
|
||||
if err := s.i.Key(s.keyDto); err != nil {
|
||||
s.err = err
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
s.key.Load(s.keyDto)
|
||||
|
||||
if !s.key.Fingerprint.Equal(s.obj.Fingerprint) {
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
return s.initialMatchTime
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) initialMatchTime() iteratorSeeker {
|
||||
switch {
|
||||
case s.key.MayContain(s.obj.FirstTimestamp):
|
||||
s.seriesOperable = true
|
||||
return s.destroy
|
||||
|
||||
case s.key.Equal(s.first), s.obj.FirstTimestamp.Equal(s.key.FirstTimestamp):
|
||||
s.seriesOperable = true
|
||||
return s.destroy
|
||||
|
||||
case s.obj.FirstTimestamp.Before(s.key.FirstTimestamp):
|
||||
return s.reCue
|
||||
|
||||
default:
|
||||
panic("violated invariant " + fmt.Sprintln(s.obj, s.key))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) reCue() iteratorSeeker {
|
||||
s.i.Previous()
|
||||
if !s.i.Valid() {
|
||||
glog.Warningf("Could not backtrack for %s", s)
|
||||
panic("violated invariant")
|
||||
}
|
||||
|
||||
if err := s.i.Key(s.keyDto); err != nil {
|
||||
s.err = err
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
s.key.Load(s.keyDto)
|
||||
|
||||
if !s.key.Fingerprint.Equal(s.obj.Fingerprint) {
|
||||
return s.fastForward
|
||||
}
|
||||
|
||||
s.seriesOperable = true
|
||||
return s.destroy
|
||||
}
|
||||
|
||||
func (s *iteratorSeekerState) fastForward() iteratorSeeker {
|
||||
s.i.Next()
|
||||
if !s.i.Valid() {
|
||||
glog.Warningf("Could not fast-forward for %s", s)
|
||||
panic("violated invariant")
|
||||
}
|
||||
|
||||
s.seriesOperable = true
|
||||
return s.destroy
|
||||
}
|
|
@ -50,18 +50,14 @@ type Processor interface {
|
|||
// CompactionProcessor combines sparse values in the database together such
|
||||
// that at least MinimumGroupSize-sized chunks are grouped together.
|
||||
type CompactionProcessor struct {
|
||||
// MaximumMutationPoolBatch represents approximately the largest pending
|
||||
// batch of mutation operations for the database before pausing to
|
||||
// commit before resumption.
|
||||
//
|
||||
// A reasonable value would be (MinimumGroupSize * 2) + 1.
|
||||
MaximumMutationPoolBatch int
|
||||
// MinimumGroupSize represents the smallest allowed sample chunk size in the
|
||||
// database.
|
||||
MinimumGroupSize int
|
||||
maximumMutationPoolBatch int
|
||||
minimumGroupSize int
|
||||
// signature is the byte representation of the CompactionProcessor's settings,
|
||||
// used for purely memoization purposes across an instance.
|
||||
signature []byte
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
}
|
||||
|
||||
func (p *CompactionProcessor) Name() string {
|
||||
|
@ -71,7 +67,7 @@ func (p *CompactionProcessor) Name() string {
|
|||
func (p *CompactionProcessor) Signature() []byte {
|
||||
if len(p.signature) == 0 {
|
||||
out, err := proto.Marshal(&dto.CompactionProcessorDefinition{
|
||||
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
|
||||
MinimumGroupSize: proto.Uint32(uint32(p.minimumGroupSize)),
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -84,7 +80,7 @@ func (p *CompactionProcessor) Signature() []byte {
|
|||
}
|
||||
|
||||
func (p *CompactionProcessor) String() string {
|
||||
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize)
|
||||
return fmt.Sprintf("compactionProcessor for minimum group size %d", p.minimumGroupSize)
|
||||
}
|
||||
|
||||
func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint *clientmodel.Fingerprint) (lastCurated time.Time, err error) {
|
||||
|
@ -98,16 +94,22 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
|
||||
var pendingMutations = 0
|
||||
var pendingSamples Values
|
||||
var sampleKey *SampleKey
|
||||
var unactedSamples Values
|
||||
var lastTouchedTime time.Time
|
||||
var keyDropped bool
|
||||
|
||||
sampleKey, err = extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
sampleKey, _ := p.sampleKeys.Get()
|
||||
defer p.sampleKeys.Give(sampleKey)
|
||||
|
||||
sampleKeyDto, _ := p.dtoSampleKeys.Get()
|
||||
defer p.dtoSampleKeys.Give(sampleKeyDto)
|
||||
|
||||
if err = sampleIterator.Key(sampleKeyDto); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sampleKey.Load(sampleKeyDto)
|
||||
|
||||
unactedSamples, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -129,10 +131,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
|
||||
keyDropped = false
|
||||
|
||||
sampleKey, err = extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
if err = sampleIterator.Key(sampleKeyDto); err != nil {
|
||||
return
|
||||
}
|
||||
sampleKey.Load(sampleKeyDto)
|
||||
|
||||
unactedSamples, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -141,7 +144,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
// If the number of pending mutations exceeds the allowed batch amount,
|
||||
// commit to disk and delete the batch. A new one will be recreated if
|
||||
// necessary.
|
||||
case pendingMutations >= p.MaximumMutationPoolBatch:
|
||||
case pendingMutations >= p.maximumMutationPoolBatch:
|
||||
err = samplesPersistence.Commit(pendingBatch)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -152,11 +155,11 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
pendingBatch.Close()
|
||||
pendingBatch = nil
|
||||
|
||||
case len(pendingSamples) == 0 && len(unactedSamples) >= p.MinimumGroupSize:
|
||||
case len(pendingSamples) == 0 && len(unactedSamples) >= p.minimumGroupSize:
|
||||
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
|
||||
unactedSamples = Values{}
|
||||
|
||||
case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize:
|
||||
case len(pendingSamples)+len(unactedSamples) < p.minimumGroupSize:
|
||||
if !keyDropped {
|
||||
k := new(dto.SampleKey)
|
||||
sampleKey.Dump(k)
|
||||
|
@ -170,7 +173,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
pendingMutations++
|
||||
|
||||
// If the number of pending writes equals the target group size
|
||||
case len(pendingSamples) == p.MinimumGroupSize:
|
||||
case len(pendingSamples) == p.minimumGroupSize:
|
||||
k := new(dto.SampleKey)
|
||||
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
|
||||
newSampleKey.Dump(k)
|
||||
|
@ -187,9 +190,9 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
keyDropped = true
|
||||
}
|
||||
|
||||
if len(unactedSamples) > p.MinimumGroupSize {
|
||||
pendingSamples = unactedSamples[:p.MinimumGroupSize]
|
||||
unactedSamples = unactedSamples[p.MinimumGroupSize:]
|
||||
if len(unactedSamples) > p.minimumGroupSize {
|
||||
pendingSamples = unactedSamples[:p.minimumGroupSize]
|
||||
unactedSamples = unactedSamples[p.minimumGroupSize:]
|
||||
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
|
||||
} else {
|
||||
pendingSamples = unactedSamples
|
||||
|
@ -198,14 +201,14 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
}
|
||||
}
|
||||
|
||||
case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize:
|
||||
case len(pendingSamples)+len(unactedSamples) >= p.minimumGroupSize:
|
||||
if !keyDropped {
|
||||
k := new(dto.SampleKey)
|
||||
sampleKey.Dump(k)
|
||||
pendingBatch.Drop(k)
|
||||
keyDropped = true
|
||||
}
|
||||
remainder := p.MinimumGroupSize - len(pendingSamples)
|
||||
remainder := p.minimumGroupSize - len(pendingSamples)
|
||||
pendingSamples = append(pendingSamples, unactedSamples[:remainder]...)
|
||||
unactedSamples = unactedSamples[remainder:]
|
||||
if len(unactedSamples) == 0 {
|
||||
|
@ -244,15 +247,42 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
|
|||
return
|
||||
}
|
||||
|
||||
// DeletionProcessor deletes sample blocks older than a defined value.
|
||||
type DeletionProcessor struct {
|
||||
func (p *CompactionProcessor) Close() {
|
||||
p.dtoSampleKeys.Close()
|
||||
p.sampleKeys.Close()
|
||||
}
|
||||
|
||||
type CompactionProcessorOptions struct {
|
||||
// MaximumMutationPoolBatch represents approximately the largest pending
|
||||
// batch of mutation operations for the database before pausing to
|
||||
// commit before resumption.
|
||||
//
|
||||
// A reasonable value would be (MinimumGroupSize * 2) + 1.
|
||||
MaximumMutationPoolBatch int
|
||||
// MinimumGroupSize represents the smallest allowed sample chunk size in the
|
||||
// database.
|
||||
MinimumGroupSize int
|
||||
}
|
||||
|
||||
func NewCompactionProcessor(o *CompactionProcessorOptions) *CompactionProcessor {
|
||||
return &CompactionProcessor{
|
||||
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
|
||||
minimumGroupSize: o.MinimumGroupSize,
|
||||
|
||||
dtoSampleKeys: newDtoSampleKeyList(10),
|
||||
sampleKeys: newSampleKeyList(10),
|
||||
}
|
||||
}
|
||||
|
||||
// DeletionProcessor deletes sample blocks older than a defined value.
|
||||
type DeletionProcessor struct {
|
||||
maximumMutationPoolBatch int
|
||||
// signature is the byte representation of the DeletionProcessor's settings,
|
||||
// used for purely memoization purposes across an instance.
|
||||
signature []byte
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
}
|
||||
|
||||
func (p *DeletionProcessor) Name() string {
|
||||
|
@ -286,10 +316,16 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
}
|
||||
}()
|
||||
|
||||
sampleKey, err := extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
sampleKeyDto, _ := p.dtoSampleKeys.Get()
|
||||
defer p.dtoSampleKeys.Give(sampleKeyDto)
|
||||
|
||||
sampleKey, _ := p.sampleKeys.Get()
|
||||
defer p.sampleKeys.Give(sampleKey)
|
||||
|
||||
if err = sampleIterator.Key(sampleKeyDto); err != nil {
|
||||
return
|
||||
}
|
||||
sampleKey.Load(sampleKeyDto)
|
||||
|
||||
sampleValues, err := extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
|
@ -312,10 +348,11 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
|
||||
}
|
||||
|
||||
sampleKey, err = extractSampleKey(sampleIterator)
|
||||
if err != nil {
|
||||
if err = sampleIterator.Key(sampleKeyDto); err != nil {
|
||||
return
|
||||
}
|
||||
sampleKey.Load(sampleKeyDto)
|
||||
|
||||
sampleValues, err = extractSampleValues(sampleIterator)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -324,7 +361,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
// If the number of pending mutations exceeds the allowed batch amount,
|
||||
// commit to disk and delete the batch. A new one will be recreated if
|
||||
// necessary.
|
||||
case pendingMutations >= p.MaximumMutationPoolBatch:
|
||||
case pendingMutations >= p.maximumMutationPoolBatch:
|
||||
err = samplesPersistence.Commit(pendingBatch)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -379,3 +416,24 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
func (p *DeletionProcessor) Close() {
|
||||
p.dtoSampleKeys.Close()
|
||||
p.sampleKeys.Close()
|
||||
}
|
||||
|
||||
type DeletionProcessorOptions struct {
|
||||
// MaximumMutationPoolBatch represents approximately the largest pending
|
||||
// batch of mutation operations for the database before pausing to
|
||||
// commit before resumption.
|
||||
MaximumMutationPoolBatch int
|
||||
}
|
||||
|
||||
func NewDeletionProcessor(o *DeletionProcessorOptions) *DeletionProcessor {
|
||||
return &DeletionProcessor{
|
||||
maximumMutationPoolBatch: o.MaximumMutationPoolBatch,
|
||||
|
||||
dtoSampleKeys: newDtoSampleKeyList(10),
|
||||
sampleKeys: newSampleKeyList(10),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,10 +123,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
in: in{
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
groupSize: 5,
|
||||
curationStates: fixture.Pairs{
|
||||
|
@ -134,27 +134,27 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
// This rule should effectively be ignored.
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 2,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
ignoreYoungerThan: 30 * time.Minute,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
},
|
||||
|
@ -553,28 +553,28 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 30 * time.Minute,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 2,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
|
||||
processor: &CompactionProcessor{
|
||||
processor: NewCompactionProcessor(&CompactionProcessorOptions{
|
||||
MinimumGroupSize: 5,
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
sampleGroups: []sampleGroup{
|
||||
|
@ -881,16 +881,20 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
stop := make(chan bool)
|
||||
defer close(stop)
|
||||
|
||||
c := Curator{
|
||||
c := NewCurator(&CuratorOptions{
|
||||
Stop: stop,
|
||||
}
|
||||
})
|
||||
defer c.Close()
|
||||
|
||||
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
iterator := curatorStates.p.NewIterator(true)
|
||||
iterator, err := curatorStates.p.NewIterator(true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.curationStates {
|
||||
|
@ -905,9 +909,8 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
curationKeyDto := &dto.CurationKey{}
|
||||
|
||||
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
|
||||
curationKeyDto := new(dto.CurationKey)
|
||||
err = iterator.Key(curationKeyDto)
|
||||
if err != nil {
|
||||
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
|
||||
}
|
||||
|
@ -938,7 +941,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
iterator = samples.NewIterator(true)
|
||||
iterator, err = samples.NewIterator(true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.sampleGroups {
|
||||
|
@ -1004,9 +1010,9 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
in: in{
|
||||
processor: &DeletionProcessor{
|
||||
processor: NewDeletionProcessor(&DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
groupSize: 5,
|
||||
curationStates: fixture.Pairs{
|
||||
|
@ -1014,17 +1020,17 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &DeletionProcessor{
|
||||
processor: NewDeletionProcessor(&DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
curationState{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
|
||||
processor: &DeletionProcessor{
|
||||
processor: NewDeletionProcessor(&DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
watermarkStates: fixture.Pairs{
|
||||
|
@ -1317,17 +1323,17 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
fingerprint: "0001-A-1-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
|
||||
processor: &DeletionProcessor{
|
||||
processor: NewDeletionProcessor(&DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
{
|
||||
fingerprint: "0002-A-2-Z",
|
||||
ignoreYoungerThan: 1 * time.Hour,
|
||||
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
|
||||
processor: &DeletionProcessor{
|
||||
processor: NewDeletionProcessor(&DeletionProcessorOptions{
|
||||
MaximumMutationPoolBatch: 15,
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
sampleGroups: []sampleGroup{
|
||||
|
@ -1404,16 +1410,20 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
stop := make(chan bool)
|
||||
defer close(stop)
|
||||
|
||||
c := Curator{
|
||||
c := NewCurator(&CuratorOptions{
|
||||
Stop: stop,
|
||||
}
|
||||
})
|
||||
defer c.Close()
|
||||
|
||||
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
iterator := curatorStates.p.NewIterator(true)
|
||||
iterator, err := curatorStates.p.NewIterator(true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.curationStates {
|
||||
|
@ -1429,9 +1439,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
}
|
||||
|
||||
curationKeyDto := new(dto.CurationKey)
|
||||
|
||||
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
|
||||
if err != nil {
|
||||
if err := iterator.Key(curationKeyDto); err != nil {
|
||||
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
|
||||
}
|
||||
|
||||
|
@ -1463,7 +1471,10 @@ func TestCuratorDeletionProcessor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
iterator = samples.NewIterator(true)
|
||||
iterator, err = samples.NewIterator(true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
for j, expected := range scenario.out.sampleGroups {
|
||||
|
|
|
@ -35,6 +35,21 @@ type SampleKey struct {
|
|||
SampleCount uint32
|
||||
}
|
||||
|
||||
// Constrain merges the underlying SampleKey to fit within the keyspace of
|
||||
// the provided first and last keys and returns whether the key was modified.
|
||||
func (s *SampleKey) Constrain(first, last *SampleKey) bool {
|
||||
switch {
|
||||
case s.Before(first.Fingerprint, first.FirstTimestamp):
|
||||
*s = *first
|
||||
return true
|
||||
case last.Before(s.Fingerprint, s.FirstTimestamp):
|
||||
*s = *last
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SampleKey) Equal(o *SampleKey) bool {
|
||||
if s == o {
|
||||
return true
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/coding/indexable"
|
||||
"github.com/prometheus/prometheus/utility/test"
|
||||
|
||||
|
@ -198,12 +197,13 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr
|
|||
Timestamp: indexable.EncodeTime(i.OldestInclusive),
|
||||
}
|
||||
|
||||
e := coding.NewPBEncoder(k).MustEncode()
|
||||
|
||||
iterator := l.MetricSamples.NewIterator(true)
|
||||
iterator, err := l.MetricSamples.NewIterator(true)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
for valid := iterator.Seek(e); valid; valid = iterator.Next() {
|
||||
for valid := iterator.Seek(k); valid; valid = iterator.Next() {
|
||||
retrievedKey, err := extractSampleKey(iterator)
|
||||
if err != nil {
|
||||
return samples, err
|
||||
|
|
|
@ -23,12 +23,9 @@ import (
|
|||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/stats"
|
||||
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
||||
dto "github.com/prometheus/prometheus/model/generated"
|
||||
)
|
||||
|
||||
type chunk Values
|
||||
|
@ -95,6 +92,9 @@ type TieredStorage struct {
|
|||
Indexer MetricIndexer
|
||||
|
||||
flushSema chan bool
|
||||
|
||||
dtoSampleKeys *dtoSampleKeyList
|
||||
sampleKeys *sampleKeyList
|
||||
}
|
||||
|
||||
// viewJob encapsulates a request to extract sample values from the datastore.
|
||||
|
@ -140,6 +140,9 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn
|
|||
wmCache: wmCache,
|
||||
|
||||
flushSema: make(chan bool, 1),
|
||||
|
||||
dtoSampleKeys: newDtoSampleKeyList(10),
|
||||
sampleKeys: newSampleKeyList(10),
|
||||
}
|
||||
|
||||
for i := 0; i < tieredMemorySemaphores; i++ {
|
||||
|
@ -323,6 +326,9 @@ func (t *TieredStorage) close() {
|
|||
close(t.ViewQueue)
|
||||
t.wmCache.Clear()
|
||||
|
||||
t.dtoSampleKeys.Close()
|
||||
t.sampleKeys.Close()
|
||||
|
||||
t.state = tieredStorageStopping
|
||||
}
|
||||
|
||||
|
@ -379,7 +385,15 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
|
||||
var iterator leveldb.Iterator
|
||||
diskPresent := true
|
||||
var firstBlock, lastBlock *SampleKey
|
||||
|
||||
firstBlock, _ := t.sampleKeys.Get()
|
||||
defer t.sampleKeys.Give(firstBlock)
|
||||
|
||||
lastBlock, _ := t.sampleKeys.Get()
|
||||
defer t.sampleKeys.Give(lastBlock)
|
||||
|
||||
sampleKeyDto, _ := t.dtoSampleKeys.Get()
|
||||
defer t.dtoSampleKeys.Give(sampleKeyDto)
|
||||
|
||||
extractionTimer := viewJob.stats.GetTimer(stats.ViewDataExtractionTime).Start()
|
||||
for _, scanJob := range scans {
|
||||
|
@ -410,14 +424,23 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
if iterator == nil {
|
||||
// Get a single iterator that will be used for all data extraction
|
||||
// below.
|
||||
iterator = t.DiskStorage.MetricSamples.NewIterator(true)
|
||||
iterator, _ = t.DiskStorage.MetricSamples.NewIterator(true)
|
||||
defer iterator.Close()
|
||||
if diskPresent = iterator.SeekToLast(); diskPresent {
|
||||
lastBlock, _ = extractSampleKey(iterator)
|
||||
if err := iterator.Key(sampleKeyDto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
lastBlock.Load(sampleKeyDto)
|
||||
|
||||
if !iterator.SeekToFirst() {
|
||||
diskPresent = false
|
||||
} else {
|
||||
firstBlock, _ = extractSampleKey(iterator)
|
||||
if err := iterator.Key(sampleKeyDto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
firstBlock.Load(sampleKeyDto)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -482,7 +505,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
|
|||
for _, op := range standingOps {
|
||||
if !op.Consumed() {
|
||||
filteredOps = append(filteredOps, op)
|
||||
continue
|
||||
}
|
||||
|
||||
giveBackOp(op)
|
||||
}
|
||||
standingOps = filteredOps
|
||||
|
||||
|
@ -515,9 +541,10 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
|
|||
return nil, true
|
||||
}
|
||||
|
||||
seekingKey := &SampleKey{
|
||||
Fingerprint: fingerprint,
|
||||
}
|
||||
seekingKey, _ := t.sampleKeys.Get()
|
||||
defer t.sampleKeys.Give(seekingKey)
|
||||
|
||||
seekingKey.Fingerprint = fingerprint
|
||||
|
||||
if fingerprint.Equal(firstBlock.Fingerprint) && ts.Before(firstBlock.FirstTimestamp) {
|
||||
seekingKey.FirstTimestamp = firstBlock.FirstTimestamp
|
||||
|
@ -527,21 +554,22 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
|
|||
seekingKey.FirstTimestamp = ts
|
||||
}
|
||||
|
||||
fd := new(dto.SampleKey)
|
||||
seekingKey.Dump(fd)
|
||||
dto, _ := t.dtoSampleKeys.Get()
|
||||
defer t.dtoSampleKeys.Give(dto)
|
||||
|
||||
// Try seeking to target key.
|
||||
rawKey := coding.NewPBEncoder(fd).MustEncode()
|
||||
if !iterator.Seek(rawKey) {
|
||||
seekingKey.Dump(dto)
|
||||
if !iterator.Seek(dto) {
|
||||
return chunk, true
|
||||
}
|
||||
|
||||
var foundKey *SampleKey
|
||||
var foundValues Values
|
||||
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
if err := iterator.Key(dto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
seekingKey.Load(dto)
|
||||
|
||||
if foundKey.Fingerprint.Equal(fingerprint) {
|
||||
if seekingKey.Fingerprint.Equal(fingerprint) {
|
||||
// Figure out if we need to rewind by one block.
|
||||
// Imagine the following supertime blocks with time ranges:
|
||||
//
|
||||
|
@ -553,16 +581,19 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
|
|||
// iterator seek behavior.
|
||||
//
|
||||
// Only do the rewind if there is another chunk before this one.
|
||||
if !foundKey.MayContain(ts) {
|
||||
if !seekingKey.MayContain(ts) {
|
||||
postValues, _ := extractSampleValues(iterator)
|
||||
if !foundKey.Equal(firstBlock) {
|
||||
if !seekingKey.Equal(firstBlock) {
|
||||
if !iterator.Previous() {
|
||||
panic("This should never return false.")
|
||||
}
|
||||
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
if err := iterator.Key(dto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
seekingKey.Load(dto)
|
||||
|
||||
if !foundKey.Fingerprint.Equal(fingerprint) {
|
||||
if !seekingKey.Fingerprint.Equal(fingerprint) {
|
||||
return postValues, false
|
||||
}
|
||||
|
||||
|
@ -576,15 +607,18 @@ func (t *TieredStorage) loadChunkAroundTime(iterator leveldb.Iterator, fingerpri
|
|||
return foundValues, false
|
||||
}
|
||||
|
||||
if fingerprint.Less(foundKey.Fingerprint) {
|
||||
if !foundKey.Equal(firstBlock) {
|
||||
if fingerprint.Less(seekingKey.Fingerprint) {
|
||||
if !seekingKey.Equal(firstBlock) {
|
||||
if !iterator.Previous() {
|
||||
panic("This should never return false.")
|
||||
}
|
||||
|
||||
foundKey, _ = extractSampleKey(iterator)
|
||||
if err := iterator.Key(dto); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
seekingKey.Load(dto)
|
||||
|
||||
if !foundKey.Fingerprint.Equal(fingerprint) {
|
||||
if !seekingKey.Fingerprint.Equal(fingerprint) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
|
|
@ -48,40 +48,48 @@ func NewViewRequestBuilder() *viewRequestBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
var getValuesAtTimes = newValueAtTimeList(10 * 1024)
|
||||
|
||||
// Gets for the given Fingerprint either the value at that time if there is an
|
||||
// match or the one or two values adjacent thereto.
|
||||
func (v *viewRequestBuilder) GetMetricAtTime(fingerprint *clientmodel.Fingerprint, time time.Time) {
|
||||
ops := v.operations[*fingerprint]
|
||||
ops = append(ops, &getValuesAtTimeOp{
|
||||
time: time,
|
||||
})
|
||||
op, _ := getValuesAtTimes.Get()
|
||||
op.time = time
|
||||
ops = append(ops, op)
|
||||
v.operations[*fingerprint] = ops
|
||||
}
|
||||
|
||||
var getValuesAtIntervals = newValueAtIntervalList(10 * 1024)
|
||||
|
||||
// Gets for the given Fingerprint either the value at that interval from From
|
||||
// through Through if there is an match or the one or two values adjacent
|
||||
// for each point.
|
||||
func (v *viewRequestBuilder) GetMetricAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval time.Duration) {
|
||||
ops := v.operations[*fingerprint]
|
||||
ops = append(ops, &getValuesAtIntervalOp{
|
||||
from: from,
|
||||
through: through,
|
||||
interval: interval,
|
||||
})
|
||||
op, _ := getValuesAtIntervals.Get()
|
||||
op.from = from
|
||||
op.through = through
|
||||
op.interval = interval
|
||||
ops = append(ops, op)
|
||||
v.operations[*fingerprint] = ops
|
||||
}
|
||||
|
||||
var getValuesAlongRanges = newValueAlongRangeList(10 * 1024)
|
||||
|
||||
// Gets for the given Fingerprint the values that occur inclusively from From
|
||||
// through Through.
|
||||
func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint, from, through time.Time) {
|
||||
ops := v.operations[*fingerprint]
|
||||
ops = append(ops, &getValuesAlongRangeOp{
|
||||
from: from,
|
||||
through: through,
|
||||
})
|
||||
op, _ := getValuesAlongRanges.Get()
|
||||
op.from = from
|
||||
op.through = through
|
||||
ops = append(ops, op)
|
||||
v.operations[*fingerprint] = ops
|
||||
}
|
||||
|
||||
var getValuesAtIntervalAlongRanges = newValueAtIntervalAlongRangeList(10 * 1024)
|
||||
|
||||
// Gets value ranges at intervals for the given Fingerprint:
|
||||
//
|
||||
// |----| |----| |----| |----|
|
||||
|
@ -90,13 +98,13 @@ func (v *viewRequestBuilder) GetMetricRange(fingerprint *clientmodel.Fingerprint
|
|||
// from interval rangeDuration through
|
||||
func (v *viewRequestBuilder) GetMetricRangeAtInterval(fingerprint *clientmodel.Fingerprint, from, through time.Time, interval, rangeDuration time.Duration) {
|
||||
ops := v.operations[*fingerprint]
|
||||
ops = append(ops, &getValueRangeAtIntervalOp{
|
||||
rangeFrom: from,
|
||||
rangeThrough: from.Add(rangeDuration),
|
||||
rangeDuration: rangeDuration,
|
||||
interval: interval,
|
||||
through: through,
|
||||
})
|
||||
op, _ := getValuesAtIntervalAlongRanges.Get()
|
||||
op.rangeFrom = from
|
||||
op.rangeThrough = from.Add(rangeDuration)
|
||||
op.rangeDuration = rangeDuration
|
||||
op.interval = interval
|
||||
op.through = through
|
||||
ops = append(ops, op)
|
||||
v.operations[*fingerprint] = ops
|
||||
}
|
||||
|
||||
|
@ -134,3 +142,18 @@ func (v view) appendSamples(fingerprint *clientmodel.Fingerprint, samples Values
|
|||
func newView() view {
|
||||
return view{NewMemorySeriesStorage(MemorySeriesOptions{})}
|
||||
}
|
||||
|
||||
func giveBackOp(op interface{}) bool {
|
||||
switch v := op.(type) {
|
||||
case *getValuesAtTimeOp:
|
||||
return getValuesAtTimes.Give(v)
|
||||
case *getValuesAtIntervalOp:
|
||||
return getValuesAtIntervals.Give(v)
|
||||
case *getValuesAlongRangeOp:
|
||||
return getValuesAlongRanges.Give(v)
|
||||
case *getValueRangeAtIntervalOp:
|
||||
return getValuesAtIntervalAlongRanges.Give(v)
|
||||
default:
|
||||
panic("unrecognized operation")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
|
|||
return t, ok, err
|
||||
}
|
||||
if !ok {
|
||||
return t, ok, err
|
||||
return t, ok, nil
|
||||
}
|
||||
t = time.Unix(v.GetTimestamp(), 0)
|
||||
return t, true, nil
|
||||
|
|
|
@ -38,7 +38,7 @@ type Persistence interface {
|
|||
|
||||
// Close reaps all of the underlying system resources associated with this
|
||||
// persistence.
|
||||
Close()
|
||||
Close() error
|
||||
// Has informs the user whether a given key exists in the database.
|
||||
Has(key proto.Message) (bool, error)
|
||||
// Get retrieves the key from the database if it exists or returns nil if
|
||||
|
|
|
@ -18,8 +18,6 @@ import (
|
|||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/jmhodges/levigo"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
)
|
||||
|
||||
type batch struct {
|
||||
|
@ -35,13 +33,34 @@ func NewBatch() *batch {
|
|||
}
|
||||
|
||||
func (b *batch) Drop(key proto.Message) {
|
||||
b.batch.Delete(coding.NewPBEncoder(key).MustEncode())
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
if err := buf.Marshal(key); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
b.batch.Delete(buf.Bytes())
|
||||
|
||||
b.drops++
|
||||
}
|
||||
|
||||
func (b *batch) Put(key, value proto.Message) {
|
||||
b.batch.Put(coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
|
||||
keyBuf, _ := buffers.Get()
|
||||
defer buffers.Give(keyBuf)
|
||||
|
||||
if err := keyBuf.Marshal(key); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
valBuf, _ := buffers.Get()
|
||||
defer buffers.Give(valBuf)
|
||||
|
||||
if err := valBuf.Marshal(value); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
b.batch.Put(keyBuf.Bytes(), valBuf.Bytes())
|
||||
|
||||
b.puts++
|
||||
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/utility"
|
||||
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
)
|
||||
|
||||
var buffers = newBufferList(50)
|
||||
|
||||
type bufferList struct {
|
||||
l utility.FreeList
|
||||
}
|
||||
|
||||
func (l *bufferList) Get() (*proto.Buffer, bool) {
|
||||
if v, ok := l.l.Get(); ok {
|
||||
return v.(*proto.Buffer), ok
|
||||
}
|
||||
|
||||
return proto.NewBuffer(make([]byte, 0, 4096)), false
|
||||
}
|
||||
|
||||
func (l *bufferList) Give(v *proto.Buffer) bool {
|
||||
v.Reset()
|
||||
|
||||
return l.l.Give(v)
|
||||
}
|
||||
|
||||
func newBufferList(cap int) *bufferList {
|
||||
return &bufferList{
|
||||
l: utility.NewFreeList(cap),
|
||||
}
|
||||
}
|
|
@ -13,30 +13,29 @@
|
|||
|
||||
package leveldb
|
||||
|
||||
import (
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
)
|
||||
|
||||
// TODO: Evaluate whether to use coding.Encoder for the key and values instead
|
||||
// raw bytes for consistency reasons.
|
||||
|
||||
// Iterator wraps Levigo and LevelDB's iterator behaviors in a manner that is
|
||||
// conducive to IO-free testing.
|
||||
//
|
||||
// It borrows some of the operational assumptions from goskiplist, which
|
||||
// functions very similarly, in that it uses no separate Valid method to
|
||||
// determine health. All methods that have a return signature of (ok bool)
|
||||
// assume in the real LevelDB case that if ok == false that the iterator
|
||||
// must be disposed of at this given instance and recreated if future
|
||||
// work is desired. This is a quirk of LevelDB itself!
|
||||
type Iterator interface {
|
||||
// GetError reports low-level errors, if available. This should not indicate
|
||||
// that the iterator is necessarily unhealthy but maybe that the underlying
|
||||
// table is corrupted itself. See the notes above for (ok bool) return
|
||||
// signatures to determine iterator health.
|
||||
GetError() error
|
||||
Key() []byte
|
||||
Next() (ok bool)
|
||||
Previous() (ok bool)
|
||||
Seek(key []byte) (ok bool)
|
||||
SeekToFirst() (ok bool)
|
||||
SeekToLast() (ok bool)
|
||||
Value() []byte
|
||||
Close()
|
||||
Error() error
|
||||
Valid() bool
|
||||
|
||||
SeekToFirst() bool
|
||||
SeekToLast() bool
|
||||
Seek(proto.Message) bool
|
||||
|
||||
Next() bool
|
||||
Previous() bool
|
||||
|
||||
Key(proto.Message) error
|
||||
Value(proto.Message) error
|
||||
|
||||
Close() error
|
||||
|
||||
rawKey() []byte
|
||||
rawValue() []byte
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/jmhodges/levigo"
|
||||
|
||||
"github.com/prometheus/prometheus/coding"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/raw"
|
||||
)
|
||||
|
@ -84,9 +83,9 @@ func (i levigoIterator) String() string {
|
|||
return fmt.Sprintf("levigoIterator created at %s that is %s and %s and %s", i.creationTime, open, valid, snapshotted)
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Close() {
|
||||
func (i *levigoIterator) Close() error {
|
||||
if i.closed {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if i.iterator != nil {
|
||||
|
@ -108,11 +107,18 @@ func (i *levigoIterator) Close() {
|
|||
i.closed = true
|
||||
i.valid = false
|
||||
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Seek(key []byte) bool {
|
||||
i.iterator.Seek(key)
|
||||
func (i *levigoIterator) Seek(m proto.Message) bool {
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
if err := buf.Marshal(m); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
i.iterator.Seek(buf.Bytes())
|
||||
|
||||
i.valid = i.iterator.Valid()
|
||||
|
||||
|
@ -151,18 +157,40 @@ func (i *levigoIterator) Previous() bool {
|
|||
return i.valid
|
||||
}
|
||||
|
||||
func (i levigoIterator) Key() (key []byte) {
|
||||
func (i *levigoIterator) rawKey() (key []byte) {
|
||||
return i.iterator.Key()
|
||||
}
|
||||
|
||||
func (i levigoIterator) Value() (value []byte) {
|
||||
func (i *levigoIterator) rawValue() (value []byte) {
|
||||
return i.iterator.Value()
|
||||
}
|
||||
|
||||
func (i levigoIterator) GetError() (err error) {
|
||||
func (i *levigoIterator) Error() (err error) {
|
||||
return i.iterator.GetError()
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Key(m proto.Message) error {
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
buf.SetBuf(i.iterator.Key())
|
||||
|
||||
return buf.Unmarshal(m)
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Value(m proto.Message) error {
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
buf.SetBuf(i.iterator.Value())
|
||||
|
||||
return buf.Unmarshal(m)
|
||||
}
|
||||
|
||||
func (i *levigoIterator) Valid() bool {
|
||||
return i.valid
|
||||
}
|
||||
|
||||
type Compression uint
|
||||
|
||||
const (
|
||||
|
@ -229,7 +257,7 @@ func NewLevelDBPersistence(o LevelDBOptions) (*LevelDBPersistence, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) Close() {
|
||||
func (l *LevelDBPersistence) Close() error {
|
||||
// These are deferred to take advantage of forced closing in case of stack
|
||||
// unwinding due to anomalies.
|
||||
defer func() {
|
||||
|
@ -268,11 +296,18 @@ func (l *LevelDBPersistence) Close() {
|
|||
}
|
||||
}()
|
||||
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
|
||||
raw, err := l.storage.Get(l.readOptions, coding.NewPBEncoder(k).MustEncode())
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
if err := buf.Marshal(k); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
raw, err := l.storage.Get(l.readOptions, buf.Bytes())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -284,8 +319,9 @@ func (l *LevelDBPersistence) Get(k, v proto.Message) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
err = proto.Unmarshal(raw, v)
|
||||
if err != nil {
|
||||
buf.SetBuf(raw)
|
||||
|
||||
if err := buf.Unmarshal(v); err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
|
@ -297,11 +333,32 @@ func (l *LevelDBPersistence) Has(k proto.Message) (has bool, err error) {
|
|||
}
|
||||
|
||||
func (l *LevelDBPersistence) Drop(k proto.Message) error {
|
||||
return l.storage.Delete(l.writeOptions, coding.NewPBEncoder(k).MustEncode())
|
||||
buf, _ := buffers.Get()
|
||||
defer buffers.Give(buf)
|
||||
|
||||
if err := buf.Marshal(k); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return l.storage.Delete(l.writeOptions, buf.Bytes())
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) Put(key, value proto.Message) error {
|
||||
return l.storage.Put(l.writeOptions, coding.NewPBEncoder(key).MustEncode(), coding.NewPBEncoder(value).MustEncode())
|
||||
keyBuf, _ := buffers.Get()
|
||||
defer buffers.Give(keyBuf)
|
||||
|
||||
if err := keyBuf.Marshal(key); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
valBuf, _ := buffers.Get()
|
||||
defer buffers.Give(valBuf)
|
||||
|
||||
if err := valBuf.Marshal(value); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return l.storage.Put(l.writeOptions, keyBuf.Bytes(), valBuf.Bytes())
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) Commit(b raw.Batch) (err error) {
|
||||
|
@ -333,7 +390,10 @@ func (l *LevelDBPersistence) Prune() {
|
|||
}
|
||||
|
||||
func (l *LevelDBPersistence) Size() (uint64, error) {
|
||||
iterator := l.NewIterator(false)
|
||||
iterator, err := l.NewIterator(false)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer iterator.Close()
|
||||
|
||||
if !iterator.SeekToFirst() {
|
||||
|
@ -342,13 +402,13 @@ func (l *LevelDBPersistence) Size() (uint64, error) {
|
|||
|
||||
keyspace := levigo.Range{}
|
||||
|
||||
keyspace.Start = iterator.Key()
|
||||
keyspace.Start = iterator.rawKey()
|
||||
|
||||
if !iterator.SeekToLast() {
|
||||
return 0, fmt.Errorf("could not seek to last key")
|
||||
}
|
||||
|
||||
keyspace.Limit = iterator.Key()
|
||||
keyspace.Limit = iterator.rawKey()
|
||||
|
||||
sizes := l.storage.GetApproximateSizes([]levigo.Range{keyspace})
|
||||
total := uint64(0)
|
||||
|
@ -374,7 +434,7 @@ func (l *LevelDBPersistence) Size() (uint64, error) {
|
|||
// will be leaked.
|
||||
//
|
||||
// The iterator is optionally snapshotable.
|
||||
func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
|
||||
func (l *LevelDBPersistence) NewIterator(snapshotted bool) (Iterator, error) {
|
||||
var (
|
||||
snapshot *levigo.Snapshot
|
||||
readOptions *levigo.ReadOptions
|
||||
|
@ -396,27 +456,27 @@ func (l *LevelDBPersistence) NewIterator(snapshotted bool) Iterator {
|
|||
readOptions: readOptions,
|
||||
snapshot: snapshot,
|
||||
storage: l.storage,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) {
|
||||
var (
|
||||
iterator = l.NewIterator(true)
|
||||
valid bool
|
||||
)
|
||||
iterator, err := l.NewIterator(true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
defer iterator.Close()
|
||||
|
||||
for valid = iterator.SeekToFirst(); valid; valid = iterator.Next() {
|
||||
err = iterator.GetError()
|
||||
if err != nil {
|
||||
return
|
||||
for valid := iterator.SeekToFirst(); valid; valid = iterator.Next() {
|
||||
if err = iterator.Error(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
decodedKey, decodeErr := decoder.DecodeKey(iterator.Key())
|
||||
decodedKey, decodeErr := decoder.DecodeKey(iterator.rawKey())
|
||||
if decodeErr != nil {
|
||||
continue
|
||||
}
|
||||
decodedValue, decodeErr := decoder.DecodeValue(iterator.Value())
|
||||
decodedValue, decodeErr := decoder.DecodeValue(iterator.rawValue())
|
||||
if decodeErr != nil {
|
||||
continue
|
||||
}
|
||||
|
@ -436,6 +496,5 @@ func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter stora
|
|||
}
|
||||
}
|
||||
}
|
||||
scannedEntireCorpus = true
|
||||
return
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
// Copyright 2013 Prometheus Team
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package utility
|
||||
|
||||
type FreeList chan interface{}
|
||||
|
||||
func NewFreeList(cap int) FreeList {
|
||||
return make(FreeList, cap)
|
||||
}
|
||||
|
||||
func (l FreeList) Get() (interface{}, bool) {
|
||||
select {
|
||||
case v := <-l:
|
||||
return v, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func (l FreeList) Give(v interface{}) bool {
|
||||
select {
|
||||
case l <- v:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (l FreeList) Close() {
|
||||
close(l)
|
||||
|
||||
for _ = range l {
|
||||
}
|
||||
}
|
16
web/web.go
16
web/web.go
|
@ -21,6 +21,9 @@ import (
|
|||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
pprof_runtime "runtime/pprof"
|
||||
|
||||
"code.google.com/p/gorest"
|
||||
"github.com/golang/glog"
|
||||
|
@ -63,6 +66,7 @@ func (w WebService) ServeForever() error {
|
|||
exp.Handle("/databases", w.DatabasesHandler)
|
||||
exp.Handle("/alerts", w.AlertsHandler)
|
||||
exp.HandleFunc("/graph", graphHandler)
|
||||
exp.HandleFunc("/heap", dumpHeap)
|
||||
|
||||
exp.Handle("/api/", compressionHandler{handler: gorest.Handle()})
|
||||
exp.Handle("/metrics", prometheus.DefaultHandler)
|
||||
|
@ -139,6 +143,18 @@ func executeTemplate(w http.ResponseWriter, name string, data interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
func dumpHeap(w http.ResponseWriter, r *http.Request) {
|
||||
target := fmt.Sprintf("/tmp/%d.heap", time.Now().Unix())
|
||||
f, err := os.Create(target)
|
||||
if err != nil {
|
||||
glog.Error("Could not dump heap: ", err)
|
||||
}
|
||||
fmt.Fprintf(w, "Writing to %s...", target)
|
||||
defer f.Close()
|
||||
pprof_runtime.WriteHeapProfile(f)
|
||||
fmt.Fprintf(w, "Done")
|
||||
}
|
||||
|
||||
func MustBuildServerUrl() string {
|
||||
_, port, err := net.SplitHostPort(*listenAddress)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue