|
|
|
@ -185,13 +185,18 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
|
|
|
|
|
return h, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ReadWAL initializes the head by consuming the write ahead log.
|
|
|
|
|
func (h *Head) ReadWAL() error {
|
|
|
|
|
r := h.wal.Reader()
|
|
|
|
|
mint := h.MinTime()
|
|
|
|
|
|
|
|
|
|
seriesFunc := func(series []RefSeries) error {
|
|
|
|
|
for _, s := range series {
|
|
|
|
|
h.create(s.Labels.Hash(), s.Labels)
|
|
|
|
|
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
|
|
|
|
|
|
|
|
|
|
if h.lastSeriesID < s.Ref {
|
|
|
|
|
h.lastSeriesID = s.Ref
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -202,7 +207,8 @@ func (h *Head) ReadWAL() error {
|
|
|
|
|
}
|
|
|
|
|
ms := h.series.getByID(s.Ref)
|
|
|
|
|
if ms == nil {
|
|
|
|
|
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
|
|
|
|
|
h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
_, chunkCreated := ms.append(s.T, s.V)
|
|
|
|
|
if chunkCreated {
|
|
|
|
@ -210,7 +216,6 @@ func (h *Head) ReadWAL() error {
|
|
|
|
|
h.metrics.chunks.Inc()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
deletesFunc := func(stones []Stone) error {
|
|
|
|
@ -222,7 +227,6 @@ func (h *Head) ReadWAL() error {
|
|
|
|
|
h.tombstones.add(s.ref, itv)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -379,17 +383,12 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
|
|
|
|
if t < a.mint {
|
|
|
|
|
return 0, ErrOutOfBounds
|
|
|
|
|
}
|
|
|
|
|
hash := lset.Hash()
|
|
|
|
|
|
|
|
|
|
s := a.head.series.getByHash(hash, lset)
|
|
|
|
|
|
|
|
|
|
if s == nil {
|
|
|
|
|
s = a.head.create(hash, lset)
|
|
|
|
|
|
|
|
|
|
s, created := a.head.getOrCreate(lset.Hash(), lset)
|
|
|
|
|
if created {
|
|
|
|
|
a.series = append(a.series, RefSeries{
|
|
|
|
|
Ref: s.ref,
|
|
|
|
|
Labels: lset,
|
|
|
|
|
hash: hash,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return s.ref, a.AddFast(s.ref, t, v)
|
|
|
|
@ -839,20 +838,32 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) {
|
|
|
|
|
return res, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
|
|
|
|
h.metrics.series.Inc()
|
|
|
|
|
h.metrics.seriesCreated.Inc()
|
|
|
|
|
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
|
|
|
|
|
// Just using `getOrSet` below would be semantically sufficient, but we'd create
|
|
|
|
|
// a new series on every sample inserted via Add(), which causes allocations
|
|
|
|
|
// and makes our series IDs rather random and harder to compress in postings.
|
|
|
|
|
s := h.series.getByHash(hash, lset)
|
|
|
|
|
if s != nil {
|
|
|
|
|
return s, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Optimistically assume that we are the first one to create the series.
|
|
|
|
|
id := atomic.AddUint64(&h.lastSeriesID, 1)
|
|
|
|
|
|
|
|
|
|
return h.getOrCreateWithID(id, hash, lset)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool) {
|
|
|
|
|
s := newMemSeries(lset, id, h.chunkRange)
|
|
|
|
|
|
|
|
|
|
s, created := h.series.getOrSet(hash, s)
|
|
|
|
|
// Skip indexing if we didn't actually create the series.
|
|
|
|
|
if !created {
|
|
|
|
|
return s
|
|
|
|
|
return s, false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.metrics.series.Inc()
|
|
|
|
|
h.metrics.seriesCreated.Inc()
|
|
|
|
|
|
|
|
|
|
h.postings.add(id, lset)
|
|
|
|
|
|
|
|
|
|
h.symMtx.Lock()
|
|
|
|
@ -870,7 +881,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
|
|
|
|
|
h.symbols[l.Value] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return s
|
|
|
|
|
return s, true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
|
|
|
|
@ -1023,6 +1034,7 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
|
|
|
|
s.locks[i].Lock()
|
|
|
|
|
|
|
|
|
|
if prev := s.hashes[i].get(hash, series.lset); prev != nil {
|
|
|
|
|
s.locks[i].Unlock()
|
|
|
|
|
return prev, false
|
|
|
|
|
}
|
|
|
|
|
s.hashes[i].set(hash, series)
|
|
|
|
|