mirror of https://github.com/prometheus/prometheus
Merge remote-tracking branch 'prometheus/main' into arve/wlog-histograms
commit
90f08de96a
|
@ -48,7 +48,7 @@ const (
|
|||
Drop Action = "drop"
|
||||
// KeepEqual drops targets for which the input does not match the target.
|
||||
KeepEqual Action = "keepequal"
|
||||
// Drop drops targets for which the input does match the target.
|
||||
// DropEqual drops targets for which the input does match the target.
|
||||
DropEqual Action = "dropequal"
|
||||
// HashMod sets a label to the modulus of a hash of labels.
|
||||
HashMod Action = "hashmod"
|
||||
|
|
|
@ -55,8 +55,8 @@ func NewListSeries(lset labels.Labels, s []chunks.Sample) *SeriesEntry {
|
|||
}
|
||||
}
|
||||
|
||||
// NewListChunkSeriesFromSamples returns chunk series entry that allows to iterate over provided samples.
|
||||
// NOTE: It uses inefficient chunks encoding implementation, not caring about chunk size.
|
||||
// NewListChunkSeriesFromSamples returns a chunk series entry that allows to iterate over provided samples.
|
||||
// NOTE: It uses an inefficient chunks encoding implementation, not caring about chunk size.
|
||||
// Use only for testing.
|
||||
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]chunks.Sample) *ChunkSeriesEntry {
|
||||
chksFromSamples := make([]chunks.Meta, 0, len(samples))
|
||||
|
|
|
@ -42,7 +42,7 @@ type BlockWriter struct {
|
|||
// ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.
|
||||
var ErrNoSeriesAppended = errors.New("no series appended, aborting")
|
||||
|
||||
// NewBlockWriter create a new block writer.
|
||||
// NewBlockWriter creates a new block writer.
|
||||
//
|
||||
// The returned writer accumulates all the series in the Head block until `Flush` is called.
|
||||
//
|
||||
|
|
|
@ -61,7 +61,7 @@ func putVarbitInt(b *bstream, val int64) {
|
|||
}
|
||||
}
|
||||
|
||||
// readVarbitInt reads an int64 encoced with putVarbitInt.
|
||||
// readVarbitInt reads an int64 encoded with putVarbitInt.
|
||||
func readVarbitInt(b *bstreamReader) (int64, error) {
|
||||
var d byte
|
||||
for i := 0; i < 8; i++ {
|
||||
|
@ -166,7 +166,7 @@ func putVarbitUint(b *bstream, val uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
// readVarbitUint reads a uint64 encoced with putVarbitUint.
|
||||
// readVarbitUint reads a uint64 encoded with putVarbitUint.
|
||||
func readVarbitUint(b *bstreamReader) (uint64, error) {
|
||||
var d byte
|
||||
for i := 0; i < 8; i++ {
|
||||
|
|
|
@ -233,7 +233,7 @@ func ChunkMetasToSamples(chunks []Meta) (result []Sample) {
|
|||
// Iterator iterates over the chunks of a single time series.
|
||||
type Iterator interface {
|
||||
// At returns the current meta.
|
||||
// It depends on implementation if the chunk is populated or not.
|
||||
// It depends on the implementation whether the chunk is populated or not.
|
||||
At() Meta
|
||||
// Next advances the iterator by one.
|
||||
Next() bool
|
||||
|
@ -478,7 +478,7 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
|
|||
// the batch is too large to fit in the current segment.
|
||||
cutNewBatch := (i != 0) && (batchSize+SegmentHeaderSize > w.segmentSize)
|
||||
|
||||
// When the segment already has some data than
|
||||
// If the segment already has some data then
|
||||
// the first batch size calculation should account for that.
|
||||
if firstBatch && w.n > SegmentHeaderSize {
|
||||
cutNewBatch = batchSize+w.n > w.segmentSize
|
||||
|
@ -717,7 +717,7 @@ func nextSequenceFile(dir string) (string, int, error) {
|
|||
}
|
||||
// It is not necessary that we find the files in number order,
|
||||
// for example with '1000000' and '200000', '1000000' would come first.
|
||||
// Though this is a very very race case, we check anyway for the max id.
|
||||
// Though this is a very very rare case, we check anyway for the max id.
|
||||
if j > i {
|
||||
i = j
|
||||
}
|
||||
|
|
|
@ -188,8 +188,8 @@ func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
|
|||
return bytes
|
||||
}
|
||||
|
||||
// ChunkDiskMapper is for writing the Head block chunks to the disk
|
||||
// and access chunks via mmapped file.
|
||||
// ChunkDiskMapper is for writing the Head block chunks to disk
|
||||
// and access chunks via mmapped files.
|
||||
type ChunkDiskMapper struct {
|
||||
/// Writer.
|
||||
dir *os.File
|
||||
|
@ -231,7 +231,7 @@ type ChunkDiskMapper struct {
|
|||
closed bool
|
||||
}
|
||||
|
||||
// mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks.
|
||||
// mmappedChunkFile provides mmap access to an entire head chunks file that holds many chunks.
|
||||
type mmappedChunkFile struct {
|
||||
byteSlice ByteSlice
|
||||
maxt int64 // Max timestamp among all of this file's chunks.
|
||||
|
@ -240,7 +240,7 @@ type mmappedChunkFile struct {
|
|||
// NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory
|
||||
// using the default head chunk file duration.
|
||||
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
// to set the maxt of all files.
|
||||
func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) {
|
||||
// Validate write buffer size.
|
||||
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
|
||||
|
@ -425,7 +425,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
|
|||
return files, nil
|
||||
}
|
||||
|
||||
// WriteChunk writes the chunk to the disk.
|
||||
// WriteChunk writes the chunk to disk.
|
||||
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
||||
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, isOOO bool, callback func(err error)) (chkRef ChunkDiskMapperRef) {
|
||||
// cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue).
|
||||
|
@ -784,7 +784,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
|
|||
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
|
||||
// and runs the provided function with information about each chunk. It returns on the first error encountered.
|
||||
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
// to set the maxt of all files.
|
||||
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16, encoding chunkenc.Encoding, isOOO bool) error) (err error) {
|
||||
cdm.writePathMtx.Lock()
|
||||
defer cdm.writePathMtx.Unlock()
|
||||
|
@ -904,7 +904,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
|
|||
return nil
|
||||
}
|
||||
|
||||
// Truncate deletes the head chunk files whose file number is less than given fileNo.
|
||||
// Truncate deletes the head chunk files with numbers less than the given fileNo.
|
||||
func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error {
|
||||
cdm.readPathMtx.RLock()
|
||||
|
||||
|
|
|
@ -272,7 +272,7 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
|
|||
meta := dms[i].meta
|
||||
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
|
||||
// If the block is entirely deleted, then we don't care about the block being big enough.
|
||||
// TODO: This is assuming single tombstone is for distinct series, which might be no true.
|
||||
// TODO: This is assuming a single tombstone is for a distinct series, which might not be true.
|
||||
if meta.Stats.NumTombstones > 0 && meta.Stats.NumTombstones >= meta.Stats.NumSeries {
|
||||
return []string{dms[i].dir}, nil
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
|||
t0 = tr * ((m.MinTime - tr + 1) / tr)
|
||||
}
|
||||
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
|
||||
// by being the multiple of the intended range.
|
||||
// by being a multiple of the intended range.
|
||||
if m.MaxTime > t0+tr {
|
||||
i++
|
||||
continue
|
||||
|
@ -395,7 +395,7 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
|||
return splitDirs
|
||||
}
|
||||
|
||||
// CompactBlockMetas merges many block metas into one, combining it's source blocks together
|
||||
// CompactBlockMetas merges many block metas into one, combining its source blocks together
|
||||
// and adjusting compaction level. Min/Max time of result block meta covers all input blocks.
|
||||
func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
||||
res := &BlockMeta{
|
||||
|
@ -833,7 +833,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
|
|||
chksIter = s.Iterator(chksIter)
|
||||
chks = chks[:0]
|
||||
for chksIter.Next() {
|
||||
// We are not iterating in streaming way over chunk as
|
||||
// We are not iterating in a streaming way over chunks as
|
||||
// it's more efficient to do bulk write for index and
|
||||
// chunk file purposes.
|
||||
chks = append(chks, chksIter.At())
|
||||
|
@ -842,7 +842,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
|
|||
return fmt.Errorf("chunk iter: %w", err)
|
||||
}
|
||||
|
||||
// Skip the series with all deleted chunks.
|
||||
// Skip series with all deleted chunks.
|
||||
if len(chks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ type DB struct {
|
|||
compactor Compactor
|
||||
blocksToDelete BlocksToDeleteFunc
|
||||
|
||||
// Mutex for that must be held when modifying the general block layout or lastGarbageCollectedMmapRef.
|
||||
// mtx must be held when modifying the general block layout or lastGarbageCollectedMmapRef.
|
||||
mtx sync.RWMutex
|
||||
blocks []*Block
|
||||
|
||||
|
@ -1431,7 +1431,7 @@ func (db *DB) reloadBlocks() (err error) {
|
|||
db.metrics.reloads.Inc()
|
||||
}()
|
||||
|
||||
// Now that we reload TSDB every minute, there is high chance for race condition with a reload
|
||||
// Now that we reload TSDB every minute, there is a high chance for a race condition with a reload
|
||||
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
|
||||
// a normal reload and CleanTombstones try to delete the same block.
|
||||
db.mtx.Lock()
|
||||
|
|
|
@ -27,10 +27,10 @@ in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
|
|||
|
||||
# Chunk
|
||||
|
||||
Unlike chunks in the on-disk blocks, here we additionally store series
|
||||
reference that the chunks belongs to and the mint/maxt of the chunks. This is
|
||||
because we don't have an index associated with these chunks, hence these meta
|
||||
information are used while replaying the chunks.
|
||||
Unlike chunks in the on-disk blocks, here we additionally store the series
|
||||
reference that each chunk belongs to and the mint/maxt of the chunks. This is
|
||||
because we don't have an index associated with these chunks, hence this metadata
|
||||
is used while replaying the chunks.
|
||||
|
||||
```
|
||||
┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐
|
||||
|
|
|
@ -40,7 +40,7 @@ Most of the sections described below start with a `len` field. It always specifi
|
|||
|
||||
### Symbol Table
|
||||
|
||||
The symbol table holds a sorted list of deduplicated strings that occurred in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size.
|
||||
The symbol table holds a sorted list of deduplicated strings that occur in label pairs of the stored series. They can be referenced from subsequent sections and significantly reduce the total index size.
|
||||
|
||||
The section contains a sequence of the string entries, each prefixed with the string's length in raw bytes. All strings are utf-8 encoded.
|
||||
Strings are referenced by sequential indexing. The strings are sorted in lexicographically ascending order.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Usage
|
||||
|
||||
TSDB can be - and is - used by other applications such as [Cortex](https://cortexmetrics.io/) and [Thanos](https://thanos.io/).
|
||||
TSDB can be - and is - used by other applications such as [Cortex](https://cortexmetrics.io/), [Thanos](https://thanos.io/), and [Grafana Mimir](https://grafana.com/oss/mimir/).
|
||||
This directory contains documentation for any developers who wish to work on or with TSDB.
|
||||
|
||||
For a full example of instantiating a database, adding and querying data, see the [tsdb example in the docs](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb).
|
||||
|
@ -18,7 +18,7 @@ A `DB` has the following main components:
|
|||
* [`Head`](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Head)
|
||||
* [Blocks (persistent blocks)](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb#DB.Blocks)
|
||||
|
||||
The `Head` is responsible for a lot. Here are its main components:
|
||||
The `Head` is responsible for a lot. Here are its main components:
|
||||
|
||||
* [WAL](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb/wal#WAL) (Write Ahead Log).
|
||||
* [`stripeSeries`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/head.go#L1292):
|
||||
|
|
|
@ -111,7 +111,7 @@ func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics {
|
|||
return &m
|
||||
}
|
||||
|
||||
// NewCircularExemplarStorage creates an circular in memory exemplar storage.
|
||||
// NewCircularExemplarStorage creates a circular in memory exemplar storage.
|
||||
// If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
|
||||
// 1GB of extra memory, accounting for the fact that this is heap allocated space.
|
||||
// If len <= 0, then the exemplar storage is essentially a noop storage but can later be
|
||||
|
|
|
@ -1467,8 +1467,8 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
|
|||
return
|
||||
}
|
||||
|
||||
// Write chunks starting from the oldest one and stop before we get to current s.headChunk.
|
||||
// If we have this chain: s.headChunk{t4} -> t3 -> t2 -> t1 -> t0
|
||||
// Write chunks starting from the oldest one and stop before we get to current s.headChunks.
|
||||
// If we have this chain: s.headChunks{t4} -> t3 -> t2 -> t1 -> t0
|
||||
// then we need to write chunks t0 to t3, but skip s.headChunks.
|
||||
for i := s.headChunks.len() - 1; i > 0; i-- {
|
||||
chk := s.headChunks.atOffset(i)
|
||||
|
|
|
@ -1496,7 +1496,7 @@ Outer:
|
|||
}
|
||||
|
||||
default:
|
||||
// This is a record type we don't understand. It is either and old format from earlier versions,
|
||||
// This is a record type we don't understand. It is either an old format from earlier versions,
|
||||
// or a new format and the code was rolled back to old version.
|
||||
loopErr = fmt.Errorf("unsupported snapshot record type 0b%b", rec[0])
|
||||
break Outer
|
||||
|
|
|
@ -158,7 +158,7 @@ type Writer struct {
|
|||
postingsEncoder PostingsEncoder
|
||||
}
|
||||
|
||||
// TOC represents index Table Of Content that states where each section of index starts.
|
||||
// TOC represents the index Table Of Contents that states where each section of the index starts.
|
||||
type TOC struct {
|
||||
Symbols uint64
|
||||
Series uint64
|
||||
|
@ -168,7 +168,7 @@ type TOC struct {
|
|||
PostingsTable uint64
|
||||
}
|
||||
|
||||
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
|
||||
// NewTOCFromByteSlice returns a parsed TOC from the given index byte slice.
|
||||
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
||||
if bs.Len() < indexTOCLen {
|
||||
return nil, encoding.ErrInvalidSize
|
||||
|
|
|
@ -163,7 +163,7 @@ type RefMetadata struct {
|
|||
Help string
|
||||
}
|
||||
|
||||
// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
|
||||
// RefExemplar is an exemplar with the labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
|
||||
type RefExemplar struct {
|
||||
Ref chunks.HeadSeriesRef
|
||||
T int64
|
||||
|
@ -798,7 +798,7 @@ func (e *Encoder) FloatHistogramSamples(histograms []RefFloatHistogramSample, b
|
|||
return buf.Get()
|
||||
}
|
||||
|
||||
// Encode encodes the Float Histogram into a byte slice.
|
||||
// EncodeFloatHistogram encodes the Float Histogram into a byte slice.
|
||||
func EncodeFloatHistogram(buf *encoding.Encbuf, h *histogram.FloatHistogram) {
|
||||
buf.PutByte(byte(h.CounterResetHint))
|
||||
|
||||
|
|
Loading…
Reference in New Issue