You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
prometheus/storage/metric/tiered.go

599 lines
16 KiB

12 years ago
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"fmt"
12 years ago
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
12 years ago
"github.com/prometheus/prometheus/model"
12 years ago
dto "github.com/prometheus/prometheus/model/generated"
12 years ago
"github.com/prometheus/prometheus/storage"
12 years ago
"sort"
12 years ago
"sync"
"time"
)
// tieredStorage both persists samples and generates materialized views for
// queries.
type tieredStorage struct {
appendToDiskQueue chan model.Sample
appendToMemoryQueue chan model.Sample
12 years ago
diskFrontier *diskFrontier
12 years ago
diskStorage *LevelDBMetricPersistence
12 years ago
draining chan bool
12 years ago
flushMemoryInterval time.Duration
memoryArena memorySeriesStorage
memoryTTL time.Duration
mutex sync.Mutex
viewQueue chan viewJob
writeMemoryInterval time.Duration
}
// viewJob encapsulates a request to extract sample values from the datastore.
type viewJob struct {
builder ViewRequestBuilder
output chan View
err chan error
}
12 years ago
// Provides a unified means for batch appending values into the datastore along
// with querying for values in an efficient way.
12 years ago
type Storage interface {
12 years ago
// Enqueues a Sample for storage.
AppendSample(model.Sample) error
// Enqueus a ViewRequestBuilder for materialization, subject to a timeout.
MakeView(request ViewRequestBuilder, timeout time.Duration) (View, error)
// Starts serving requests.
12 years ago
Serve()
12 years ago
// Stops the storage subsystem, flushing all pending operations.
Drain()
12 years ago
Flush()
12 years ago
}
12 years ago
func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryInterval, writeMemoryInterval, memoryTTL time.Duration, root string) Storage {
diskStorage, err := NewLevelDBMetricPersistence(root)
12 years ago
if err != nil {
panic(err)
}
return &tieredStorage{
appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth),
diskStorage: diskStorage,
12 years ago
draining: make(chan bool),
12 years ago
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL,
viewQueue: make(chan viewJob, viewQueueDepth),
writeMemoryInterval: writeMemoryInterval,
}
}
12 years ago
func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
if len(t.draining) > 0 {
return fmt.Errorf("Storage is in the process of draining.")
}
12 years ago
t.appendToMemoryQueue <- s
12 years ago
return
}
func (t *tieredStorage) Drain() {
12 years ago
if len(t.draining) == 0 {
t.draining <- true
}
12 years ago
}
func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
12 years ago
if len(t.draining) > 0 {
err = fmt.Errorf("Storage is in the process of draining.")
return
}
12 years ago
result := make(chan View)
errChan := make(chan error)
t.viewQueue <- viewJob{
builder: builder,
output: result,
err: errChan,
}
select {
case value := <-result:
view = value
case err = <-errChan:
return
case <-time.After(deadline):
err = fmt.Errorf("MakeView timed out after %s.", deadline)
}
return
}
12 years ago
func (t *tieredStorage) rebuildDiskFrontier() (err error) {
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
12 years ago
12 years ago
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: rebuildDiskFrontier, result: failure})
}()
i, closer, err := t.diskStorage.metricSamples.GetIterator()
if closer != nil {
defer closer.Close()
}
if err != nil {
panic(err)
}
t.diskFrontier, err = newDiskFrontier(i)
if err != nil {
panic(err)
12 years ago
}
12 years ago
return
12 years ago
}
func (t *tieredStorage) Serve() {
var (
flushMemoryTicker = time.Tick(t.flushMemoryInterval)
writeMemoryTicker = time.Tick(t.writeMemoryInterval)
)
12 years ago
12 years ago
for {
12 years ago
t.reportQueues()
12 years ago
select {
case <-writeMemoryTicker:
t.writeMemory()
case <-flushMemoryTicker:
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
12 years ago
case <-t.draining:
t.flush()
12 years ago
break
12 years ago
}
}
}
12 years ago
func (t *tieredStorage) reportQueues() {
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "occupancy"}, float64(len(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "append_to_memory", "facet": "capacity"}, float64(cap(t.appendToMemoryQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue)))
queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue)))
}
12 years ago
func (t *tieredStorage) writeMemory() {
12 years ago
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: writeMemory, result: failure})
}()
12 years ago
t.mutex.Lock()
defer t.mutex.Unlock()
pendingLength := len(t.appendToMemoryQueue)
for i := 0; i < pendingLength; i++ {
t.memoryArena.AppendSample(<-t.appendToMemoryQueue)
}
}
12 years ago
func (t *tieredStorage) Flush() {
t.flush()
}
12 years ago
// Write all pending appends.
func (t *tieredStorage) flush() (err error) {
t.writeMemory()
t.flushMemory()
return
}
type memoryToDiskFlusher struct {
toDiskQueue chan model.Sample
disk MetricPersistence
olderThan time.Time
valuesAccepted int
valuesRejected int
}
type memoryToDiskFlusherVisitor struct {
stream stream
flusher *memoryToDiskFlusher
}
func (f memoryToDiskFlusherVisitor) DecodeKey(in interface{}) (out interface{}, err error) {
out = time.Time(in.(skipListTime))
return
}
func (f memoryToDiskFlusherVisitor) DecodeValue(in interface{}) (out interface{}, err error) {
out = in.(value).get()
return
}
func (f memoryToDiskFlusherVisitor) Filter(key, value interface{}) (filterResult storage.FilterResult) {
var (
recordTime = key.(time.Time)
)
if recordTime.Before(f.flusher.olderThan) {
f.flusher.valuesAccepted++
return storage.ACCEPT
}
f.flusher.valuesRejected++
return storage.STOP
}
func (f memoryToDiskFlusherVisitor) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
recordTime = key.(time.Time)
recordValue = value.(model.SampleValue)
)
if len(f.flusher.toDiskQueue) == cap(f.flusher.toDiskQueue) {
f.flusher.Flush()
}
f.flusher.toDiskQueue <- model.Sample{
Metric: f.stream.metric,
Timestamp: recordTime,
Value: recordValue,
}
f.stream.values.Delete(skipListTime(recordTime))
return
}
func (f *memoryToDiskFlusher) ForStream(stream stream) (decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) {
visitor := memoryToDiskFlusherVisitor{
stream: stream,
flusher: f,
}
12 years ago
// fmt.Printf("fingerprint -> %s\n", model.NewFingerprintFromMetric(stream.metric).ToRowKey())
12 years ago
return visitor, visitor, visitor
}
func (f *memoryToDiskFlusher) Flush() {
length := len(f.toDiskQueue)
samples := model.Samples{}
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue)
}
f.disk.AppendSamples(samples)
}
func (f memoryToDiskFlusher) Close() {
f.Flush()
}
// Persist a whole bunch of samples to the datastore.
func (t *tieredStorage) flushMemory() {
12 years ago
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
recordOutcome(duration, nil, map[string]string{operation: appendSample, result: success}, map[string]string{operation: flushMemory, result: failure})
}()
12 years ago
t.mutex.Lock()
defer t.mutex.Unlock()
flusher := &memoryToDiskFlusher{
disk: t.diskStorage,
olderThan: time.Now().Add(-1 * t.memoryTTL),
toDiskQueue: t.appendToDiskQueue,
}
defer flusher.Close()
t.memoryArena.ForEachSample(flusher)
return
}
func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
12 years ago
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
12 years ago
12 years ago
recordOutcome(duration, err, map[string]string{operation: appendSample, result: success}, map[string]string{operation: renderView, result: failure})
}()
12 years ago
12 years ago
t.mutex.Lock()
defer t.mutex.Unlock()
12 years ago
var (
12 years ago
scans = viewJob.builder.ScanJobs()
// standingOperations = ops{}
// lastTime = time.Time{}
12 years ago
view = newView()
12 years ago
)
12 years ago
// Rebuilding of the frontier should happen on a conditional basis if a
// (fingerprint, timestamp) tuple is outside of the current frontier.
err = t.rebuildDiskFrontier()
if err != nil {
panic(err)
}
12 years ago
12 years ago
iterator, closer, err := t.diskStorage.metricSamples.GetIterator()
if closer != nil {
defer closer.Close()
}
if err != nil {
panic(err)
}
for _, scanJob := range scans {
// XXX: Memoize the last retrieval for forward scans.
var (
12 years ago
// standingOperations ops
12 years ago
)
12 years ago
// fmt.Printf("Starting scan of %s...\n", scanJob)
12 years ago
if !(t.diskFrontier == nil || scanJob.fingerprint.Less(t.diskFrontier.firstFingerprint) || t.diskFrontier.lastFingerprint.Less(scanJob.fingerprint)) {
12 years ago
// fmt.Printf("Using diskFrontier %s\n", t.diskFrontier)
12 years ago
seriesFrontier, err := newSeriesFrontier(scanJob.fingerprint, *t.diskFrontier, iterator)
12 years ago
// fmt.Printf("Using seriesFrontier %s\n", seriesFrontier)
12 years ago
if err != nil {
panic(err)
}
12 years ago
12 years ago
if seriesFrontier != nil {
12 years ago
var (
targetKey = &dto.SampleKey{}
foundKey = &dto.SampleKey{}
foundValue *dto.SampleValueSeries
)
12 years ago
12 years ago
for _, operation := range scanJob.operations {
if seriesFrontier.lastTime.Before(operation.StartsAt()) {
12 years ago
// fmt.Printf("operation %s occurs after %s; aborting...\n", operation, seriesFrontier.lastTime)
12 years ago
break
}
12 years ago
12 years ago
scanJob.operations = scanJob.operations[1:len(scanJob.operations)]
12 years ago
12 years ago
if operation.StartsAt().Before(seriesFrontier.firstSupertime) {
12 years ago
// fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime)
12 years ago
continue
}
12 years ago
targetKey.Fingerprint = scanJob.fingerprint.ToDTO()
targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt())
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()
12 years ago
// XXX: Use frontiers to manage out of range queries.
12 years ago
iterator.Seek(rawKey)
foundKey, err = extractSampleKey(iterator)
if err != nil {
panic(err)
}
12 years ago
var (
fst = indexable.DecodeTime(foundKey.Timestamp)
lst = time.Unix(*foundKey.LastTimestamp, 0)
)
12 years ago
if !((operation.StartsAt().Before(fst)) || lst.Before(operation.StartsAt())) {
12 years ago
// fmt.Printf("operation %s occurs inside of %s...\n", operation, foundKey)
12 years ago
foundValue, err = extractSampleValue(iterator)
if err != nil {
panic(err)
}
} else if operation.StartsAt().Before(fst) {
fmt.Printf("operation %s may occur in next entity; fast forwarding...\n", operation)
panic("oops")
12 years ago
} else {
12 years ago
panic("illegal state")
}
var (
elementCount = len(foundValue.Value)
searcher = func(i int) bool {
return time.Unix(*foundValue.Value[i].Timestamp, 0).After(operation.StartsAt())
12 years ago
}
12 years ago
index = sort.Search(elementCount, searcher)
)
12 years ago
12 years ago
if index != elementCount {
if index > 0 {
index--
}
12 years ago
12 years ago
foundValue.Value = foundValue.Value[index:elementCount]
}
12 years ago
switch operation.(type) {
case getValuesAtTimeOp:
if len(foundValue.Value) > 0 {
view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[0].Timestamp, 0), model.SampleValue(*foundValue.Value[0].Value))
}
if len(foundValue.Value) > 1 {
12 years ago
view.appendSample(scanJob.fingerprint, time.Unix(*foundValue.Value[1].Timestamp, 0), model.SampleValue(*foundValue.Value[1].Value))
12 years ago
}
default:
panic("unhandled")
12 years ago
}
}
}
}
}
12 years ago
// for {
// if len(s.operations) == 0 {
// if len(standingOperations) > 0 {
// var (
// intervals = collectIntervals(standingOperations)
// ranges = collectRanges(standingOperations)
// )
// if len(intervals) > 0 {
// }
// if len(ranges) > 0 {
// if len(ranges) > 0 {
// }
// }
// break
// }
// }
// operation := s.operations[0]
// if operation.StartsAt().Equal(lastTime) {
// standingOperations = append(standingOperations, operation)
// } else {
// standingOperations = ops{operation}
// lastTime = operation.StartsAt()
// }
// s.operations = s.operations[1:len(s.operations)]
// }
12 years ago
viewJob.output <- view
12 years ago
return
12 years ago
}
func (s scanJobs) Represent(d *LevelDBMetricPersistence, m memorySeriesStorage) (storage *memorySeriesStorage, err error) {
if len(s) == 0 {
return
}
iterator, closer, err := d.metricSamples.GetIterator()
if err != nil {
panic(err)
return
}
defer closer.Close()
diskFrontier, err := newDiskFrontier(iterator)
if err != nil {
panic(err)
return
}
if diskFrontier == nil {
panic("diskfrontier == nil")
}
for _, job := range s {
if len(job.operations) == 0 {
panic("len(job.operations) == 0 should never occur")
}
// Determine if the metric is in the known keyspace. This is used as a
// high-level heuristic before comparing the timestamps.
var (
fingerprint = job.fingerprint
absentDiskKeyspace = fingerprint.Less(diskFrontier.firstFingerprint) || diskFrontier.lastFingerprint.Less(fingerprint)
absentMemoryKeyspace = false
)
if _, ok := m.fingerprintToSeries[fingerprint]; !ok {
absentMemoryKeyspace = true
}
var (
firstSupertime time.Time
lastSupertime time.Time
)
var (
_ = absentMemoryKeyspace
_ = firstSupertime
_ = lastSupertime
)
// If the key is present in the disk keyspace, we should find out the maximum
// seek points ahead of time. In the LevelDB case, this will save us from
// having to dispose of and recreate the iterator.
if !absentDiskKeyspace {
seriesFrontier, err := newSeriesFrontier(fingerprint, *diskFrontier, iterator)
if err != nil {
panic(err)
return nil, err
}
if seriesFrontier == nil {
panic("ouch")
}
}
}
return
}
// var (
// memoryLowWaterMark time.Time
// memoryHighWaterMark time.Time
// )
// if !absentMemoryKeyspace {
// }
// // if firstDiskFingerprint.Equal(job.fingerprint) {
// // for _, operation := range job.operations {
// // if o, ok := operation.(getMetricAtTimeOperation); ok {
// // if o.StartTime().Before(firstDiskSuperTime) {
// // }
// // }
// // if o, ok := operation.(GetMetricAtInterval); ok {
// // }
// // }
// // }
// }
// // // Compare the metrics on the basis of the keys.
// // firstSampleInRange = sort.IsSorted(model.Fingerprints{firstDiskFingerprint, s[0].fingerprint})
// // lastSampleInRange = sort.IsSorted(model.Fingerprints{s[s.Len()-1].fingerprint, lastDiskFingerprint})
// // if firstSampleInRange && firstDiskFingerprint.Equal(s[0].fingerprint) {
// // firstSampleInRange = !indexable.DecodeTime(firstKey.Timestamp).After(s.operations[0].StartTime())
// // }
// // if lastSampleInRange && lastDiskFingerprint.Equal(s[s.Len()-1].fingerprint) {
// // lastSampleInRange = !s.operations[s.Len()-1].StartTime().After(indexable.DecodeTime(lastKey.Timestamp))
// // }
// // for _, job := range s {
// // operations := job.operations
// // numberOfOperations := len(operations)
// // for j := 0; j < numberOfOperations; j++ {
// // operationTime := operations[j].StartTime()
// // group, skipAhead := collectOperationsForTime(operationTime, operations[j:numberOfOperations])
// // ranges := collectRanges(group)
// // intervals := collectIntervals(group)
// // fmt.Printf("ranges -> %s\n", ranges)
// // if len(ranges) > 0 {
// // fmt.Printf("d -> %s\n", peekForLongestRange(ranges, ranges[0].through))
// // }
// // j += skipAhead
// // }
// // }