update tsdb (#4692)

removed unused code
metrics for WAL operations
minor variable renamings to improve readability
now using the common.Makefile
fixed the total NumTombstones count Stats

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/4713/head
Krasi Georgiev 6 years ago committed by GitHub
parent 17fea5478a
commit 73fba6d0e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,34 @@
# Copyright 2018 The Prometheus Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TSDB_PROJECT_DIR = "."
TSDB_CLI_DIR="$(TSDB_PROJECT_DIR)/cmd/tsdb"
TSDB_BIN = "$(TSDB_CLI_DIR)/tsdb"
TSDB_BENCHMARK_NUM_METRICS ?= 1000
TSDB_BENCHMARK_DATASET ?= "$(TSDB_PROJECT_DIR)/testdata/20kseries.json"
TSDB_BENCHMARK_OUTPUT_DIR ?= "$(TSDB_CLI_DIR)/benchout"
STATICCHECK_IGNORE =
include Makefile.common
build:
@$(GO) build -o $(TSDB_BIN) $(TSDB_CLI_DIR)
bench: build
@echo ">> running benchmark, writing result to $(TSDB_BENCHMARK_OUTPUT_DIR)"
@$(TSDB_BIN) bench write --metrics=$(TSDB_BENCHMARK_NUM_METRICS) --out=$(TSDB_BENCHMARK_OUTPUT_DIR) $(TSDB_BENCHMARK_DATASET)
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/cpu.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/cpuprof.svg
@$(GO) tool pprof --inuse_space -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.inuse.svg
@$(GO) tool pprof --alloc_space -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mem.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/memprof.alloc.svg
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/block.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/blockprof.svg
@$(GO) tool pprof -svg $(TSDB_BIN) $(TSDB_BENCHMARK_OUTPUT_DIR)/mutex.prof > $(TSDB_BENCHMARK_OUTPUT_DIR)/mutexprof.svg

@ -0,0 +1,132 @@
# Copyright 2018 The Prometheus Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# A common Makefile that includes rules to be reused in different prometheus projects.
# !!! Open PRs only against the prometheus/prometheus/Makefile.common repository!
# Example usage :
# Create the main Makefile in the root project directory.
# include Makefile.common
# customTarget:
# @echo ">> Running customTarget"
#
# Ensure GOBIN is not set during build so that promu is installed to the correct path
unexport GOBIN
GO ?= go
GOFMT ?= $(GO)fmt
FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH)))
PROMU := $(FIRST_GOPATH)/bin/promu
STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck
GOVENDOR := $(FIRST_GOPATH)/bin/govendor
pkgs = ./...
PREFIX ?= $(shell pwd)
BIN_DIR ?= $(shell pwd)
DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD))
DOCKER_REPO ?= prom
.PHONY: all
all: style staticcheck unused build test
# This rule is used to forward a target like "build" to "common-build". This
# allows a new "build" target to be defined in a Makefile which includes this
# one and override "common-build" without override warnings.
%: common-% ;
.PHONY: common-style
common-style:
@echo ">> checking code style"
@fmtRes=$$($(GOFMT) -d $$(find . -path ./vendor -prune -o -name '*.go' -print)); \
if [ -n "$${fmtRes}" ]; then \
echo "gofmt checking failed!"; echo "$${fmtRes}"; echo; \
echo "Please ensure you are using $$($(GO) version) for formatting code."; \
exit 1; \
fi
.PHONY: common-check_license
common-check_license:
@echo ">> checking license header"
@licRes=$$(for file in $$(find . -type f -iname '*.go' ! -path './vendor/*') ; do \
awk 'NR<=3' $$file | grep -Eq "(Copyright|generated|GENERATED)" || echo $$file; \
done); \
if [ -n "$${licRes}" ]; then \
echo "license header checking failed:"; echo "$${licRes}"; \
exit 1; \
fi
.PHONY: common-test-short
common-test-short:
@echo ">> running short tests"
$(GO) test -short $(pkgs)
.PHONY: common-test
common-test:
@echo ">> running all tests"
$(GO) test -race $(pkgs)
.PHONY: common-format
common-format:
@echo ">> formatting code"
$(GO) fmt $(pkgs)
.PHONY: common-vet
common-vet:
@echo ">> vetting code"
$(GO) vet $(pkgs)
.PHONY: common-staticcheck
common-staticcheck: $(STATICCHECK)
@echo ">> running staticcheck"
$(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs)
.PHONY: common-unused
common-unused: $(GOVENDOR)
@echo ">> running check for unused packages"
@$(GOVENDOR) list +unused | grep . && exit 1 || echo 'No unused packages'
.PHONY: common-build
common-build: promu
@echo ">> building binaries"
$(PROMU) build --prefix $(PREFIX)
.PHONY: common-tarball
common-tarball: promu
@echo ">> building release tarball"
$(PROMU) tarball --prefix $(PREFIX) $(BIN_DIR)
.PHONY: common-docker
common-docker:
docker build -t "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" .
.PHONY: common-docker-publish
common-docker-publish:
docker push "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME)"
.PHONY: common-docker-tag-latest
common-docker-tag-latest:
docker tag "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME):latest"
.PHONY: promu
promu:
GOOS= GOARCH= $(GO) get -u github.com/prometheus/promu
.PHONY: $(STATICCHECK)
$(STATICCHECK):
GOOS= GOARCH= $(GO) get -u honnef.co/go/tools/cmd/staticcheck
.PHONY: $(GOVENDOR)
$(GOVENDOR):
GOOS= GOARCH= $(GO) get -u github.com/kardianos/govendor

@ -185,16 +185,10 @@ type BlockMetaCompaction struct {
Failed bool `json:"failed,omitempty"`
}
const (
flagNone = 0
flagStd = 1
)
const indexFilename = "index"
const metaFilename = "meta.json"
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func walDir(dir string) string { return filepath.Join(dir, "wal") }
func readMetaFile(dir string) (*BlockMeta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
@ -484,7 +478,6 @@ Outer:
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _, iv := range ivs {
stones.addInterval(id, iv)
pb.meta.Stats.NumTombstones++
}
return nil
})
@ -492,6 +485,7 @@ Outer:
return err
}
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err

@ -23,8 +23,6 @@ import (
"strconv"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/wal"
@ -100,12 +98,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate
// it with the original WAL.
//
// Non-critical errors are logged and not returned.
func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
if logger == nil {
logger = log.NewNopLogger()
}
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{}
var sr io.Reader
@ -272,17 +265,5 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(logger).Log("msg", "truncating segments failed", "err", err)
}
if err := DeleteCheckpoints(w.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(logger).Log("msg", "delete old checkpoints", "err", err)
}
return stats, nil
}

@ -300,7 +300,7 @@ func (db *DB) run() {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
_, err := db.compact()
err := db.compact()
if err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
@ -366,12 +366,12 @@ func (a dbAppender) Commit() error {
// this is sufficient to reliably delete old data.
// Old blocks are only deleted on reload based on the new block's parent information.
// See DB.reload documentation for further information.
func (db *DB) compact() (changes bool, err error) {
func (db *DB) compact() (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()
if !db.compactionsEnabled {
return false, nil
return nil
}
// Check whether we have pending head blocks that are ready to be persisted.
@ -379,7 +379,7 @@ func (db *DB) compact() (changes bool, err error) {
for {
select {
case <-db.stopc:
return changes, nil
return nil
default:
}
// The head has a compactable range if 1.5 level 0 ranges are between the oldest
@ -402,14 +402,13 @@ func (db *DB) compact() (changes bool, err error) {
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block")
return errors.Wrap(err, "persist head block")
}
changes = true
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
return errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
@ -418,7 +417,7 @@ func (db *DB) compact() (changes bool, err error) {
for {
plan, err := db.compactor.Plan(db.dir)
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
return errors.Wrap(err, "plan compaction")
}
if len(plan) == 0 {
break
@ -426,23 +425,22 @@ func (db *DB) compact() (changes bool, err error) {
select {
case <-db.stopc:
return changes, nil
return nil
default:
}
if _, err := db.compactor.Compact(db.dir, plan...); err != nil {
return changes, errors.Wrapf(err, "compact %s", plan)
return errors.Wrapf(err, "compact %s", plan)
}
changes = true
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
return errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
return changes, nil
return nil
}
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {

@ -5,11 +5,80 @@
package fileutil
import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
)
// CopyDirs copies all directories, subdirectories and files recursively including the empty folders.
// Source and destination must be full paths.
func CopyDirs(src, dest string) error {
if err := os.MkdirAll(dest, 0777); err != nil {
return err
}
files, err := readDirs(src)
if err != nil {
return err
}
for _, f := range files {
dp := filepath.Join(dest, f)
sp := filepath.Join(src, f)
stat, err := os.Stat(sp)
if err != nil {
return err
}
// Empty directories are also created.
if stat.IsDir() {
if err := os.MkdirAll(dp, 0777); err != nil {
return err
}
continue
}
if err := copyFile(sp, dp); err != nil {
return err
}
}
return nil
}
func copyFile(src, dest string) error {
data, err := ioutil.ReadFile(src)
if err != nil {
return err
}
err = ioutil.WriteFile(dest, data, 0644)
if err != nil {
return err
}
return nil
}
// readDirs reads the source directory recursively and
// returns relative paths to all files and empty directories.
func readDirs(src string) ([]string, error) {
var files []string
var err error
err = filepath.Walk(src, func(path string, f os.FileInfo, err error) error {
relativePath := strings.TrimPrefix(path, src)
if len(relativePath) > 0 {
files = append(files, relativePath)
}
return nil
})
if err != nil {
return nil, err
}
return files, nil
}
// ReadDir returns the filenames in the given directory in sorted order.
func ReadDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)

@ -76,19 +76,25 @@ type Head struct {
}
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
activeAppenders prometheus.Gauge
series prometheus.Gauge
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -150,6 +156,30 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
})
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
if r != nil {
r.MustRegister(
@ -166,6 +196,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.gcDuration,
m.walTruncateDuration,
m.samplesAppended,
m.headTruncateFail,
m.headTruncateTotal,
m.checkpointDeleteFail,
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
)
}
return m
@ -421,7 +457,12 @@ func (h *Head) Init() error {
}
// Truncate removes old data before mint from the head.
func (h *Head) Truncate(mint int64) error {
func (h *Head) Truncate(mint int64) (err error) {
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
}
}()
initialize := h.MinTime() == math.MaxInt64
if h.MinTime() >= mint && !initialize {
@ -440,6 +481,7 @@ func (h *Head) Truncate(mint int64) error {
return nil
}
h.metrics.headTruncateTotal.Inc()
start := time.Now()
h.gc()
@ -469,9 +511,25 @@ func (h *Head) Truncate(mint int64) error {
keep := func(id uint64) bool {
return h.series.getByID(id) != nil
}
if _, err = Checkpoint(h.logger, h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint")
}
if err := h.wal.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
}
h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher checkpoint exists.
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
h.metrics.checkpointDeleteFail.Inc()
}
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete",

@ -394,7 +394,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
w.buf2.putBE32int(valt.Len())
// here we have an index for the symbol file if v2, otherwise it's an offset
for _, v := range valt.s {
for _, v := range valt.entries {
index, ok := w.symbols[v]
if !ok {
return errors.Errorf("symbol entry for %q does not exist", v)
@ -870,9 +870,9 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
return nil, errors.Wrap(d.err(), "read label value index")
}
st := &serializedStringTuples{
l: nc,
b: d.get(),
lookup: r.lookupSymbol,
idsCount: nc,
idsBytes: d.get(),
lookup: r.lookupSymbol,
}
return st, nil
}
@ -936,33 +936,33 @@ func (r *Reader) SortedPostings(p Postings) Postings {
}
type stringTuples struct {
l int // tuple length
s []string // flattened tuple entries
length int // tuple length
entries []string // flattened tuple entries
}
func NewStringTuples(s []string, l int) (*stringTuples, error) {
if len(s)%l != 0 {
func NewStringTuples(entries []string, length int) (*stringTuples, error) {
if len(entries)%length != 0 {
return nil, errors.Wrap(errInvalidSize, "string tuple list")
}
return &stringTuples{s: s, l: l}, nil
return &stringTuples{entries: entries, length: length}, nil
}
func (t *stringTuples) Len() int { return len(t.s) / t.l }
func (t *stringTuples) At(i int) ([]string, error) { return t.s[i : i+t.l], nil }
func (t *stringTuples) Len() int { return len(t.entries) / t.length }
func (t *stringTuples) At(i int) ([]string, error) { return t.entries[i : i+t.length], nil }
func (t *stringTuples) Swap(i, j int) {
c := make([]string, t.l)
copy(c, t.s[i:i+t.l])
c := make([]string, t.length)
copy(c, t.entries[i:i+t.length])
for k := 0; k < t.l; k++ {
t.s[i+k] = t.s[j+k]
t.s[j+k] = c[k]
for k := 0; k < t.length; k++ {
t.entries[i+k] = t.entries[j+k]
t.entries[j+k] = c[k]
}
}
func (t *stringTuples) Less(i, j int) bool {
for k := 0; k < t.l; k++ {
d := strings.Compare(t.s[i+k], t.s[j+k])
for k := 0; k < t.length; k++ {
d := strings.Compare(t.entries[i+k], t.entries[j+k])
if d < 0 {
return true
@ -975,23 +975,23 @@ func (t *stringTuples) Less(i, j int) bool {
}
type serializedStringTuples struct {
l int
b []byte
lookup func(uint32) (string, error)
idsCount int
idsBytes []byte // bytes containing the ids pointing to the string in the lookup table.
lookup func(uint32) (string, error)
}
func (t *serializedStringTuples) Len() int {
return len(t.b) / (4 * t.l)
return len(t.idsBytes) / (4 * t.idsCount)
}
func (t *serializedStringTuples) At(i int) ([]string, error) {
if len(t.b) < (i+t.l)*4 {
if len(t.idsBytes) < (i+t.idsCount)*4 {
return nil, errInvalidSize
}
res := make([]string, 0, t.l)
res := make([]string, 0, t.idsCount)
for k := 0; k < t.l; k++ {
offset := binary.BigEndian.Uint32(t.b[(i+k)*4:])
for k := 0; k < t.idsCount; k++ {
offset := binary.BigEndian.Uint32(t.idsBytes[(i+k)*4:])
s, err := t.lookup(offset)
if err != nil {

@ -892,30 +892,6 @@ func (it *deletedIterator) Err() error {
return it.it.Err()
}
type mockSeriesSet struct {
next func() bool
series func() Series
err func() error
}
func (m *mockSeriesSet) Next() bool { return m.next() }
func (m *mockSeriesSet) At() Series { return m.series() }
func (m *mockSeriesSet) Err() error { return m.err() }
func newListSeriesSet(list []Series) *mockSeriesSet {
i := -1
return &mockSeriesSet{
next: func() bool {
i++
return i < len(list)
},
series: func() Series {
return list[i]
},
err: func() error { return nil },
}
}
type errSeriesSet struct {
err error
}

@ -42,6 +42,9 @@ type TombstoneReader interface {
// Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error
// Total returns the total count of tombstones.
Total() uint64
// Close any underlying resources
Close() error
}
@ -185,6 +188,17 @@ func (t *memTombstones) Iter(f func(uint64, Intervals) error) error {
return nil
}
func (t *memTombstones) Total() uint64 {
t.mtx.RLock()
defer t.mtx.RUnlock()
total := uint64(0)
for _, ivs := range t.intvlGroups {
total += uint64(len(ivs))
}
return total
}
// addInterval to an existing memTombstones
func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
t.mtx.Lock()

@ -750,15 +750,6 @@ func (w *SegmentWAL) Close() error {
return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name())
}
const (
minSectorSize = 512
// walPageBytes is the alignment for flushing records to the backing Writer.
// It should be a multiple of the minimum sector size so that WAL can safely
// distinguish between torn writes and ordinary data corruption.
walPageBytes = 16 * minSectorSize
)
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
// Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment.

@ -162,6 +162,8 @@ type WAL struct {
fsyncDuration prometheus.Summary
pageFlushes prometheus.Counter
pageCompletions prometheus.Counter
truncateFail prometheus.Counter
truncateTotal prometheus.Counter
}
// New returns a new WAL over the given directory.
@ -201,8 +203,16 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi
Name: "prometheus_tsdb_wal_completed_pages_total",
Help: "Total number of completed pages.",
})
w.truncateFail = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_failed_total",
Help: "Total number of WAL truncations that failed.",
})
w.truncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_truncations_total",
Help: "Total number of WAL truncations attempted.",
})
if reg != nil {
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions)
reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal)
}
_, j, err := w.Segments()
@ -527,7 +537,13 @@ func (w *WAL) Segments() (m, n int, err error) {
}
// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) error {
func (w *WAL) Truncate(i int) (err error) {
w.truncateTotal.Inc()
defer func() {
if err != nil {
w.truncateFail.Inc()
}
}()
refs, err := listSegments(w.dir)
if err != nil {
return err
@ -536,7 +552,7 @@ func (w *WAL) Truncate(i int) error {
if r.n >= i {
break
}
if err := os.Remove(filepath.Join(w.dir, r.s)); err != nil {
if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil {
return err
}
}

36
vendor/vendor.json vendored

@ -847,46 +847,46 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "JLELG+wyXa02O8G/RTLpfPijOE8=",
"checksumSHA1": "Lxf9zE7+7AVy9XzRKGFGsJLNQQg=",
"path": "github.com/prometheus/tsdb",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",
"path": "github.com/prometheus/tsdb/chunkenc",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "gOTpgLnuc2Ygwof6pCpLVedz00I=",
"checksumSHA1": "thpsi9kun7R16uGmHtZs5+H1/j0=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "dd5JqbMKbRKI6XlGzrvEAuMUJY4=",
"checksumSHA1": "OOpNX8vclkCZ9fiK3w4p2s9PykM=",
"path": "github.com/prometheus/tsdb/index",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "/NURBB5JOqg7fhWrVq8R8Sl3dAI=",
"checksumSHA1": "9e76b1itftTi/+2hltBrRVZdR1M=",
"path": "github.com/prometheus/tsdb/wal",
"revision": "9c8ca47399a7f53f57b440464c103f5067e9b7b6",
"revisionTime": "2018-09-21T05:31:22Z"
"revision": "0ce41118ed2055ac7678085bb521de287ef355fd",
"revisionTime": "2018-10-03T08:08:31Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",

Loading…
Cancel
Save