more descriptive var names and some more logging. (#405)

* more descriptive checkpoint var names and some more logging.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
pull/5805/head
Krasi Georgiev 6 years ago committed by GitHub
parent 0ce41118ed
commit d7492b9350
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,7 +38,7 @@ type CheckpointStats struct {
TotalTombstones int // Processed tombstones including dropped ones. TotalTombstones int // Processed tombstones including dropped ones.
} }
// LastCheckpoint returns the directory name of the most recent checkpoint. // LastCheckpoint returns the directory name and index of the most recent checkpoint.
// If dir does not contain any checkpoints, ErrNotFound is returned. // If dir does not contain any checkpoints, ErrNotFound is returned.
func LastCheckpoint(dir string) (string, int, error) { func LastCheckpoint(dir string) (string, int, error) {
files, err := ioutil.ReadDir(dir) files, err := ioutil.ReadDir(dir)
@ -55,18 +55,17 @@ func LastCheckpoint(dir string) (string, int, error) {
if !fi.IsDir() { if !fi.IsDir() {
return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name()) return "", 0, errors.Errorf("checkpoint %s is not a directory", fi.Name())
} }
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil { if err != nil {
continue continue
} }
return fi.Name(), k, nil return fi.Name(), idx, nil
} }
return "", 0, ErrNotFound return "", 0, ErrNotFound
} }
// DeleteCheckpoints deletes all checkpoints in dir that have an index // DeleteCheckpoints deletes all checkpoints in a directory below a given index.
// below n. func DeleteCheckpoints(dir string, maxIndex int) error {
func DeleteCheckpoints(dir string, n int) error {
var errs MultiError var errs MultiError
files, err := ioutil.ReadDir(dir) files, err := ioutil.ReadDir(dir)
@ -77,8 +76,8 @@ func DeleteCheckpoints(dir string, n int) error {
if !strings.HasPrefix(fi.Name(), checkpointPrefix) { if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
continue continue
} }
k, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
if err != nil || k >= n { if err != nil || index >= maxIndex {
continue continue
} }
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
@ -90,7 +89,7 @@ func DeleteCheckpoints(dir string, n int) error {
const checkpointPrefix = "checkpoint." const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [m, n] in the given WAL. // Checkpoint creates a compacted checkpoint of segments in range [first, last] in the given WAL.
// It includes the most recent checkpoint if it exists. // It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples below mint are dropped. // All series not satisfying keep and samples below mint are dropped.
// //
@ -98,7 +97,7 @@ const checkpointPrefix = "checkpoint."
// segmented format as the original WAL itself. // segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate // This makes it easy to read it through the WAL package and concatenate
// it with the original WAL. // it with the original WAL.
func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sr io.Reader var sr io.Reader
@ -107,27 +106,28 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C
// files if there is an error somewhere. // files if there is an error somewhere.
var closers []io.Closer var closers []io.Closer
{ {
lastFn, k, err := LastCheckpoint(w.Dir()) dir, idx, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return nil, errors.Wrap(err, "find last checkpoint") return nil, errors.Wrap(err, "find last checkpoint")
} }
last := idx + 1
if err == nil { if err == nil {
if m > k+1 { if from > last {
return nil, errors.New("unexpected gap to last checkpoint") return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from)
} }
// Ignore WAL files below the checkpoint. They shouldn't exist to begin with. // Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
m = k + 1 from = last
last, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), lastFn)) r, err := wal.NewSegmentsReader(filepath.Join(w.Dir(), dir))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open last checkpoint") return nil, errors.Wrap(err, "open last checkpoint")
} }
defer last.Close() defer r.Close()
closers = append(closers, last) closers = append(closers, r)
sr = last sr = r
} }
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n) segsr, err := wal.NewSegmentsRangeReader(w.Dir(), from, to)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "create segment reader") return nil, errors.Wrap(err, "create segment reader")
} }
@ -141,7 +141,7 @@ func Checkpoint(w *wal.WAL, m, n int, keep func(id uint64) bool, mint int64) (*C
} }
} }
cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", n)) cpdir := filepath.Join(w.Dir(), fmt.Sprintf("checkpoint.%06d", to))
cpdirtmp := cpdir + ".tmp" cpdirtmp := cpdir + ".tmp"
if err := os.MkdirAll(cpdirtmp, 0777); err != nil { if err := os.MkdirAll(cpdirtmp, 0777); err != nil {

@ -418,12 +418,12 @@ func (h *Head) Init() error {
} }
// Backfill the checkpoint first if it exists. // Backfill the checkpoint first if it exists.
cp, n, err := LastCheckpoint(h.wal.Dir()) dir, startFrom, err := LastCheckpoint(h.wal.Dir())
if err != nil && err != ErrNotFound { if err != nil && err != ErrNotFound {
return errors.Wrap(err, "find last checkpoint") return errors.Wrap(err, "find last checkpoint")
} }
if err == nil { if err == nil {
sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), cp)) sr, err := wal.NewSegmentsReader(filepath.Join(h.wal.Dir(), dir))
if err != nil { if err != nil {
return errors.Wrap(err, "open checkpoint") return errors.Wrap(err, "open checkpoint")
} }
@ -434,11 +434,11 @@ func (h *Head) Init() error {
if err := h.loadWAL(wal.NewReader(sr)); err != nil { if err := h.loadWAL(wal.NewReader(sr)); err != nil {
return errors.Wrap(err, "backfill checkpoint") return errors.Wrap(err, "backfill checkpoint")
} }
n++ startFrom++
} }
// Backfill segments from the last checkpoint onwards // Backfill segments from the last checkpoint onwards
sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), n, -1) sr, err := wal.NewSegmentsRangeReader(h.wal.Dir(), startFrom, -1)
if err != nil { if err != nil {
return errors.Wrap(err, "open WAL segments") return errors.Wrap(err, "open WAL segments")
} }
@ -493,18 +493,18 @@ func (h *Head) Truncate(mint int64) (err error) {
} }
start = time.Now() start = time.Now()
m, n, err := h.wal.Segments() first, last, err := h.wal.Segments()
if err != nil { if err != nil {
return errors.Wrap(err, "get segment range") return errors.Wrap(err, "get segment range")
} }
n-- // Never consider last segment for checkpoint. last-- // Never consider last segment for checkpoint.
if n < 0 { if last < 0 {
return nil // no segments yet. return nil // no segments yet.
} }
// The lower third of segments should contain mostly obsolete samples. // The lower third of segments should contain mostly obsolete samples.
// If we have less than three segments, it's not worth checkpointing yet. // If we have less than three segments, it's not worth checkpointing yet.
n = m + (n-m)/3 last = first + (last-first)/3
if n <= m { if last <= first {
return nil return nil
} }
@ -512,18 +512,18 @@ func (h *Head) Truncate(mint int64) (err error) {
return h.series.getByID(id) != nil return h.series.getByID(id) != nil
} }
h.metrics.checkpointCreationTotal.Inc() h.metrics.checkpointCreationTotal.Inc()
if _, err = Checkpoint(h.wal, m, n, keep, mint); err != nil { if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil {
h.metrics.checkpointCreationFail.Inc() h.metrics.checkpointCreationFail.Inc()
return errors.Wrap(err, "create checkpoint") return errors.Wrap(err, "create checkpoint")
} }
if err := h.wal.Truncate(n + 1); err != nil { if err := h.wal.Truncate(last + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint. // If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint // Leftover segments will just be ignored in the future if there's a checkpoint
// that supersedes them. // that supersedes them.
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
} }
h.metrics.checkpointDeleteTotal.Inc() h.metrics.checkpointDeleteTotal.Inc()
if err := DeleteCheckpoints(h.wal.Dir(), n); err != nil { if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil {
// Leftover old checkpoints do not cause problems down the line beyond // Leftover old checkpoints do not cause problems down the line beyond
// occupying disk space. // occupying disk space.
// They will just be ignored since a higher checkpoint exists. // They will just be ignored since a higher checkpoint exists.
@ -533,7 +533,7 @@ func (h *Head) Truncate(mint int64) (err error) {
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
level.Info(h.logger).Log("msg", "WAL checkpoint complete", level.Info(h.logger).Log("msg", "WAL checkpoint complete",
"low", m, "high", n, "duration", time.Since(start)) "first", first, "last", last, "duration", time.Since(start))
return nil return nil
} }

@ -289,13 +289,13 @@ func (w *WAL) Repair(origErr error) error {
if err != nil { if err != nil {
return errors.Wrap(err, "list segments") return errors.Wrap(err, "list segments")
} }
level.Warn(w.logger).Log("msg", "deleting all segments behind corruption") level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment)
for _, s := range segs { for _, s := range segs {
if s.n <= cerr.Segment { if s.index <= cerr.Segment {
continue continue
} }
if w.segment.i == s.n { if w.segment.i == s.index {
// The active segment needs to be removed, // The active segment needs to be removed,
// close it first (Windows!). Can be closed safely // close it first (Windows!). Can be closed safely
// as we set the current segment to repaired file // as we set the current segment to repaired file
@ -304,14 +304,14 @@ func (w *WAL) Repair(origErr error) error {
return errors.Wrap(err, "close active segment") return errors.Wrap(err, "close active segment")
} }
} }
if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { if err := os.Remove(filepath.Join(w.dir, s.name)); err != nil {
return errors.Wrap(err, "delete segment") return errors.Wrapf(err, "delete segment:%v", s.index)
} }
} }
// Regardless of the corruption offset, no record reaches into the previous segment. // Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all // So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption. // its records up to the corruption.
level.Warn(w.logger).Log("msg", "rewrite corrupted segment") level.Warn(w.logger).Log("msg", "rewrite corrupted segment", "segment", cerr.Segment)
fn := SegmentName(w.dir, cerr.Segment) fn := SegmentName(w.dir, cerr.Segment)
tmpfn := fn + ".repair" tmpfn := fn + ".repair"
@ -523,9 +523,9 @@ func (w *WAL) log(rec []byte, final bool) error {
return nil return nil
} }
// Segments returns the range [m, n] of currently existing segments. // Segments returns the range [first, n] of currently existing segments.
// If no segments are found, m and n are -1. // If no segments are found, first and n are -1.
func (w *WAL) Segments() (m, n int, err error) { func (w *WAL) Segments() (first, last int, err error) {
refs, err := listSegments(w.dir) refs, err := listSegments(w.dir)
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
@ -533,7 +533,7 @@ func (w *WAL) Segments() (m, n int, err error) {
if len(refs) == 0 { if len(refs) == 0 {
return -1, -1, nil return -1, -1, nil
} }
return refs[0].n, refs[len(refs)-1].n, nil return refs[0].index, refs[len(refs)-1].index, nil
} }
// Truncate drops all segments before i. // Truncate drops all segments before i.
@ -549,10 +549,10 @@ func (w *WAL) Truncate(i int) (err error) {
return err return err
} }
for _, r := range refs { for _, r := range refs {
if r.n >= i { if r.index >= i {
break break
} }
if err = os.Remove(filepath.Join(w.dir, r.s)); err != nil { if err = os.Remove(filepath.Join(w.dir, r.name)); err != nil {
return err return err
} }
} }
@ -595,8 +595,8 @@ func (w *WAL) Close() (err error) {
} }
type segmentRef struct { type segmentRef struct {
s string name string
n int index int
} }
func listSegments(dir string) (refs []segmentRef, err error) { func listSegments(dir string) (refs []segmentRef, err error) {
@ -613,11 +613,11 @@ func listSegments(dir string) (refs []segmentRef, err error) {
if len(refs) > 0 && k > last+1 { if len(refs) > 0 && k > last+1 {
return nil, errors.New("segments are not sequential") return nil, errors.New("segments are not sequential")
} }
refs = append(refs, segmentRef{s: fn, n: k}) refs = append(refs, segmentRef{name: fn, index: k})
last = k last = k
} }
sort.Slice(refs, func(i, j int) bool { sort.Slice(refs, func(i, j int) bool {
return refs[i].n < refs[j].n return refs[i].index < refs[j].index
}) })
return refs, nil return refs, nil
} }
@ -628,8 +628,8 @@ func NewSegmentsReader(dir string) (io.ReadCloser, error) {
} }
// NewSegmentsRangeReader returns a new reader over the given WAL segment range. // NewSegmentsRangeReader returns a new reader over the given WAL segment range.
// If m or n are -1, the range is open on the respective end. // If first or last are -1, the range is open on the respective end.
func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) { func NewSegmentsRangeReader(dir string, first, last int) (io.ReadCloser, error) {
refs, err := listSegments(dir) refs, err := listSegments(dir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -637,13 +637,13 @@ func NewSegmentsRangeReader(dir string, m, n int) (io.ReadCloser, error) {
var segs []*Segment var segs []*Segment
for _, r := range refs { for _, r := range refs {
if m >= 0 && r.n < m { if first >= 0 && r.index < first {
continue continue
} }
if n >= 0 && r.n > n { if last >= 0 && r.index > last {
break break
} }
s, err := OpenReadSegment(filepath.Join(dir, r.s)) s, err := OpenReadSegment(filepath.Join(dir, r.name))
if err != nil { if err != nil {
return nil, err return nil, err
} }

Loading…
Cancel
Save