From 2a825f6c283d022767d05325a923e8cce1f09634 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 22 Dec 2016 01:12:28 +0100 Subject: [PATCH] Consolidate mem index into HeadBlock --- db.go | 13 +++---- head.go | 84 ++++++++++++++++++++++++++++++----------- index.go => postings.go | 54 +------------------------- 3 files changed, 70 insertions(+), 81 deletions(-) rename index.go => postings.go (74%) diff --git a/db.go b/db.go index 823ccf1c0..16def55b1 100644 --- a/db.go +++ b/db.go @@ -227,13 +227,12 @@ func (s *Shard) appendBatch(samples []hashedSample) error { s.mtx.Lock() defer s.mtx.Unlock() - var merr MultiError - - for _, sm := range samples { - merr.Add(s.head.append(sm.hash, sm.labels, sm.t, sm.v)) - } + // TODO(fabxc): distinguish samples between concurrent heads for + // different time blocks. Those may occurr during transition to still + // allow late samples to arrive for a previous block. + err := s.head.appendBatch(samples) - // TODO(fabxc): randomize over time + // TODO(fabxc): randomize over time and use better scoring function. if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { select { case s.persistCh <- struct{}{}: @@ -246,7 +245,7 @@ func (s *Shard) appendBatch(samples []hashedSample) error { } } - return merr.Err() + return err } func intervalOverlap(amin, amax, bmin, bmax int64) bool { diff --git a/head.go b/head.go index dfe17b1e2..d5ad515db 100644 --- a/head.go +++ b/head.go @@ -12,9 +12,18 @@ import ( // HeadBlock handles reads and writes of time series data within a time window. type HeadBlock struct { - mtx sync.RWMutex - descs map[uint64][]*chunkDesc // labels hash to possible chunks descs - index *memIndex + mtx sync.RWMutex + + // descs holds all chunk descs for the head block. Each chunk implicitly + // is assigned the index as its ID. + descs []*chunkDesc + // hashes contains a collision map of label set hashes of chunks + // to their position in the chunk desc slice. + hashes map[uint64][]int + + symbols []string // all seen strings + values map[string]stringset // label names to possible values + postings *memPostings // postings lists for terms stats BlockStats } @@ -22,8 +31,10 @@ type HeadBlock struct { // NewHeadBlock creates a new empty head block. func NewHeadBlock(baseTime int64) *HeadBlock { b := &HeadBlock{ - descs: make(map[uint64][]*chunkDesc, 2048), - index: newMemIndex(), + descs: []*chunkDesc{}, + hashes: map[uint64][]int{}, + values: map[string]stringset{}, + postings: &memPostings{m: make(map[term][]uint32)}, } b.stats.MinTime = baseTime @@ -37,11 +48,10 @@ func (h *HeadBlock) Querier(mint, maxt int64) Querier { // Chunk returns the chunk for the reference number. func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { - c, ok := h.index.forward[ref] - if !ok { + if int(ref) >= len(h.descs) { return nil, errNotFound } - return c.chunk, nil + return h.descs[int(ref)].chunk, nil } func (h *HeadBlock) interval() (int64, int64) { @@ -60,7 +70,7 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { } var sl []string - for s := range h.index.values[names[0]] { + for s := range h.values[names[0]] { sl = append(sl, s) } sort.Strings(sl) @@ -74,15 +84,16 @@ func (h *HeadBlock) LabelValues(names ...string) (StringTuples, error) { // Postings returns the postings list iterator for the label pair. func (h *HeadBlock) Postings(name, value string) (Postings, error) { - return h.index.Postings(term{name, value}), nil + return h.postings.get(term{name: name, value: value}), nil } // Series returns the series for the given reference. func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { - cd, ok := h.index.forward[ref] - if !ok { + if int(ref) >= len(h.descs) { return nil, errNotFound } + cd := h.descs[ref] + if !intervalOverlap(cd.firsTimestamp, cd.lastTimestamp, mint, maxt) { return nil, nil } @@ -101,9 +112,10 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { - cds := h.descs[hash] - for _, cd := range cds { - if cd.lset.Equals(lset) { + refs := h.hashes[hash] + + for _, ref := range refs { + if cd := h.descs[ref]; cd.lset.Equals(lset) { return cd } } @@ -112,16 +124,44 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { lset: lset, chunk: chunks.NewXORChunk(int(math.MaxInt64)), } - h.index.add(cd) + // Index the new chunk. + ref := len(h.descs) + + h.descs = append(h.descs, cd) + h.hashes[hash] = append(refs, ref) + + // Add each label pair as a term to the inverted index. + terms := make([]term, 0, len(lset)) + + for _, l := range lset { + terms = append(terms, term{name: l.Name, value: l.Value}) + + valset, ok := h.values[l.Name] + if !ok { + valset = stringset{} + h.values[l.Name] = valset + } + valset.set(l.Value) + } + h.postings.add(uint32(ref), terms...) // For the head block there's exactly one chunk per series. h.stats.ChunkCount++ h.stats.SeriesCount++ - h.descs[hash] = append(cds, cd) return cd } +func (h *HeadBlock) appendBatch(samples []hashedSample) error { + var merr MultiError + + for _, s := range samples { + merr.Add(h.append(s.hash, s.labels, s.t, s.v)) + } + + return merr.Err() +} + // append adds the sample to the headblock. func (h *HeadBlock) append(hash uint64, lset labels.Labels, ts int64, v float64) error { if err := h.get(hash, lset).append(ts, v); err != nil { @@ -153,8 +193,8 @@ func (h *HeadBlock) persist(p string) (int64, error) { defer sw.Close() defer iw.Close() - for ref, cd := range h.index.forward { - if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil { + for ref, cd := range h.descs { + if err := sw.WriteSeries(uint32(ref), cd.lset, []*chunkDesc{cd}); err != nil { return 0, err } } @@ -162,7 +202,7 @@ func (h *HeadBlock) persist(p string) (int64, error) { if err := iw.WriteStats(h.stats); err != nil { return 0, err } - for n, v := range h.index.values { + for n, v := range h.values { s := make([]string, 0, len(v)) for x := range v { s = append(s, x) @@ -173,8 +213,8 @@ func (h *HeadBlock) persist(p string) (int64, error) { } } - for t := range h.index.postings.m { - if err := iw.WritePostings(t.name, t.value, h.index.postings.get(t)); err != nil { + for t := range h.postings.m { + if err := iw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil { return 0, err } } diff --git a/index.go b/postings.go similarity index 74% rename from index.go rename to postings.go index e16ab3c25..fc4d24dfa 100644 --- a/index.go +++ b/postings.go @@ -5,64 +5,14 @@ import ( "strings" ) -type memIndex struct { - lastID uint32 - - forward map[uint32]*chunkDesc // chunk ID to chunk desc - values map[string]stringset // label names to possible values - postings *memPostings // postings lists for terms -} - -// newMemIndex returns a new in-memory index. -func newMemIndex() *memIndex { - return &memIndex{ - lastID: 0, - forward: make(map[uint32]*chunkDesc), - values: make(map[string]stringset), - postings: &memPostings{m: make(map[term][]uint32)}, - } -} - -func (ix *memIndex) numSeries() int { - return len(ix.forward) -} - -func (ix *memIndex) Postings(t term) Postings { - return ix.postings.get(t) +type memPostings struct { + m map[term][]uint32 } type term struct { name, value string } -func (ix *memIndex) add(chkd *chunkDesc) { - // Add each label pair as a term to the inverted index. - terms := make([]term, 0, len(chkd.lset)) - - for _, l := range chkd.lset { - terms = append(terms, term{name: l.Name, value: l.Value}) - - // Add to label name to values index. - valset, ok := ix.values[l.Name] - if !ok { - valset = stringset{} - ix.values[l.Name] = valset - } - valset.set(l.Value) - } - ix.lastID++ - id := ix.lastID - - ix.postings.add(id, terms...) - - // Store forward index for the returned ID. - ix.forward[id] = chkd -} - -type memPostings struct { - m map[term][]uint32 -} - // Postings returns an iterator over the postings list for s. func (p *memPostings) get(t term) Postings { return &listPostings{list: p.m[t], idx: -1}