mirror of https://github.com/prometheus/prometheus
244 lines
6.4 KiB
Go
244 lines
6.4 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid"
|
|
|
|
"github.com/prometheus/common/promslog"
|
|
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/textparse"
|
|
"github.com/prometheus/prometheus/tsdb"
|
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
|
)
|
|
|
|
func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) {
|
|
var maxt, mint int64 = math.MinInt64, math.MaxInt64
|
|
|
|
for {
|
|
entry, err := p.Next()
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("next: %w", err)
|
|
}
|
|
|
|
if entry != textparse.EntrySeries {
|
|
continue
|
|
}
|
|
|
|
_, ts, _ := p.Series()
|
|
if ts == nil {
|
|
return 0, 0, errors.New("expected timestamp for series got none")
|
|
}
|
|
|
|
if *ts > maxt {
|
|
maxt = *ts
|
|
}
|
|
if *ts < mint {
|
|
mint = *ts
|
|
}
|
|
}
|
|
|
|
if maxt == math.MinInt64 {
|
|
maxt = 0
|
|
}
|
|
if mint == math.MaxInt64 {
|
|
mint = 0
|
|
}
|
|
|
|
return maxt, mint, nil
|
|
}
|
|
|
|
func getCompatibleBlockDuration(maxBlockDuration int64) int64 {
|
|
blockDuration := tsdb.DefaultBlockDuration
|
|
if maxBlockDuration > tsdb.DefaultBlockDuration {
|
|
ranges := tsdb.ExponentialBlockRanges(tsdb.DefaultBlockDuration, 10, 3)
|
|
idx := len(ranges) - 1 // Use largest range if user asked for something enormous.
|
|
for i, v := range ranges {
|
|
if v > maxBlockDuration {
|
|
idx = i - 1
|
|
break
|
|
}
|
|
}
|
|
blockDuration = ranges[idx]
|
|
}
|
|
return blockDuration
|
|
}
|
|
|
|
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string) (returnErr error) {
|
|
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
|
|
mint = blockDuration * (mint / blockDuration)
|
|
|
|
db, err := tsdb.OpenDBReadOnly(outputDir, "", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err()
|
|
}()
|
|
|
|
var (
|
|
wroteHeader bool
|
|
nextSampleTs int64 = math.MaxInt64
|
|
)
|
|
|
|
lb := labels.NewBuilder(labels.EmptyLabels())
|
|
|
|
for t := mint; t <= maxt; t += blockDuration {
|
|
tsUpper := t + blockDuration
|
|
if nextSampleTs != math.MaxInt64 && nextSampleTs >= tsUpper {
|
|
// The next sample is not in this timerange, we can avoid parsing
|
|
// the file for this timerange.
|
|
continue
|
|
}
|
|
nextSampleTs = math.MaxInt64
|
|
|
|
err := func() error {
|
|
// To prevent races with compaction, a block writer only allows appending samples
|
|
// that are at most half a block size older than the most recent sample appended so far.
|
|
// However, in the way we use the block writer here, compaction doesn't happen, while we
|
|
// also need to append samples throughout the whole block range. To allow that, we
|
|
// pretend that the block is twice as large here, but only really add sample in the
|
|
// original interval later.
|
|
w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), outputDir, 2*blockDuration)
|
|
if err != nil {
|
|
return fmt.Errorf("block writer: %w", err)
|
|
}
|
|
defer func() {
|
|
err = tsdb_errors.NewMulti(err, w.Close()).Err()
|
|
}()
|
|
|
|
ctx := context.Background()
|
|
app := w.Appender(ctx)
|
|
symbolTable := labels.NewSymbolTable() // One table per block means it won't grow too large.
|
|
p := textparse.NewOpenMetricsParser(input, symbolTable)
|
|
samplesCount := 0
|
|
for {
|
|
e, err := p.Next()
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("parse: %w", err)
|
|
}
|
|
if e != textparse.EntrySeries {
|
|
continue
|
|
}
|
|
|
|
_, ts, v := p.Series()
|
|
if ts == nil {
|
|
l := labels.Labels{}
|
|
p.Metric(&l)
|
|
return fmt.Errorf("expected timestamp for series %v, got none", l)
|
|
}
|
|
if *ts < t {
|
|
continue
|
|
}
|
|
if *ts >= tsUpper {
|
|
if *ts < nextSampleTs {
|
|
nextSampleTs = *ts
|
|
}
|
|
continue
|
|
}
|
|
|
|
l := labels.Labels{}
|
|
p.Metric(&l)
|
|
|
|
lb.Reset(l)
|
|
for name, value := range customLabels {
|
|
lb.Set(name, value)
|
|
}
|
|
lbls := lb.Labels()
|
|
|
|
if _, err := app.Append(0, lbls, *ts, v); err != nil {
|
|
return fmt.Errorf("add sample: %w", err)
|
|
}
|
|
|
|
samplesCount++
|
|
if samplesCount < maxSamplesInAppender {
|
|
continue
|
|
}
|
|
|
|
// If we arrive here, the samples count is greater than the maxSamplesInAppender.
|
|
// Therefore the old appender is committed and a new one is created.
|
|
// This prevents keeping too many samples lined up in an appender and thus in RAM.
|
|
if err := app.Commit(); err != nil {
|
|
return fmt.Errorf("commit: %w", err)
|
|
}
|
|
|
|
app = w.Appender(ctx)
|
|
samplesCount = 0
|
|
}
|
|
|
|
if err := app.Commit(); err != nil {
|
|
return fmt.Errorf("commit: %w", err)
|
|
}
|
|
|
|
block, err := w.Flush(ctx)
|
|
switch {
|
|
case err == nil:
|
|
if quiet {
|
|
break
|
|
}
|
|
// Empty block, don't print.
|
|
if block.Compare(ulid.ULID{}) == 0 {
|
|
break
|
|
}
|
|
blocks, err := db.Blocks()
|
|
if err != nil {
|
|
return fmt.Errorf("get blocks: %w", err)
|
|
}
|
|
for _, b := range blocks {
|
|
if b.Meta().ULID == block {
|
|
printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable)
|
|
wroteHeader = true
|
|
break
|
|
}
|
|
}
|
|
case errors.Is(err, tsdb.ErrNoSeriesAppended):
|
|
default:
|
|
return fmt.Errorf("flush: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return fmt.Errorf("process blocks: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) (err error) {
|
|
p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps.
|
|
maxt, mint, err := getMinAndMaxTimestamps(p)
|
|
if err != nil {
|
|
return fmt.Errorf("getting min and max timestamp: %w", err)
|
|
}
|
|
if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels); err != nil {
|
|
return fmt.Errorf("block creation: %w", err)
|
|
}
|
|
return nil
|
|
}
|