Merge pull request #4471 from prometheus/uptsdb

vendor: update TSDB
pull/4472/head
Fabian Reinartz 6 years ago committed by GitHub
commit cc878ea559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -364,8 +364,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc,
}
}
r.SetHealth(HealthGood)
r.SetLastError(err)
r.health = HealthGood
r.lastError = err
return vec, nil
}

@ -0,0 +1,279 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
)
// CheckpointStats returns stats about a created checkpoint.
type CheckpointStats struct {
DroppedSeries int
DroppedSamples int
DroppedTombstones int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples inlcuding dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
}
// LastCheckpoint returns the directory name of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
// Traverse list backwards since there may be multiple checkpoints left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil {
continue
}
return fi.Name(), k, nil
}
return "", 0, ErrNotFound
}
// DeleteCheckpoints deletes all checkpoints in dir that have an index
// below n.
func DeleteCheckpoints(dir string, n int) error {
var errs MultiError
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue
}
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n {
continue
}
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
stats := &CheckpointStats{}
var sr io.Reader
{
lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint")
}
if err == nil {
if m > k+1 {
return nil, errors.New("unexpected gap to last checkpoint")
}
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1
last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn))
if err != nil {
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
sr = last
}
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
if err != nil {
return nil, errors.Wrap(err, "create segment reader")
}
defer segsr.Close()
if sr != nil {
sr = io.MultiReader(sr, segsr)
} else {
sr = segsr
}
}
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n))
cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
r := wal.NewReader(sr)
var (
series []RefSeries
samples []RefSample
tstones []Stone
dec RecordDecoder
enc RecordEncoder
buf []byte
recs [][]byte
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
// Remember where the record for this iteration starts.
start := len(buf)
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
if err != nil {
return nil, errors.Wrap(err, "decode series")
}
// Drop irrelevant series in place.
repl := series[:0]
for _, s := range series {
if keep(s.Ref) {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Series(repl, buf)
}
stats.TotalSeries += len(series)
stats.DroppedSeries += len(series) - len(repl)
case RecordSamples:
samples, err = dec.Samples(rec, samples)
if err != nil {
return nil, errors.Wrap(err, "decode samples")
}
// Drop irrelevant samples in place.
repl := samples[:0]
for _, s := range samples {
if s.T >= mint {
repl = append(repl, s)
}
}
if len(repl) > 0 {
buf = enc.Samples(repl, buf)
}
stats.TotalSamples += len(samples)
stats.DroppedSamples += len(samples) - len(repl)
case RecordTombstones:
tstones, err = dec.Tombstones(rec, tstones)
if err != nil {
return nil, errors.Wrap(err, "decode deletes")
}
// Drop irrelevant tombstones in place.
repl := tstones[:0]
for _, s := range tstones {
for _, iv := range s.intervals {
if iv.Maxt >= mint {
repl = append(repl, s)
break
}
}
}
if len(repl) > 0 {
buf = enc.Tombstones(repl, buf)
}
stats.TotalTombstones += len(tstones)
stats.DroppedTombstones += len(tstones) - len(repl)
default:
return nil, errors.New("invalid record type")
}
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 1 MB increments.
if len(buf) > 1*1024*1024 {
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
// If we hit any corruption during checkpointing, repairing is not an option.
// The head won't know which series records are lost.
if r.Err() != nil {
return nil, errors.Wrap(r.Err(), "read segments")
}
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil
}

@ -37,6 +37,7 @@ import (
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
"golang.org/x/sync/errgroup"
)
@ -191,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
}
// Migrate old WAL.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}
db = &DB{
dir: dir,
@ -221,18 +226,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, errors.Wrap(err, "create leveled compactor")
}
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
wlog, err := wal.New(l, r, filepath.Join(dir, "wal"))
if err != nil {
return nil, err
}
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0])
if err != nil {
return nil, err
}
if err := db.reload(); err != nil {
return nil, err
}
if err := db.head.ReadWAL(); err != nil {
if err := db.head.Init(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
@ -501,7 +506,6 @@ func (db *DB) reload() (err error) {
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
@ -596,10 +600,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
return bm[i].MinTime < bm[j].MinTime
})
var (
overlaps [][]BlockMeta

@ -6,6 +6,7 @@ package fileutil
import (
"os"
"path/filepath"
"sort"
)
@ -23,3 +24,45 @@ func ReadDir(dirpath string) ([]string, error) {
sort.Strings(names)
return names, nil
}
// Rename safely renames a file.
func Rename(from, to string) error {
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}
// Replace moves a file or directory to a new location and deletes any previous data.
// It is not atomic.
func Replace(from, to string) error {
if err := os.RemoveAll(to); err != nil {
return nil
}
if err := os.Rename(from, to); err != nil {
return err
}
// Directory was renamed; sync parent dir to persist rename.
pdir, err := OpenDir(filepath.Dir(to))
if err != nil {
return err
}
if err = Fsync(pdir); err != nil {
pdir.Close()
return err
}
return pdir.Close()
}

@ -15,6 +15,7 @@ package tsdb
import (
"math"
"path/filepath"
"runtime"
"sort"
"strings"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)
var (
@ -53,9 +55,10 @@ var (
type Head struct {
chunkRange int64
metrics *headMetrics
wal WAL
wal *wal.WAL
logger log.Logger
appendPool sync.Pool
bytesPool sync.Pool
minTime, maxTime int64
lastSeriesID uint64
@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}
// NewHead opens the head block in dir.
func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) {
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) {
if l == nil {
l = log.NewNopLogger()
}
if wal == nil {
wal = NopWAL()
}
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
}
@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
wal: wal,
logger: l,
chunkRange: chunkRange,
minTime: math.MinInt64,
minTime: math.MaxInt64,
maxTime: math.MinInt64,
series: newStripeSeries(),
values: map[string]stringset{},
@ -200,15 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
mint int64,
minValidTime int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input {
for _, s := range samples {
if s.T < mint || s.Ref%total != partition {
if s.T < minValidTime || s.Ref%total != partition {
continue
}
ms := h.series.getByID(s.Ref)
@ -221,18 +223,48 @@ func (h *Head) processWALSamples(
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
if s.T > maxt {
maxt = s.T
}
if s.T < mint {
mint = s.T
}
}
output <- samples
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
// ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error {
defer h.postings.EnsureOrder()
func (h *Head) updateMinMaxTime(mint, maxt int64) {
for {
lt := h.MinTime()
if mint >= lt {
break
}
if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) {
break
}
}
for {
ht := h.MaxTime()
if maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) {
break
}
}
}
r := h.wal.Reader()
mint := h.MinTime()
func (h *Head) loadWAL(r *wal.Reader) error {
minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}
// Track number of samples that referenced a series we don't know about
// for error reporting.
@ -253,7 +285,7 @@ func (h *Head) ReadWAL() error {
output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(i, input, output)
@ -263,49 +295,71 @@ func (h *Head) ReadWAL() error {
input = output
}
// TODO(fabxc): series entries spread between samples can starve the sample workers.
// Even with bufferd channels, this can impact startup time with lots of series churn.
// We must not paralellize series creation itself but could make the indexing asynchronous.
seriesFunc := func(series []RefSeries) {
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
var (
dec RecordDecoder
series []RefSeries
samples []RefSample
tstones []Stone
)
for r.Next() {
series, samples, tstones = series[:0], samples[:0], tstones[:0]
rec := r.Record()
switch dec.Type(rec) {
case RecordSeries:
series, err := dec.Series(rec, series)
if err != nil {
return errors.Wrap(err, "decode series")
}
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
if h.lastSeriesID < s.Ref {
h.lastSeriesID = s.Ref
}
}
}
}
samplesFunc := func(samples []RefSample) {
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
case RecordSamples:
samples, err := dec.Samples(rec, samples)
if err != nil {
return errors.Wrap(err, "decode samples")
}
var buf []RefSample
select {
case buf = <-input:
default:
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
n := 5000
if len(samples) < n {
n = len(samples)
}
var buf []RefSample
select {
case buf = <-input:
default:
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
}
firstInput <- append(buf[:0], samples[:n]...)
samples = samples[n:]
}
}
deletesFunc := func(stones []Stone) {
for _, s := range stones {
for _, itv := range s.intervals {
if itv.Maxt < mint {
continue
case RecordTombstones:
tstones, err := dec.Tombstones(rec, tstones)
if err != nil {
return errors.Wrap(err, "decode tombstones")
}
for _, s := range tstones {
for _, itv := range s.intervals {
if itv.Maxt < minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
}
h.tombstones.addInterval(s.ref, itv)
}
default:
return errors.Errorf("invalid record type %v", dec.Type(rec))
}
}
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
// Signal termination to first worker and wait for last one to close its output channel.
close(firstInput)
@ -313,20 +367,64 @@ func (h *Head) ReadWAL() error {
}
wg.Wait()
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs)
}
return nil
}
// Init loads data from the write ahead log and prepares the head for writes.
func (h *Head) Init() error {
defer h.postings.EnsureOrder()
if h.wal == nil {
return nil
}
// Backfill the checkpoint first if it exists.
cp, n, err := LastCheckpoint(h.wal.Dir())
if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint")
}
if err == nil {
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp))
if err != nil {
return errors.Wrap(err, "open checkpoint")
}
defer sr.Close()
// A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
return errors.Wrap(err, "backfill checkpoint")
}
n++
}
// Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1)
if err != nil {
return errors.Wrap(err, "consume WAL")
return errors.Wrap(err, "open WAL segments")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
defer sr.Close()
err = h.loadWAL(wal.NewReader(sr))
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
return nil
}
// Truncate removes all data before mint from the head block and truncates its WAL.
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error {
initialize := h.MinTime() == math.MinInt64
initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint {
if h.MinTime() >= mint && !initialize {
return nil
}
atomic.StoreInt64(&h.minTime, mint)
@ -348,18 +446,37 @@ func (h *Head) Truncate(mint int64) error {
level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start))
h.metrics.gcDuration.Observe(time.Since(start).Seconds())
if h.wal == nil {
return nil
}
start = time.Now()
m, n, err := h.wal.Segments()
if err != nil {
return errors.Wrap(err, "get segment range")
}
n-- // Never consider last segment for checkpoint.
if n < 0 {
return nil // no segments yet.
}
// The lower third of segments should contain mostly obsolete samples.
// If we have less than three segments, it's not worth checkpointing yet.
n = m + (n-m)/3
if n <= m {
return nil
}
keep := func(id uint64) bool {
return h.series.getByID(id) != nil
}
if err := h.wal.Truncate(mint, keep); err == nil {
level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start))
} else {
level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start))
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil {
return errors.Wrap(err, "create checkpoint")
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete",
"low", m, "high", n, "duration", time.Since(start))
return nil
}
@ -367,10 +484,7 @@ func (h *Head) Truncate(mint int64) error {
// for a compltely fresh head with an empty WAL.
// Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64.
mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) {
if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) {
return false
}
// Ensure that max time is initialized to at least the min time we just set.
@ -441,7 +555,7 @@ func (h *Head) Appender() Appender {
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MinInt64 {
if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h}
}
return h.appender()
@ -449,10 +563,11 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}
@ -468,16 +583,29 @@ func (h *Head) putAppendBuffer(b []RefSample) {
h.appendPool.Put(b[:0])
}
func (h *Head) getBytesBuffer() []byte {
b := h.bytesPool.Get()
if b == nil {
return make([]byte, 0, 1024)
}
return b.([]byte)
}
func (h *Head) putBytesBuffer(b []byte) {
h.bytesPool.Put(b[:0])
}
type headAppender struct {
head *Head
mint, maxt int64
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []RefSeries
samples []RefSample
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if t < a.mint {
if t < a.minValidTime {
return 0, ErrOutOfBounds
}
@ -504,9 +632,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil {
return err
}
if t < a.mint {
if t < a.minValidTime {
return ErrOutOfBounds
}
if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}
@ -520,15 +651,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
return nil
}
func (a *headAppender) log() error {
if a.head.wal == nil {
return nil
}
buf := a.head.getBytesBuffer()
defer func() { a.head.putBytesBuffer(buf) }()
var rec []byte
var enc RecordEncoder
if len(a.series) > 0 {
rec = enc.Series(a.series, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log series")
}
}
if len(a.samples) > 0 {
rec = enc.Samples(a.samples, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log samples")
}
}
return nil
}
func (a *headAppender) Commit() error {
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
if err := a.head.wal.LogSeries(a.series); err != nil {
return err
}
if err := a.head.wal.LogSamples(a.samples); err != nil {
return errors.Wrap(err, "WAL log samples")
if err := a.log(); err != nil {
return errors.Wrap(err, "write to WAL")
}
total := len(a.samples)
@ -548,16 +706,7 @@ func (a *headAppender) Commit() error {
}
a.head.metrics.samplesAppended.Add(float64(total))
for {
ht := a.head.MaxTime()
if a.maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) {
break
}
}
a.head.updateMinMaxTime(a.mint, a.maxt)
return nil
}
@ -568,7 +717,8 @@ func (a *headAppender) Rollback() error {
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a.head.wal.LogSeries(a.series)
a.samples = nil
return a.log()
}
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
@ -601,8 +751,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
if p.Err() != nil {
return p.Err()
}
if err := h.wal.LogDeletes(stones); err != nil {
return err
var enc RecordEncoder
if h.wal != nil {
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
return err
}
}
for _, s := range stones {
h.tombstones.addInterval(s.ref, s.intervals[0])
@ -694,6 +848,9 @@ func (h *Head) MaxTime() int64 {
// Close flushes the WAL and closes the head.
func (h *Head) Close() error {
if h.wal == nil {
return nil
}
return h.wal.Close()
}

@ -0,0 +1,213 @@
// Copyright 2018 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"math"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
)
// RecordType represents the data type of a record.
type RecordType uint8
const (
RecordInvalid RecordType = 255
RecordSeries RecordType = 1
RecordSamples RecordType = 2
RecordTombstones RecordType = 3
)
type RecordLogger interface {
Log(recs ...[]byte) error
}
type RecordReader interface {
Next() bool
Err() error
Record() []byte
}
// RecordDecoder decodes series, sample, and tombstone records.
// The zero value is ready to use.
type RecordDecoder struct {
}
// Type returns the type of the record.
// Return RecordInvalid if no valid record type is found.
func (d *RecordDecoder) Type(rec []byte) RecordType {
if len(rec) < 1 {
return RecordInvalid
}
switch t := RecordType(rec[0]); t {
case RecordSeries, RecordSamples, RecordTombstones:
return t
}
return RecordInvalid
}
// Series appends series in rec to the given slice.
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSeries {
return nil, errors.New("invalid record type")
}
for len(dec.b) > 0 && dec.err() == nil {
ref := dec.be64()
lset := make(labels.Labels, dec.uvarint())
for i := range lset {
lset[i].Name = dec.uvarintStr()
lset[i].Value = dec.uvarintStr()
}
sort.Sort(lset)
series = append(series, RefSeries{
Ref: ref,
Labels: lset,
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return series, nil
}
// Samples appends samples in rec to the given slice.
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordSamples {
return nil, errors.New("invalid record type")
}
if dec.len() == 0 {
return samples, nil
}
var (
baseRef = dec.be64()
baseTime = dec.be64int64()
)
for len(dec.b) > 0 && dec.err() == nil {
dref := dec.varint64()
dtime := dec.varint64()
val := dec.be64()
samples = append(samples, RefSample{
Ref: uint64(int64(baseRef) + dref),
T: baseTime + dtime,
V: math.Float64frombits(val),
})
}
if dec.err() != nil {
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples))
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return samples, nil
}
// Tombstones appends tombstones in rec to the given slice.
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
dec := decbuf{b: rec}
if RecordType(dec.byte()) != RecordTombstones {
return nil, errors.New("invalid record type")
}
for dec.len() > 0 && dec.err() == nil {
tstones = append(tstones, Stone{
ref: dec.be64(),
intervals: Intervals{
{Mint: dec.varint64(), Maxt: dec.varint64()},
},
})
}
if dec.err() != nil {
return nil, dec.err()
}
if len(dec.b) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
}
return tstones, nil
}
// RecordEncoder encodes series, sample, and tombstones records.
// The zero value is ready to use.
type RecordEncoder struct {
}
// Series appends the encoded series to b and returns the resulting slice.
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSeries))
for _, s := range series {
buf.putBE64(s.Ref)
buf.putUvarint(len(s.Labels))
for _, l := range s.Labels {
buf.putUvarintStr(l.Name)
buf.putUvarintStr(l.Value)
}
}
return buf.get()
}
// Samples appends the encoded samples to b and returns the resulting slice.
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordSamples))
if len(samples) == 0 {
return buf.get()
}
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := samples[0]
buf.putBE64(first.Ref)
buf.putBE64int64(first.T)
for _, s := range samples {
buf.putVarint64(int64(s.Ref) - int64(first.Ref))
buf.putVarint64(s.T - first.T)
buf.putBE64(math.Float64bits(s.V))
}
return buf.get()
}
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
buf := encbuf{b: b}
buf.putByte(byte(RecordTombstones))
for _, s := range tstones {
for _, iv := range s.intervals {
buf.putBE64(s.ref)
buf.putVarint64(iv.Mint)
buf.putVarint64(iv.Maxt)
}
}
return buf.get()
}

@ -33,6 +33,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)
// WALEntryType indicates what data a WAL entry contains.
@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record codex instead.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
}
// SegmentWAL is a write ahead log for series data.
//
// DEPRECATED: use wal pkg combined with the record coders instead.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
}
return nil
}
// MigrateWAL rewrites the deprecated write ahead log into the new format.
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
// Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "list sequence files")
}
if len(fns) == 0 {
return nil // No WAL at all yet.
}
// Check header of first segment to see whether we are still dealing with an
// old WAL.
f, err := os.Open(fns[0])
if err != nil {
return errors.Wrap(err, "check first existing segment")
}
defer f.Close()
var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return errors.Wrap(err, "read header from first segment")
}
// If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
return nil
}
level.Info(logger).Log("msg", "migrating WAL format")
tmpdir := dir + ".tmp"
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
// It should've already been closed as part of the previous finalization.
// Do it once again in case of prior errors.
defer func() {
if err != nil {
repl.Close()
}
}()
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
if err != nil {
return errors.Wrap(err, "open old WAL")
}
defer w.Close()
rdr := w.Reader()
var (
enc RecordEncoder
b []byte
)
decErr := rdr.Read(
func(s []RefSeries) {
if err != nil {
return
}
err = repl.Log(enc.Series(s, b[:0]))
},
func(s []RefSample) {
if err != nil {
return
}
err = repl.Log(enc.Samples(s, b[:0]))
},
func(s []Stone) {
if err != nil {
return
}
err = repl.Log(enc.Tombstones(s, b[:0]))
},
)
if decErr != nil {
return errors.Wrap(err, "decode old entries")
}
if err != nil {
return errors.Wrap(err, "write new entries")
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
}
if err := fileutil.Replace(tmpdir, dir); err != nil {
return errors.Wrap(err, "replace old WAL")
}
return nil
}

@ -0,0 +1,822 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
)
const (
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7
)
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
type page struct {
alloc int
flushed int
buf [pageSize]byte
}
func (p *page) remaining() int {
return pageSize - p.alloc
}
func (p *page) full() bool {
return pageSize-p.alloc < recordHeaderSize
}
// Segment represents a segment file.
type Segment struct {
*os.File
dir string
i int
}
// Index returns the index of the segment.
func (s *Segment) Index() int {
return s.i
}
// Dir returns the directory of the segment.
func (s *Segment) Dir() string {
return s.dir
}
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Segment int
Offset int64
Err error
}
func (e *CorruptionErr) Error() string {
if e.Segment < 0 {
return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err)
}
return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err)
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment(dir string, k int) (*Segment, error) {
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
stat, err := f.Stat()
if err != nil {
f.Close()
return nil, err
}
// If the last page is torn, fill it with zeros.
// In case it was torn after all records were written successfully, this
// will just pad the page and everything will be fine.
// If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end.
if d := stat.Size() % pageSize; d != 0 {
if _, err := f.Write(make([]byte, pageSize-d)); err != nil {
f.Close()
return nil, errors.Wrap(err, "zero-pad torn page")
}
}
return &Segment{File: f, i: k, dir: dir}, nil
}
// CreateSegment creates a new segment k in dir.
func CreateSegment(dir string, k int) (*Segment, error) {
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
return &Segment{File: f, i: k, dir: dir}, nil
}
// OpenReadSegment opens the segment with the given filename.
func OpenReadSegment(fn string) (*Segment, error) {
k, err := strconv.Atoi(filepath.Base(fn))
if err != nil {
return nil, errors.New("not a valid filename")
}
f, err := os.Open(fn)
if err != nil {
return nil, err
}
return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil
}
// WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an error occurs during read, the repair procedure must be called
// before it's safe to do further writes.
//
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
type WAL struct {
dir string
logger log.Logger
segmentSize int
mtx sync.RWMutex
segment *Segment // active segment
donePages int // pages written to the segment
page *page // active page
stopc chan chan struct{}
actorc chan func()
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
}
// New returns a new WAL over the given directory.
func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) {
return NewSize(logger, reg, dir, defaultSegmentSize)
}
// NewSize returns a new WAL over the given directory.
// New segments are created with the specified size.
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) {
if segmentSize%pageSize != 0 {
return nil, errors.New("invalid segment size")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
}
if logger == nil {
logger = log.NewNopLogger()
}
w := &WAL{
dir: dir,
logger: logger,
segmentSize: segmentSize,
page: &page{},
actorc: make(chan func(), 100),
stopc: make(chan chan struct{}),
}
w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
})
w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_page_flushes_total",
Help: "Total number of page flushes.",
})
w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_completed_pages_total",
Help: "Total number of completed pages.",
})
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions)
}
_, j, err := w.Segments()
if err != nil {
return nil, errors.Wrap(err, "get segment range")
}
// Fresh dir, no segments yet.
if j == -1 {
if w.segment, err = CreateSegment(w.dir, 0); err != nil {
return nil, err
}
} else {
if w.segment, err = OpenWriteSegment(w.dir, j); err != nil {
return nil, err
}
// Correctly initialize donePages.
stat, err := w.segment.Stat()
if err != nil {
return nil, err
}
w.donePages = int(stat.Size() / pageSize)
}
go w.run()
return w, nil
}
// Dir returns the directory of the WAL.
func (w *WAL) Dir() string {
return w.dir
}
func (w *WAL) run() {
Loop:
for {
select {
case f := <-w.actorc:
f()
case donec := <-w.stopc:
close(w.actorc)
defer close(donec)
break Loop
}
}
// Drain and process any remaining functions.
for f := range w.actorc {
f()
}
}
// Repair attempts to repair the WAL based on the error.
// It discards all data after the corruption.
func (w *WAL) Repair(err error) error {
// We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible.
// But that's not generally applicable if the records have any kind of causality.
// Maybe as an extra mode in the future if mid-WAL corruptions become
// a frequent concern.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.New("cannot handle error")
}
if cerr.Segment < 0 {
return errors.New("corruption error does not specify position")
}
level.Warn(w.logger).Log("msg", "starting corruption repair",
"segment", cerr.Segment, "offset", cerr.Offset)
// All segments behind the corruption can no longer be used.
segs, err := listSegments(w.dir)
if err != nil {
return errors.Wrap(err, "list segments")
}
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption")
for _, s := range segs {
if s.n <= cerr.Segment {
continue
}
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil {
return errors.Wrap(err, "delete segment")
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption.
level.Warn(w.logger).Log("msg", "rewrite corrupted segment")
fn := SegmentName(w.dir, cerr.Segment)
tmpfn := fn + ".repair"
if err := fileutil.Rename(fn, tmpfn); err != nil {
return err
}
// Create a clean segment and make it the active one.
s, err := CreateSegment(w.dir, cerr.Segment)
if err != nil {
return err
}
w.segment = s
f, err := os.Open(tmpfn)
if err != nil {
return errors.Wrap(err, "open segment")
}
defer f.Close()
r := NewReader(bufio.NewReader(f))
for r.Next() {
if err := w.Log(r.Record()); err != nil {
return errors.Wrap(err, "insert record")
}
}
// We expect an error here, so nothing to handle.
if err := os.Remove(tmpfn); err != nil {
return errors.Wrap(err, "delete corrupted segment")
}
return nil
}
// SegmentName builds a segment name for the directory.
func SegmentName(dir string, i int) string {
return filepath.Join(dir, fmt.Sprintf("%08d", i))
}
// nextSegment creates the next segment and closes the previous one.
func (w *WAL) nextSegment() error {
// Only flush the current page if it actually holds data.
if w.page.alloc > 0 {
if err := w.flushPage(true); err != nil {
return err
}
}
next, err := CreateSegment(w.dir, w.segment.Index()+1)
if err != nil {
return errors.Wrap(err, "create new segment file")
}
prev := w.segment
w.segment = next
w.donePages = 0
// Don't block further writes by fsyncing the last segment.
w.actorc <- func() {
if err := w.fsync(prev); err != nil {
level.Error(w.logger).Log("msg", "sync previous segment", "err", err)
}
if err := prev.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
}
return nil
}
// flushPage writes the new contents of the page to disk. If no more records will fit into
// the page, the remaining bytes will be set to zero and a new page will be started.
// If clear is true, this is enforced regardless of how many bytes are left in the page.
func (w *WAL) flushPage(clear bool) error {
w.pageFlushes.Inc()
p := w.page
clear = clear || p.full()
// No more data will fit into the page. Enqueue and clear it.
if clear {
p.alloc = pageSize // write till end of page
w.pageCompletions.Inc()
}
n, err := w.segment.Write(p.buf[p.flushed:p.alloc])
if err != nil {
return err
}
p.flushed += n
// We flushed an entire page, prepare a new one.
if clear {
for i := range p.buf {
p.buf[i] = 0
}
p.alloc = 0
p.flushed = 0
w.donePages++
}
return nil
}
type recType uint8
const (
recPageTerm recType = 0 // Rest of page is empty.
recFull recType = 1 // Full record.
recFirst recType = 2 // First fragment of a record.
recMiddle recType = 3 // Middle fragments of a record.
recLast recType = 4 // Final fragment of a record.
)
func (t recType) String() string {
switch t {
case recPageTerm:
return "zero"
case recFull:
return "full"
case recFirst:
return "first"
case recMiddle:
return "middle"
case recLast:
return "last"
default:
return "<invalid>"
}
}
func (w *WAL) pagesPerSegment() int {
return w.segmentSize / pageSize
}
// Log writes the records into the log.
// Multiple records can be passed at once to reduce writes and increase throughput.
func (w *WAL) Log(recs ...[]byte) error {
w.mtx.Lock()
defer w.mtx.Unlock()
// Callers could just implement their own list record format but adding
// a bit of extra logic here frees them from that overhead.
for i, r := range recs {
if err := w.log(r, i == len(recs)-1); err != nil {
return err
}
}
return nil
}
// log writes rec to the log and forces a flush of the current page if its
// the final record of a batch.
func (w *WAL) log(rec []byte, final bool) error {
// If the record is too big to fit within pages in the current
// segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries.
left := w.page.remaining() - recordHeaderSize // Active pages.
left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages.
if len(rec) > left {
if err := w.nextSegment(); err != nil {
return err
}
}
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0; i == 0 || len(rec) > 0; i++ {
p := w.page
// Find how much of the record we can fit into the page.
var (
l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize)
part = rec[:l]
buf = p.buf[p.alloc:]
typ recType
)
switch {
case i == 0 && len(part) == len(rec):
typ = recFull
case len(part) == len(rec):
typ = recLast
case i == 0:
typ = recFirst
default:
typ = recMiddle
}
buf[0] = byte(typ)
crc := crc32.Checksum(part, castagnoliTable)
binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
binary.BigEndian.PutUint32(buf[3:], crc)
copy(buf[recordHeaderSize:], part)
p.alloc += len(part) + recordHeaderSize
// If we wrote a full record, we can fit more records of the batch
// into the page before flushing it.
if final || typ != recFull || w.page.full() {
if err := w.flushPage(false); err != nil {
return err
}
}
rec = rec[l:]
}
return nil
}
// Segments returns the range [m, n] of currently existing segments.
// If no segments are found, m and n are -1.
func (w *WAL) Segments() (m, n int, err error) {
refs, err := listSegments(w.dir)
if err != nil {
return 0, 0, err
}
if len(refs) == 0 {
return -1, -1, nil
}
return refs[0].n, refs[len(refs)-1].n, nil
}
// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error {
refs, err := listSegments(w.dir)
if err != nil {
return err
}
for _, r := range refs {
if r.n >= i {
break
}
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err
}
}
return nil
}
func (w *WAL) fsync(f *Segment) error {
start := time.Now()
err := fileutil.Fsync(f.File)
w.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
// Close flushes all writes and closes active segment.
func (w *WAL) Close() (err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
// Flush the last page and zero out all its remaining size.
// We must not flush an empty page as it would falsely signal
// the segment is done if we start writing to it again after opening.
if w.page.alloc > 0 {
if err := w.flushPage(true); err != nil {
return err
}
}
donec := make(chan struct{})
w.stopc <- donec
<-donec
if err = w.fsync(w.segment); err != nil {
level.Error(w.logger).Log("msg", "sync previous segment", "err", err)
}
if err := w.segment.Close(); err != nil {
level.Error(w.logger).Log("msg", "close previous segment", "err", err)
}
return nil
}
type segmentRef struct {
s string
n int
}
func listSegments(dir string) (refs []segmentRef, err error) {
files, err := fileutil.ReadDir(dir)
if err != nil {
return nil, err
}
var last int
for _, fn := range files {
k, err := strconv.Atoi(fn)
if err != nil {
continue
}
if len(refs) > 0 && k > last+1 {
return nil, errors.New("segments are not sequential")
}
refs = append(refs, segmentRef{s: fn, n: k})
last = k
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].n < refs[j].n
})
return refs, nil
}
// NewSegmentsReader returns a new reader over all segments in the directory.
func NewSegmentsReader(dir string) (io.ReadCloser, error) {
return NewSegmentsRangeReader(dir, 0, math.MaxInt32)
}
// NewSegmentsRangeReader returns a new reader over the given WAL segment range.
// If m or n are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) {
refs, err := listSegments(dir)
if err != nil {
return nil, err
}
var segs []*Segment
for _, r := range refs {
if m >= 0 && r.n < m {
continue
}
if n >= 0 && r.n > n {
break
}
s, err := OpenReadSegment(filepath.Join(dir, r.s))
if err != nil {
return nil, err
}
segs = append(segs, s)
}
return newSegmentBufReader(segs...), nil
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
// The main purpose is that we are able to track segment and offset for
// corruption reporting.
type segmentBufReader struct {
buf *bufio.Reader
segs []*Segment
cur int
off int
more bool
}
func newSegmentBufReader(segs ...*Segment) *segmentBufReader {
return &segmentBufReader{
buf: bufio.NewReaderSize(nil, 16*pageSize),
segs: segs,
cur: -1,
}
}
func (r *segmentBufReader) Close() (err error) {
for _, s := range r.segs {
if e := s.Close(); e != nil {
err = e
}
}
return err
}
func (r *segmentBufReader) Read(b []byte) (n int, err error) {
if !r.more {
if r.cur+1 >= len(r.segs) {
return 0, io.EOF
}
r.cur++
r.off = 0
r.more = true
r.buf.Reset(r.segs[r.cur])
}
n, err = r.buf.Read(b)
r.off += n
if err != io.EOF {
return n, err
}
// Just return what we read so far, but don't signal EOF.
// Only unset more so we don't invalidate the current segment and
// offset before the next read.
r.more = false
return n, nil
}
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io.Reader
err error
rec []byte
buf [pageSize]byte
total int64 // total bytes processed.
}
// NewReader returns a new reader.
func NewReader(r io.Reader) *Reader {
return &Reader{rdr: r}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called again after it returned false.
func (r *Reader) Next() bool {
err := r.next()
if errors.Cause(err) == io.EOF {
return false
}
r.err = err
return r.err == nil
}
func (r *Reader) next() (err error) {
// We have to use r.buf since allocating byte arrays here fails escape
// analysis and ends up on the heap, even though it seemingly should not.
hdr := r.buf[:recordHeaderSize]
buf := r.buf[recordHeaderSize:]
r.rec = r.rec[:0]
i := 0
for {
if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil {
return errors.Wrap(err, "read first header byte")
}
r.total++
typ := recType(hdr[0])
// Gobble up zero bytes.
if typ == recPageTerm {
// We are pedantic and check whether the zeros are actually up
// to a page boundary.
// It's not strictly necessary but may catch sketchy state early.
k := pageSize - (r.total % pageSize)
if k == pageSize {
continue // Initial 0 byte was last page byte.
}
n, err := io.ReadFull(r.rdr, buf[:k])
if err != nil {
return errors.Wrap(err, "read remaining zeros")
}
r.total += int64(n)
for _, c := range buf[:k] {
if c != 0 {
return errors.New("unexpected non-zero byte in padded page")
}
}
continue
}
n, err := io.ReadFull(r.rdr, hdr[1:])
if err != nil {
return errors.Wrap(err, "read remaining header")
}
r.total += int64(n)
var (
length = binary.BigEndian.Uint16(hdr[1:])
crc = binary.BigEndian.Uint32(hdr[3:])
)
if length > pageSize-recordHeaderSize {
return errors.Errorf("invalid record size %d", length)
}
n, err = io.ReadFull(r.rdr, buf[:length])
if err != nil {
return err
}
r.total += int64(n)
if n != int(length) {
return errors.Errorf("invalid size: expected %d, got %d", length, n)
}
if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc {
return errors.Errorf("unexpected checksum %x, expected %x", c, crc)
}
r.rec = append(r.rec, buf[:length]...)
switch typ {
case recFull:
if i != 0 {
return errors.New("unexpected full record")
}
return nil
case recFirst:
if i != 0 {
return errors.New("unexpected first record")
}
case recMiddle:
if i == 0 {
return errors.New("unexpected middle record")
}
case recLast:
if i == 0 {
return errors.New("unexpected last record")
}
return nil
default:
return errors.Errorf("unexpected record type %d", typ)
}
// Only increment i for non-zero records since we use it
// to determine valid content record sequences.
i++
}
}
// Err returns the last encountered error wrapped in a corruption error.
// If the reader does not allow to infer a segment index and offset, a total
// offset in the reader stream will be provided.
func (r *Reader) Err() error {
if r.err == nil {
return nil
}
if b, ok := r.rdr.(*segmentBufReader); ok {
return &CorruptionErr{
Err: r.err,
Segment: b.segs[b.cur].Index(),
Offset: int64(b.off),
}
}
return &CorruptionErr{
Err: r.err,
Segment: -1,
Offset: r.total,
}
}
// Record returns the current record. The returned byte slice is only
// valid until the next call to Next.
func (r *Reader) Record() []byte {
return r.rec
}
func min(i, j int) int {
if i < j {
return i
}
return j
}

34
vendor/vendor.json vendored

@ -841,40 +841,46 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "gzvR+g1v/ILXxAt/NuxzIPWk1x0=",
"checksumSHA1": "vRK6HrNOeJheYudfpCIUyh42T3o=",
"path": "github.com/prometheus/tsdb",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",
"path": "github.com/prometheus/tsdb/chunkenc",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=",
"checksumSHA1": "bL3t5K2Q8e1GuM6gy5PAJ05go14=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "AZGFK4UtJe8/j8pHqGTNQ8wu27g=",
"path": "github.com/prometheus/tsdb/index",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e",
"revisionTime": "2018-07-11T11:21:26Z"
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "6GXK7RnUngyM9OT/M2uzv8T3DOY=",
"path": "github.com/prometheus/tsdb/wal",
"revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a",
"revisionTime": "2018-08-07T11:25:08Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",

Loading…
Cancel
Save