mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
6.4 KiB
243 lines
6.4 KiB
// Copyright 2020 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package main |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"math" |
|
"time" |
|
|
|
"github.com/oklog/ulid" |
|
|
|
"github.com/prometheus/common/promslog" |
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
"github.com/prometheus/prometheus/model/textparse" |
|
"github.com/prometheus/prometheus/tsdb" |
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" |
|
) |
|
|
|
func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) { |
|
var maxt, mint int64 = math.MinInt64, math.MaxInt64 |
|
|
|
for { |
|
entry, err := p.Next() |
|
if errors.Is(err, io.EOF) { |
|
break |
|
} |
|
if err != nil { |
|
return 0, 0, fmt.Errorf("next: %w", err) |
|
} |
|
|
|
if entry != textparse.EntrySeries { |
|
continue |
|
} |
|
|
|
_, ts, _ := p.Series() |
|
if ts == nil { |
|
return 0, 0, errors.New("expected timestamp for series got none") |
|
} |
|
|
|
if *ts > maxt { |
|
maxt = *ts |
|
} |
|
if *ts < mint { |
|
mint = *ts |
|
} |
|
} |
|
|
|
if maxt == math.MinInt64 { |
|
maxt = 0 |
|
} |
|
if mint == math.MaxInt64 { |
|
mint = 0 |
|
} |
|
|
|
return maxt, mint, nil |
|
} |
|
|
|
func getCompatibleBlockDuration(maxBlockDuration int64) int64 { |
|
blockDuration := tsdb.DefaultBlockDuration |
|
if maxBlockDuration > tsdb.DefaultBlockDuration { |
|
ranges := tsdb.ExponentialBlockRanges(tsdb.DefaultBlockDuration, 10, 3) |
|
idx := len(ranges) - 1 // Use largest range if user asked for something enormous. |
|
for i, v := range ranges { |
|
if v > maxBlockDuration { |
|
idx = i - 1 |
|
break |
|
} |
|
} |
|
blockDuration = ranges[idx] |
|
} |
|
return blockDuration |
|
} |
|
|
|
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string) (returnErr error) { |
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration) |
|
mint = blockDuration * (mint / blockDuration) |
|
|
|
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil) |
|
if err != nil { |
|
return err |
|
} |
|
defer func() { |
|
returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err() |
|
}() |
|
|
|
var ( |
|
wroteHeader bool |
|
nextSampleTs int64 = math.MaxInt64 |
|
) |
|
|
|
lb := labels.NewBuilder(labels.EmptyLabels()) |
|
|
|
for t := mint; t <= maxt; t += blockDuration { |
|
tsUpper := t + blockDuration |
|
if nextSampleTs != math.MaxInt64 && nextSampleTs >= tsUpper { |
|
// The next sample is not in this timerange, we can avoid parsing |
|
// the file for this timerange. |
|
continue |
|
} |
|
nextSampleTs = math.MaxInt64 |
|
|
|
err := func() error { |
|
// To prevent races with compaction, a block writer only allows appending samples |
|
// that are at most half a block size older than the most recent sample appended so far. |
|
// However, in the way we use the block writer here, compaction doesn't happen, while we |
|
// also need to append samples throughout the whole block range. To allow that, we |
|
// pretend that the block is twice as large here, but only really add sample in the |
|
// original interval later. |
|
w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration) |
|
if err != nil { |
|
return fmt.Errorf("block writer: %w", err) |
|
} |
|
defer func() { |
|
err = tsdb_errors.NewMulti(err, w.Close()).Err() |
|
}() |
|
|
|
ctx := context.Background() |
|
app := w.Appender(ctx) |
|
symbolTable := labels.NewSymbolTable() // One table per block means it won't grow too large. |
|
p := textparse.NewOpenMetricsParser(input, symbolTable) |
|
samplesCount := 0 |
|
for { |
|
e, err := p.Next() |
|
if errors.Is(err, io.EOF) { |
|
break |
|
} |
|
if err != nil { |
|
return fmt.Errorf("parse: %w", err) |
|
} |
|
if e != textparse.EntrySeries { |
|
continue |
|
} |
|
|
|
_, ts, v := p.Series() |
|
if ts == nil { |
|
l := labels.Labels{} |
|
p.Metric(&l) |
|
return fmt.Errorf("expected timestamp for series %v, got none", l) |
|
} |
|
if *ts < t { |
|
continue |
|
} |
|
if *ts >= tsUpper { |
|
if *ts < nextSampleTs { |
|
nextSampleTs = *ts |
|
} |
|
continue |
|
} |
|
|
|
l := labels.Labels{} |
|
p.Metric(&l) |
|
|
|
lb.Reset(l) |
|
for name, value := range customLabels { |
|
lb.Set(name, value) |
|
} |
|
lbls := lb.Labels() |
|
|
|
if _, err := app.Append(0, lbls, *ts, v); err != nil { |
|
return fmt.Errorf("add sample: %w", err) |
|
} |
|
|
|
samplesCount++ |
|
if samplesCount < maxSamplesInAppender { |
|
continue |
|
} |
|
|
|
// If we arrive here, the samples count is greater than the maxSamplesInAppender. |
|
// Therefore the old appender is committed and a new one is created. |
|
// This prevents keeping too many samples lined up in an appender and thus in RAM. |
|
if err := app.Commit(); err != nil { |
|
return fmt.Errorf("commit: %w", err) |
|
} |
|
|
|
app = w.Appender(ctx) |
|
samplesCount = 0 |
|
} |
|
|
|
if err := app.Commit(); err != nil { |
|
return fmt.Errorf("commit: %w", err) |
|
} |
|
|
|
block, err := w.Flush(ctx) |
|
switch { |
|
case err == nil: |
|
if quiet { |
|
break |
|
} |
|
// Empty block, don't print. |
|
if block.Compare(ulid.ULID{}) == 0 { |
|
break |
|
} |
|
blocks, err := db.Blocks() |
|
if err != nil { |
|
return fmt.Errorf("get blocks: %w", err) |
|
} |
|
for _, b := range blocks { |
|
if b.Meta().ULID == block { |
|
printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable) |
|
wroteHeader = true |
|
break |
|
} |
|
} |
|
case errors.Is(err, tsdb.ErrNoSeriesAppended): |
|
default: |
|
return fmt.Errorf("flush: %w", err) |
|
} |
|
|
|
return nil |
|
}() |
|
if err != nil { |
|
return fmt.Errorf("process blocks: %w", err) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) (err error) { |
|
p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps. |
|
maxt, mint, err := getMinAndMaxTimestamps(p) |
|
if err != nil { |
|
return fmt.Errorf("getting min and max timestamp: %w", err) |
|
} |
|
if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels); err != nil { |
|
return fmt.Errorf("block creation: %w", err) |
|
} |
|
return nil |
|
}
|
|
|