mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1008 lines
28 KiB
1008 lines
28 KiB
// Copyright 2021 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"fmt" |
|
"github.com/prometheus/prometheus/pkg/labels" |
|
"github.com/prometheus/prometheus/tsdb/chunkenc" |
|
"github.com/prometheus/prometheus/tsdb/encoding" |
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" |
|
"github.com/prometheus/prometheus/tsdb/fileutil" |
|
"io/ioutil" |
|
"math" |
|
"os" |
|
"path/filepath" |
|
"runtime" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"github.com/go-kit/log/level" |
|
"github.com/pkg/errors" |
|
"go.uber.org/atomic" |
|
|
|
"github.com/prometheus/prometheus/pkg/exemplar" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/tsdb/record" |
|
"github.com/prometheus/prometheus/tsdb/tombstones" |
|
"github.com/prometheus/prometheus/tsdb/wal" |
|
) |
|
|
|
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { |
|
// Track number of samples that referenced a series we don't know about |
|
// for error reporting. |
|
var unknownRefs atomic.Uint64 |
|
var unknownExemplarRefs atomic.Uint64 |
|
|
|
// Start workers that each process samples for a partition of the series ID space. |
|
// They are connected through a ring of channels which ensures that all sample batches |
|
// read from the WAL are processed in order. |
|
var ( |
|
wg sync.WaitGroup |
|
n = runtime.GOMAXPROCS(0) |
|
inputs = make([]chan []record.RefSample, n) |
|
outputs = make([]chan []record.RefSample, n) |
|
exemplarsInput chan record.RefExemplar |
|
|
|
dec record.Decoder |
|
shards = make([][]record.RefSample, n) |
|
|
|
decoded = make(chan interface{}, 10) |
|
decodeErr, seriesCreationErr error |
|
seriesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefSeries{} |
|
}, |
|
} |
|
samplesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefSample{} |
|
}, |
|
} |
|
tstonesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []tombstones.Stone{} |
|
}, |
|
} |
|
exemplarsPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefExemplar{} |
|
}, |
|
} |
|
) |
|
|
|
defer func() { |
|
// For CorruptionErr ensure to terminate all workers before exiting. |
|
_, ok := err.(*wal.CorruptionErr) |
|
if ok || seriesCreationErr != nil { |
|
for i := 0; i < n; i++ { |
|
close(inputs[i]) |
|
for range outputs[i] { |
|
} |
|
} |
|
close(exemplarsInput) |
|
wg.Wait() |
|
} |
|
}() |
|
|
|
wg.Add(n) |
|
for i := 0; i < n; i++ { |
|
outputs[i] = make(chan []record.RefSample, 300) |
|
inputs[i] = make(chan []record.RefSample, 300) |
|
|
|
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { |
|
unknown := h.processWALSamples(h.minValidTime.Load(), input, output) |
|
unknownRefs.Add(unknown) |
|
wg.Done() |
|
}(inputs[i], outputs[i]) |
|
} |
|
|
|
wg.Add(1) |
|
exemplarsInput = make(chan record.RefExemplar, 300) |
|
go func(input <-chan record.RefExemplar) { |
|
var err error |
|
defer wg.Done() |
|
for e := range input { |
|
ms := h.series.getByID(e.Ref) |
|
if ms == nil { |
|
unknownExemplarRefs.Inc() |
|
continue |
|
} |
|
|
|
if e.T < h.minValidTime.Load() { |
|
continue |
|
} |
|
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when |
|
// replaying the WAL, so lets just log the error if it's not that type. |
|
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) |
|
if err != nil && err == storage.ErrOutOfOrderExemplar { |
|
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) |
|
} |
|
} |
|
}(exemplarsInput) |
|
|
|
go func() { |
|
defer close(decoded) |
|
for r.Next() { |
|
rec := r.Record() |
|
switch dec.Type(rec) { |
|
case record.Series: |
|
series := seriesPool.Get().([]record.RefSeries)[:0] |
|
series, err = dec.Series(rec, series) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode series"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- series |
|
case record.Samples: |
|
samples := samplesPool.Get().([]record.RefSample)[:0] |
|
samples, err = dec.Samples(rec, samples) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode samples"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- samples |
|
case record.Tombstones: |
|
tstones := tstonesPool.Get().([]tombstones.Stone)[:0] |
|
tstones, err = dec.Tombstones(rec, tstones) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode tombstones"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- tstones |
|
case record.Exemplars: |
|
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] |
|
exemplars, err = dec.Exemplars(rec, exemplars) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode exemplars"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- exemplars |
|
default: |
|
// Noop. |
|
} |
|
} |
|
}() |
|
|
|
// The records are always replayed from the oldest to the newest. |
|
Outer: |
|
for d := range decoded { |
|
switch v := d.(type) { |
|
case []record.RefSeries: |
|
for _, walSeries := range v { |
|
mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels) |
|
if err != nil { |
|
seriesCreationErr = err |
|
break Outer |
|
} |
|
|
|
if h.lastSeriesID.Load() < walSeries.Ref { |
|
h.lastSeriesID.Store(walSeries.Ref) |
|
} |
|
|
|
mmc := mmappedChunks[walSeries.Ref] |
|
|
|
if created { |
|
// This is the first WAL series record for this series. |
|
h.setMMappedChunks(mSeries, mmc) |
|
continue |
|
} |
|
|
|
// There's already a different ref for this series. |
|
// A duplicate series record is only possible when the old samples were already compacted into a block. |
|
// Hence we can discard all the samples and m-mapped chunks replayed till now for this series. |
|
|
|
multiRef[walSeries.Ref] = mSeries.ref |
|
|
|
idx := mSeries.ref % uint64(n) |
|
// It is possible that some old sample is being processed in processWALSamples that |
|
// could cause race below. So we wait for the goroutine to empty input the buffer and finish |
|
// processing all old samples after emptying the buffer. |
|
select { |
|
case <-outputs[idx]: // allow output side to drain to avoid deadlock |
|
default: |
|
} |
|
inputs[idx] <- []record.RefSample{} |
|
for len(inputs[idx]) != 0 { |
|
time.Sleep(1 * time.Millisecond) |
|
select { |
|
case <-outputs[idx]: // allow output side to drain to avoid deadlock |
|
default: |
|
} |
|
} |
|
|
|
// Checking if the new m-mapped chunks overlap with the already existing ones. |
|
// This should never happen, but we have a check anyway to detect any |
|
// edge cases that we might have missed. |
|
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { |
|
if overlapsClosedInterval( |
|
mSeries.mmappedChunks[0].minTime, |
|
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, |
|
mmc[0].minTime, |
|
mmc[len(mmc)-1].maxTime, |
|
) { |
|
// The m-map chunks for the new series ref overlaps with old m-map chunks. |
|
seriesCreationErr = errors.Errorf("overlapping m-mapped chunks for series %s", mSeries.lset.String()) |
|
break Outer |
|
} |
|
} |
|
|
|
// Replacing m-mapped chunks with the new ones (could be empty). |
|
h.setMMappedChunks(mSeries, mmc) |
|
|
|
// Any samples replayed till now would already be compacted. Resetting the head chunk. |
|
mSeries.nextAt = 0 |
|
mSeries.headChunk = nil |
|
mSeries.app = nil |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
seriesPool.Put(v) |
|
case []record.RefSample: |
|
samples := v |
|
// We split up the samples into chunks of 5000 samples or less. |
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise |
|
// cause thousands of very large in flight buffers occupying large amounts |
|
// of unused memory. |
|
for len(samples) > 0 { |
|
m := 5000 |
|
if len(samples) < m { |
|
m = len(samples) |
|
} |
|
for i := 0; i < n; i++ { |
|
var buf []record.RefSample |
|
select { |
|
case buf = <-outputs[i]: |
|
default: |
|
} |
|
shards[i] = buf[:0] |
|
} |
|
for _, sam := range samples[:m] { |
|
if r, ok := multiRef[sam.Ref]; ok { |
|
sam.Ref = r |
|
} |
|
mod := sam.Ref % uint64(n) |
|
shards[mod] = append(shards[mod], sam) |
|
} |
|
for i := 0; i < n; i++ { |
|
inputs[i] <- shards[i] |
|
} |
|
samples = samples[m:] |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
samplesPool.Put(v) |
|
case []tombstones.Stone: |
|
for _, s := range v { |
|
for _, itv := range s.Intervals { |
|
if itv.Maxt < h.minValidTime.Load() { |
|
continue |
|
} |
|
if m := h.series.getByID(s.Ref); m == nil { |
|
unknownRefs.Inc() |
|
continue |
|
} |
|
h.tombstones.AddInterval(s.Ref, itv) |
|
} |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
tstonesPool.Put(v) |
|
case []record.RefExemplar: |
|
for _, e := range v { |
|
exemplarsInput <- e |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
exemplarsPool.Put(v) |
|
default: |
|
panic(fmt.Errorf("unexpected decoded type: %T", d)) |
|
} |
|
} |
|
|
|
if decodeErr != nil { |
|
return decodeErr |
|
} |
|
if seriesCreationErr != nil { |
|
// Drain the channel to unblock the goroutine. |
|
for range decoded { |
|
} |
|
return seriesCreationErr |
|
} |
|
|
|
// Signal termination to each worker and wait for it to close its output channel. |
|
for i := 0; i < n; i++ { |
|
close(inputs[i]) |
|
for range outputs[i] { |
|
} |
|
} |
|
close(exemplarsInput) |
|
wg.Wait() |
|
|
|
if r.Err() != nil { |
|
return errors.Wrap(r.Err(), "read records") |
|
} |
|
|
|
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { |
|
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) |
|
} |
|
return nil |
|
} |
|
|
|
func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { |
|
h.metrics.chunksCreated.Add(float64(len(mmc))) |
|
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) |
|
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) |
|
mSeries.mmappedChunks = mmc |
|
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. |
|
if len(mmc) == 0 { |
|
mSeries.mmMaxTime = math.MinInt64 |
|
} else { |
|
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime |
|
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) |
|
} |
|
} |
|
|
|
// processWALSamples adds the samples it receives to the head and passes |
|
// the buffer received to an output channel for reuse. |
|
// Samples before the minValidTime timestamp are discarded. |
|
func (h *Head) processWALSamples( |
|
minValidTime int64, |
|
input <-chan []record.RefSample, output chan<- []record.RefSample, |
|
) (unknownRefs uint64) { |
|
defer close(output) |
|
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) |
|
|
|
for samples := range input { |
|
for _, s := range samples { |
|
if s.T < minValidTime { |
|
continue |
|
} |
|
ms := h.series.getByID(s.Ref) |
|
if ms == nil { |
|
unknownRefs++ |
|
continue |
|
} |
|
if s.T <= ms.mmMaxTime { |
|
continue |
|
} |
|
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { |
|
h.metrics.chunksCreated.Inc() |
|
h.metrics.chunks.Inc() |
|
} |
|
if s.T > maxt { |
|
maxt = s.T |
|
} |
|
if s.T < mint { |
|
mint = s.T |
|
} |
|
} |
|
output <- samples |
|
} |
|
h.updateMinMaxTime(mint, maxt) |
|
|
|
return unknownRefs |
|
} |
|
|
|
const ( |
|
chunkSnapshotRecordTypeSeries uint8 = 1 |
|
chunkSnapshotRecordTypeTombstones uint8 = 2 |
|
chunkSnapshotRecordTypeExemplars uint8 = 3 |
|
) |
|
|
|
type chunkSnapshotRecord struct { |
|
ref uint64 |
|
lset labels.Labels |
|
chunkRange int64 |
|
mc *memChunk |
|
sampleBuf [4]sample |
|
} |
|
|
|
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { |
|
buf := encoding.Encbuf{B: b} |
|
|
|
buf.PutByte(chunkSnapshotRecordTypeSeries) |
|
buf.PutBE64(s.ref) |
|
buf.PutUvarint(len(s.lset)) |
|
for _, l := range s.lset { |
|
buf.PutUvarintStr(l.Name) |
|
buf.PutUvarintStr(l.Value) |
|
} |
|
buf.PutBE64int64(s.chunkRange) |
|
|
|
s.Lock() |
|
if s.headChunk == nil { |
|
buf.PutUvarint(0) |
|
} else { |
|
buf.PutUvarint(1) |
|
buf.PutBE64int64(s.headChunk.minTime) |
|
buf.PutBE64int64(s.headChunk.maxTime) |
|
buf.PutByte(byte(s.headChunk.chunk.Encoding())) |
|
buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) |
|
// Put the sample buf. |
|
for _, smpl := range s.sampleBuf { |
|
buf.PutBE64int64(smpl.t) |
|
buf.PutBEFloat64(smpl.v) |
|
} |
|
} |
|
s.Unlock() |
|
|
|
return buf.Get() |
|
} |
|
|
|
func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) { |
|
dec := encoding.Decbuf{B: b} |
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries { |
|
return csr, errors.Errorf("invalid record type %x", flag) |
|
} |
|
|
|
csr.ref = dec.Be64() |
|
|
|
// The label set written to the disk is already sorted. |
|
csr.lset = make(labels.Labels, dec.Uvarint()) |
|
for i := range csr.lset { |
|
csr.lset[i].Name = dec.UvarintStr() |
|
csr.lset[i].Value = dec.UvarintStr() |
|
} |
|
|
|
csr.chunkRange = dec.Be64int64() |
|
if dec.Uvarint() == 0 { |
|
return |
|
} |
|
|
|
csr.mc = &memChunk{} |
|
csr.mc.minTime = dec.Be64int64() |
|
csr.mc.maxTime = dec.Be64int64() |
|
enc := chunkenc.Encoding(dec.Byte()) |
|
|
|
// The underlying bytes gets re-used later, so make a copy. |
|
chunkBytes := dec.UvarintBytes() |
|
chunkBytesCopy := make([]byte, len(chunkBytes)) |
|
copy(chunkBytesCopy, chunkBytes) |
|
|
|
chk, err := chunkenc.FromData(enc, chunkBytesCopy) |
|
if err != nil { |
|
return csr, errors.Wrap(err, "chunk from data") |
|
} |
|
csr.mc.chunk = chk |
|
|
|
for i := range csr.sampleBuf { |
|
csr.sampleBuf[i].t = dec.Be64int64() |
|
csr.sampleBuf[i].v = dec.Be64Float64() |
|
} |
|
|
|
err = dec.Err() |
|
if err != nil && len(dec.B) > 0 { |
|
err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) |
|
} |
|
|
|
return |
|
} |
|
|
|
func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) { |
|
buf := encoding.Encbuf{} |
|
|
|
buf.PutByte(chunkSnapshotRecordTypeTombstones) |
|
b, err := tombstones.Encode(tr) |
|
if err != nil { |
|
return nil, errors.Wrap(err, "encode tombstones") |
|
} |
|
buf.PutUvarintBytes(b) |
|
|
|
return buf.Get(), nil |
|
} |
|
|
|
func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) { |
|
dec := encoding.Decbuf{B: b} |
|
|
|
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones { |
|
return nil, errors.Errorf("invalid record type %x", flag) |
|
} |
|
|
|
tr, err := tombstones.Decode(dec.UvarintBytes()) |
|
return tr, errors.Wrap(err, "decode tombstones") |
|
} |
|
|
|
const chunkSnapshotPrefix = "chunk_snapshot." |
|
|
|
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head. |
|
// It deletes the old chunk snapshots if the chunk snapshot creation is successful. |
|
// |
|
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written |
|
// using the WAL package. N is the last WAL segment present during snapshotting and |
|
// M is the offset in segment N upto which data was written. |
|
// |
|
// The snapshot first contains all series (each in individual records and not sorted), followed by |
|
// tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they |
|
// were written to the circular buffer. |
|
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { |
|
if h.wal == nil { |
|
// If we are not storing any WAL, does not make sense to take a snapshot too. |
|
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled") |
|
return &ChunkSnapshotStats{}, nil |
|
} |
|
h.chunkSnapshotMtx.Lock() |
|
defer h.chunkSnapshotMtx.Unlock() |
|
|
|
stats := &ChunkSnapshotStats{} |
|
|
|
wlast, woffset, err := h.wal.LastSegmentAndOffset() |
|
if err != nil && err != record.ErrNotFound { |
|
return stats, errors.Wrap(err, "get last wal segment and offset") |
|
} |
|
|
|
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) |
|
if err != nil && err != record.ErrNotFound { |
|
return stats, errors.Wrap(err, "find last chunk snapshot") |
|
} |
|
|
|
if wlast == cslast && woffset == csoffset { |
|
// Nothing has been written to the WAL/Head since the last snapshot. |
|
return stats, nil |
|
} |
|
|
|
snapshotName := chunkSnapshotDir(wlast, woffset) |
|
|
|
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName) |
|
cpdirtmp := cpdir + ".tmp" |
|
stats.Dir = cpdir |
|
|
|
if err := os.MkdirAll(cpdirtmp, 0777); err != nil { |
|
return stats, errors.Wrap(err, "create chunk snapshot dir") |
|
} |
|
cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled()) |
|
if err != nil { |
|
return stats, errors.Wrap(err, "open chunk snapshot") |
|
} |
|
|
|
// Ensures that an early return caused by an error doesn't leave any tmp files. |
|
defer func() { |
|
cp.Close() |
|
os.RemoveAll(cpdirtmp) |
|
}() |
|
|
|
var ( |
|
buf []byte |
|
recs [][]byte |
|
) |
|
// Add all series to the snapshot. |
|
stripeSize := h.series.size |
|
for i := 0; i < stripeSize; i++ { |
|
h.series.locks[i].RLock() |
|
|
|
for _, s := range h.series.series[i] { |
|
start := len(buf) |
|
buf = s.encodeToSnapshotRecord(buf) |
|
if len(buf[start:]) == 0 { |
|
continue // All contents discarded. |
|
} |
|
recs = append(recs, buf[start:]) |
|
// Flush records in 10 MB increments. |
|
if len(buf) > 10*1024*1024 { |
|
if err := cp.Log(recs...); err != nil { |
|
h.series.locks[i].RUnlock() |
|
return stats, errors.Wrap(err, "flush records") |
|
} |
|
buf, recs = buf[:0], recs[:0] |
|
} |
|
} |
|
stats.TotalSeries += len(h.series.series[i]) |
|
|
|
h.series.locks[i].RUnlock() |
|
} |
|
|
|
// Add tombstones to the snapshot. |
|
tombstonesReader, err := h.Tombstones() |
|
if err != nil { |
|
return stats, errors.Wrap(err, "get tombstones") |
|
} |
|
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader) |
|
if err != nil { |
|
return stats, errors.Wrap(err, "encode tombstones") |
|
} |
|
recs = append(recs, rec) |
|
// Flush remaining series records and tombstones. |
|
if err := cp.Log(recs...); err != nil { |
|
return stats, errors.Wrap(err, "flush records") |
|
} |
|
buf = buf[:0] |
|
|
|
// Add exemplars in the snapshot. |
|
// We log in batches, with each record having upto 10000 exemplars. |
|
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB. |
|
maxExemplarsPerRecord := 10000 |
|
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord) |
|
enc := record.Encoder{} |
|
flushExemplars := func() error { |
|
if len(batch) == 0 { |
|
return nil |
|
} |
|
buf = buf[:0] |
|
encbuf := encoding.Encbuf{B: buf} |
|
encbuf.PutByte(chunkSnapshotRecordTypeExemplars) |
|
enc.EncodeExemplarsIntoBuffer(batch, &encbuf) |
|
if err := cp.Log(encbuf.Get()); err != nil { |
|
return errors.Wrap(err, "log exemplars") |
|
} |
|
buf, batch = buf[:0], batch[:0] |
|
return nil |
|
} |
|
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error { |
|
if len(batch) >= maxExemplarsPerRecord { |
|
if err := flushExemplars(); err != nil { |
|
return errors.Wrap(err, "flush exemplars") |
|
} |
|
} |
|
|
|
ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels) |
|
if ms == nil { |
|
// It is possible that exemplar refers to some old series. We discard such exemplars. |
|
return nil |
|
} |
|
batch = append(batch, record.RefExemplar{ |
|
Ref: ms.ref, |
|
T: e.Ts, |
|
V: e.Value, |
|
Labels: e.Labels, |
|
}) |
|
return nil |
|
}) |
|
if err != nil { |
|
return stats, errors.Wrap(err, "iterate exemplars") |
|
} |
|
|
|
// Flush remaining exemplars. |
|
if err := flushExemplars(); err != nil { |
|
return stats, errors.Wrap(err, "flush exemplars at the end") |
|
} |
|
|
|
if err := cp.Close(); err != nil { |
|
return stats, errors.Wrap(err, "close chunk snapshot") |
|
} |
|
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { |
|
return stats, errors.Wrap(err, "rename chunk snapshot directory") |
|
} |
|
|
|
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil { |
|
// Leftover old chunk snapshots do not cause problems down the line beyond |
|
// occupying disk space. |
|
// They will just be ignored since a higher chunk snapshot exists. |
|
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err) |
|
} |
|
return stats, nil |
|
} |
|
|
|
func chunkSnapshotDir(wlast, woffset int) string { |
|
return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) |
|
} |
|
|
|
func (h *Head) performChunkSnapshot() error { |
|
level.Info(h.logger).Log("msg", "creating chunk snapshot") |
|
startTime := time.Now() |
|
stats, err := h.ChunkSnapshot() |
|
elapsed := time.Since(startTime) |
|
if err == nil { |
|
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir) |
|
} |
|
return errors.Wrap(err, "chunk snapshot") |
|
} |
|
|
|
// ChunkSnapshotStats returns stats about a created chunk snapshot. |
|
type ChunkSnapshotStats struct { |
|
TotalSeries int |
|
Dir string |
|
} |
|
|
|
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. |
|
// If dir does not contain any chunk snapshots, ErrNotFound is returned. |
|
func LastChunkSnapshot(dir string) (string, int, int, error) { |
|
files, err := ioutil.ReadDir(dir) |
|
if err != nil { |
|
return "", 0, 0, err |
|
} |
|
maxIdx, maxOffset := -1, -1 |
|
maxFileName := "" |
|
for i := 0; i < len(files); i++ { |
|
fi := files[i] |
|
|
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { |
|
continue |
|
} |
|
if !fi.IsDir() { |
|
return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name()) |
|
} |
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") |
|
if len(splits) != 2 { |
|
return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name()) |
|
} |
|
|
|
idx, err := strconv.Atoi(splits[0]) |
|
if err != nil { |
|
continue |
|
} |
|
|
|
offset, err := strconv.Atoi(splits[1]) |
|
if err != nil { |
|
continue |
|
} |
|
|
|
if idx > maxIdx || (idx == maxIdx && offset > maxOffset) { |
|
maxIdx, maxOffset = idx, offset |
|
maxFileName = filepath.Join(dir, fi.Name()) |
|
} |
|
} |
|
if maxFileName == "" { |
|
return "", 0, 0, record.ErrNotFound |
|
} |
|
return maxFileName, maxIdx, maxOffset, nil |
|
} |
|
|
|
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index. |
|
func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { |
|
files, err := ioutil.ReadDir(dir) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
errs := tsdb_errors.NewMulti() |
|
for _, fi := range files { |
|
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { |
|
continue |
|
} |
|
|
|
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".") |
|
if len(splits) != 2 { |
|
continue |
|
} |
|
|
|
idx, err := strconv.Atoi(splits[0]) |
|
if err != nil { |
|
continue |
|
} |
|
|
|
offset, err := strconv.Atoi(splits[1]) |
|
if err != nil { |
|
continue |
|
} |
|
|
|
if idx < maxIndex || (idx == maxIndex && offset < maxOffset) { |
|
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { |
|
errs.Add(err) |
|
} |
|
} |
|
|
|
} |
|
return errs.Err() |
|
} |
|
|
|
// loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned, |
|
// it is the responsibility of the caller to clear the contents of the Head. |
|
func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) { |
|
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot) |
|
if err != nil { |
|
if err == record.ErrNotFound { |
|
return snapIdx, snapOffset, nil, nil |
|
} |
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot") |
|
} |
|
|
|
start := time.Now() |
|
sr, err := wal.NewSegmentsReader(dir) |
|
if err != nil { |
|
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot") |
|
} |
|
defer func() { |
|
if err := sr.Close(); err != nil { |
|
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) |
|
} |
|
}() |
|
|
|
var ( |
|
numSeries = 0 |
|
unknownRefs = int64(0) |
|
n = runtime.GOMAXPROCS(0) |
|
wg sync.WaitGroup |
|
recordChan = make(chan chunkSnapshotRecord, 5*n) |
|
shardedRefSeries = make([]map[uint64]*memSeries, n) |
|
errChan = make(chan error, n) |
|
refSeries map[uint64]*memSeries |
|
exemplarBuf []record.RefExemplar |
|
dec record.Decoder |
|
) |
|
|
|
wg.Add(n) |
|
for i := 0; i < n; i++ { |
|
go func(idx int, rc <-chan chunkSnapshotRecord) { |
|
defer wg.Done() |
|
defer func() { |
|
// If there was an error, drain the channel |
|
// to unblock the main thread. |
|
for range rc { |
|
} |
|
}() |
|
|
|
shardedRefSeries[idx] = make(map[uint64]*memSeries) |
|
localRefSeries := shardedRefSeries[idx] |
|
|
|
for csr := range rc { |
|
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset) |
|
if err != nil { |
|
errChan <- err |
|
return |
|
} |
|
localRefSeries[csr.ref] = series |
|
if h.lastSeriesID.Load() < series.ref { |
|
h.lastSeriesID.Store(series.ref) |
|
} |
|
|
|
series.chunkRange = csr.chunkRange |
|
if csr.mc == nil { |
|
continue |
|
} |
|
series.nextAt = csr.mc.maxTime // This will create a new chunk on append. |
|
series.headChunk = csr.mc |
|
for i := range series.sampleBuf { |
|
series.sampleBuf[i].t = csr.sampleBuf[i].t |
|
series.sampleBuf[i].v = csr.sampleBuf[i].v |
|
} |
|
|
|
app, err := series.headChunk.chunk.Appender() |
|
if err != nil { |
|
errChan <- err |
|
return |
|
} |
|
series.app = app |
|
|
|
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime) |
|
} |
|
}(i, recordChan) |
|
} |
|
|
|
r := wal.NewReader(sr) |
|
var loopErr error |
|
Outer: |
|
for r.Next() { |
|
select { |
|
case err := <-errChan: |
|
errChan <- err |
|
break Outer |
|
default: |
|
} |
|
|
|
rec := r.Record() |
|
switch rec[0] { |
|
case chunkSnapshotRecordTypeSeries: |
|
numSeries++ |
|
csr, err := decodeSeriesFromChunkSnapshot(rec) |
|
if err != nil { |
|
loopErr = errors.Wrap(err, "decode series record") |
|
break Outer |
|
} |
|
recordChan <- csr |
|
|
|
case chunkSnapshotRecordTypeTombstones: |
|
tr, err := decodeTombstonesSnapshotRecord(rec) |
|
if err != nil { |
|
loopErr = errors.Wrap(err, "decode tombstones") |
|
break Outer |
|
} |
|
|
|
if err = tr.Iter(func(ref uint64, ivs tombstones.Intervals) error { |
|
h.tombstones.AddInterval(ref, ivs...) |
|
return nil |
|
}); err != nil { |
|
loopErr = errors.Wrap(err, "iterate tombstones") |
|
break Outer |
|
} |
|
|
|
case chunkSnapshotRecordTypeExemplars: |
|
// Exemplars are at the end of snapshot. So all series are loaded at this point. |
|
if len(refSeries) == 0 { |
|
close(recordChan) |
|
wg.Wait() |
|
|
|
refSeries = make(map[uint64]*memSeries, numSeries) |
|
for _, shard := range shardedRefSeries { |
|
for k, v := range shard { |
|
refSeries[k] = v |
|
} |
|
} |
|
} |
|
|
|
decbuf := encoding.Decbuf{B: rec[1:]} |
|
|
|
exemplarBuf = exemplarBuf[:0] |
|
exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf) |
|
if err != nil { |
|
loopErr = errors.Wrap(err, "exemplars from buffer") |
|
break Outer |
|
} |
|
|
|
for _, e := range exemplarBuf { |
|
ms, ok := refSeries[e.Ref] |
|
if !ok { |
|
unknownRefs++ |
|
continue |
|
} |
|
|
|
if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{ |
|
Labels: e.Labels, |
|
Value: e.V, |
|
Ts: e.T, |
|
}); err != nil { |
|
loopErr = errors.Wrap(err, "append exemplar") |
|
break Outer |
|
} |
|
} |
|
|
|
default: |
|
// This is a record type we don't understand. It is either and old format from earlier versions, |
|
// or a new format and the code was rolled back to old version. |
|
loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0]) |
|
break Outer |
|
} |
|
} |
|
if len(refSeries) == 0 { |
|
close(recordChan) |
|
wg.Wait() |
|
} |
|
|
|
close(errChan) |
|
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop")) |
|
for err := range errChan { |
|
merr.Add(errors.Wrap(err, "record processing")) |
|
} |
|
if err := merr.Err(); err != nil { |
|
return -1, -1, nil, err |
|
} |
|
|
|
if r.Err() != nil { |
|
return -1, -1, nil, errors.Wrap(r.Err(), "read records") |
|
} |
|
|
|
if len(refSeries) == 0 { |
|
// We had no exemplar record, so we have to build the map here. |
|
refSeries = make(map[uint64]*memSeries, numSeries) |
|
for _, shard := range shardedRefSeries { |
|
for k, v := range shard { |
|
refSeries[k] = v |
|
} |
|
} |
|
} |
|
|
|
elapsed := time.Since(start) |
|
level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String()) |
|
if unknownRefs > 0 { |
|
level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs) |
|
} |
|
|
|
return snapIdx, snapOffset, refSeries, nil |
|
}
|
|
|