From 817e379a0e05189a6cc85a2930f1d889fa2c4fca Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Aug 2018 05:22:02 -0400 Subject: [PATCH 1/2] vendor: update TSDB Signed-off-by: Fabian Reinartz --- .../github.com/prometheus/tsdb/checkpoint.go | 279 ++++++ vendor/github.com/prometheus/tsdb/db.go | 16 +- .../prometheus/tsdb/fileutil/fileutil.go | 43 + vendor/github.com/prometheus/tsdb/head.go | 337 +++++-- vendor/github.com/prometheus/tsdb/record.go | 213 +++++ vendor/github.com/prometheus/tsdb/wal.go | 102 +++ vendor/github.com/prometheus/tsdb/wal/wal.go | 822 ++++++++++++++++++ vendor/vendor.json | 34 +- 8 files changed, 1734 insertions(+), 112 deletions(-) create mode 100644 vendor/github.com/prometheus/tsdb/checkpoint.go create mode 100644 vendor/github.com/prometheus/tsdb/record.go create mode 100644 vendor/github.com/prometheus/tsdb/wal/wal.go diff --git a/vendor/github.com/prometheus/tsdb/checkpoint.go b/vendor/github.com/prometheus/tsdb/checkpoint.go new file mode 100644 index 000000000..d988d3561 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/checkpoint.go @@ -0,0 +1,279 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +// CheckpointStats returns stats about a created checkpoint. +type CheckpointStats struct { + DroppedSeries int + DroppedSamples int + DroppedTombstones int + TotalSeries int // Processed series including dropped ones. + TotalSamples int // Processed samples inlcuding dropped ones. + TotalTombstones int // Processed tombstones including dropped ones. +} + +// LastCheckpoint returns the directory name of the most recent checkpoint. +// If dir does not contain any checkpoints, ErrNotFound is returned. +func LastCheckpoint(dir string) (string, int, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return "", 0, err + } + // Traverse list backwards since there may be multiple checkpoints left. + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + if !fi.IsDir() { + return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + return fi.Name(), k, nil + } + return "", 0, ErrNotFound +} + +// DeleteCheckpoints deletes all checkpoints in dir that have an index +// below n. +func DeleteCheckpoints(dir string, n int) error { + var errs MultiError + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || k >= n { + continue + } + if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +const checkpointPrefix = "checkpoint." + +// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. +// It includes the most recent checkpoint if it exists. +// All series not satisfying keep and samples below mint are dropped. +// +// The checkpoint is stored in a directory named checkpoint.N in the same +// segmented format as the original WAL itself. +// This makes it easy to read it through the WAL package and concatenate +// it with the original WAL. +// +// Non-critical errors are logged and not returned. +func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { + if logger == nil { + logger = log.NewNopLogger() + } + stats := &CheckpointStats{} + + var sr io.Reader + { + lastFn, k, err := LastCheckpoint(w.Dir()) + if err != nil && err != ErrNotFound { + return nil, errors.Wrap(err, "find last checkpoint") + } + if err == nil { + if m > k+1 { + return nil, errors.New("unexpected gap to last checkpoint") + } + // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. + m = k + 1 + + last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) + if err != nil { + return nil, errors.Wrap(err, "open last checkpoint") + } + defer last.Close() + sr = last + } + + segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) + if err != nil { + return nil, errors.Wrap(err, "create segment reader") + } + defer segsr.Close() + + if sr != nil { + sr = io.MultiReader(sr, segsr) + } else { + sr = segsr + } + } + + cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) + cpdirtmp := cpdir + ".tmp" + + if err := os.MkdirAll(cpdirtmp, 0777); err != nil { + return nil, errors.Wrap(err, "create checkpoint dir") + } + cp, err := wal.New(nil, nil, cpdirtmp) + if err != nil { + return nil, errors.Wrap(err, "open checkpoint") + } + + r := wal.NewReader(sr) + + var ( + series []RefSeries + samples []RefSample + tstones []Stone + dec RecordDecoder + enc RecordEncoder + buf []byte + recs [][]byte + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + + // We don't reset the buffer since we batch up multiple records + // before writing them to the checkpoint. + // Remember where the record for this iteration starts. + start := len(buf) + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + if err != nil { + return nil, errors.Wrap(err, "decode series") + } + // Drop irrelevant series in place. + repl := series[:0] + for _, s := range series { + if keep(s.Ref) { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Series(repl, buf) + } + stats.TotalSeries += len(series) + stats.DroppedSeries += len(series) - len(repl) + + case RecordSamples: + samples, err = dec.Samples(rec, samples) + if err != nil { + return nil, errors.Wrap(err, "decode samples") + } + // Drop irrelevant samples in place. + repl := samples[:0] + for _, s := range samples { + if s.T >= mint { + repl = append(repl, s) + } + } + if len(repl) > 0 { + buf = enc.Samples(repl, buf) + } + stats.TotalSamples += len(samples) + stats.DroppedSamples += len(samples) - len(repl) + + case RecordTombstones: + tstones, err = dec.Tombstones(rec, tstones) + if err != nil { + return nil, errors.Wrap(err, "decode deletes") + } + // Drop irrelevant tombstones in place. + repl := tstones[:0] + for _, s := range tstones { + for _, iv := range s.intervals { + if iv.Maxt >= mint { + repl = append(repl, s) + break + } + } + } + if len(repl) > 0 { + buf = enc.Tombstones(repl, buf) + } + stats.TotalTombstones += len(tstones) + stats.DroppedTombstones += len(tstones) - len(repl) + + default: + return nil, errors.New("invalid record type") + } + if len(buf[start:]) == 0 { + continue // All contents discarded. + } + recs = append(recs, buf[start:]) + + // Flush records in 1 MB increments. + if len(buf) > 1*1024*1024 { + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + buf, recs = buf[:0], recs[:0] + } + } + // If we hit any corruption during checkpointing, repairing is not an option. + // The head won't know which series records are lost. + if r.Err() != nil { + return nil, errors.Wrap(r.Err(), "read segments") + } + + // Flush remaining records. + if err := cp.Log(recs...); err != nil { + return nil, errors.Wrap(err, "flush records") + } + if err := cp.Close(); err != nil { + return nil, errors.Wrap(err, "close checkpoint") + } + if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { + return nil, errors.Wrap(err, "rename checkpoint directory") + } + if err := w.Truncate(n + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(logger).Log("msg", "truncating segments failed", "err", err) + } + if err := DeleteCheckpoints(w.Dir(), n); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(logger).Log("msg", "delete old checkpoints", "err", err) + } + return stats, nil +} diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index fcfbeeeb2..e6a0a74b4 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" "golang.org/x/sync/errgroup" ) @@ -191,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } + // Migrate old WAL. + if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { + return nil, errors.Wrap(err, "migrate WAL") + } db = &DB{ dir: dir, @@ -221,18 +226,18 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, errors.Wrap(err, "create leveled compactor") } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r) + wlog, err := wal.New(l, r, filepath.Join(dir, "wal")) if err != nil { return nil, err } - db.head, err = NewHead(r, l, wal, opts.BlockRanges[0]) + db.head, err = NewHead(r, l, wlog, opts.BlockRanges[0]) if err != nil { return nil, err } if err := db.reload(); err != nil { return nil, err } - if err := db.head.ReadWAL(); err != nil { + if err := db.head.Init(); err != nil { return nil, errors.Wrap(err, "read WAL") } @@ -501,7 +506,6 @@ func (db *DB) reload() (err error) { sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime }) - if err := validateBlockSequence(blocks); err != nil { return errors.Wrap(err, "invalid block sequence") } @@ -596,10 +600,6 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { if len(bm) <= 1 { return nil } - sort.Slice(bm, func(i, j int) bool { - return bm[i].MinTime < bm[j].MinTime - }) - var ( overlaps [][]BlockMeta diff --git a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go index c2c25842a..2158bfd26 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go @@ -6,6 +6,7 @@ package fileutil import ( "os" + "path/filepath" "sort" ) @@ -23,3 +24,45 @@ func ReadDir(dirpath string) ([]string, error) { sort.Strings(names) return names, nil } + +// Rename safely renames a file. +func Rename(from, to string) error { + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} + +// Replace moves a file or directory to a new location and deletes any previous data. +// It is not atomic. +func Replace(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return nil + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + if err = Fsync(pdir); err != nil { + pdir.Close() + return err + } + return pdir.Close() +} diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 372842865..4f0c2c958 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "path/filepath" "runtime" "sort" "strings" @@ -30,6 +31,7 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) var ( @@ -53,9 +55,10 @@ var ( type Head struct { chunkRange int64 metrics *headMetrics - wal WAL + wal *wal.WAL logger log.Logger appendPool sync.Pool + bytesPool sync.Pool minTime, maxTime int64 lastSeriesID uint64 @@ -169,13 +172,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int64) (*Head, error) { if l == nil { l = log.NewNopLogger() } - if wal == nil { - wal = NopWAL() - } if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } @@ -183,7 +183,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( wal: wal, logger: l, chunkRange: chunkRange, - minTime: math.MinInt64, + minTime: math.MaxInt64, maxTime: math.MinInt64, series: newStripeSeries(), values: map[string]stringset{}, @@ -200,15 +200,17 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( // them on to other workers. // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( - mint int64, + minValidTime int64, partition, total uint64, input <-chan []RefSample, output chan<- []RefSample, ) (unknownRefs uint64) { defer close(output) + mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + for samples := range input { for _, s := range samples { - if s.T < mint || s.Ref%total != partition { + if s.T < minValidTime || s.Ref%total != partition { continue } ms := h.series.getByID(s.Ref) @@ -221,18 +223,48 @@ func (h *Head) processWALSamples( h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } + if s.T > maxt { + maxt = s.T + } + if s.T < mint { + mint = s.T + } } output <- samples } + h.updateMinMaxTime(mint, maxt) + return unknownRefs } -// ReadWAL initializes the head by consuming the write ahead log. -func (h *Head) ReadWAL() error { - defer h.postings.EnsureOrder() +func (h *Head) updateMinMaxTime(mint, maxt int64) { + for { + lt := h.MinTime() + if mint >= lt { + break + } + if atomic.CompareAndSwapInt64(&h.minTime, lt, mint) { + break + } + } + for { + ht := h.MaxTime() + if maxt <= ht { + break + } + if atomic.CompareAndSwapInt64(&h.maxTime, ht, maxt) { + break + } + } +} - r := h.wal.Reader() - mint := h.MinTime() +func (h *Head) loadWAL(r *wal.Reader) error { + minValidTime := h.MinTime() + // If the min time is still uninitialized (no persisted blocks yet), + // we accept all sample timestamps from the WAL. + if minValidTime == math.MaxInt64 { + minValidTime = math.MinInt64 + } // Track number of samples that referenced a series we don't know about // for error reporting. @@ -253,7 +285,7 @@ func (h *Head) ReadWAL() error { output := make(chan []RefSample, 300) go func(i int, input <-chan []RefSample, output chan<- []RefSample) { - unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + unknown := h.processWALSamples(minValidTime, uint64(i), uint64(n), input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() }(i, input, output) @@ -263,49 +295,71 @@ func (h *Head) ReadWAL() error { input = output } - // TODO(fabxc): series entries spread between samples can starve the sample workers. - // Even with bufferd channels, this can impact startup time with lots of series churn. - // We must not paralellize series creation itself but could make the indexing asynchronous. - seriesFunc := func(series []RefSeries) { - for _, s := range series { - h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) + var ( + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + ) + for r.Next() { + series, samples, tstones = series[:0], samples[:0], tstones[:0] + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err := dec.Series(rec, series) + if err != nil { + return errors.Wrap(err, "decode series") + } + for _, s := range series { + h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) - if h.lastSeriesID < s.Ref { - h.lastSeriesID = s.Ref + if h.lastSeriesID < s.Ref { + h.lastSeriesID = s.Ref + } } - } - } - samplesFunc := func(samples []RefSample) { - // We split up the samples into chunks of 5000 samples or less. - // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise - // cause thousands of very large in flight buffers occupying large amounts - // of unused memory. - for len(samples) > 0 { - n := 5000 - if len(samples) < n { - n = len(samples) + case RecordSamples: + samples, err := dec.Samples(rec, samples) + if err != nil { + return errors.Wrap(err, "decode samples") } - var buf []RefSample - select { - case buf = <-input: - default: + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(samples) > 0 { + n := 5000 + if len(samples) < n { + n = len(samples) + } + var buf []RefSample + select { + case buf = <-input: + default: + } + firstInput <- append(buf[:0], samples[:n]...) + samples = samples[n:] } - firstInput <- append(buf[:0], samples[:n]...) - samples = samples[n:] - } - } - deletesFunc := func(stones []Stone) { - for _, s := range stones { - for _, itv := range s.intervals { - if itv.Maxt < mint { - continue + case RecordTombstones: + tstones, err := dec.Tombstones(rec, tstones) + if err != nil { + return errors.Wrap(err, "decode tombstones") + } + for _, s := range tstones { + for _, itv := range s.intervals { + if itv.Maxt < minValidTime { + continue + } + h.tombstones.addInterval(s.ref, itv) } - h.tombstones.addInterval(s.ref, itv) } + default: + return errors.Errorf("invalid record type %v", dec.Type(rec)) } } - - err := r.Read(seriesFunc, samplesFunc, deletesFunc) + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } // Signal termination to first worker and wait for last one to close its output channel. close(firstInput) @@ -313,20 +367,64 @@ func (h *Head) ReadWAL() error { } wg.Wait() + if unknownRefs > 0 { + level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) + } + return nil +} + +// Init loads data from the write ahead log and prepares the head for writes. +func (h *Head) Init() error { + defer h.postings.EnsureOrder() + + if h.wal == nil { + return nil + } + + // Backfill the checkpoint first if it exists. + cp, n, err := LastCheckpoint(h.wal.Dir()) + if err != nil && err != ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + if err == nil { + sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer sr.Close() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := h.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + n++ + } + + // Backfill segments from the last checkpoint onwards + sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) if err != nil { - return errors.Wrap(err, "consume WAL") + return errors.Wrap(err, "open WAL segments") } - if unknownRefs > 0 { - level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + defer sr.Close() + + err = h.loadWAL(wal.NewReader(sr)) + if err == nil { + return nil + } + level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) + + if err := h.wal.Repair(err); err != nil { + return errors.Wrap(err, "repair corrupted WAL") } return nil } -// Truncate removes all data before mint from the head block and truncates its WAL. +// Truncate removes old data before mint from the head. func (h *Head) Truncate(mint int64) error { - initialize := h.MinTime() == math.MinInt64 + initialize := h.MinTime() == math.MaxInt64 - if h.MinTime() >= mint { + if h.MinTime() >= mint && !initialize { return nil } atomic.StoreInt64(&h.minTime, mint) @@ -348,18 +446,37 @@ func (h *Head) Truncate(mint int64) error { level.Info(h.logger).Log("msg", "head GC completed", "duration", time.Since(start)) h.metrics.gcDuration.Observe(time.Since(start).Seconds()) + if h.wal == nil { + return nil + } start = time.Now() + m, n, err := h.wal.Segments() + if err != nil { + return errors.Wrap(err, "get segment range") + } + n-- // Never consider last segment for checkpoint. + if n < 0 { + return nil // no segments yet. + } + // The lower third of segments should contain mostly obsolete samples. + // If we have less than three segments, it's not worth checkpointing yet. + n = m + (n-m)/3 + if n <= m { + return nil + } + keep := func(id uint64) bool { return h.series.getByID(id) != nil } - if err := h.wal.Truncate(mint, keep); err == nil { - level.Info(h.logger).Log("msg", "WAL truncation completed", "duration", time.Since(start)) - } else { - level.Error(h.logger).Log("msg", "WAL truncation failed", "err", err, "duration", time.Since(start)) + if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") } h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) + level.Info(h.logger).Log("msg", "WAL checkpoint complete", + "low", m, "high", n, "duration", time.Since(start)) + return nil } @@ -367,10 +484,7 @@ func (h *Head) Truncate(mint int64) error { // for a compltely fresh head with an empty WAL. // Returns true if the initialization took an effect. func (h *Head) initTime(t int64) (initialized bool) { - // In the init state, the head has a high timestamp of math.MinInt64. - mint, _ := rangeForTimestamp(t, h.chunkRange) - - if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) { + if !atomic.CompareAndSwapInt64(&h.minTime, math.MaxInt64, t) { return false } // Ensure that max time is initialized to at least the min time we just set. @@ -441,7 +555,7 @@ func (h *Head) Appender() Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. - if h.MinTime() == math.MinInt64 { + if h.MinTime() == math.MaxInt64 { return &initAppender{head: h} } return h.appender() @@ -449,10 +563,11 @@ func (h *Head) Appender() Appender { func (h *Head) appender() *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), + head: h, + minValidTime: h.MaxTime() - h.chunkRange/2, + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), } } @@ -468,16 +583,29 @@ func (h *Head) putAppendBuffer(b []RefSample) { h.appendPool.Put(b[:0]) } +func (h *Head) getBytesBuffer() []byte { + b := h.bytesPool.Get() + if b == nil { + return make([]byte, 0, 1024) + } + return b.([]byte) +} + +func (h *Head) putBytesBuffer(b []byte) { + h.bytesPool.Put(b[:0]) +} + type headAppender struct { - head *Head - mint, maxt int64 + head *Head + minValidTime int64 // No samples below this timestamp are allowed. + mint, maxt int64 series []RefSeries samples []RefSample } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { - if t < a.mint { + if t < a.minValidTime { return 0, ErrOutOfBounds } @@ -504,9 +632,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if err != nil { return err } - if t < a.mint { + if t < a.minValidTime { return ErrOutOfBounds } + if t < a.mint { + a.mint = t + } if t > a.maxt { a.maxt = t } @@ -520,15 +651,42 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return nil } +func (a *headAppender) log() error { + if a.head.wal == nil { + return nil + } + + buf := a.head.getBytesBuffer() + defer func() { a.head.putBytesBuffer(buf) }() + + var rec []byte + var enc RecordEncoder + + if len(a.series) > 0 { + rec = enc.Series(a.series, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log series") + } + } + if len(a.samples) > 0 { + rec = enc.Samples(a.samples, buf) + buf = rec[:0] + + if err := a.head.wal.Log(rec); err != nil { + return errors.Wrap(err, "log samples") + } + } + return nil +} + func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) - if err := a.head.wal.LogSeries(a.series); err != nil { - return err - } - if err := a.head.wal.LogSamples(a.samples); err != nil { - return errors.Wrap(err, "WAL log samples") + if err := a.log(); err != nil { + return errors.Wrap(err, "write to WAL") } total := len(a.samples) @@ -548,16 +706,7 @@ func (a *headAppender) Commit() error { } a.head.metrics.samplesAppended.Add(float64(total)) - - for { - ht := a.head.MaxTime() - if a.maxt <= ht { - break - } - if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) { - break - } - } + a.head.updateMinMaxTime(a.mint, a.maxt) return nil } @@ -568,7 +717,8 @@ func (a *headAppender) Rollback() error { // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. - return a.head.wal.LogSeries(a.series) + a.samples = nil + return a.log() } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -601,8 +751,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - if err := h.wal.LogDeletes(stones); err != nil { - return err + var enc RecordEncoder + + if h.wal != nil { + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } } for _, s := range stones { h.tombstones.addInterval(s.ref, s.intervals[0]) @@ -694,6 +848,9 @@ func (h *Head) MaxTime() int64 { // Close flushes the WAL and closes the head. func (h *Head) Close() error { + if h.wal == nil { + return nil + } return h.wal.Close() } diff --git a/vendor/github.com/prometheus/tsdb/record.go b/vendor/github.com/prometheus/tsdb/record.go new file mode 100644 index 000000000..c8cc7a504 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/record.go @@ -0,0 +1,213 @@ +// Copyright 2018 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "math" + "sort" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/labels" +) + +// RecordType represents the data type of a record. +type RecordType uint8 + +const ( + RecordInvalid RecordType = 255 + RecordSeries RecordType = 1 + RecordSamples RecordType = 2 + RecordTombstones RecordType = 3 +) + +type RecordLogger interface { + Log(recs ...[]byte) error +} + +type RecordReader interface { + Next() bool + Err() error + Record() []byte +} + +// RecordDecoder decodes series, sample, and tombstone records. +// The zero value is ready to use. +type RecordDecoder struct { +} + +// Type returns the type of the record. +// Return RecordInvalid if no valid record type is found. +func (d *RecordDecoder) Type(rec []byte) RecordType { + if len(rec) < 1 { + return RecordInvalid + } + switch t := RecordType(rec[0]); t { + case RecordSeries, RecordSamples, RecordTombstones: + return t + } + return RecordInvalid +} + +// Series appends series in rec to the given slice. +func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSeries { + return nil, errors.New("invalid record type") + } + for len(dec.b) > 0 && dec.err() == nil { + ref := dec.be64() + + lset := make(labels.Labels, dec.uvarint()) + + for i := range lset { + lset[i].Name = dec.uvarintStr() + lset[i].Value = dec.uvarintStr() + } + sort.Sort(lset) + + series = append(series, RefSeries{ + Ref: ref, + Labels: lset, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return series, nil +} + +// Samples appends samples in rec to the given slice. +func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordSamples { + return nil, errors.New("invalid record type") + } + if dec.len() == 0 { + return samples, nil + } + var ( + baseRef = dec.be64() + baseTime = dec.be64int64() + ) + for len(dec.b) > 0 && dec.err() == nil { + dref := dec.varint64() + dtime := dec.varint64() + val := dec.be64() + + samples = append(samples, RefSample{ + Ref: uint64(int64(baseRef) + dref), + T: baseTime + dtime, + V: math.Float64frombits(val), + }) + } + + if dec.err() != nil { + return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return samples, nil +} + +// Tombstones appends tombstones in rec to the given slice. +func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { + dec := decbuf{b: rec} + + if RecordType(dec.byte()) != RecordTombstones { + return nil, errors.New("invalid record type") + } + for dec.len() > 0 && dec.err() == nil { + tstones = append(tstones, Stone{ + ref: dec.be64(), + intervals: Intervals{ + {Mint: dec.varint64(), Maxt: dec.varint64()}, + }, + }) + } + if dec.err() != nil { + return nil, dec.err() + } + if len(dec.b) > 0 { + return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) + } + return tstones, nil +} + +// RecordEncoder encodes series, sample, and tombstones records. +// The zero value is ready to use. +type RecordEncoder struct { +} + +// Series appends the encoded series to b and returns the resulting slice. +func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSeries)) + + for _, s := range series { + buf.putBE64(s.Ref) + buf.putUvarint(len(s.Labels)) + + for _, l := range s.Labels { + buf.putUvarintStr(l.Name) + buf.putUvarintStr(l.Value) + } + } + return buf.get() +} + +// Samples appends the encoded samples to b and returns the resulting slice. +func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordSamples)) + + if len(samples) == 0 { + return buf.get() + } + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := samples[0] + + buf.putBE64(first.Ref) + buf.putBE64int64(first.T) + + for _, s := range samples { + buf.putVarint64(int64(s.Ref) - int64(first.Ref)) + buf.putVarint64(s.T - first.T) + buf.putBE64(math.Float64bits(s.V)) + } + return buf.get() +} + +// Tombstones appends the encoded tombstones to b and returns the resulting slice. +func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { + buf := encbuf{b: b} + buf.putByte(byte(RecordTombstones)) + + for _, s := range tstones { + for _, iv := range s.intervals { + buf.putBE64(s.ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) + } + } + return buf.get() +} diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index c1b9f6b06..972fdea38 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" ) // WALEntryType indicates what data a WAL entry contains. @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. +// +// DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader LogSeries([]RefSeries) error @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { } // SegmentWAL is a write ahead log for series data. +// +// DEPRECATED: use wal pkg combined with the record coders instead. type SegmentWAL struct { mtx sync.Mutex metrics *walMetrics @@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { } return nil } + +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + // Detect whether we still have the old WAL. + fns, err := sequenceFiles(dir) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "list sequence files") + } + if len(fns) == 0 { + return nil // No WAL at all yet. + } + // Check header of first segment to see whether we are still dealing with an + // old WAL. + f, err := os.Open(fns[0]) + if err != nil { + return errors.Wrap(err, "check first existing segment") + } + defer f.Close() + + var hdr [4]byte + if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { + return errors.Wrap(err, "read header from first segment") + } + // If we cannot read the magic header for segments of the old WAL, abort. + // Either it's migrated already or there's a corruption issue with which + // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. + if binary.BigEndian.Uint32(hdr[:]) != WALMagic { + return nil + } + + level.Info(logger).Log("msg", "migrating WAL format") + + tmpdir := dir + ".tmp" + if err := os.RemoveAll(tmpdir); err != nil { + return errors.Wrap(err, "cleanup replacement dir") + } + repl, err := wal.New(logger, nil, tmpdir) + if err != nil { + return errors.Wrap(err, "open new WAL") + } + // It should've already been closed as part of the previous finalization. + // Do it once again in case of prior errors. + defer func() { + if err != nil { + repl.Close() + } + }() + + w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) + if err != nil { + return errors.Wrap(err, "open old WAL") + } + defer w.Close() + + rdr := w.Reader() + + var ( + enc RecordEncoder + b []byte + ) + decErr := rdr.Read( + func(s []RefSeries) { + if err != nil { + return + } + err = repl.Log(enc.Series(s, b[:0])) + }, + func(s []RefSample) { + if err != nil { + return + } + err = repl.Log(enc.Samples(s, b[:0])) + }, + func(s []Stone) { + if err != nil { + return + } + err = repl.Log(enc.Tombstones(s, b[:0])) + }, + ) + if decErr != nil { + return errors.Wrap(err, "decode old entries") + } + if err != nil { + return errors.Wrap(err, "write new entries") + } + if err := repl.Close(); err != nil { + return errors.Wrap(err, "close new WAL") + } + if err := fileutil.Replace(tmpdir, dir); err != nil { + return errors.Wrap(err, "replace old WAL") + } + return nil +} diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go new file mode 100644 index 000000000..0e95ba2a7 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -0,0 +1,822 @@ +// Copyright 2017 The Prometheus Authors + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "bufio" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" +) + +const ( + defaultSegmentSize = 128 * 1024 * 1024 // 128 MB + pageSize = 32 * 1024 // 32KB + recordHeaderSize = 7 +) + +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + +type page struct { + alloc int + flushed int + buf [pageSize]byte +} + +func (p *page) remaining() int { + return pageSize - p.alloc +} + +func (p *page) full() bool { + return pageSize-p.alloc < recordHeaderSize +} + +// Segment represents a segment file. +type Segment struct { + *os.File + dir string + i int +} + +// Index returns the index of the segment. +func (s *Segment) Index() int { + return s.i +} + +// Dir returns the directory of the segment. +func (s *Segment) Dir() string { + return s.dir +} + +// CorruptionErr is an error that's returned when corruption is encountered. +type CorruptionErr struct { + Segment int + Offset int64 + Err error +} + +func (e *CorruptionErr) Error() string { + if e.Segment < 0 { + return fmt.Sprintf("corruption after %d bytes: %s", e.Offset, e.Err) + } + return fmt.Sprintf("corruption in segment %d at %d: %s", e.Segment, e.Offset, e.Err) +} + +// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends. +func OpenWriteSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + f.Close() + return nil, err + } + // If the last page is torn, fill it with zeros. + // In case it was torn after all records were written successfully, this + // will just pad the page and everything will be fine. + // If it was torn mid-record, a full read (which the caller should do anyway + // to ensure integrity) will detect it as a corruption by the end. + if d := stat.Size() % pageSize; d != 0 { + if _, err := f.Write(make([]byte, pageSize-d)); err != nil { + f.Close() + return nil, errors.Wrap(err, "zero-pad torn page") + } + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// CreateSegment creates a new segment k in dir. +func CreateSegment(dir string, k int) (*Segment, error) { + f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: dir}, nil +} + +// OpenReadSegment opens the segment with the given filename. +func OpenReadSegment(fn string) (*Segment, error) { + k, err := strconv.Atoi(filepath.Base(fn)) + if err != nil { + return nil, errors.New("not a valid filename") + } + f, err := os.Open(fn) + if err != nil { + return nil, err + } + return &Segment{File: f, i: k, dir: filepath.Dir(fn)}, nil +} + +// WAL is a write ahead log that stores records in segment files. +// It must be read from start to end once before logging new data. +// If an error occurs during read, the repair procedure must be called +// before it's safe to do further writes. +// +// Segments are written to in pages of 32KB, with records possibly split +// across page boundaries. +// Records are never split across segments to allow full segments to be +// safely truncated. It also ensures that torn writes never corrupt records +// beyond the most recent segment. +type WAL struct { + dir string + logger log.Logger + segmentSize int + mtx sync.RWMutex + segment *Segment // active segment + donePages int // pages written to the segment + page *page // active page + stopc chan chan struct{} + actorc chan func() + + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter +} + +// New returns a new WAL over the given directory. +func New(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + return NewSize(logger, reg, dir, defaultSegmentSize) +} + +// NewSize returns a new WAL over the given directory. +// New segments are created with the specified size. +func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int) (*WAL, error) { + if segmentSize%pageSize != 0 { + return nil, errors.New("invalid segment size") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, errors.Wrap(err, "create dir") + } + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + segmentSize: segmentSize, + page: &page{}, + actorc: make(chan func(), 100), + stopc: make(chan chan struct{}), + } + w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + }) + w.pageFlushes = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_page_flushes_total", + Help: "Total number of page flushes.", + }) + w.pageCompletions = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_completed_pages_total", + Help: "Total number of completed pages.", + }) + if reg != nil { + reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions) + } + + _, j, err := w.Segments() + if err != nil { + return nil, errors.Wrap(err, "get segment range") + } + // Fresh dir, no segments yet. + if j == -1 { + if w.segment, err = CreateSegment(w.dir, 0); err != nil { + return nil, err + } + } else { + if w.segment, err = OpenWriteSegment(w.dir, j); err != nil { + return nil, err + } + // Correctly initialize donePages. + stat, err := w.segment.Stat() + if err != nil { + return nil, err + } + w.donePages = int(stat.Size() / pageSize) + } + go w.run() + + return w, nil +} + +// Dir returns the directory of the WAL. +func (w *WAL) Dir() string { + return w.dir +} + +func (w *WAL) run() { +Loop: + for { + select { + case f := <-w.actorc: + f() + case donec := <-w.stopc: + close(w.actorc) + defer close(donec) + break Loop + } + } + // Drain and process any remaining functions. + for f := range w.actorc { + f() + } +} + +// Repair attempts to repair the WAL based on the error. +// It discards all data after the corruption. +func (w *WAL) Repair(err error) error { + // We could probably have a mode that only discards torn records right around + // the corruption to preserve as data much as possible. + // But that's not generally applicable if the records have any kind of causality. + // Maybe as an extra mode in the future if mid-WAL corruptions become + // a frequent concern. + cerr, ok := err.(*CorruptionErr) + if !ok { + return errors.New("cannot handle error") + } + if cerr.Segment < 0 { + return errors.New("corruption error does not specify position") + } + + level.Warn(w.logger).Log("msg", "starting corruption repair", + "segment", cerr.Segment, "offset", cerr.Offset) + + // All segments behind the corruption can no longer be used. + segs, err := listSegments(w.dir) + if err != nil { + return errors.Wrap(err, "list segments") + } + level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") + + for _, s := range segs { + if s.n <= cerr.Segment { + continue + } + if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { + return errors.Wrap(err, "delete segment") + } + } + // Regardless of the corruption offset, no record reaches into the previous segment. + // So we can safely repair the WAL by removing the segment and re-inserting all + // its records up to the corruption. + level.Warn(w.logger).Log("msg", "rewrite corrupted segment") + + fn := SegmentName(w.dir, cerr.Segment) + tmpfn := fn + ".repair" + + if err := fileutil.Rename(fn, tmpfn); err != nil { + return err + } + // Create a clean segment and make it the active one. + s, err := CreateSegment(w.dir, cerr.Segment) + if err != nil { + return err + } + w.segment = s + + f, err := os.Open(tmpfn) + if err != nil { + return errors.Wrap(err, "open segment") + } + defer f.Close() + r := NewReader(bufio.NewReader(f)) + + for r.Next() { + if err := w.Log(r.Record()); err != nil { + return errors.Wrap(err, "insert record") + } + } + // We expect an error here, so nothing to handle. + + if err := os.Remove(tmpfn); err != nil { + return errors.Wrap(err, "delete corrupted segment") + } + return nil +} + +// SegmentName builds a segment name for the directory. +func SegmentName(dir string, i int) string { + return filepath.Join(dir, fmt.Sprintf("%08d", i)) +} + +// nextSegment creates the next segment and closes the previous one. +func (w *WAL) nextSegment() error { + // Only flush the current page if it actually holds data. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } + } + next, err := CreateSegment(w.dir, w.segment.Index()+1) + if err != nil { + return errors.Wrap(err, "create new segment file") + } + prev := w.segment + w.segment = next + w.donePages = 0 + + // Don't block further writes by fsyncing the last segment. + w.actorc <- func() { + if err := w.fsync(prev); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := prev.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + } + return nil +} + +// flushPage writes the new contents of the page to disk. If no more records will fit into +// the page, the remaining bytes will be set to zero and a new page will be started. +// If clear is true, this is enforced regardless of how many bytes are left in the page. +func (w *WAL) flushPage(clear bool) error { + w.pageFlushes.Inc() + + p := w.page + clear = clear || p.full() + + // No more data will fit into the page. Enqueue and clear it. + if clear { + p.alloc = pageSize // write till end of page + w.pageCompletions.Inc() + } + n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) + if err != nil { + return err + } + p.flushed += n + + // We flushed an entire page, prepare a new one. + if clear { + for i := range p.buf { + p.buf[i] = 0 + } + p.alloc = 0 + p.flushed = 0 + w.donePages++ + } + return nil +} + +type recType uint8 + +const ( + recPageTerm recType = 0 // Rest of page is empty. + recFull recType = 1 // Full record. + recFirst recType = 2 // First fragment of a record. + recMiddle recType = 3 // Middle fragments of a record. + recLast recType = 4 // Final fragment of a record. +) + +func (t recType) String() string { + switch t { + case recPageTerm: + return "zero" + case recFull: + return "full" + case recFirst: + return "first" + case recMiddle: + return "middle" + case recLast: + return "last" + default: + return "" + } +} + +func (w *WAL) pagesPerSegment() int { + return w.segmentSize / pageSize +} + +// Log writes the records into the log. +// Multiple records can be passed at once to reduce writes and increase throughput. +func (w *WAL) Log(recs ...[]byte) error { + w.mtx.Lock() + defer w.mtx.Unlock() + // Callers could just implement their own list record format but adding + // a bit of extra logic here frees them from that overhead. + for i, r := range recs { + if err := w.log(r, i == len(recs)-1); err != nil { + return err + } + } + return nil +} + +// log writes rec to the log and forces a flush of the current page if its +// the final record of a batch. +func (w *WAL) log(rec []byte, final bool) error { + // If the record is too big to fit within pages in the current + // segment, terminate the active segment and advance to the next one. + // This ensures that records do not cross segment boundaries. + left := w.page.remaining() - recordHeaderSize // Active pages. + left += (pageSize - recordHeaderSize) * (w.pagesPerSegment() - w.donePages - 1) // Free pages. + + if len(rec) > left { + if err := w.nextSegment(); err != nil { + return err + } + } + + // Populate as many pages as necessary to fit the record. + // Be careful to always do one pass to ensure we write zero-length records. + for i := 0; i == 0 || len(rec) > 0; i++ { + p := w.page + + // Find how much of the record we can fit into the page. + var ( + l = min(len(rec), (pageSize-p.alloc)-recordHeaderSize) + part = rec[:l] + buf = p.buf[p.alloc:] + typ recType + ) + + switch { + case i == 0 && len(part) == len(rec): + typ = recFull + case len(part) == len(rec): + typ = recLast + case i == 0: + typ = recFirst + default: + typ = recMiddle + } + + buf[0] = byte(typ) + crc := crc32.Checksum(part, castagnoliTable) + binary.BigEndian.PutUint16(buf[1:], uint16(len(part))) + binary.BigEndian.PutUint32(buf[3:], crc) + + copy(buf[recordHeaderSize:], part) + p.alloc += len(part) + recordHeaderSize + + // If we wrote a full record, we can fit more records of the batch + // into the page before flushing it. + if final || typ != recFull || w.page.full() { + if err := w.flushPage(false); err != nil { + return err + } + } + rec = rec[l:] + } + return nil +} + +// Segments returns the range [m, n] of currently existing segments. +// If no segments are found, m and n are -1. +func (w *WAL) Segments() (m, n int, err error) { + refs, err := listSegments(w.dir) + if err != nil { + return 0, 0, err + } + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0].n, refs[len(refs)-1].n, nil +} + +// Truncate drops all segments before i. +func (w *WAL) Truncate(i int) error { + refs, err := listSegments(w.dir) + if err != nil { + return err + } + for _, r := range refs { + if r.n >= i { + break + } + if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil { + return err + } + } + return nil +} + +func (w *WAL) fsync(f *Segment) error { + start := time.Now() + err := fileutil.Fsync(f.File) + w.fsyncDuration.Observe(time.Since(start).Seconds()) + return err +} + +// Close flushes all writes and closes active segment. +func (w *WAL) Close() (err error) { + w.mtx.Lock() + defer w.mtx.Unlock() + + // Flush the last page and zero out all its remaining size. + // We must not flush an empty page as it would falsely signal + // the segment is done if we start writing to it again after opening. + if w.page.alloc > 0 { + if err := w.flushPage(true); err != nil { + return err + } + } + + donec := make(chan struct{}) + w.stopc <- donec + <-donec + + if err = w.fsync(w.segment); err != nil { + level.Error(w.logger).Log("msg", "sync previous segment", "err", err) + } + if err := w.segment.Close(); err != nil { + level.Error(w.logger).Log("msg", "close previous segment", "err", err) + } + + return nil +} + +type segmentRef struct { + s string + n int +} + +func listSegments(dir string) (refs []segmentRef, err error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return nil, err + } + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return nil, errors.New("segments are not sequential") + } + refs = append(refs, segmentRef{s: fn, n: k}) + last = k + } + sort.Slice(refs, func(i, j int) bool { + return refs[i].n < refs[j].n + }) + return refs, nil +} + +// NewSegmentsReader returns a new reader over all segments in the directory. +func NewSegmentsReader(dir string) (io.ReadCloser, error) { + return NewSegmentsRangeReader(dir, 0, math.MaxInt32) +} + +// NewSegmentsRangeReader returns a new reader over the given WAL segment range. +// If m or n are -1, the range is open on the respective end. +func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { + refs, err := listSegments(dir) + if err != nil { + return nil, err + } + var segs []*Segment + + for _, r := range refs { + if m >= 0 && r.n < m { + continue + } + if n >= 0 && r.n > n { + break + } + s, err := OpenReadSegment(filepath.Join(dir, r.s)) + if err != nil { + return nil, err + } + segs = append(segs, s) + } + return newSegmentBufReader(segs...), nil +} + +// segmentBufReader is a buffered reader that reads in multiples of pages. +// The main purpose is that we are able to track segment and offset for +// corruption reporting. +type segmentBufReader struct { + buf *bufio.Reader + segs []*Segment + cur int + off int + more bool +} + +func newSegmentBufReader(segs ...*Segment) *segmentBufReader { + return &segmentBufReader{ + buf: bufio.NewReaderSize(nil, 16*pageSize), + segs: segs, + cur: -1, + } +} + +func (r *segmentBufReader) Close() (err error) { + for _, s := range r.segs { + if e := s.Close(); e != nil { + err = e + } + } + return err +} + +func (r *segmentBufReader) Read(b []byte) (n int, err error) { + if !r.more { + if r.cur+1 >= len(r.segs) { + return 0, io.EOF + } + r.cur++ + r.off = 0 + r.more = true + r.buf.Reset(r.segs[r.cur]) + } + n, err = r.buf.Read(b) + r.off += n + if err != io.EOF { + return n, err + } + // Just return what we read so far, but don't signal EOF. + // Only unset more so we don't invalidate the current segment and + // offset before the next read. + r.more = false + return n, nil +} + +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int64 // total bytes processed. +} + +// NewReader returns a new reader. +func NewReader(r io.Reader) *Reader { + return &Reader{rdr: r} +} + +// Next advances the reader to the next records and returns true if it exists. +// It must not be called again after it returned false. +func (r *Reader) Next() bool { + err := r.next() + if errors.Cause(err) == io.EOF { + return false + } + r.err = err + return r.err == nil +} + +func (r *Reader) next() (err error) { + // We have to use r.buf since allocating byte arrays here fails escape + // analysis and ends up on the heap, even though it seemingly should not. + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] + + r.rec = r.rec[:0] + + i := 0 + for { + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") + } + r.total++ + typ := recType(hdr[0]) + + // Gobble up zero bytes. + if typ == recPageTerm { + // We are pedantic and check whether the zeros are actually up + // to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (r.total % pageSize) + if k == pageSize { + continue // Initial 0 byte was last page byte. + } + n, err := io.ReadFull(r.rdr, buf[:k]) + if err != nil { + return errors.Wrap(err, "read remaining zeros") + } + r.total += int64(n) + + for _, c := range buf[:k] { + if c != 0 { + return errors.New("unexpected non-zero byte in padded page") + } + } + continue + } + n, err := io.ReadFull(r.rdr, hdr[1:]) + if err != nil { + return errors.Wrap(err, "read remaining header") + } + r.total += int64(n) + + var ( + length = binary.BigEndian.Uint16(hdr[1:]) + crc = binary.BigEndian.Uint32(hdr[3:]) + ) + + if length > pageSize-recordHeaderSize { + return errors.Errorf("invalid record size %d", length) + } + n, err = io.ReadFull(r.rdr, buf[:length]) + if err != nil { + return err + } + r.total += int64(n) + + if n != int(length) { + return errors.Errorf("invalid size: expected %d, got %d", length, n) + } + if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { + return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + r.rec = append(r.rec, buf[:length]...) + + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record") + } + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record") + } + case recLast: + if i == 0 { + return errors.New("unexpected last record") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + i++ + } +} + +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. +func (r *Reader) Err() error { + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Segment: b.segs[b.cur].Index(), + Offset: int64(b.off), + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } +} + +// Record returns the current record. The returned byte slice is only +// valid until the next call to Next. +func (r *Reader) Record() []byte { + return r.rec +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 16d2d5978..73134a28c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -841,40 +841,46 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "gzvR+g1v/ILXxAt/NuxzIPWk1x0=", + "checksumSHA1": "vRK6HrNOeJheYudfpCIUyh42T3o=", "path": "github.com/prometheus/tsdb", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { "checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=", "path": "github.com/prometheus/tsdb/chunkenc", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { "checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { - "checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=", + "checksumSHA1": "bL3t5K2Q8e1GuM6gy5PAJ05go14=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { "checksumSHA1": "AZGFK4UtJe8/j8pHqGTNQ8wu27g=", "path": "github.com/prometheus/tsdb/index", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { "checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=", "path": "github.com/prometheus/tsdb/labels", - "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", - "revisionTime": "2018-07-11T11:21:26Z" + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" + }, + { + "checksumSHA1": "6GXK7RnUngyM9OT/M2uzv8T3DOY=", + "path": "github.com/prometheus/tsdb/wal", + "revision": "06f01d45ad2ca2853c9dc1a0d5db6c75c8af6a5a", + "revisionTime": "2018-08-07T11:25:08Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", From b7e2f407ded76644edc97957c6e8e611a64ea29d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 7 Aug 2018 07:30:44 -0400 Subject: [PATCH 2/2] rules: Fix double-locking of mutex Signed-off-by: Fabian Reinartz --- rules/alerting.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rules/alerting.go b/rules/alerting.go index 36377c099..5f8b347b0 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -364,8 +364,8 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, } } - r.SetHealth(HealthGood) - r.SetLastError(err) + r.health = HealthGood + r.lastError = err return vec, nil }