mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
391 lines
12 KiB
391 lines
12 KiB
// Copyright 2013 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package metric
|
|
|
|
import (
|
|
"code.google.com/p/goprotobuf/proto"
|
|
"fmt"
|
|
"github.com/prometheus/prometheus/coding"
|
|
"github.com/prometheus/prometheus/model"
|
|
dto "github.com/prometheus/prometheus/model/generated"
|
|
"github.com/prometheus/prometheus/storage"
|
|
"github.com/prometheus/prometheus/storage/raw"
|
|
"github.com/prometheus/prometheus/storage/raw/leveldb"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// CurationState contains high-level curation state information for the
|
|
// heads-up-display.
|
|
type CurationState struct {
|
|
Active bool
|
|
Name string
|
|
Limit time.Duration
|
|
Fingerprint model.Fingerprint
|
|
}
|
|
|
|
// watermarkFilter determines whether to include or exclude candidate
|
|
// values from the curation process by virtue of how old the high watermark is.
|
|
type watermarkFilter struct {
|
|
// curationState is the data store for curation remarks.
|
|
curationState raw.Persistence
|
|
// ignoreYoungerThan conveys this filter's policy of not working on elements
|
|
// younger than a given relative time duration. This is persisted to the
|
|
// curation remark database (curationState) to indicate how far a given
|
|
// policy of this type has progressed.
|
|
ignoreYoungerThan time.Duration
|
|
// processor is the post-processor that performs whatever action is desired on
|
|
// the data that is deemed valid to be worked on.
|
|
processor Processor
|
|
// stop functions as the global stop channel for all future operations.
|
|
stop chan bool
|
|
// stopAt is used to determine the elegibility of series for compaction.
|
|
stopAt time.Time
|
|
// status is the outbound channel for notifying the status page of its state.
|
|
status chan CurationState
|
|
}
|
|
|
|
// curator is responsible for effectuating a given curation policy across the
|
|
// stored samples on-disk. This is useful to compact sparse sample values into
|
|
// single sample entities to reduce keyspace load on the datastore.
|
|
type Curator struct {
|
|
// Stop functions as a channel that when empty allows the curator to operate.
|
|
// The moment a value is ingested inside of it, the curator goes into drain
|
|
// mode.
|
|
Stop chan bool
|
|
}
|
|
|
|
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
|
|
// into (model.Fingerprint, model.Watermark) doubles.
|
|
type watermarkDecoder struct{}
|
|
|
|
// watermarkOperator scans over the curator.samples table for metrics whose
|
|
// high watermark has been determined to be allowable for curation. This type
|
|
// is individually responsible for compaction.
|
|
//
|
|
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
|
|
// forward until the stop point or end of the series is reached.
|
|
type watermarkOperator struct {
|
|
// curationState is the data store for curation remarks.
|
|
curationState raw.Persistence
|
|
// diskFrontier models the available seekable ranges for the provided
|
|
// sampleIterator.
|
|
diskFrontier diskFrontier
|
|
// ignoreYoungerThan is passed into the curation remark for the given series.
|
|
ignoreYoungerThan time.Duration
|
|
// processor is responsible for executing a given stategy on the
|
|
// to-be-operated-on series.
|
|
processor Processor
|
|
// sampleIterator is a snapshotted iterator for the time series.
|
|
sampleIterator leveldb.Iterator
|
|
// samples
|
|
samples raw.Persistence
|
|
// stopAt is a cue for when to stop mutating a given series.
|
|
stopAt time.Time
|
|
}
|
|
|
|
// run facilitates the curation lifecycle.
|
|
//
|
|
// recencyThreshold represents the most recent time up to which values will be
|
|
// curated.
|
|
// curationState is the on-disk store where the curation remarks are made for
|
|
// how much progress has been made.
|
|
func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
|
|
defer func(t time.Time) {
|
|
duration := float64(time.Since(t) / time.Millisecond)
|
|
|
|
labels := map[string]string{
|
|
cutOff: fmt.Sprint(ignoreYoungerThan),
|
|
processorName: processor.Name(),
|
|
result: success,
|
|
}
|
|
if err != nil {
|
|
labels[result] = failure
|
|
}
|
|
|
|
curationDuration.IncrementBy(labels, duration)
|
|
curationDurations.Add(labels, duration)
|
|
}(time.Now())
|
|
defer func() {
|
|
status <- CurationState{
|
|
Active: false,
|
|
}
|
|
}()
|
|
|
|
iterator := samples.NewIterator(true)
|
|
defer iterator.Close()
|
|
|
|
diskFrontier, err := newDiskFrontier(iterator)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if diskFrontier == nil {
|
|
// No sample database exists; no work to do!
|
|
return
|
|
}
|
|
|
|
decoder := watermarkDecoder{}
|
|
|
|
filter := watermarkFilter{
|
|
curationState: curationState,
|
|
ignoreYoungerThan: ignoreYoungerThan,
|
|
processor: processor,
|
|
status: status,
|
|
stop: c.Stop,
|
|
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
|
}
|
|
|
|
// Right now, the ability to stop a curation is limited to the beginning of
|
|
// each fingerprint cycle. It is impractical to cease the work once it has
|
|
// begun for a given series.
|
|
operator := watermarkOperator{
|
|
curationState: curationState,
|
|
diskFrontier: *diskFrontier,
|
|
processor: processor,
|
|
ignoreYoungerThan: ignoreYoungerThan,
|
|
sampleIterator: iterator,
|
|
samples: samples,
|
|
stopAt: instant.Add(-1 * ignoreYoungerThan),
|
|
}
|
|
|
|
_, err = watermarks.ForEach(decoder, filter, operator)
|
|
|
|
return
|
|
}
|
|
|
|
// drain instructs the curator to stop at the next convenient moment as to not
|
|
// introduce data inconsistencies.
|
|
func (c Curator) Drain() {
|
|
if len(c.Stop) == 0 {
|
|
c.Stop <- true
|
|
}
|
|
}
|
|
|
|
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
|
|
key := &dto.Fingerprint{}
|
|
bytes := in.([]byte)
|
|
|
|
err = proto.Unmarshal(bytes, key)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
out = model.NewFingerprintFromDTO(key)
|
|
|
|
return
|
|
}
|
|
|
|
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
|
|
dto := &dto.MetricHighWatermark{}
|
|
bytes := in.([]byte)
|
|
|
|
err = proto.Unmarshal(bytes, dto)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
out = model.NewWatermarkFromHighWatermarkDTO(dto)
|
|
|
|
return
|
|
}
|
|
|
|
func (w watermarkFilter) shouldStop() bool {
|
|
return len(w.stop) != 0
|
|
}
|
|
|
|
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
|
|
rawSignature, err := processor.Signature()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
curationKey := model.CurationKey{
|
|
Fingerprint: fingerprint,
|
|
ProcessorMessageRaw: rawSignature,
|
|
ProcessorMessageTypeName: processor.Name(),
|
|
IgnoreYoungerThan: ignoreYoungerThan,
|
|
}.ToDTO()
|
|
curationValue := &dto.CurationValue{}
|
|
|
|
rawKey := coding.NewProtocolBuffer(curationKey)
|
|
|
|
has, err := states.Has(rawKey)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !has {
|
|
return
|
|
}
|
|
|
|
rawCurationValue, err := states.Get(rawKey)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
err = proto.Unmarshal(rawCurationValue, curationValue)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
baseRemark := model.NewCurationRemarkFromDTO(curationValue)
|
|
remark = &baseRemark
|
|
|
|
return
|
|
}
|
|
|
|
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) {
|
|
fingerprint := key.(model.Fingerprint)
|
|
|
|
defer func() {
|
|
labels := map[string]string{
|
|
cutOff: fmt.Sprint(w.ignoreYoungerThan),
|
|
result: strings.ToLower(r.String()),
|
|
processorName: w.processor.Name(),
|
|
}
|
|
|
|
curationFilterOperations.Increment(labels)
|
|
}()
|
|
|
|
defer func() {
|
|
w.status <- CurationState{
|
|
Active: true,
|
|
Name: w.processor.Name(),
|
|
Limit: w.ignoreYoungerThan,
|
|
Fingerprint: fingerprint,
|
|
}
|
|
}()
|
|
|
|
if w.shouldStop() {
|
|
return storage.STOP
|
|
}
|
|
|
|
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if curationRemark == nil {
|
|
r = storage.ACCEPT
|
|
return
|
|
}
|
|
if !curationRemark.OlderThan(w.stopAt) {
|
|
return storage.SKIP
|
|
}
|
|
watermark := value.(model.Watermark)
|
|
if !curationRemark.OlderThan(watermark.Time) {
|
|
return storage.SKIP
|
|
}
|
|
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if curationConsistent {
|
|
return storage.SKIP
|
|
}
|
|
|
|
return storage.ACCEPT
|
|
}
|
|
|
|
// curationConsistent determines whether the given metric is in a dirty state
|
|
// and needs curation.
|
|
func (w watermarkFilter) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
|
|
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, f)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !curationRemark.OlderThan(watermark.Time) {
|
|
consistent = true
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorError) {
|
|
fingerprint := key.(model.Fingerprint)
|
|
|
|
seriesFrontier, err := newSeriesFrontier(fingerprint, w.diskFrontier, w.sampleIterator)
|
|
if err != nil || seriesFrontier == nil {
|
|
// An anomaly with the series frontier is severe in the sense that some sort
|
|
// of an illegal state condition exists in the storage layer, which would
|
|
// probably signify an illegal disk frontier.
|
|
return &storage.OperatorError{error: err, Continuable: false}
|
|
}
|
|
|
|
curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint)
|
|
if err != nil {
|
|
// An anomaly with the curation remark is likely not fatal in the sense that
|
|
// there was a decoding error with the entity and shouldn't be cause to stop
|
|
// work. The process will simply start from a pessimistic work time and
|
|
// work forward. With an idempotent processor, this is safe.
|
|
return &storage.OperatorError{error: err, Continuable: true}
|
|
}
|
|
|
|
startKey := model.SampleKey{
|
|
Fingerprint: fingerprint,
|
|
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
|
|
}
|
|
|
|
prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode()
|
|
if err != nil {
|
|
// An encoding failure of a key is no reason to stop.
|
|
return &storage.OperatorError{error: err, Continuable: true}
|
|
}
|
|
if !w.sampleIterator.Seek(prospectiveKey) {
|
|
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
|
// no work may occur, and the iterator cannot be recovered.
|
|
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false}
|
|
}
|
|
|
|
newestAllowedSample := w.stopAt
|
|
if !newestAllowedSample.Before(seriesFrontier.lastSupertime) {
|
|
newestAllowedSample = seriesFrontier.lastSupertime
|
|
}
|
|
|
|
lastTime, err := w.processor.Apply(w.sampleIterator, w.samples, newestAllowedSample, fingerprint)
|
|
if err != nil {
|
|
// We can't divine the severity of a processor error without refactoring the
|
|
// interface.
|
|
return &storage.OperatorError{error: err, Continuable: false}
|
|
}
|
|
|
|
err = w.refreshCurationRemark(fingerprint, lastTime)
|
|
if err != nil {
|
|
// Under the assumption that the processors are idempotent, they can be
|
|
// re-run; thusly, the commitment of the curation remark is no cause
|
|
// to cease further progress.
|
|
return &storage.OperatorError{error: err, Continuable: true}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (w watermarkOperator) refreshCurationRemark(f model.Fingerprint, finished time.Time) (err error) {
|
|
signature, err := w.processor.Signature()
|
|
if err != nil {
|
|
return
|
|
}
|
|
curationKey := model.CurationKey{
|
|
Fingerprint: f,
|
|
ProcessorMessageRaw: signature,
|
|
ProcessorMessageTypeName: w.processor.Name(),
|
|
IgnoreYoungerThan: w.ignoreYoungerThan,
|
|
}.ToDTO()
|
|
curationValue := model.CurationRemark{
|
|
LastCompletionTimestamp: finished,
|
|
}.ToDTO()
|
|
|
|
err = w.curationState.Put(coding.NewProtocolBuffer(curationKey), coding.NewProtocolBuffer(curationValue))
|
|
|
|
return
|
|
}
|