mirror of https://github.com/prometheus/prometheus
Remove double-reference in chunk hashmap
parent
675f0886f0
commit
1e1a37b15b
6
db.go
6
db.go
|
@ -290,7 +290,10 @@ func (s *Shard) appendBatch(samples []hashedSample) error {
|
|||
// TODO(fabxc): distinguish samples between concurrent heads for
|
||||
// different time blocks. Those may occurr during transition to still
|
||||
// allow late samples to arrive for a previous block.
|
||||
err := s.head.appendBatch(samples, s.metrics.samplesAppended)
|
||||
err := s.head.appendBatch(samples)
|
||||
if err != nil {
|
||||
s.metrics.samplesAppended.Add(float64(len(samples)))
|
||||
}
|
||||
|
||||
// TODO(fabxc): randomize over time and use better scoring function.
|
||||
if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 {
|
||||
|
@ -401,6 +404,7 @@ func (s *Shard) persist() error {
|
|||
|
||||
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
|
||||
type chunkDesc struct {
|
||||
ref uint32
|
||||
lset labels.Labels
|
||||
chunk chunks.Chunk
|
||||
|
||||
|
|
33
head.go
33
head.go
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/fabxc/tsdb/chunks"
|
||||
"github.com/fabxc/tsdb/labels"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// HeadBlock handles reads and writes of time series data within a time window.
|
||||
|
@ -18,8 +17,8 @@ type HeadBlock struct {
|
|||
// is assigned the index as its ID.
|
||||
descs []*chunkDesc
|
||||
// hashes contains a collision map of label set hashes of chunks
|
||||
// to their position in the chunk desc slice.
|
||||
hashes map[uint64][]int
|
||||
// to their chunk descs.
|
||||
hashes map[uint64][]*chunkDesc
|
||||
|
||||
values map[string]stringset // label names to possible values
|
||||
postings *memPostings // postings lists for terms
|
||||
|
@ -38,7 +37,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
|
|||
|
||||
b := &HeadBlock{
|
||||
descs: []*chunkDesc{},
|
||||
hashes: map[uint64][]int{},
|
||||
hashes: map[uint64][]*chunkDesc{},
|
||||
values: map[string]stringset{},
|
||||
postings: &memPostings{m: make(map[term][]uint32)},
|
||||
wal: wal,
|
||||
|
@ -136,15 +135,15 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) {
|
|||
|
||||
// get retrieves the chunk with the hash and label set and creates
|
||||
// a new one if it doesn't exist yet.
|
||||
func (h *HeadBlock) get(hash uint64, lset labels.Labels) (*chunkDesc, uint32) {
|
||||
refs := h.hashes[hash]
|
||||
func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
|
||||
cds := h.hashes[hash]
|
||||
|
||||
for _, ref := range refs {
|
||||
if cd := h.descs[ref]; cd.lset.Equals(lset) {
|
||||
return cd, uint32(ref)
|
||||
for _, cd := range cds {
|
||||
if cd.lset.Equals(lset) {
|
||||
return cd
|
||||
}
|
||||
}
|
||||
return nil, 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
|
||||
|
@ -160,10 +159,10 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
|
|||
panic(err)
|
||||
}
|
||||
// Index the new chunk.
|
||||
ref := len(h.descs)
|
||||
cd.ref = uint32(len(h.descs))
|
||||
|
||||
h.descs = append(h.descs, cd)
|
||||
h.hashes[hash] = append(h.hashes[hash], ref)
|
||||
h.hashes[hash] = append(h.hashes[hash], cd)
|
||||
|
||||
// Add each label pair as a term to the inverted index.
|
||||
terms := make([]term, 0, len(lset))
|
||||
|
@ -178,7 +177,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
|
|||
}
|
||||
valset.set(l.Value)
|
||||
}
|
||||
h.postings.add(uint32(ref), terms...)
|
||||
h.postings.add(cd.ref, terms...)
|
||||
|
||||
// For the head block there's exactly one chunk per series.
|
||||
h.stats.ChunkCount++
|
||||
|
@ -187,7 +186,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
|
|||
return cd
|
||||
}
|
||||
|
||||
func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Counter) error {
|
||||
func (h *HeadBlock) appendBatch(samples []hashedSample) error {
|
||||
// Find head chunks for all samples and allocate new IDs/refs for
|
||||
// ones we haven't seen before.
|
||||
var (
|
||||
|
@ -199,11 +198,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun
|
|||
for i := range samples {
|
||||
s := &samples[i]
|
||||
|
||||
cd, ref := h.get(s.hash, s.labels)
|
||||
cd := h.get(s.hash, s.labels)
|
||||
if cd != nil {
|
||||
// TODO(fabxc): sample refs are only scoped within a block for
|
||||
// now and we ignore any previously set value
|
||||
s.ref = ref
|
||||
s.ref = cd.ref
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -226,6 +225,8 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun
|
|||
return err
|
||||
}
|
||||
|
||||
// After the samples were successfully written to the WAL, there may
|
||||
// be no further failures.
|
||||
for i, s := range newSeries {
|
||||
h.create(newHashes[i], s)
|
||||
}
|
||||
|
|
|
@ -15,7 +15,11 @@ type term struct {
|
|||
|
||||
// Postings returns an iterator over the postings list for s.
|
||||
func (p *memPostings) get(t term) Postings {
|
||||
return &listPostings{list: p.m[t], idx: -1}
|
||||
l := p.m[t]
|
||||
if l == nil {
|
||||
return emptyPostings
|
||||
}
|
||||
return &listPostings{list: l, idx: -1}
|
||||
}
|
||||
|
||||
// add adds a document to the index. The caller has to ensure that no
|
||||
|
|
Loading…
Reference in New Issue