mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2890 lines
82 KiB
2890 lines
82 KiB
// Copyright 2017 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package tsdb |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"math" |
|
"path/filepath" |
|
"runtime" |
|
"sort" |
|
"sync" |
|
"time" |
|
|
|
"github.com/go-kit/log" |
|
"github.com/go-kit/log/level" |
|
"github.com/oklog/ulid" |
|
"github.com/pkg/errors" |
|
"github.com/prometheus/client_golang/prometheus" |
|
"go.uber.org/atomic" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
"github.com/prometheus/prometheus/pkg/exemplar" |
|
"github.com/prometheus/prometheus/pkg/labels" |
|
"github.com/prometheus/prometheus/storage" |
|
"github.com/prometheus/prometheus/tsdb/chunkenc" |
|
"github.com/prometheus/prometheus/tsdb/chunks" |
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" |
|
"github.com/prometheus/prometheus/tsdb/index" |
|
"github.com/prometheus/prometheus/tsdb/record" |
|
"github.com/prometheus/prometheus/tsdb/tombstones" |
|
"github.com/prometheus/prometheus/tsdb/tsdbutil" |
|
"github.com/prometheus/prometheus/tsdb/wal" |
|
) |
|
|
|
var ( |
|
// ErrInvalidSample is returned if an appended sample is not valid and can't |
|
// be ingested. |
|
ErrInvalidSample = errors.New("invalid sample") |
|
// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't |
|
// be ingested. |
|
ErrInvalidExemplar = errors.New("invalid exemplar") |
|
// ErrAppenderClosed is returned if an appender has already be successfully |
|
// rolled back or committed. |
|
ErrAppenderClosed = errors.New("appender closed") |
|
) |
|
|
|
type ExemplarStorage interface { |
|
storage.ExemplarQueryable |
|
AddExemplar(labels.Labels, exemplar.Exemplar) error |
|
ValidateExemplar(labels.Labels, exemplar.Exemplar) error |
|
} |
|
|
|
// Head handles reads and writes of time series data within a time window. |
|
type Head struct { |
|
chunkRange atomic.Int64 |
|
numSeries atomic.Uint64 |
|
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. |
|
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. |
|
lastWALTruncationTime atomic.Int64 |
|
lastMemoryTruncationTime atomic.Int64 |
|
lastSeriesID atomic.Uint64 |
|
|
|
metrics *headMetrics |
|
opts *HeadOptions |
|
wal *wal.WAL |
|
exemplarMetrics *ExemplarMetrics |
|
exemplars ExemplarStorage |
|
logger log.Logger |
|
appendPool sync.Pool |
|
exemplarsPool sync.Pool |
|
seriesPool sync.Pool |
|
bytesPool sync.Pool |
|
memChunkPool sync.Pool |
|
|
|
// All series addressable by their ID or hash. |
|
series *stripeSeries |
|
|
|
symMtx sync.RWMutex |
|
symbols map[string]struct{} |
|
|
|
deletedMtx sync.Mutex |
|
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. |
|
|
|
postings *index.MemPostings // Postings lists for terms. |
|
|
|
tombstones *tombstones.MemTombstones |
|
|
|
iso *isolation |
|
|
|
cardinalityMutex sync.Mutex |
|
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. |
|
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. |
|
|
|
// chunkDiskMapper is used to write and read Head chunks to/from disk. |
|
chunkDiskMapper *chunks.ChunkDiskMapper |
|
|
|
closedMtx sync.Mutex |
|
closed bool |
|
|
|
stats *HeadStats |
|
reg prometheus.Registerer |
|
|
|
memTruncationInProcess atomic.Bool |
|
} |
|
|
|
// HeadOptions are parameters for the Head block. |
|
type HeadOptions struct { |
|
ChunkRange int64 |
|
// ChunkDirRoot is the parent directory of the chunks directory. |
|
ChunkDirRoot string |
|
ChunkPool chunkenc.Pool |
|
ChunkWriteBufferSize int |
|
// StripeSize sets the number of entries in the hash map, it must be a power of 2. |
|
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. |
|
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. |
|
StripeSize int |
|
SeriesCallback SeriesLifecycleCallback |
|
EnableExemplarStorage bool |
|
|
|
// Runtime reloadable options. |
|
MaxExemplars atomic.Int64 |
|
} |
|
|
|
func DefaultHeadOptions() *HeadOptions { |
|
return &HeadOptions{ |
|
ChunkRange: DefaultBlockDuration, |
|
ChunkDirRoot: "", |
|
ChunkPool: chunkenc.NewPool(), |
|
ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, |
|
StripeSize: DefaultStripeSize, |
|
SeriesCallback: &noopSeriesLifecycleCallback{}, |
|
} |
|
} |
|
|
|
type headMetrics struct { |
|
activeAppenders prometheus.Gauge |
|
series prometheus.GaugeFunc |
|
seriesCreated prometheus.Counter |
|
seriesRemoved prometheus.Counter |
|
seriesNotFound prometheus.Counter |
|
chunks prometheus.Gauge |
|
chunksCreated prometheus.Counter |
|
chunksRemoved prometheus.Counter |
|
gcDuration prometheus.Summary |
|
samplesAppended prometheus.Counter |
|
outOfBoundSamples prometheus.Counter |
|
outOfOrderSamples prometheus.Counter |
|
walTruncateDuration prometheus.Summary |
|
walCorruptionsTotal prometheus.Counter |
|
walTotalReplayDuration prometheus.Gauge |
|
headTruncateFail prometheus.Counter |
|
headTruncateTotal prometheus.Counter |
|
checkpointDeleteFail prometheus.Counter |
|
checkpointDeleteTotal prometheus.Counter |
|
checkpointCreationFail prometheus.Counter |
|
checkpointCreationTotal prometheus.Counter |
|
mmapChunkCorruptionTotal prometheus.Counter |
|
} |
|
|
|
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { |
|
m := &headMetrics{ |
|
activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_head_active_appenders", |
|
Help: "Number of currently active appender transactions", |
|
}), |
|
series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_head_series", |
|
Help: "Total number of series in the head block.", |
|
}, func() float64 { |
|
return float64(h.NumSeries()) |
|
}), |
|
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_series_created_total", |
|
Help: "Total number of series created in the head", |
|
}), |
|
seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_series_removed_total", |
|
Help: "Total number of series removed in the head", |
|
}), |
|
seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_series_not_found_total", |
|
Help: "Total number of requests for series that were not found.", |
|
}), |
|
chunks: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_head_chunks", |
|
Help: "Total number of chunks in the head block.", |
|
}), |
|
chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_chunks_created_total", |
|
Help: "Total number of chunks created in the head", |
|
}), |
|
chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_chunks_removed_total", |
|
Help: "Total number of chunks removed in the head", |
|
}), |
|
gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{ |
|
Name: "prometheus_tsdb_head_gc_duration_seconds", |
|
Help: "Runtime of garbage collection in the head block.", |
|
}), |
|
walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{ |
|
Name: "prometheus_tsdb_wal_truncate_duration_seconds", |
|
Help: "Duration of WAL truncation.", |
|
}), |
|
walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_wal_corruptions_total", |
|
Help: "Total number of WAL corruptions.", |
|
}), |
|
walTotalReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_data_replay_duration_seconds", |
|
Help: "Time taken to replay the data on disk.", |
|
}), |
|
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_samples_appended_total", |
|
Help: "Total number of appended samples.", |
|
}), |
|
outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_out_of_bound_samples_total", |
|
Help: "Total number of out of bound samples ingestion failed attempts.", |
|
}), |
|
outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_out_of_order_samples_total", |
|
Help: "Total number of out of order samples ingestion failed attempts.", |
|
}), |
|
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_truncations_failed_total", |
|
Help: "Total number of head truncations that failed.", |
|
}), |
|
headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_head_truncations_total", |
|
Help: "Total number of head truncations attempted.", |
|
}), |
|
checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_checkpoint_deletions_failed_total", |
|
Help: "Total number of checkpoint deletions that failed.", |
|
}), |
|
checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_checkpoint_deletions_total", |
|
Help: "Total number of checkpoint deletions attempted.", |
|
}), |
|
checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_checkpoint_creations_failed_total", |
|
Help: "Total number of checkpoint creations that failed.", |
|
}), |
|
checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_checkpoint_creations_total", |
|
Help: "Total number of checkpoint creations attempted.", |
|
}), |
|
mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{ |
|
Name: "prometheus_tsdb_mmap_chunk_corruptions_total", |
|
Help: "Total number of memory-mapped chunk corruptions.", |
|
}), |
|
} |
|
|
|
if r != nil { |
|
r.MustRegister( |
|
m.activeAppenders, |
|
m.series, |
|
m.chunks, |
|
m.chunksCreated, |
|
m.chunksRemoved, |
|
m.seriesCreated, |
|
m.seriesRemoved, |
|
m.seriesNotFound, |
|
m.gcDuration, |
|
m.walTruncateDuration, |
|
m.walCorruptionsTotal, |
|
m.walTotalReplayDuration, |
|
m.samplesAppended, |
|
m.outOfBoundSamples, |
|
m.outOfOrderSamples, |
|
m.headTruncateFail, |
|
m.headTruncateTotal, |
|
m.checkpointDeleteFail, |
|
m.checkpointDeleteTotal, |
|
m.checkpointCreationFail, |
|
m.checkpointCreationTotal, |
|
m.mmapChunkCorruptionTotal, |
|
// Metrics bound to functions and not needed in tests |
|
// can be created and registered on the spot. |
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_head_max_time", |
|
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.", |
|
}, func() float64 { |
|
return float64(h.MaxTime()) |
|
}), |
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_head_min_time", |
|
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.", |
|
}, func() float64 { |
|
return float64(h.MinTime()) |
|
}), |
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_isolation_low_watermark", |
|
Help: "The lowest TSDB append ID that is still referenced.", |
|
}, func() float64 { |
|
return float64(h.iso.lowWatermark()) |
|
}), |
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
Name: "prometheus_tsdb_isolation_high_watermark", |
|
Help: "The highest TSDB append ID that has been given out.", |
|
}, func() float64 { |
|
return float64(h.iso.lastAppendID()) |
|
}), |
|
) |
|
} |
|
return m |
|
} |
|
|
|
// HeadStats are the statistics for the head component of the DB. |
|
type HeadStats struct { |
|
WALReplayStatus *WALReplayStatus |
|
} |
|
|
|
// NewHeadStats returns a new HeadStats object. |
|
func NewHeadStats() *HeadStats { |
|
return &HeadStats{ |
|
WALReplayStatus: &WALReplayStatus{}, |
|
} |
|
} |
|
|
|
// WALReplayStatus contains status information about the WAL replay. |
|
type WALReplayStatus struct { |
|
sync.RWMutex |
|
Min int |
|
Max int |
|
Current int |
|
} |
|
|
|
// GetWALReplayStatus returns the WAL replay status information. |
|
func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus { |
|
s.RLock() |
|
defer s.RUnlock() |
|
|
|
return WALReplayStatus{ |
|
Min: s.Min, |
|
Max: s.Max, |
|
Current: s.Current, |
|
} |
|
} |
|
|
|
const cardinalityCacheExpirationTime = time.Duration(30) * time.Second |
|
|
|
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. |
|
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { |
|
h.cardinalityMutex.Lock() |
|
defer h.cardinalityMutex.Unlock() |
|
currentTime := time.Duration(time.Now().Unix()) * time.Second |
|
seconds := currentTime - h.lastPostingsStatsCall |
|
if seconds > cardinalityCacheExpirationTime { |
|
h.cardinalityCache = nil |
|
} |
|
if h.cardinalityCache != nil { |
|
return h.cardinalityCache |
|
} |
|
h.cardinalityCache = h.postings.Stats(statsByLabelName) |
|
h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second |
|
|
|
return h.cardinalityCache |
|
} |
|
|
|
// NewHead opens the head block in dir. |
|
func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { |
|
var err error |
|
if l == nil { |
|
l = log.NewNopLogger() |
|
} |
|
if opts.ChunkRange < 1 { |
|
return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange) |
|
} |
|
if opts.SeriesCallback == nil { |
|
opts.SeriesCallback = &noopSeriesLifecycleCallback{} |
|
} |
|
|
|
em := NewExemplarMetrics(r) |
|
es, err := NewCircularExemplarStorage(opts.MaxExemplars.Load(), em) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if stats == nil { |
|
stats = NewHeadStats() |
|
} |
|
|
|
h := &Head{ |
|
wal: wal, |
|
logger: l, |
|
opts: opts, |
|
exemplarMetrics: em, |
|
exemplars: es, |
|
series: newStripeSeries(opts.StripeSize, opts.SeriesCallback), |
|
symbols: map[string]struct{}{}, |
|
postings: index.NewUnorderedMemPostings(), |
|
tombstones: tombstones.NewMemTombstones(), |
|
iso: newIsolation(), |
|
deleted: map[uint64]int{}, |
|
memChunkPool: sync.Pool{ |
|
New: func() interface{} { |
|
return &memChunk{} |
|
}, |
|
}, |
|
stats: stats, |
|
reg: r, |
|
} |
|
h.chunkRange.Store(opts.ChunkRange) |
|
h.minTime.Store(math.MaxInt64) |
|
h.maxTime.Store(math.MinInt64) |
|
h.lastWALTruncationTime.Store(math.MinInt64) |
|
h.lastMemoryTruncationTime.Store(math.MinInt64) |
|
h.metrics = newHeadMetrics(h, r) |
|
|
|
if opts.ChunkPool == nil { |
|
opts.ChunkPool = chunkenc.NewPool() |
|
} |
|
|
|
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( |
|
mmappedChunksDir(opts.ChunkDirRoot), |
|
opts.ChunkPool, |
|
opts.ChunkWriteBufferSize, |
|
) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return h, nil |
|
} |
|
|
|
func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") } |
|
|
|
func (h *Head) ApplyConfig(cfg *config.Config) error { |
|
if !h.opts.EnableExemplarStorage { |
|
return nil |
|
} |
|
|
|
// Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage |
|
// to decide if it should pass exemplars along to it's exemplar storage, so we |
|
// need to update opts.MaxExemplars here. |
|
prevSize := h.opts.MaxExemplars.Load() |
|
h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) |
|
|
|
if prevSize == h.opts.MaxExemplars.Load() { |
|
return nil |
|
} |
|
|
|
migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load()) |
|
level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated) |
|
return nil |
|
} |
|
|
|
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { |
|
return h.exemplars.ExemplarQuerier(ctx) |
|
} |
|
|
|
// processWALSamples adds a partition of samples it receives to the head and passes |
|
// them on to other workers. |
|
// Samples before the mint timestamp are discarded. |
|
func (h *Head) processWALSamples( |
|
minValidTime int64, |
|
input <-chan []record.RefSample, output chan<- []record.RefSample, |
|
) (unknownRefs uint64) { |
|
defer close(output) |
|
|
|
// Mitigate lock contention in getByID. |
|
refSeries := map[uint64]*memSeries{} |
|
|
|
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) |
|
|
|
for samples := range input { |
|
for _, s := range samples { |
|
if s.T < minValidTime { |
|
continue |
|
} |
|
ms := refSeries[s.Ref] |
|
if ms == nil { |
|
ms = h.series.getByID(s.Ref) |
|
if ms == nil { |
|
unknownRefs++ |
|
continue |
|
} |
|
refSeries[s.Ref] = ms |
|
} |
|
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { |
|
h.metrics.chunksCreated.Inc() |
|
h.metrics.chunks.Inc() |
|
} |
|
if s.T > maxt { |
|
maxt = s.T |
|
} |
|
if s.T < mint { |
|
mint = s.T |
|
} |
|
} |
|
output <- samples |
|
} |
|
h.updateMinMaxTime(mint, maxt) |
|
|
|
return unknownRefs |
|
} |
|
|
|
func (h *Head) updateMinMaxTime(mint, maxt int64) { |
|
for { |
|
lt := h.MinTime() |
|
if mint >= lt { |
|
break |
|
} |
|
if h.minTime.CAS(lt, mint) { |
|
break |
|
} |
|
} |
|
for { |
|
ht := h.MaxTime() |
|
if maxt <= ht { |
|
break |
|
} |
|
if h.maxTime.CAS(ht, maxt) { |
|
break |
|
} |
|
} |
|
} |
|
|
|
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks map[uint64][]*mmappedChunk) (err error) { |
|
// Track number of samples that referenced a series we don't know about |
|
// for error reporting. |
|
var unknownRefs atomic.Uint64 |
|
var unknownExemplarRefs atomic.Uint64 |
|
|
|
// Start workers that each process samples for a partition of the series ID space. |
|
// They are connected through a ring of channels which ensures that all sample batches |
|
// read from the WAL are processed in order. |
|
var ( |
|
wg sync.WaitGroup |
|
n = runtime.GOMAXPROCS(0) |
|
inputs = make([]chan []record.RefSample, n) |
|
outputs = make([]chan []record.RefSample, n) |
|
exemplarsInput chan record.RefExemplar |
|
|
|
dec record.Decoder |
|
shards = make([][]record.RefSample, n) |
|
|
|
decoded = make(chan interface{}, 10) |
|
decodeErr, seriesCreationErr error |
|
seriesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefSeries{} |
|
}, |
|
} |
|
samplesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefSample{} |
|
}, |
|
} |
|
tstonesPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []tombstones.Stone{} |
|
}, |
|
} |
|
exemplarsPool = sync.Pool{ |
|
New: func() interface{} { |
|
return []record.RefExemplar{} |
|
}, |
|
} |
|
) |
|
|
|
defer func() { |
|
// For CorruptionErr ensure to terminate all workers before exiting. |
|
_, ok := err.(*wal.CorruptionErr) |
|
if ok || seriesCreationErr != nil { |
|
for i := 0; i < n; i++ { |
|
close(inputs[i]) |
|
for range outputs[i] { |
|
} |
|
} |
|
close(exemplarsInput) |
|
wg.Wait() |
|
} |
|
}() |
|
|
|
wg.Add(n) |
|
for i := 0; i < n; i++ { |
|
outputs[i] = make(chan []record.RefSample, 300) |
|
inputs[i] = make(chan []record.RefSample, 300) |
|
|
|
go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { |
|
unknown := h.processWALSamples(h.minValidTime.Load(), input, output) |
|
unknownRefs.Add(unknown) |
|
wg.Done() |
|
}(inputs[i], outputs[i]) |
|
} |
|
|
|
wg.Add(1) |
|
exemplarsInput = make(chan record.RefExemplar, 300) |
|
go func(input <-chan record.RefExemplar) { |
|
defer wg.Done() |
|
for e := range input { |
|
ms := h.series.getByID(e.Ref) |
|
if ms == nil { |
|
unknownExemplarRefs.Inc() |
|
continue |
|
} |
|
|
|
if e.T < h.minValidTime.Load() { |
|
continue |
|
} |
|
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when |
|
// replaying the WAL, so lets just log the error if it's not that type. |
|
err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels}) |
|
if err != nil && err == storage.ErrOutOfOrderExemplar { |
|
level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err) |
|
} |
|
} |
|
}(exemplarsInput) |
|
|
|
go func() { |
|
defer close(decoded) |
|
for r.Next() { |
|
rec := r.Record() |
|
switch dec.Type(rec) { |
|
case record.Series: |
|
series := seriesPool.Get().([]record.RefSeries)[:0] |
|
series, err = dec.Series(rec, series) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode series"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- series |
|
case record.Samples: |
|
samples := samplesPool.Get().([]record.RefSample)[:0] |
|
samples, err = dec.Samples(rec, samples) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode samples"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- samples |
|
case record.Tombstones: |
|
tstones := tstonesPool.Get().([]tombstones.Stone)[:0] |
|
tstones, err = dec.Tombstones(rec, tstones) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode tombstones"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- tstones |
|
case record.Exemplars: |
|
exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0] |
|
exemplars, err = dec.Exemplars(rec, exemplars) |
|
if err != nil { |
|
decodeErr = &wal.CorruptionErr{ |
|
Err: errors.Wrap(err, "decode exemplars"), |
|
Segment: r.Segment(), |
|
Offset: r.Offset(), |
|
} |
|
return |
|
} |
|
decoded <- exemplars |
|
default: |
|
// Noop. |
|
} |
|
} |
|
}() |
|
|
|
Outer: |
|
for d := range decoded { |
|
switch v := d.(type) { |
|
case []record.RefSeries: |
|
for _, s := range v { |
|
series, created, err := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) |
|
if err != nil { |
|
seriesCreationErr = err |
|
break Outer |
|
} |
|
|
|
if created { |
|
// If this series gets a duplicate record, we don't restore its mmapped chunks, |
|
// and instead restore everything from WAL records. |
|
series.mmappedChunks = mmappedChunks[series.ref] |
|
|
|
h.metrics.chunks.Add(float64(len(series.mmappedChunks))) |
|
h.metrics.chunksCreated.Add(float64(len(series.mmappedChunks))) |
|
|
|
if len(series.mmappedChunks) > 0 { |
|
h.updateMinMaxTime(series.minTime(), series.maxTime()) |
|
} |
|
} else { |
|
// TODO(codesome) Discard old samples and mmapped chunks and use mmap chunks for the new series ID. |
|
|
|
// There's already a different ref for this series. |
|
multiRef[s.Ref] = series.ref |
|
} |
|
|
|
if h.lastSeriesID.Load() < s.Ref { |
|
h.lastSeriesID.Store(s.Ref) |
|
} |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
seriesPool.Put(v) |
|
case []record.RefSample: |
|
samples := v |
|
// We split up the samples into chunks of 5000 samples or less. |
|
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise |
|
// cause thousands of very large in flight buffers occupying large amounts |
|
// of unused memory. |
|
for len(samples) > 0 { |
|
m := 5000 |
|
if len(samples) < m { |
|
m = len(samples) |
|
} |
|
for i := 0; i < n; i++ { |
|
var buf []record.RefSample |
|
select { |
|
case buf = <-outputs[i]: |
|
default: |
|
} |
|
shards[i] = buf[:0] |
|
} |
|
for _, sam := range samples[:m] { |
|
if r, ok := multiRef[sam.Ref]; ok { |
|
sam.Ref = r |
|
} |
|
mod := sam.Ref % uint64(n) |
|
shards[mod] = append(shards[mod], sam) |
|
} |
|
for i := 0; i < n; i++ { |
|
inputs[i] <- shards[i] |
|
} |
|
samples = samples[m:] |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
samplesPool.Put(v) |
|
case []tombstones.Stone: |
|
for _, s := range v { |
|
for _, itv := range s.Intervals { |
|
if itv.Maxt < h.minValidTime.Load() { |
|
continue |
|
} |
|
if m := h.series.getByID(s.Ref); m == nil { |
|
unknownRefs.Inc() |
|
continue |
|
} |
|
h.tombstones.AddInterval(s.Ref, itv) |
|
} |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
tstonesPool.Put(v) |
|
case []record.RefExemplar: |
|
for _, e := range v { |
|
exemplarsInput <- e |
|
} |
|
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification. |
|
exemplarsPool.Put(v) |
|
default: |
|
panic(fmt.Errorf("unexpected decoded type: %T", d)) |
|
} |
|
} |
|
|
|
if decodeErr != nil { |
|
return decodeErr |
|
} |
|
if seriesCreationErr != nil { |
|
// Drain the channel to unblock the goroutine. |
|
for range decoded { |
|
} |
|
return seriesCreationErr |
|
} |
|
|
|
// Signal termination to each worker and wait for it to close its output channel. |
|
for i := 0; i < n; i++ { |
|
close(inputs[i]) |
|
for range outputs[i] { |
|
} |
|
} |
|
close(exemplarsInput) |
|
wg.Wait() |
|
|
|
if r.Err() != nil { |
|
return errors.Wrap(r.Err(), "read records") |
|
} |
|
|
|
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 { |
|
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load()) |
|
} |
|
return nil |
|
} |
|
|
|
// Init loads data from the write ahead log and prepares the head for writes. |
|
// It should be called before using an appender so that it |
|
// limits the ingested samples to the head min valid time. |
|
func (h *Head) Init(minValidTime int64) error { |
|
h.minValidTime.Store(minValidTime) |
|
defer h.postings.EnsureOrder() |
|
defer h.gc() // After loading the wal remove the obsolete data from the head. |
|
|
|
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any") |
|
start := time.Now() |
|
|
|
mmappedChunks, err := h.loadMmappedChunks() |
|
if err != nil { |
|
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err) |
|
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok { |
|
h.metrics.mmapChunkCorruptionTotal.Inc() |
|
} |
|
// If this fails, data will be recovered from WAL. |
|
// Hence we wont lose any data (given WAL is not corrupt). |
|
mmappedChunks = h.removeCorruptedMmappedChunks(err) |
|
} |
|
|
|
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String()) |
|
if h.wal == nil { |
|
level.Info(h.logger).Log("msg", "WAL not found") |
|
return nil |
|
} |
|
|
|
level.Info(h.logger).Log("msg", "Replaying WAL, this may take a while") |
|
|
|
checkpointReplayStart := time.Now() |
|
// Backfill the checkpoint first if it exists. |
|
dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) |
|
if err != nil && err != record.ErrNotFound { |
|
return errors.Wrap(err, "find last checkpoint") |
|
} |
|
|
|
// Find the last segment. |
|
_, endAt, e := wal.Segments(h.wal.Dir()) |
|
if e != nil { |
|
return errors.Wrap(e, "finding WAL segments") |
|
} |
|
|
|
h.startWALReplayStatus(startFrom, endAt) |
|
|
|
multiRef := map[uint64]uint64{} |
|
if err == nil { |
|
sr, err := wal.NewSegmentsReader(dir) |
|
if err != nil { |
|
return errors.Wrap(err, "open checkpoint") |
|
} |
|
defer func() { |
|
if err := sr.Close(); err != nil { |
|
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) |
|
} |
|
}() |
|
|
|
// A corrupted checkpoint is a hard error for now and requires user |
|
// intervention. There's likely little data that can be recovered anyway. |
|
if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil { |
|
return errors.Wrap(err, "backfill checkpoint") |
|
} |
|
h.updateWALReplayStatusRead(startFrom) |
|
startFrom++ |
|
level.Info(h.logger).Log("msg", "WAL checkpoint loaded") |
|
} |
|
checkpointReplayDuration := time.Since(checkpointReplayStart) |
|
|
|
walReplayStart := time.Now() |
|
|
|
// Backfill segments from the most recent checkpoint onwards. |
|
for i := startFrom; i <= endAt; i++ { |
|
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) |
|
if err != nil { |
|
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) |
|
} |
|
|
|
sr := wal.NewSegmentBufReader(s) |
|
err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks) |
|
if err := sr.Close(); err != nil { |
|
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err) |
|
} |
|
if err != nil { |
|
return err |
|
} |
|
level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt) |
|
h.updateWALReplayStatusRead(i) |
|
} |
|
|
|
walReplayDuration := time.Since(start) |
|
h.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds()) |
|
level.Info(h.logger).Log( |
|
"msg", "WAL replay completed", |
|
"checkpoint_replay_duration", checkpointReplayDuration.String(), |
|
"wal_replay_duration", time.Since(walReplayStart).String(), |
|
"total_replay_duration", walReplayDuration.String(), |
|
) |
|
|
|
return nil |
|
} |
|
|
|
// SetMinValidTime sets the minimum timestamp the head can ingest. |
|
func (h *Head) SetMinValidTime(minValidTime int64) { |
|
h.minValidTime.Store(minValidTime) |
|
} |
|
|
|
func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) { |
|
mmappedChunks := map[uint64][]*mmappedChunk{} |
|
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error { |
|
if maxt < h.minValidTime.Load() { |
|
return nil |
|
} |
|
|
|
slice := mmappedChunks[seriesRef] |
|
if len(slice) > 0 { |
|
if slice[len(slice)-1].maxTime >= mint { |
|
return &chunks.CorruptionErr{ |
|
Err: errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef), |
|
} |
|
} |
|
} |
|
|
|
slice = append(slice, &mmappedChunk{ |
|
ref: chunkRef, |
|
minTime: mint, |
|
maxTime: maxt, |
|
numSamples: numSamples, |
|
}) |
|
mmappedChunks[seriesRef] = slice |
|
return nil |
|
}); err != nil { |
|
return nil, errors.Wrap(err, "iterate on on-disk chunks") |
|
} |
|
return mmappedChunks, nil |
|
} |
|
|
|
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously |
|
// loaded mmapped chunks. |
|
func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk { |
|
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") |
|
|
|
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { |
|
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) |
|
return map[uint64][]*mmappedChunk{} |
|
} |
|
|
|
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks") |
|
mmappedChunks, err := h.loadMmappedChunks() |
|
if err != nil { |
|
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) |
|
mmappedChunks = map[uint64][]*mmappedChunk{} |
|
} |
|
|
|
return mmappedChunks |
|
} |
|
|
|
// Truncate removes old data before mint from the head and WAL. |
|
func (h *Head) Truncate(mint int64) (err error) { |
|
initialize := h.MinTime() == math.MaxInt64 |
|
if err := h.truncateMemory(mint); err != nil { |
|
return err |
|
} |
|
if initialize { |
|
return nil |
|
} |
|
return h.truncateWAL(mint) |
|
} |
|
|
|
// truncateMemory removes old data before mint from the head. |
|
func (h *Head) truncateMemory(mint int64) (err error) { |
|
defer func() { |
|
if err != nil { |
|
h.metrics.headTruncateFail.Inc() |
|
} |
|
}() |
|
|
|
initialize := h.MinTime() == math.MaxInt64 |
|
|
|
if h.MinTime() >= mint && !initialize { |
|
return nil |
|
} |
|
|
|
// The order of these two Store() should not be changed, |
|
// i.e. truncation time is set before in-process boolean. |
|
h.lastMemoryTruncationTime.Store(mint) |
|
h.memTruncationInProcess.Store(true) |
|
defer h.memTruncationInProcess.Store(false) |
|
|
|
// We wait for pending queries to end that overlap with this truncation. |
|
if !initialize { |
|
h.WaitForPendingReadersInTimeRange(h.MinTime(), mint) |
|
} |
|
|
|
h.minTime.Store(mint) |
|
h.minValidTime.Store(mint) |
|
|
|
// Ensure that max time is at least as high as min time. |
|
for h.MaxTime() < mint { |
|
h.maxTime.CAS(h.MaxTime(), mint) |
|
} |
|
|
|
// This was an initial call to Truncate after loading blocks on startup. |
|
// We haven't read back the WAL yet, so do not attempt to truncate it. |
|
if initialize { |
|
return nil |
|
} |
|
|
|
h.metrics.headTruncateTotal.Inc() |
|
start := time.Now() |
|
|
|
actualMint := h.gc() |
|
level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start)) |
|
h.metrics.gcDuration.Observe(time.Since(start).Seconds()) |
|
if actualMint > h.minTime.Load() { |
|
// The actual mint of the Head is higher than the one asked to truncate. |
|
appendableMinValidTime := h.appendableMinValidTime() |
|
if actualMint < appendableMinValidTime { |
|
h.minTime.Store(actualMint) |
|
h.minValidTime.Store(actualMint) |
|
} else { |
|
// The actual min time is in the appendable window. |
|
// So we set the mint to the appendableMinValidTime. |
|
h.minTime.Store(appendableMinValidTime) |
|
h.minValidTime.Store(appendableMinValidTime) |
|
} |
|
} |
|
|
|
// Truncate the chunk m-mapper. |
|
if err := h.chunkDiskMapper.Truncate(mint); err != nil { |
|
return errors.Wrap(err, "truncate chunks.HeadReadWriter") |
|
} |
|
return nil |
|
} |
|
|
|
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. |
|
// The query timeout limits the max wait time of this function implicitly. |
|
// The mint is inclusive and maxt is the truncation time hence exclusive. |
|
func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) { |
|
maxt-- // Making it inclusive before checking overlaps. |
|
overlaps := func() bool { |
|
o := false |
|
h.iso.TraverseOpenReads(func(s *isolationState) bool { |
|
if s.mint <= maxt && mint <= s.maxt { |
|
// Overlaps with the truncation range. |
|
o = true |
|
return false |
|
} |
|
return true |
|
}) |
|
return o |
|
} |
|
for overlaps() { |
|
time.Sleep(500 * time.Millisecond) |
|
} |
|
} |
|
|
|
// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier |
|
// has to be created. In the latter case, the method also returns the new mint to be used for creating the |
|
// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data. |
|
// |
|
// NOTE: The querier should already be taken before calling this. |
|
func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose bool, getNew bool, newMint int64) { |
|
if !h.memTruncationInProcess.Load() { |
|
return false, false, 0 |
|
} |
|
// Head truncation is in process. It also means that the block that was |
|
// created for this truncation range is also available. |
|
// Check if we took a querier that overlaps with this truncation. |
|
memTruncTime := h.lastMemoryTruncationTime.Load() |
|
if querierMaxt < memTruncTime { |
|
// Head compaction has happened and this time range is being truncated. |
|
// This query doesn't overlap with the Head any longer. |
|
// We should close this querier to avoid races and the data would be |
|
// available with the blocks below. |
|
// Cases: |
|
// 1. |------truncation------| |
|
// |---query---| |
|
// 2. |------truncation------| |
|
// |---query---| |
|
return true, false, 0 |
|
} |
|
if querierMint < memTruncTime { |
|
// The truncation time is not same as head mint that we saw above but the |
|
// query still overlaps with the Head. |
|
// The truncation started after we got the querier. So it is not safe |
|
// to use this querier and/or might block truncation. We should get |
|
// a new querier for the new Head range while remaining will be available |
|
// in the blocks below. |
|
// Case: |
|
// |------truncation------| |
|
// |----query----| |
|
// Turns into |
|
// |------truncation------| |
|
// |---qu---| |
|
return true, true, memTruncTime |
|
} |
|
|
|
// Other case is this, which is a no-op |
|
// |------truncation------| |
|
// |---query---| |
|
return false, false, 0 |
|
} |
|
|
|
// truncateWAL removes old data before mint from the WAL. |
|
func (h *Head) truncateWAL(mint int64) error { |
|
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() { |
|
return nil |
|
} |
|
start := time.Now() |
|
h.lastWALTruncationTime.Store(mint) |
|
|
|
first, last, err := wal.Segments(h.wal.Dir()) |
|
if err != nil { |
|
return errors.Wrap(err, "get segment range") |
|
} |
|
// Start a new segment, so low ingestion volume TSDB don't have more WAL than |
|
// needed. |
|
if err := h.wal.NextSegment(); err != nil { |
|
return errors.Wrap(err, "next segment") |
|
} |
|
last-- // Never consider last segment for checkpoint. |
|
if last < 0 { |
|
return nil // no segments yet. |
|
} |
|
// The lower two thirds of segments should contain mostly obsolete samples. |
|
// If we have less than two segments, it's not worth checkpointing yet. |
|
// With the default 2h blocks, this will keeping up to around 3h worth |
|
// of WAL segments. |
|
last = first + (last-first)*2/3 |
|
if last <= first { |
|
return nil |
|
} |
|
|
|
keep := func(id uint64) bool { |
|
if h.series.getByID(id) != nil { |
|
return true |
|
} |
|
h.deletedMtx.Lock() |
|
_, ok := h.deleted[id] |
|
h.deletedMtx.Unlock() |
|
return ok |
|
} |
|
h.metrics.checkpointCreationTotal.Inc() |
|
if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil { |
|
h.metrics.checkpointCreationFail.Inc() |
|
if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok { |
|
h.metrics.walCorruptionsTotal.Inc() |
|
} |
|
return errors.Wrap(err, "create checkpoint") |
|
} |
|
if err := h.wal.Truncate(last + 1); err != nil { |
|
// If truncating fails, we'll just try again at the next checkpoint. |
|
// Leftover segments will just be ignored in the future if there's a checkpoint |
|
// that supersedes them. |
|
level.Error(h.logger).Log("msg", "truncating segments failed", "err", err) |
|
} |
|
|
|
// The checkpoint is written and segments before it is truncated, so we no |
|
// longer need to track deleted series that are before it. |
|
h.deletedMtx.Lock() |
|
for ref, segment := range h.deleted { |
|
if segment < first { |
|
delete(h.deleted, ref) |
|
} |
|
} |
|
h.deletedMtx.Unlock() |
|
|
|
h.metrics.checkpointDeleteTotal.Inc() |
|
if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { |
|
// Leftover old checkpoints do not cause problems down the line beyond |
|
// occupying disk space. |
|
// They will just be ignored since a higher checkpoint exists. |
|
level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err) |
|
h.metrics.checkpointDeleteFail.Inc() |
|
} |
|
h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds()) |
|
|
|
level.Info(h.logger).Log("msg", "WAL checkpoint complete", |
|
"first", first, "last", last, "duration", time.Since(start)) |
|
|
|
return nil |
|
} |
|
|
|
// initTime initializes a head with the first timestamp. This only needs to be called |
|
// for a completely fresh head with an empty WAL. |
|
func (h *Head) initTime(t int64) { |
|
if !h.minTime.CAS(math.MaxInt64, t) { |
|
return |
|
} |
|
// Ensure that max time is initialized to at least the min time we just set. |
|
// Concurrent appenders may already have set it to a higher value. |
|
h.maxTime.CAS(math.MinInt64, t) |
|
} |
|
|
|
type Stats struct { |
|
NumSeries uint64 |
|
MinTime, MaxTime int64 |
|
IndexPostingStats *index.PostingsStats |
|
} |
|
|
|
// Stats returns important current HEAD statistics. Note that it is expensive to |
|
// calculate these. |
|
func (h *Head) Stats(statsByLabelName string) *Stats { |
|
return &Stats{ |
|
NumSeries: h.NumSeries(), |
|
MaxTime: h.MaxTime(), |
|
MinTime: h.MinTime(), |
|
IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName), |
|
} |
|
} |
|
|
|
type RangeHead struct { |
|
head *Head |
|
mint, maxt int64 |
|
} |
|
|
|
// NewRangeHead returns a *RangeHead. |
|
func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { |
|
return &RangeHead{ |
|
head: head, |
|
mint: mint, |
|
maxt: maxt, |
|
} |
|
} |
|
|
|
func (h *RangeHead) Index() (IndexReader, error) { |
|
return h.head.indexRange(h.mint, h.maxt), nil |
|
} |
|
|
|
func (h *RangeHead) Chunks() (ChunkReader, error) { |
|
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt)) |
|
} |
|
|
|
func (h *RangeHead) Tombstones() (tombstones.Reader, error) { |
|
return h.head.tombstones, nil |
|
} |
|
|
|
func (h *RangeHead) MinTime() int64 { |
|
return h.mint |
|
} |
|
|
|
// MaxTime returns the max time of actual data fetch-able from the head. |
|
// This controls the chunks time range which is closed [b.MinTime, b.MaxTime]. |
|
func (h *RangeHead) MaxTime() int64 { |
|
return h.maxt |
|
} |
|
|
|
// BlockMaxTime returns the max time of the potential block created from this head. |
|
// It's different to MaxTime as we need to add +1 millisecond to block maxt because block |
|
// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. |
|
func (h *RangeHead) BlockMaxTime() int64 { |
|
return h.MaxTime() + 1 |
|
} |
|
|
|
func (h *RangeHead) NumSeries() uint64 { |
|
return h.head.NumSeries() |
|
} |
|
|
|
func (h *RangeHead) Meta() BlockMeta { |
|
return BlockMeta{ |
|
MinTime: h.MinTime(), |
|
MaxTime: h.MaxTime(), |
|
ULID: h.head.Meta().ULID, |
|
Stats: BlockStats{ |
|
NumSeries: h.NumSeries(), |
|
}, |
|
} |
|
} |
|
|
|
// String returns an human readable representation of the range head. It's important to |
|
// keep this function in order to avoid the struct dump when the head is stringified in |
|
// errors or logs. |
|
func (h *RangeHead) String() string { |
|
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) |
|
} |
|
|
|
// initAppender is a helper to initialize the time bounds of the head |
|
// upon the first sample it receives. |
|
type initAppender struct { |
|
app storage.Appender |
|
head *Head |
|
} |
|
|
|
func (a *initAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { |
|
if a.app != nil { |
|
return a.app.Append(ref, lset, t, v) |
|
} |
|
|
|
a.head.initTime(t) |
|
a.app = a.head.appender() |
|
return a.app.Append(ref, lset, t, v) |
|
} |
|
|
|
func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { |
|
// Check if exemplar storage is enabled. |
|
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { |
|
return 0, nil |
|
} |
|
|
|
if a.app != nil { |
|
return a.app.AppendExemplar(ref, l, e) |
|
} |
|
// We should never reach here given we would call Append before AppendExemplar |
|
// and we probably want to always base head/WAL min time on sample times. |
|
a.head.initTime(e.Ts) |
|
a.app = a.head.appender() |
|
|
|
return a.app.AppendExemplar(ref, l, e) |
|
} |
|
|
|
var _ storage.GetRef = &initAppender{} |
|
|
|
func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { |
|
if g, ok := a.app.(storage.GetRef); ok { |
|
return g.GetRef(lset) |
|
} |
|
return 0, nil |
|
} |
|
|
|
func (a *initAppender) Commit() error { |
|
if a.app == nil { |
|
return nil |
|
} |
|
return a.app.Commit() |
|
} |
|
|
|
func (a *initAppender) Rollback() error { |
|
if a.app == nil { |
|
return nil |
|
} |
|
return a.app.Rollback() |
|
} |
|
|
|
// Appender returns a new Appender on the database. |
|
func (h *Head) Appender(_ context.Context) storage.Appender { |
|
h.metrics.activeAppenders.Inc() |
|
|
|
// The head cache might not have a starting point yet. The init appender |
|
// picks up the first appended timestamp as the base. |
|
if h.MinTime() == math.MaxInt64 { |
|
return &initAppender{ |
|
head: h, |
|
} |
|
} |
|
return h.appender() |
|
} |
|
|
|
func (h *Head) appender() *headAppender { |
|
appendID, cleanupAppendIDsBelow := h.iso.newAppendID() |
|
|
|
// Allocate the exemplars buffer only if exemplars are enabled. |
|
var exemplarsBuf []exemplarWithSeriesRef |
|
if h.opts.EnableExemplarStorage { |
|
exemplarsBuf = h.getExemplarBuffer() |
|
} |
|
|
|
return &headAppender{ |
|
head: h, |
|
minValidTime: h.appendableMinValidTime(), |
|
mint: math.MaxInt64, |
|
maxt: math.MinInt64, |
|
samples: h.getAppendBuffer(), |
|
sampleSeries: h.getSeriesBuffer(), |
|
exemplars: exemplarsBuf, |
|
appendID: appendID, |
|
cleanupAppendIDsBelow: cleanupAppendIDsBelow, |
|
} |
|
} |
|
|
|
func (h *Head) appendableMinValidTime() int64 { |
|
// Setting the minimum valid time to whichever is greater, the head min valid time or the compaction window, |
|
// ensures that no samples will be added within the compaction window to avoid races. |
|
return max(h.minValidTime.Load(), h.MaxTime()-h.chunkRange.Load()/2) |
|
} |
|
|
|
func max(a, b int64) int64 { |
|
if a > b { |
|
return a |
|
} |
|
return b |
|
} |
|
|
|
func (h *Head) getAppendBuffer() []record.RefSample { |
|
b := h.appendPool.Get() |
|
if b == nil { |
|
return make([]record.RefSample, 0, 512) |
|
} |
|
return b.([]record.RefSample) |
|
} |
|
|
|
func (h *Head) putAppendBuffer(b []record.RefSample) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.appendPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef { |
|
b := h.exemplarsPool.Get() |
|
if b == nil { |
|
return make([]exemplarWithSeriesRef, 0, 512) |
|
} |
|
return b.([]exemplarWithSeriesRef) |
|
} |
|
|
|
func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { |
|
if b == nil { |
|
return |
|
} |
|
|
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.exemplarsPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getSeriesBuffer() []*memSeries { |
|
b := h.seriesPool.Get() |
|
if b == nil { |
|
return make([]*memSeries, 0, 512) |
|
} |
|
return b.([]*memSeries) |
|
} |
|
|
|
func (h *Head) putSeriesBuffer(b []*memSeries) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.seriesPool.Put(b[:0]) |
|
} |
|
|
|
func (h *Head) getBytesBuffer() []byte { |
|
b := h.bytesPool.Get() |
|
if b == nil { |
|
return make([]byte, 0, 1024) |
|
} |
|
return b.([]byte) |
|
} |
|
|
|
func (h *Head) putBytesBuffer(b []byte) { |
|
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. |
|
h.bytesPool.Put(b[:0]) |
|
} |
|
|
|
type exemplarWithSeriesRef struct { |
|
ref uint64 |
|
exemplar exemplar.Exemplar |
|
} |
|
|
|
type headAppender struct { |
|
head *Head |
|
minValidTime int64 // No samples below this timestamp are allowed. |
|
mint, maxt int64 |
|
|
|
series []record.RefSeries |
|
samples []record.RefSample |
|
exemplars []exemplarWithSeriesRef |
|
sampleSeries []*memSeries |
|
|
|
appendID, cleanupAppendIDsBelow uint64 |
|
closed bool |
|
} |
|
|
|
func (a *headAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { |
|
if t < a.minValidTime { |
|
a.head.metrics.outOfBoundSamples.Inc() |
|
return 0, storage.ErrOutOfBounds |
|
} |
|
|
|
s := a.head.series.getByID(ref) |
|
if s == nil { |
|
// Ensure no empty labels have gotten through. |
|
lset = lset.WithoutEmpty() |
|
if len(lset) == 0 { |
|
return 0, errors.Wrap(ErrInvalidSample, "empty labelset") |
|
} |
|
|
|
if l, dup := lset.HasDuplicateLabelNames(); dup { |
|
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) |
|
} |
|
|
|
var created bool |
|
var err error |
|
s, created, err = a.head.getOrCreate(lset.Hash(), lset) |
|
if err != nil { |
|
return 0, err |
|
} |
|
if created { |
|
a.series = append(a.series, record.RefSeries{ |
|
Ref: s.ref, |
|
Labels: lset, |
|
}) |
|
} |
|
} |
|
|
|
s.Lock() |
|
if err := s.appendable(t, v); err != nil { |
|
s.Unlock() |
|
if err == storage.ErrOutOfOrderSample { |
|
a.head.metrics.outOfOrderSamples.Inc() |
|
} |
|
return 0, err |
|
} |
|
s.pendingCommit = true |
|
s.Unlock() |
|
|
|
if t < a.mint { |
|
a.mint = t |
|
} |
|
if t > a.maxt { |
|
a.maxt = t |
|
} |
|
|
|
a.samples = append(a.samples, record.RefSample{ |
|
Ref: s.ref, |
|
T: t, |
|
V: v, |
|
}) |
|
a.sampleSeries = append(a.sampleSeries, s) |
|
return s.ref, nil |
|
} |
|
|
|
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't |
|
// use getOrCreate or make any of the lset sanity checks that Append does. |
|
func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exemplar) (uint64, error) { |
|
// Check if exemplar storage is enabled. |
|
if !a.head.opts.EnableExemplarStorage || a.head.opts.MaxExemplars.Load() <= 0 { |
|
return 0, nil |
|
} |
|
s := a.head.series.getByID(ref) |
|
if s == nil { |
|
return 0, fmt.Errorf("unknown series ref. when trying to add exemplar: %d", ref) |
|
} |
|
|
|
// Ensure no empty labels have gotten through. |
|
e.Labels = e.Labels.WithoutEmpty() |
|
|
|
err := a.head.exemplars.ValidateExemplar(s.lset, e) |
|
if err != nil { |
|
if err == storage.ErrDuplicateExemplar || err == storage.ErrExemplarsDisabled { |
|
// Duplicate, don't return an error but don't accept the exemplar. |
|
return 0, nil |
|
} |
|
return 0, err |
|
} |
|
|
|
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e}) |
|
|
|
return s.ref, nil |
|
} |
|
|
|
var _ storage.GetRef = &headAppender{} |
|
|
|
func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { |
|
s := a.head.series.getByHash(lset.Hash(), lset) |
|
if s == nil { |
|
return 0, nil |
|
} |
|
// returned labels must be suitable to pass to Append() |
|
return s.ref, s.lset |
|
} |
|
|
|
func (a *headAppender) log() error { |
|
if a.head.wal == nil { |
|
return nil |
|
} |
|
|
|
buf := a.head.getBytesBuffer() |
|
defer func() { a.head.putBytesBuffer(buf) }() |
|
|
|
var rec []byte |
|
var enc record.Encoder |
|
|
|
if len(a.series) > 0 { |
|
rec = enc.Series(a.series, buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log series") |
|
} |
|
} |
|
if len(a.samples) > 0 { |
|
rec = enc.Samples(a.samples, buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log samples") |
|
} |
|
} |
|
if len(a.exemplars) > 0 { |
|
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf) |
|
buf = rec[:0] |
|
|
|
if err := a.head.wal.Log(rec); err != nil { |
|
return errors.Wrap(err, "log exemplars") |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar { |
|
ret := make([]record.RefExemplar, 0, len(es)) |
|
for _, e := range es { |
|
ret = append(ret, record.RefExemplar{ |
|
Ref: e.ref, |
|
T: e.exemplar.Ts, |
|
V: e.exemplar.Value, |
|
Labels: e.exemplar.Labels, |
|
}) |
|
} |
|
return ret |
|
} |
|
|
|
func (a *headAppender) Commit() (err error) { |
|
if a.closed { |
|
return ErrAppenderClosed |
|
} |
|
defer func() { a.closed = true }() |
|
|
|
if err := a.log(); err != nil { |
|
_ = a.Rollback() // Most likely the same error will happen again. |
|
return errors.Wrap(err, "write to WAL") |
|
} |
|
|
|
// No errors logging to WAL, so pass the exemplars along to the in memory storage. |
|
for _, e := range a.exemplars { |
|
s := a.head.series.getByID(e.ref) |
|
// We don't instrument exemplar appends here, all is instrumented by storage. |
|
if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil { |
|
if err == storage.ErrOutOfOrderExemplar { |
|
continue |
|
} |
|
level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err) |
|
} |
|
} |
|
|
|
defer a.head.metrics.activeAppenders.Dec() |
|
defer a.head.putAppendBuffer(a.samples) |
|
defer a.head.putSeriesBuffer(a.sampleSeries) |
|
defer a.head.putExemplarBuffer(a.exemplars) |
|
defer a.head.iso.closeAppend(a.appendID) |
|
|
|
total := len(a.samples) |
|
var series *memSeries |
|
for i, s := range a.samples { |
|
series = a.sampleSeries[i] |
|
series.Lock() |
|
ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) |
|
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) |
|
series.pendingCommit = false |
|
series.Unlock() |
|
|
|
if !ok { |
|
total-- |
|
a.head.metrics.outOfOrderSamples.Inc() |
|
} |
|
if chunkCreated { |
|
a.head.metrics.chunks.Inc() |
|
a.head.metrics.chunksCreated.Inc() |
|
} |
|
} |
|
|
|
a.head.metrics.samplesAppended.Add(float64(total)) |
|
a.head.updateMinMaxTime(a.mint, a.maxt) |
|
|
|
return nil |
|
} |
|
|
|
func (a *headAppender) Rollback() (err error) { |
|
if a.closed { |
|
return ErrAppenderClosed |
|
} |
|
defer func() { a.closed = true }() |
|
defer a.head.metrics.activeAppenders.Dec() |
|
defer a.head.iso.closeAppend(a.appendID) |
|
defer a.head.putSeriesBuffer(a.sampleSeries) |
|
|
|
var series *memSeries |
|
for i := range a.samples { |
|
series = a.sampleSeries[i] |
|
series.Lock() |
|
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) |
|
series.pendingCommit = false |
|
series.Unlock() |
|
} |
|
a.head.putAppendBuffer(a.samples) |
|
a.head.putExemplarBuffer(a.exemplars) |
|
a.samples = nil |
|
a.exemplars = nil |
|
|
|
// Series are created in the head memory regardless of rollback. Thus we have |
|
// to log them to the WAL in any case. |
|
return a.log() |
|
} |
|
|
|
// Delete all samples in the range of [mint, maxt] for series that satisfy the given |
|
// label matchers. |
|
func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { |
|
// Do not delete anything beyond the currently valid range. |
|
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime()) |
|
|
|
ir := h.indexRange(mint, maxt) |
|
|
|
p, err := PostingsForMatchers(ir, ms...) |
|
if err != nil { |
|
return errors.Wrap(err, "select series") |
|
} |
|
|
|
var stones []tombstones.Stone |
|
for p.Next() { |
|
series := h.series.getByID(p.At()) |
|
|
|
series.RLock() |
|
t0, t1 := series.minTime(), series.maxTime() |
|
series.RUnlock() |
|
if t0 == math.MinInt64 || t1 == math.MinInt64 { |
|
continue |
|
} |
|
// Delete only until the current values and not beyond. |
|
t0, t1 = clampInterval(mint, maxt, t0, t1) |
|
stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}}) |
|
} |
|
if p.Err() != nil { |
|
return p.Err() |
|
} |
|
if h.wal != nil { |
|
var enc record.Encoder |
|
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { |
|
return err |
|
} |
|
} |
|
for _, s := range stones { |
|
h.tombstones.AddInterval(s.Ref, s.Intervals[0]) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// gc removes data before the minimum timestamp from the head. |
|
// It returns the actual min times of the chunks present in the Head. |
|
func (h *Head) gc() int64 { |
|
// Only data strictly lower than this timestamp must be deleted. |
|
mint := h.MinTime() |
|
|
|
// Drop old chunks and remember series IDs and hashes if they can be |
|
// deleted entirely. |
|
deleted, chunksRemoved, actualMint := h.series.gc(mint) |
|
seriesRemoved := len(deleted) |
|
|
|
h.metrics.seriesRemoved.Add(float64(seriesRemoved)) |
|
h.metrics.chunksRemoved.Add(float64(chunksRemoved)) |
|
h.metrics.chunks.Sub(float64(chunksRemoved)) |
|
h.numSeries.Sub(uint64(seriesRemoved)) |
|
|
|
// Remove deleted series IDs from the postings lists. |
|
h.postings.Delete(deleted) |
|
|
|
if h.wal != nil { |
|
_, last, _ := wal.Segments(h.wal.Dir()) |
|
h.deletedMtx.Lock() |
|
// Keep series records until we're past segment 'last' |
|
// because the WAL will still have samples records with |
|
// this ref ID. If we didn't keep these series records then |
|
// on start up when we replay the WAL, or any other code |
|
// that reads the WAL, wouldn't be able to use those |
|
// samples since we would have no labels for that ref ID. |
|
for ref := range deleted { |
|
h.deleted[ref] = last |
|
} |
|
h.deletedMtx.Unlock() |
|
} |
|
|
|
// Rebuild symbols and label value indices from what is left in the postings terms. |
|
// symMtx ensures that append of symbols and postings is disabled for rebuild time. |
|
h.symMtx.Lock() |
|
defer h.symMtx.Unlock() |
|
|
|
symbols := make(map[string]struct{}, len(h.symbols)) |
|
if err := h.postings.Iter(func(l labels.Label, _ index.Postings) error { |
|
symbols[l.Name] = struct{}{} |
|
symbols[l.Value] = struct{}{} |
|
return nil |
|
}); err != nil { |
|
// This should never happen, as the iteration function only returns nil. |
|
panic(err) |
|
} |
|
h.symbols = symbols |
|
|
|
return actualMint |
|
} |
|
|
|
// Tombstones returns a new reader over the head's tombstones |
|
func (h *Head) Tombstones() (tombstones.Reader, error) { |
|
return h.tombstones, nil |
|
} |
|
|
|
// Index returns an IndexReader against the block. |
|
func (h *Head) Index() (IndexReader, error) { |
|
return h.indexRange(math.MinInt64, math.MaxInt64), nil |
|
} |
|
|
|
func (h *Head) indexRange(mint, maxt int64) *headIndexReader { |
|
if hmin := h.MinTime(); hmin > mint { |
|
mint = hmin |
|
} |
|
return &headIndexReader{head: h, mint: mint, maxt: maxt} |
|
} |
|
|
|
// Chunks returns a ChunkReader against the block. |
|
func (h *Head) Chunks() (ChunkReader, error) { |
|
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64)) |
|
} |
|
|
|
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) { |
|
h.closedMtx.Lock() |
|
defer h.closedMtx.Unlock() |
|
if h.closed { |
|
return nil, errors.New("can't read from a closed head") |
|
} |
|
if hmin := h.MinTime(); hmin > mint { |
|
mint = hmin |
|
} |
|
return &headChunkReader{ |
|
head: h, |
|
mint: mint, |
|
maxt: maxt, |
|
isoState: is, |
|
}, nil |
|
} |
|
|
|
// NumSeries returns the number of active series in the head. |
|
func (h *Head) NumSeries() uint64 { |
|
return h.numSeries.Load() |
|
} |
|
|
|
// Meta returns meta information about the head. |
|
// The head is dynamic so will return dynamic results. |
|
func (h *Head) Meta() BlockMeta { |
|
var id [16]byte |
|
copy(id[:], "______head______") |
|
return BlockMeta{ |
|
MinTime: h.MinTime(), |
|
MaxTime: h.MaxTime(), |
|
ULID: ulid.ULID(id), |
|
Stats: BlockStats{ |
|
NumSeries: h.NumSeries(), |
|
}, |
|
} |
|
} |
|
|
|
// MinTime returns the lowest time bound on visible data in the head. |
|
func (h *Head) MinTime() int64 { |
|
return h.minTime.Load() |
|
} |
|
|
|
// MaxTime returns the highest timestamp seen in data of the head. |
|
func (h *Head) MaxTime() int64 { |
|
return h.maxTime.Load() |
|
} |
|
|
|
// compactable returns whether the head has a compactable range. |
|
// The head has a compactable range when the head time range is 1.5 times the chunk range. |
|
// The 0.5 acts as a buffer of the appendable window. |
|
func (h *Head) compactable() bool { |
|
return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3 |
|
} |
|
|
|
// Close flushes the WAL and closes the head. |
|
func (h *Head) Close() error { |
|
h.closedMtx.Lock() |
|
defer h.closedMtx.Unlock() |
|
h.closed = true |
|
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close()) |
|
if h.wal != nil { |
|
errs.Add(h.wal.Close()) |
|
} |
|
return errs.Err() |
|
} |
|
|
|
// String returns an human readable representation of the TSDB head. It's important to |
|
// keep this function in order to avoid the struct dump when the head is stringified in |
|
// errors or logs. |
|
func (h *Head) String() string { |
|
return "head" |
|
} |
|
|
|
type headChunkReader struct { |
|
head *Head |
|
mint, maxt int64 |
|
isoState *isolationState |
|
} |
|
|
|
func (h *headChunkReader) Close() error { |
|
h.isoState.Close() |
|
return nil |
|
} |
|
|
|
// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID. |
|
// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes. |
|
func packChunkID(seriesID, chunkID uint64) uint64 { |
|
if seriesID > (1<<40)-1 { |
|
panic("series ID exceeds 5 bytes") |
|
} |
|
if chunkID > (1<<24)-1 { |
|
panic("chunk ID exceeds 3 bytes") |
|
} |
|
return (seriesID << 24) | chunkID |
|
} |
|
|
|
func unpackChunkID(id uint64) (seriesID, chunkID uint64) { |
|
return id >> 24, (id << 40) >> 40 |
|
} |
|
|
|
// Chunk returns the chunk for the reference number. |
|
func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { |
|
sid, cid := unpackChunkID(ref) |
|
|
|
s := h.head.series.getByID(sid) |
|
// This means that the series has been garbage collected. |
|
if s == nil { |
|
return nil, storage.ErrNotFound |
|
} |
|
|
|
s.Lock() |
|
c, garbageCollect, err := s.chunk(int(cid), h.head.chunkDiskMapper) |
|
if err != nil { |
|
s.Unlock() |
|
return nil, err |
|
} |
|
defer func() { |
|
if garbageCollect { |
|
// Set this to nil so that Go GC can collect it after it has been used. |
|
c.chunk = nil |
|
s.memChunkPool.Put(c) |
|
} |
|
}() |
|
|
|
// This means that the chunk is outside the specified range. |
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) { |
|
s.Unlock() |
|
return nil, storage.ErrNotFound |
|
} |
|
s.Unlock() |
|
|
|
return &safeChunk{ |
|
Chunk: c.chunk, |
|
s: s, |
|
cid: int(cid), |
|
isoState: h.isoState, |
|
chunkDiskMapper: h.head.chunkDiskMapper, |
|
}, nil |
|
} |
|
|
|
type safeChunk struct { |
|
chunkenc.Chunk |
|
s *memSeries |
|
cid int |
|
isoState *isolationState |
|
chunkDiskMapper *chunks.ChunkDiskMapper |
|
} |
|
|
|
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { |
|
c.s.Lock() |
|
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter) |
|
c.s.Unlock() |
|
return it |
|
} |
|
|
|
type headIndexReader struct { |
|
head *Head |
|
mint, maxt int64 |
|
} |
|
|
|
func (h *headIndexReader) Close() error { |
|
return nil |
|
} |
|
|
|
func (h *headIndexReader) Symbols() index.StringIter { |
|
h.head.symMtx.RLock() |
|
res := make([]string, 0, len(h.head.symbols)) |
|
|
|
for s := range h.head.symbols { |
|
res = append(res, s) |
|
} |
|
h.head.symMtx.RUnlock() |
|
|
|
sort.Strings(res) |
|
return index.NewStringListIter(res) |
|
} |
|
|
|
// SortedLabelValues returns label values present in the head for the |
|
// specific label name that are within the time range mint to maxt. |
|
// If matchers are specified the returned result set is reduced |
|
// to label values of metrics matching the matchers. |
|
func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { |
|
values, err := h.LabelValues(name, matchers...) |
|
if err == nil { |
|
sort.Strings(values) |
|
} |
|
return values, err |
|
} |
|
|
|
// LabelValues returns label values present in the head for the |
|
// specific label name that are within the time range mint to maxt. |
|
// If matchers are specified the returned result set is reduced |
|
// to label values of metrics matching the matchers. |
|
func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { |
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { |
|
return []string{}, nil |
|
} |
|
|
|
if len(matchers) == 0 { |
|
h.head.symMtx.RLock() |
|
defer h.head.symMtx.RUnlock() |
|
return h.head.postings.LabelValues(name), nil |
|
} |
|
|
|
return labelValuesWithMatchers(h, name, matchers...) |
|
} |
|
|
|
// LabelNames returns all the unique label names present in the head |
|
// that are within the time range mint to maxt. |
|
func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) { |
|
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { |
|
return []string{}, nil |
|
} |
|
|
|
if len(matchers) == 0 { |
|
h.head.symMtx.RLock() |
|
labelNames := h.head.postings.LabelNames() |
|
h.head.symMtx.RUnlock() |
|
|
|
sort.Strings(labelNames) |
|
return labelNames, nil |
|
} |
|
|
|
return labelNamesWithMatchers(h, matchers...) |
|
} |
|
|
|
// Postings returns the postings list iterator for the label pairs. |
|
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) { |
|
res := make([]index.Postings, 0, len(values)) |
|
for _, value := range values { |
|
res = append(res, h.head.postings.Get(name, value)) |
|
} |
|
return index.Merge(res...), nil |
|
} |
|
|
|
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { |
|
series := make([]*memSeries, 0, 128) |
|
|
|
// Fetch all the series only once. |
|
for p.Next() { |
|
s := h.head.series.getByID(p.At()) |
|
if s == nil { |
|
level.Debug(h.head.logger).Log("msg", "Looked up series not found") |
|
} else { |
|
series = append(series, s) |
|
} |
|
} |
|
if err := p.Err(); err != nil { |
|
return index.ErrPostings(errors.Wrap(err, "expand postings")) |
|
} |
|
|
|
sort.Slice(series, func(i, j int) bool { |
|
return labels.Compare(series[i].lset, series[j].lset) < 0 |
|
}) |
|
|
|
// Convert back to list. |
|
ep := make([]uint64, 0, len(series)) |
|
for _, p := range series { |
|
ep = append(ep, p.ref) |
|
} |
|
return index.NewListPostings(ep) |
|
} |
|
|
|
// Series returns the series for the given reference. |
|
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { |
|
s := h.head.series.getByID(ref) |
|
|
|
if s == nil { |
|
h.head.metrics.seriesNotFound.Inc() |
|
return storage.ErrNotFound |
|
} |
|
*lbls = append((*lbls)[:0], s.lset...) |
|
|
|
s.Lock() |
|
defer s.Unlock() |
|
|
|
*chks = (*chks)[:0] |
|
|
|
for i, c := range s.mmappedChunks { |
|
// Do not expose chunks that are outside of the specified range. |
|
if !c.OverlapsClosedInterval(h.mint, h.maxt) { |
|
continue |
|
} |
|
*chks = append(*chks, chunks.Meta{ |
|
MinTime: c.minTime, |
|
MaxTime: c.maxTime, |
|
Ref: packChunkID(s.ref, uint64(s.chunkID(i))), |
|
}) |
|
} |
|
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) { |
|
*chks = append(*chks, chunks.Meta{ |
|
MinTime: s.headChunk.minTime, |
|
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to). |
|
Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))), |
|
}) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// LabelValueFor returns label value for the given label name in the series referred to by ID. |
|
func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error) { |
|
memSeries := h.head.series.getByID(id) |
|
if memSeries == nil { |
|
return "", storage.ErrNotFound |
|
} |
|
|
|
value := memSeries.lset.Get(label) |
|
if value == "" { |
|
return "", storage.ErrNotFound |
|
} |
|
|
|
return value, nil |
|
} |
|
|
|
// LabelNamesFor returns all the label names for the series referred to by IDs. |
|
// The names returned are sorted. |
|
func (h *headIndexReader) LabelNamesFor(ids ...uint64) ([]string, error) { |
|
namesMap := make(map[string]struct{}) |
|
for _, id := range ids { |
|
memSeries := h.head.series.getByID(id) |
|
if memSeries == nil { |
|
return nil, storage.ErrNotFound |
|
} |
|
for _, lbl := range memSeries.lset { |
|
namesMap[lbl.Name] = struct{}{} |
|
} |
|
} |
|
names := make([]string, 0, len(namesMap)) |
|
for name := range namesMap { |
|
names = append(names, name) |
|
} |
|
sort.Strings(names) |
|
return names, nil |
|
} |
|
|
|
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { |
|
// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create |
|
// a new series on every sample inserted via Add(), which causes allocations |
|
// and makes our series IDs rather random and harder to compress in postings. |
|
s := h.series.getByHash(hash, lset) |
|
if s != nil { |
|
return s, false, nil |
|
} |
|
|
|
// Optimistically assume that we are the first one to create the series. |
|
id := h.lastSeriesID.Inc() |
|
|
|
return h.getOrCreateWithID(id, hash, lset) |
|
} |
|
|
|
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) { |
|
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { |
|
return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool) |
|
}) |
|
if err != nil { |
|
return nil, false, err |
|
} |
|
if !created { |
|
return s, false, nil |
|
} |
|
|
|
h.metrics.seriesCreated.Inc() |
|
h.numSeries.Inc() |
|
|
|
h.symMtx.Lock() |
|
defer h.symMtx.Unlock() |
|
|
|
for _, l := range lset { |
|
h.symbols[l.Name] = struct{}{} |
|
h.symbols[l.Value] = struct{}{} |
|
} |
|
|
|
h.postings.Add(id, lset) |
|
return s, true, nil |
|
} |
|
|
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built |
|
// on top of a regular hashmap and holds a slice of series to resolve hash collisions. |
|
// Its methods require the hash to be submitted with it to avoid re-computations throughout |
|
// the code. |
|
type seriesHashmap map[uint64][]*memSeries |
|
|
|
func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { |
|
for _, s := range m[hash] { |
|
if labels.Equal(s.lset, lset) { |
|
return s |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (m seriesHashmap) set(hash uint64, s *memSeries) { |
|
l := m[hash] |
|
for i, prev := range l { |
|
if labels.Equal(prev.lset, s.lset) { |
|
l[i] = s |
|
return |
|
} |
|
} |
|
m[hash] = append(l, s) |
|
} |
|
|
|
func (m seriesHashmap) del(hash uint64, lset labels.Labels) { |
|
var rem []*memSeries |
|
for _, s := range m[hash] { |
|
if !labels.Equal(s.lset, lset) { |
|
rem = append(rem, s) |
|
} |
|
} |
|
if len(rem) == 0 { |
|
delete(m, hash) |
|
} else { |
|
m[hash] = rem |
|
} |
|
} |
|
|
|
const ( |
|
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map. |
|
DefaultStripeSize = 1 << 14 |
|
) |
|
|
|
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. |
|
// The locks are padded to not be on the same cache line. Filling the padded space |
|
// with the maps was profiled to be slower – likely due to the additional pointer |
|
// dereferences. |
|
type stripeSeries struct { |
|
size int |
|
series []map[uint64]*memSeries |
|
hashes []seriesHashmap |
|
locks []stripeLock |
|
seriesLifecycleCallback SeriesLifecycleCallback |
|
} |
|
|
|
type stripeLock struct { |
|
sync.RWMutex |
|
// Padding to avoid multiple locks being on the same cache line. |
|
_ [40]byte |
|
} |
|
|
|
func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *stripeSeries { |
|
s := &stripeSeries{ |
|
size: stripeSize, |
|
series: make([]map[uint64]*memSeries, stripeSize), |
|
hashes: make([]seriesHashmap, stripeSize), |
|
locks: make([]stripeLock, stripeSize), |
|
seriesLifecycleCallback: seriesCallback, |
|
} |
|
|
|
for i := range s.series { |
|
s.series[i] = map[uint64]*memSeries{} |
|
} |
|
for i := range s.hashes { |
|
s.hashes[i] = seriesHashmap{} |
|
} |
|
return s |
|
} |
|
|
|
// gc garbage collects old chunks that are strictly before mint and removes |
|
// series entirely that have no chunks left. |
|
func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int, int64) { |
|
var ( |
|
deleted = map[uint64]struct{}{} |
|
deletedForCallback = []labels.Labels{} |
|
rmChunks = 0 |
|
actualMint int64 = math.MaxInt64 |
|
) |
|
// Run through all series and truncate old chunks. Mark those with no |
|
// chunks left as deleted and store their ID. |
|
for i := 0; i < s.size; i++ { |
|
s.locks[i].Lock() |
|
|
|
for hash, all := range s.hashes[i] { |
|
for _, series := range all { |
|
series.Lock() |
|
rmChunks += series.truncateChunksBefore(mint) |
|
|
|
if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit { |
|
seriesMint := series.minTime() |
|
if seriesMint < actualMint { |
|
actualMint = seriesMint |
|
} |
|
series.Unlock() |
|
continue |
|
} |
|
|
|
// The series is gone entirely. We need to keep the series lock |
|
// and make sure we have acquired the stripe locks for hash and ID of the |
|
// series alike. |
|
// If we don't hold them all, there's a very small chance that a series receives |
|
// samples again while we are half-way into deleting it. |
|
j := int(series.ref) & (s.size - 1) |
|
|
|
if i != j { |
|
s.locks[j].Lock() |
|
} |
|
|
|
deleted[series.ref] = struct{}{} |
|
s.hashes[i].del(hash, series.lset) |
|
delete(s.series[j], series.ref) |
|
deletedForCallback = append(deletedForCallback, series.lset) |
|
|
|
if i != j { |
|
s.locks[j].Unlock() |
|
} |
|
|
|
series.Unlock() |
|
} |
|
} |
|
|
|
s.locks[i].Unlock() |
|
|
|
s.seriesLifecycleCallback.PostDeletion(deletedForCallback...) |
|
deletedForCallback = deletedForCallback[:0] |
|
} |
|
|
|
if actualMint == math.MaxInt64 { |
|
actualMint = mint |
|
} |
|
|
|
return deleted, rmChunks, actualMint |
|
} |
|
|
|
func (s *stripeSeries) getByID(id uint64) *memSeries { |
|
i := id & uint64(s.size-1) |
|
|
|
s.locks[i].RLock() |
|
series := s.series[i][id] |
|
s.locks[i].RUnlock() |
|
|
|
return series |
|
} |
|
|
|
func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { |
|
i := hash & uint64(s.size-1) |
|
|
|
s.locks[i].RLock() |
|
series := s.hashes[i].get(hash, lset) |
|
s.locks[i].RUnlock() |
|
|
|
return series |
|
} |
|
|
|
func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) { |
|
// PreCreation is called here to avoid calling it inside the lock. |
|
// It is not necessary to call it just before creating a series, |
|
// rather it gives a 'hint' whether to create a series or not. |
|
preCreationErr := s.seriesLifecycleCallback.PreCreation(lset) |
|
|
|
// Create the series, unless the PreCreation() callback as failed. |
|
// If failed, we'll not allow to create a new series anyway. |
|
var series *memSeries |
|
if preCreationErr == nil { |
|
series = createSeries() |
|
} |
|
|
|
i := hash & uint64(s.size-1) |
|
s.locks[i].Lock() |
|
|
|
if prev := s.hashes[i].get(hash, lset); prev != nil { |
|
s.locks[i].Unlock() |
|
return prev, false, nil |
|
} |
|
if preCreationErr == nil { |
|
s.hashes[i].set(hash, series) |
|
} |
|
s.locks[i].Unlock() |
|
|
|
if preCreationErr != nil { |
|
// The callback prevented creation of series. |
|
return nil, false, preCreationErr |
|
} |
|
// Setting the series in the s.hashes marks the creation of series |
|
// as any further calls to this methods would return that series. |
|
s.seriesLifecycleCallback.PostCreation(series.lset) |
|
|
|
i = series.ref & uint64(s.size-1) |
|
|
|
s.locks[i].Lock() |
|
s.series[i][series.ref] = series |
|
s.locks[i].Unlock() |
|
|
|
return series, true, nil |
|
} |
|
|
|
type sample struct { |
|
t int64 |
|
v float64 |
|
} |
|
|
|
func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} } |
|
func (s sample) T() int64 { return s.t } |
|
func (s sample) V() float64 { return s.v } |
|
|
|
// memSeries is the in-memory representation of a series. None of its methods |
|
// are goroutine safe and it is the caller's responsibility to lock it. |
|
type memSeries struct { |
|
sync.RWMutex |
|
|
|
ref uint64 |
|
lset labels.Labels |
|
mmappedChunks []*mmappedChunk |
|
headChunk *memChunk |
|
chunkRange int64 |
|
firstChunkID int |
|
|
|
nextAt int64 // Timestamp at which to cut the next chunk. |
|
sampleBuf [4]sample |
|
pendingCommit bool // Whether there are samples waiting to be committed to this series. |
|
|
|
app chunkenc.Appender // Current appender for the chunk. |
|
|
|
memChunkPool *sync.Pool |
|
|
|
txs *txRing |
|
} |
|
|
|
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries { |
|
s := &memSeries{ |
|
lset: lset, |
|
ref: id, |
|
chunkRange: chunkRange, |
|
nextAt: math.MinInt64, |
|
txs: newTxRing(4), |
|
memChunkPool: memChunkPool, |
|
} |
|
return s |
|
} |
|
|
|
func (s *memSeries) minTime() int64 { |
|
if len(s.mmappedChunks) > 0 { |
|
return s.mmappedChunks[0].minTime |
|
} |
|
if s.headChunk != nil { |
|
return s.headChunk.minTime |
|
} |
|
return math.MinInt64 |
|
} |
|
|
|
func (s *memSeries) maxTime() int64 { |
|
c := s.head() |
|
if c == nil { |
|
return math.MinInt64 |
|
} |
|
return c.maxTime |
|
} |
|
|
|
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { |
|
s.mmapCurrentHeadChunk(chunkDiskMapper) |
|
|
|
s.headChunk = &memChunk{ |
|
chunk: chunkenc.NewXORChunk(), |
|
minTime: mint, |
|
maxTime: math.MinInt64, |
|
} |
|
|
|
// Set upper bound on when the next chunk must be started. An earlier timestamp |
|
// may be chosen dynamically at a later point. |
|
s.nextAt = rangeForTimestamp(mint, s.chunkRange) |
|
|
|
app, err := s.headChunk.chunk.Appender() |
|
if err != nil { |
|
panic(err) |
|
} |
|
s.app = app |
|
return s.headChunk |
|
} |
|
|
|
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { |
|
if s.headChunk == nil { |
|
// There is no head chunk, so nothing to m-map here. |
|
return |
|
} |
|
|
|
chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) |
|
if err != nil { |
|
if err != chunks.ErrChunkDiskMapperClosed { |
|
panic(err) |
|
} |
|
} |
|
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ |
|
ref: chunkRef, |
|
numSamples: uint16(s.headChunk.chunk.NumSamples()), |
|
minTime: s.headChunk.minTime, |
|
maxTime: s.headChunk.maxTime, |
|
}) |
|
} |
|
|
|
// appendable checks whether the given sample is valid for appending to the series. |
|
func (s *memSeries) appendable(t int64, v float64) error { |
|
c := s.head() |
|
if c == nil { |
|
return nil |
|
} |
|
|
|
if t > c.maxTime { |
|
return nil |
|
} |
|
if t < c.maxTime { |
|
return storage.ErrOutOfOrderSample |
|
} |
|
// We are allowing exact duplicates as we can encounter them in valid cases |
|
// like federation and erroring out at that time would be extremely noisy. |
|
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { |
|
return storage.ErrDuplicateSampleForTimestamp |
|
} |
|
return nil |
|
} |
|
|
|
// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. |
|
// If garbageCollect is true, it means that the returned *memChunk |
|
// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. |
|
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { |
|
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are |
|
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. |
|
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix |
|
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk. |
|
ix := id - s.firstChunkID |
|
if ix < 0 || ix > len(s.mmappedChunks) { |
|
return nil, false, storage.ErrNotFound |
|
} |
|
if ix == len(s.mmappedChunks) { |
|
if s.headChunk == nil { |
|
return nil, false, errors.New("invalid head chunk") |
|
} |
|
return s.headChunk, false, nil |
|
} |
|
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) |
|
if err != nil { |
|
if _, ok := err.(*chunks.CorruptionErr); ok { |
|
panic(err) |
|
} |
|
return nil, false, err |
|
} |
|
mc := s.memChunkPool.Get().(*memChunk) |
|
mc.chunk = chk |
|
mc.minTime = s.mmappedChunks[ix].minTime |
|
mc.maxTime = s.mmappedChunks[ix].maxTime |
|
return mc, true, nil |
|
} |
|
|
|
func (s *memSeries) chunkID(pos int) int { |
|
return pos + s.firstChunkID |
|
} |
|
|
|
// truncateChunksBefore removes all chunks from the series that |
|
// have no timestamp at or after mint. |
|
// Chunk IDs remain unchanged. |
|
func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { |
|
if s.headChunk != nil && s.headChunk.maxTime < mint { |
|
// If head chunk is truncated, we can truncate all mmapped chunks. |
|
removed = 1 + len(s.mmappedChunks) |
|
s.firstChunkID += removed |
|
s.headChunk = nil |
|
s.mmappedChunks = nil |
|
return removed |
|
} |
|
if len(s.mmappedChunks) > 0 { |
|
for i, c := range s.mmappedChunks { |
|
if c.maxTime >= mint { |
|
break |
|
} |
|
removed = i + 1 |
|
} |
|
s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...) |
|
s.firstChunkID += removed |
|
} |
|
return removed |
|
} |
|
|
|
// append adds the sample (t, v) to the series. The caller also has to provide |
|
// the appendID for isolation. (The appendID can be zero, which results in no |
|
// isolation for this append.) |
|
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. |
|
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { |
|
// Based on Gorilla white papers this offers near-optimal compression ratio |
|
// so anything bigger that this has diminishing returns and increases |
|
// the time range within which we have to decompress all samples. |
|
const samplesPerChunk = 120 |
|
|
|
c := s.head() |
|
|
|
if c == nil { |
|
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { |
|
// Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. |
|
return false, false |
|
} |
|
// There is no chunk in this series yet, create the first chunk for the sample. |
|
c = s.cutNewHeadChunk(t, chunkDiskMapper) |
|
chunkCreated = true |
|
} |
|
numSamples := c.chunk.NumSamples() |
|
|
|
// Out of order sample. |
|
if c.maxTime >= t { |
|
return false, chunkCreated |
|
} |
|
// If we reach 25% of a chunk's desired sample count, predict an end time |
|
// for this chunk that will try to make samples equally distributed within |
|
// the remaining chunks in the current chunk range. |
|
// At latest it must happen at the timestamp set when the chunk was cut. |
|
if numSamples == samplesPerChunk/4 { |
|
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) |
|
} |
|
if t >= s.nextAt { |
|
c = s.cutNewHeadChunk(t, chunkDiskMapper) |
|
chunkCreated = true |
|
} |
|
s.app.Append(t, v) |
|
|
|
c.maxTime = t |
|
|
|
s.sampleBuf[0] = s.sampleBuf[1] |
|
s.sampleBuf[1] = s.sampleBuf[2] |
|
s.sampleBuf[2] = s.sampleBuf[3] |
|
s.sampleBuf[3] = sample{t: t, v: v} |
|
|
|
if appendID > 0 { |
|
s.txs.add(appendID) |
|
} |
|
|
|
return true, chunkCreated |
|
} |
|
|
|
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after |
|
// acquiring lock. |
|
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { |
|
s.txs.cleanupAppendIDsBelow(bound) |
|
} |
|
|
|
// computeChunkEndTime estimates the end timestamp based the beginning of a |
|
// chunk, its current timestamp and the upper bound up to which we insert data. |
|
// It assumes that the time range is 1/4 full. |
|
// Assuming that the samples will keep arriving at the same rate, it will make the |
|
// remaining n chunks within this chunk range (before max) equally sized. |
|
func computeChunkEndTime(start, cur, max int64) int64 { |
|
n := (max - start) / ((cur - start + 1) * 4) |
|
if n <= 1 { |
|
return max |
|
} |
|
return start + (max-start)/n |
|
} |
|
|
|
// iterator returns a chunk iterator. |
|
// It is unsafe to call this concurrently with s.append(...) without holding the series lock. |
|
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { |
|
c, garbageCollect, err := s.chunk(id, chunkDiskMapper) |
|
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a |
|
// series's chunk, which got then garbage collected before it got |
|
// accessed. We must ensure to not garbage collect as long as any |
|
// readers still hold a reference. |
|
if err != nil { |
|
return chunkenc.NewNopIterator() |
|
} |
|
defer func() { |
|
if garbageCollect { |
|
// Set this to nil so that Go GC can collect it after it has been used. |
|
// This should be done always at the end. |
|
c.chunk = nil |
|
s.memChunkPool.Put(c) |
|
} |
|
}() |
|
|
|
ix := id - s.firstChunkID |
|
|
|
numSamples := c.chunk.NumSamples() |
|
stopAfter := numSamples |
|
|
|
if isoState != nil { |
|
totalSamples := 0 // Total samples in this series. |
|
previousSamples := 0 // Samples before this chunk. |
|
|
|
for j, d := range s.mmappedChunks { |
|
totalSamples += int(d.numSamples) |
|
if j < ix { |
|
previousSamples += int(d.numSamples) |
|
} |
|
} |
|
|
|
if s.headChunk != nil { |
|
totalSamples += s.headChunk.chunk.NumSamples() |
|
} |
|
|
|
// Removing the extra transactionIDs that are relevant for samples that |
|
// come after this chunk, from the total transactionIDs. |
|
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) |
|
|
|
// Iterate over the appendIDs, find the first one that the isolation state says not |
|
// to return. |
|
it := s.txs.iterator() |
|
for index := 0; index < appendIDsToConsider; index++ { |
|
appendID := it.At() |
|
if appendID <= isoState.maxAppendID { // Easy check first. |
|
if _, ok := isoState.incompleteAppends[appendID]; !ok { |
|
it.Next() |
|
continue |
|
} |
|
} |
|
stopAfter = numSamples - (appendIDsToConsider - index) |
|
if stopAfter < 0 { |
|
stopAfter = 0 // Stopped in a previous chunk. |
|
} |
|
break |
|
} |
|
} |
|
|
|
if stopAfter == 0 { |
|
return chunkenc.NewNopIterator() |
|
} |
|
|
|
if id-s.firstChunkID < len(s.mmappedChunks) { |
|
if stopAfter == numSamples { |
|
return c.chunk.Iterator(it) |
|
} |
|
if msIter, ok := it.(*stopIterator); ok { |
|
msIter.Iterator = c.chunk.Iterator(msIter.Iterator) |
|
msIter.i = -1 |
|
msIter.stopAfter = stopAfter |
|
return msIter |
|
} |
|
return &stopIterator{ |
|
Iterator: c.chunk.Iterator(it), |
|
i: -1, |
|
stopAfter: stopAfter, |
|
} |
|
} |
|
// Serve the last 4 samples for the last chunk from the sample buffer |
|
// as their compressed bytes may be mutated by added samples. |
|
if msIter, ok := it.(*memSafeIterator); ok { |
|
msIter.Iterator = c.chunk.Iterator(msIter.Iterator) |
|
msIter.i = -1 |
|
msIter.total = numSamples |
|
msIter.stopAfter = stopAfter |
|
msIter.buf = s.sampleBuf |
|
return msIter |
|
} |
|
return &memSafeIterator{ |
|
stopIterator: stopIterator{ |
|
Iterator: c.chunk.Iterator(it), |
|
i: -1, |
|
stopAfter: stopAfter, |
|
}, |
|
total: numSamples, |
|
buf: s.sampleBuf, |
|
} |
|
} |
|
|
|
func (s *memSeries) head() *memChunk { |
|
return s.headChunk |
|
} |
|
|
|
type memChunk struct { |
|
chunk chunkenc.Chunk |
|
minTime, maxTime int64 |
|
} |
|
|
|
// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt]. |
|
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { |
|
return mc.minTime <= maxt && mint <= mc.maxTime |
|
} |
|
|
|
type stopIterator struct { |
|
chunkenc.Iterator |
|
|
|
i, stopAfter int |
|
} |
|
|
|
func (it *stopIterator) Next() bool { |
|
if it.i+1 >= it.stopAfter { |
|
return false |
|
} |
|
it.i++ |
|
return it.Iterator.Next() |
|
} |
|
|
|
type memSafeIterator struct { |
|
stopIterator |
|
|
|
total int |
|
buf [4]sample |
|
} |
|
|
|
func (it *memSafeIterator) Seek(t int64) bool { |
|
if it.Err() != nil { |
|
return false |
|
} |
|
|
|
ts, _ := it.At() |
|
|
|
for t > ts || it.i == -1 { |
|
if !it.Next() { |
|
return false |
|
} |
|
ts, _ = it.At() |
|
} |
|
|
|
return true |
|
} |
|
|
|
func (it *memSafeIterator) Next() bool { |
|
if it.i+1 >= it.stopAfter { |
|
return false |
|
} |
|
it.i++ |
|
if it.total-it.i > 4 { |
|
return it.Iterator.Next() |
|
} |
|
return true |
|
} |
|
|
|
func (it *memSafeIterator) At() (int64, float64) { |
|
if it.total-it.i > 4 { |
|
return it.Iterator.At() |
|
} |
|
s := it.buf[4-(it.total-it.i)] |
|
return s.t, s.v |
|
} |
|
|
|
type mmappedChunk struct { |
|
ref uint64 |
|
numSamples uint16 |
|
minTime, maxTime int64 |
|
} |
|
|
|
// Returns true if the chunk overlaps [mint, maxt]. |
|
func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool { |
|
return mc.minTime <= maxt && mint <= mc.maxTime |
|
} |
|
|
|
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. |
|
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB. |
|
// All the callbacks should be safe to be called concurrently. |
|
// It is up to the user to implement soft or hard consistency by making the callbacks |
|
// atomic or non-atomic. Atomic callbacks can cause degradation performance. |
|
type SeriesLifecycleCallback interface { |
|
// PreCreation is called before creating a series to indicate if the series can be created. |
|
// A non nil error means the series should not be created. |
|
PreCreation(labels.Labels) error |
|
// PostCreation is called after creating a series to indicate a creation of series. |
|
PostCreation(labels.Labels) |
|
// PostDeletion is called after deletion of series. |
|
PostDeletion(...labels.Labels) |
|
} |
|
|
|
type noopSeriesLifecycleCallback struct{} |
|
|
|
func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil } |
|
func (noopSeriesLifecycleCallback) PostCreation(labels.Labels) {} |
|
func (noopSeriesLifecycleCallback) PostDeletion(...labels.Labels) {} |
|
|
|
func (h *Head) Size() int64 { |
|
var walSize int64 |
|
if h.wal != nil { |
|
walSize, _ = h.wal.Size() |
|
} |
|
cdmSize, _ := h.chunkDiskMapper.Size() |
|
return walSize + cdmSize |
|
} |
|
|
|
func (h *RangeHead) Size() int64 { |
|
return h.head.Size() |
|
} |
|
|
|
func (h *Head) startWALReplayStatus(startFrom, last int) { |
|
h.stats.WALReplayStatus.Lock() |
|
defer h.stats.WALReplayStatus.Unlock() |
|
|
|
h.stats.WALReplayStatus.Min = startFrom |
|
h.stats.WALReplayStatus.Max = last |
|
h.stats.WALReplayStatus.Current = startFrom |
|
} |
|
|
|
func (h *Head) updateWALReplayStatusRead(current int) { |
|
h.stats.WALReplayStatus.Lock() |
|
defer h.stats.WALReplayStatus.Unlock() |
|
|
|
h.stats.WALReplayStatus.Current = current |
|
}
|
|
|