Extract HighWatermarking.

Clean up the rest.
pull/346/head
Matt T. Proud 11 years ago
parent f4669a812c
commit d8792cfd86

@ -17,7 +17,7 @@ include Makefile.INCLUDE
all: binary test
$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) source_path
$(GOCC): $(BUILD_PATH)/cache/$(GOPKG) $(FULL_GOPATH)
tar -C $(BUILD_PATH)/root -xzf $<
touch $@
@ -58,7 +58,7 @@ format:
model: dependencies preparation
$(MAKE) -C model
preparation: $(GOCC) source_path
preparation: $(GOCC) $(FULL_GOPATH)
$(MAKE) -C $(BUILD_PATH)
race_condition_binary: build
@ -76,9 +76,9 @@ search_index:
server: config dependencies model preparation
$(MAKE) -C server
# source_path is responsible for ensuring that the builder has not done anything
# $(FULL_GOPATH) is responsible for ensuring that the builder has not done anything
# stupid like working on Prometheus outside of ${GOPATH}.
source_path:
$(FULL_GOPATH):
-[ -d "$(FULL_GOPATH)" ] || { mkdir -vp $(FULL_GOPATH_BASE) ; ln -s "$(PWD)" "$(FULL_GOPATH)" ; }
[ -d "$(FULL_GOPATH)" ]
@ -94,4 +94,4 @@ update:
web: config dependencies model preparation
$(MAKE) -C web
.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index source_path test tools update
.PHONY: advice binary build clean config dependencies documentation format model package preparation race_condition_binary race_condition_run run search_index test tools update

@ -28,7 +28,6 @@ import (
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/raw/leveldb"
"github.com/prometheus/prometheus/web"
"github.com/prometheus/prometheus/web/api"
)
@ -76,7 +75,7 @@ type prometheus struct {
reportDatabasesTimer *time.Ticker
curationMutex sync.Mutex
curationState chan metric.CurationState
databaseStates chan []leveldb.DatabaseState
databaseStates chan []string
stopBackgroundOperations chan bool
unwrittenSamples chan *extraction.Result
@ -207,7 +206,7 @@ func main() {
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
curationState := make(chan metric.CurationState, 1)
databaseStates := make(chan []leveldb.DatabaseState, 1)
databaseStates := make(chan []string, 1)
// Coprime numbers, fool!
headCompactionTimer := time.NewTicker(*headCompactInterval)
bodyCompactionTimer := time.NewTicker(*bodyCompactInterval)

@ -92,7 +92,7 @@ type watermarkScanner struct {
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t) / time.Millisecond)

@ -33,6 +33,8 @@ type FingerprintMetricIndex interface {
IndexBatch(FingerprintMetricMapping) error
Lookup(*clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error)
Close() error
State() string
Size() (s uint64, present bool, err error)
}
type leveldbFingerprintMetricIndex struct {
@ -49,6 +51,15 @@ func (i *leveldbFingerprintMetricIndex) Close() error {
return nil
}
func (i *leveldbFingerprintMetricIndex) State() string {
return i.p.State()
}
func (i *leveldbFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *leveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch()
defer b.Close()
@ -102,6 +113,8 @@ type LabelNameFingerprintIndex interface {
Lookup(clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error)
Has(clientmodel.LabelName) (ok bool, err error)
Close() error
State() string
Size() (s uint64, present bool, err error)
}
type leveldbLabelNameFingerprintIndex struct {
@ -164,6 +177,15 @@ func (i *leveldbLabelNameFingerprintIndex) Close() error {
return nil
}
func (i *leveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *leveldbLabelNameFingerprintIndex) State() string {
return i.p.State()
}
type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
@ -188,6 +210,8 @@ type LabelSetFingerprintIndex interface {
Lookup(*LabelPair) (m clientmodel.Fingerprints, ok bool, err error)
Has(*LabelPair) (ok bool, err error)
Close() error
State() string
Size() (s uint64, present bool, err error)
}
type leveldbLabelSetFingerprintIndex struct {
@ -265,6 +289,15 @@ func (i *leveldbLabelSetFingerprintIndex) Close() error {
return nil
}
func (i *leveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *leveldbLabelSetFingerprintIndex) State() string {
return i.p.State()
}
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
@ -280,6 +313,8 @@ type MetricMembershipIndex interface {
IndexBatch([]clientmodel.Metric) error
Has(clientmodel.Metric) (ok bool, err error)
Close() error
State() string
Size() (s uint64, present bool, err error)
}
type leveldbMetricMembershipIndex struct {
@ -314,6 +349,15 @@ func (i *leveldbMetricMembershipIndex) Close() error {
return nil
}
func (i *leveldbMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
func (i *leveldbMetricMembershipIndex) State() string {
return i.p.State()
}
type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions
}

@ -39,7 +39,7 @@ type LevelDBMetricPersistence struct {
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
MetricHighWatermarks *leveldb.LevelDBPersistence
MetricHighWatermarks HighWatermarker
metricMembershipIndex MetricMembershipIndex
MetricSamples *leveldb.LevelDBPersistence
}
@ -114,6 +114,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
var err error
emission.fingerprintToMetrics, err = NewLevelDBFingerprintMetricIndex(&LevelDBFingerprintMetricIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metrics by Fingerprint",
Purpose: "Index",
Path: baseDirectory + "/label_name_and_value_pairs_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
},
@ -126,6 +128,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
func() {
var err error
o := &leveldb.LevelDBOptions{
Name: "Samples",
Purpose: "Timeseries",
Path: baseDirectory + "/samples_by_fingerprint",
CacheSizeBytes: *fingerprintsToLabelPairCacheSize,
}
@ -137,11 +141,13 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"High Watermarks by Fingerprint",
func() {
var err error
o := &leveldb.LevelDBOptions{
Path: baseDirectory + "/high_watermarks_by_fingerprint",
CacheSizeBytes: *highWatermarkCacheSize,
}
emission.MetricHighWatermarks, err = leveldb.NewLevelDBPersistence(o)
emission.MetricHighWatermarks, err = NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "High Watermarks",
Purpose: "The youngest sample in the database per metric.",
Path: baseDirectory + "/high_watermarks_by_fingerprint",
CacheSizeBytes: *highWatermarkCacheSize,
}})
workers.MayFail(err)
},
},
@ -151,6 +157,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
var err error
emission.labelNameToFingerprints, err = NewLevelLabelNameFingerprintIndex(&LevelDBLabelNameFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Name",
Purpose: "Index",
Path: baseDirectory + "/fingerprints_by_label_name",
CacheSizeBytes: *labelNameToFingerprintsCacheSize,
},
@ -164,6 +172,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
var err error
emission.labelSetToFingerprints, err = NewLevelDBLabelSetFingerprintIndex(&LevelDBLabelSetFingerprintIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Fingerprints by Label Pair",
Purpose: "Index",
Path: baseDirectory + "/fingerprints_by_label_name_and_value_pair",
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
},
@ -178,6 +188,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
emission.metricMembershipIndex, err = NewLevelDBMetricMembershipIndex(
&LevelDBMetricMembershipIndexOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Metric Membership",
Purpose: "Index",
Path: baseDirectory + "/metric_membership_index",
CacheSizeBytes: *metricMembershipIndexCacheSize,
},
@ -190,6 +202,8 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
func() {
var err error
o := &leveldb.LevelDBOptions{
Name: "Sample Curation Remarks",
Purpose: "Ledger of Progress for Various Curators",
Path: baseDirectory + "/curation_remarks",
CacheSizeBytes: *curationRemarksCacheSize,
}
@ -459,41 +473,16 @@ func (l *LevelDBMetricPersistence) refreshHighWatermarks(groups map[clientmodel.
recordOutcome(duration, err, map[string]string{operation: refreshHighWatermarks, result: success}, map[string]string{operation: refreshHighWatermarks, result: failure})
}(time.Now())
batch := leveldb.NewBatch()
defer batch.Close()
value := &dto.MetricHighWatermark{}
for fingerprint, samples := range groups {
value.Reset()
f := new(dto.Fingerprint)
dumpFingerprint(f, &fingerprint)
present, err := l.MetricHighWatermarks.Get(f, value)
if err != nil {
return err
}
newestSampleTimestamp := samples[len(samples)-1].Timestamp
if !present {
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(f, value)
b := FingerprintHighWatermarkMapping{}
for fp, ss := range groups {
if len(ss) == 0 {
continue
}
// BUG(matt): Repace this with watermark management.
if newestSampleTimestamp.After(time.Unix(value.GetTimestamp(), 0)) {
value.Timestamp = proto.Int64(newestSampleTimestamp.Unix())
batch.Put(f, value)
}
}
err = l.MetricHighWatermarks.Commit(batch)
if err != nil {
return err
b[fp] = ss[len(ss)-1].Timestamp
}
return nil
return l.MetricHighWatermarks.UpdateBatch(b)
}
func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (err error) {
@ -783,7 +772,7 @@ func (l *LevelDBMetricPersistence) CompactKeyspaces() {
// l.fingerprintToMetrics.CompactKeyspace()
// l.labelNameToFingerprints.CompactKeyspace()
// l.labelSetToFingerprints.CompactKeyspace()
l.MetricHighWatermarks.CompactKeyspace()
// l.MetricHighWatermarks.CompactKeyspace()
// l.metricMembershipIndex.CompactKeyspace()
l.MetricSamples.CompactKeyspace()
}
@ -796,30 +785,30 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
}
total += size
// if size, err = l.fingerprintToMetrics.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, _, err = l.fingerprintToMetrics.Size(); err != nil {
return 0, err
}
total += size
// if size, err = l.labelNameToFingerprints.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, _, err = l.labelNameToFingerprints.Size(); err != nil {
return 0, err
}
total += size
// if size, err = l.labelSetToFingerprints.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, _, err = l.labelSetToFingerprints.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.MetricHighWatermarks.ApproximateSize(); err != nil {
if size, _, err = l.MetricHighWatermarks.Size(); err != nil {
return 0, err
}
total += size
// if size, err = l.metricMembershipIndex.ApproximateSize(); err != nil {
// return 0, err
// }
// total += size
if size, _, err = l.metricMembershipIndex.Size(); err != nil {
return 0, err
}
total += size
if size, err = l.MetricSamples.ApproximateSize(); err != nil {
return 0, err
@ -829,43 +818,14 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
return total, nil
}
func (l *LevelDBMetricPersistence) States() []leveldb.DatabaseState {
states := []leveldb.DatabaseState{}
state := l.CurationRemarks.State()
state.Name = "Curation Remarks"
state.Type = "Watermark"
states = append(states, state)
// state = l.fingerprintToMetrics.State()
// state.Name = "Fingerprints to Metrics"
// state.Type = "Index"
// states = append(states, state)
// state = l.labelNameToFingerprints.State()
// state.Name = "Label Name to Fingerprints"
// state.Type = "Inverted Index"
// states = append(states, state)
// state = l.labelSetToFingerprints.State()
// state.Name = "Label Pair to Fingerprints"
// state.Type = "Inverted Index"
// states = append(states, state)
state = l.MetricHighWatermarks.State()
state.Name = "Metric Last Write"
state.Type = "Watermark"
states = append(states, state)
// state = l.metricMembershipIndex.State()
// state.Name = "Metric Membership"
// state.Type = "Index"
// states = append(states, state)
state = l.MetricSamples.State()
state.Name = "Samples"
state.Type = "Time Series"
states = append(states, state)
return states
func (l *LevelDBMetricPersistence) States() []string {
return []string{
l.CurationRemarks.State(),
l.fingerprintToMetrics.State(),
l.labelNameToFingerprints.State(),
l.labelSetToFingerprints.State(),
l.MetricHighWatermarks.State(),
l.metricMembershipIndex.State(),
l.MetricSamples.State(),
}
}

@ -854,10 +854,11 @@ func TestCuratorCompactionProcessor(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
@ -1380,8 +1381,11 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: watermarkDirectory.Path()})
watermarkStates, err := NewLevelDBHighWatermarker(&LevelDBHighWatermarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: watermarkDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
}

@ -331,7 +331,7 @@ func (t *TieredStorage) seriesTooOld(f *clientmodel.Fingerprint, i time.Time) (b
value := &dto.MetricHighWatermark{}
k := &dto.Fingerprint{}
dumpFingerprint(k, f)
diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(k, value)
_, diskHit, err := t.DiskStorage.MetricHighWatermarks.Get(f)
if err != nil {
return false, err
}

@ -23,6 +23,10 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"github.com/prometheus/prometheus/storage/raw/leveldb"
)
// unsafe.Sizeof(watermarks{})
@ -162,3 +166,97 @@ func (lru *WatermarkCache) checkCapacity() {
lru.size -= elementSize
}
}
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
type HighWatermarker interface {
raw.ForEacher
UpdateBatch(FingerprintHighWatermarkMapping) error
Get(*clientmodel.Fingerprint) (t time.Time, ok bool, err error)
Close() error
State() string
Size() (uint64, bool, error)
}
type leveldbHighWatermarker struct {
p *leveldb.LevelDBPersistence
}
func (w *leveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark)
ok, err = w.p.Get(k, v)
if err != nil {
return t, ok, err
}
if !ok {
return t, ok, err
}
t = time.Unix(v.GetTimestamp(), 0)
return t, true, nil
}
func (w *leveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
for fp, t := range m {
existing, present, err := w.Get(&fp)
if err != nil {
return err
}
k := new(dto.Fingerprint)
dumpFingerprint(k, &fp)
v := new(dto.MetricHighWatermark)
if !present {
v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v)
continue
}
// BUG(matt): Repace this with watermark management.
if t.After(existing) {
v.Timestamp = proto.Int64(t.Unix())
batch.Put(k, v)
}
}
return w.p.Commit(batch)
}
func (i *leveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *leveldbHighWatermarker) Close() error {
i.p.Close()
return nil
}
func (i *leveldbHighWatermarker) State() string {
return i.p.State()
}
func (i *leveldbHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
return s, true, err
}
type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &leveldbHighWatermarker{
p: s,
}, nil
}

@ -75,6 +75,6 @@ func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) {
return l.persistence.ApproximateSize()
}
func (l *LevelDBMembershipIndex) State() leveldb.DatabaseState {
func (l *LevelDBMembershipIndex) State() string {
return l.persistence.State()
}

@ -27,7 +27,9 @@ import (
// LevelDBPersistence is a disk-backed sorted key-value store.
type LevelDBPersistence struct {
path string
path string
name string
purpose string
cache *levigo.Cache
filterPolicy *levigo.FilterPolicy
@ -162,7 +164,9 @@ func (i levigoIterator) GetError() (err error) {
}
type LevelDBOptions struct {
Path string
Path string
Name string
Purpose string
CacheSizeBytes int
OpenFileAllowance int
@ -202,7 +206,9 @@ func NewLevelDBPersistence(o *LevelDBOptions) (*LevelDBPersistence, error) {
writeOptions.SetSync(o.FlushOnMutate)
return &LevelDBPersistence{
path: o.Path,
path: o.Path,
name: o.Name,
purpose: o.Purpose,
cache: cache,
filterPolicy: filterPolicy,

@ -14,8 +14,10 @@
package leveldb
import (
"bytes"
"fmt"
"github.com/prometheus/prometheus/utility"
"time"
)
const (
@ -26,9 +28,8 @@ const (
// DatabaseState models a bundle of metadata about a LevelDB database used in
// template format string interpolation.
type DatabaseState struct {
LastRefreshed time.Time
Type string
Name string
Purpose string
Path string
LowLevelStatus string
SSTablesStatus string
@ -36,10 +37,25 @@ type DatabaseState struct {
Error error
}
func (l *LevelDBPersistence) State() DatabaseState {
func (s DatabaseState) String() string {
b := new(bytes.Buffer)
fmt.Fprintln(b, "Name:", s.Name)
fmt.Fprintln(b, "Path:", s.Path)
fmt.Fprintln(b, "Purpose:", s.Purpose)
fmt.Fprintln(b, "Low Level Diagnostics:", s.LowLevelStatus)
fmt.Fprintln(b, "SSTable Statistics:", s.SSTablesStatus)
fmt.Fprintln(b, "Approximate Size:", s.ApproximateSize)
fmt.Fprintln(b, "Error:", s.Error)
return b.String()
}
func (l *LevelDBPersistence) LowLevelState() DatabaseState {
databaseState := DatabaseState{
LastRefreshed: time.Now(),
Path: l.path,
Name: l.name,
Purpose: l.purpose,
LowLevelStatus: l.storage.PropertyValue(statsKey),
SSTablesStatus: l.storage.PropertyValue(sstablesKey),
}
@ -52,3 +68,7 @@ func (l *LevelDBPersistence) State() DatabaseState {
return databaseState
}
func (l *LevelDBPersistence) State() string {
return l.LowLevelState().String()
}

@ -14,15 +14,14 @@
package web
import (
"github.com/prometheus/prometheus/storage/raw/leveldb"
"net/http"
"sync"
)
type DatabasesHandler struct {
States []leveldb.DatabaseState
States []string
Incoming chan []leveldb.DatabaseState
Incoming chan []string
mutex sync.RWMutex
}

Loading…
Cancel
Save