diff --git a/db.go b/db.go index 01efc1e52..a2d440780 100644 --- a/db.go +++ b/db.go @@ -213,6 +213,13 @@ func (db *DB) compact(blocks []block) error { } tmpdir := blocks[0].dir() + ".tmp" + // TODO(fabxc): find a better place to do this transparently. + for _, b := range blocks { + if h, ok := b.(*HeadBlock); ok { + h.updateMapping() + } + } + if err := db.compactor.compact(tmpdir, blocks...); err != nil { return err } diff --git a/head.go b/head.go index ffe8541eb..0581bb34b 100644 --- a/head.go +++ b/head.go @@ -50,6 +50,7 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, + mapper: newPositionMapper(nil), } b.bstats.MinTime = math.MaxInt64 @@ -81,7 +82,7 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { return nil, err } - b.rewriteMapping() + b.updateMapping() return b, nil } @@ -134,23 +135,6 @@ func (h *HeadBlock) Postings(name, value string) (Postings, error) { return h.postings.get(term{name: name, value: value}), nil } -// remapPostings changes the order of the postings from their ID to the ordering -// of the series they reference. -// Returned postings have no longer monotonic IDs and MUST NOT be used for regular -// postings set operations, i.e. intersect and merge. -func (h *HeadBlock) remapPostings(p Postings) Postings { - list, err := expandPostings(p) - if err != nil { - return errPostings{err: err} - } - - slice.Sort(list, func(i, j int) bool { - return h.mapper.fw[list[i]] < h.mapper.fw[list[j]] - }) - - return newListPostings(list) -} - // Series returns the series for the given reference. func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { if int(ref) >= len(h.descs) { @@ -288,11 +272,6 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { for i, s := range newSeries { h.create(newHashes[i], s) } - // TODO(fabxc): just mark as dirty instead and trigger a remapping - // periodically and upon querying. - if len(newSeries) > 0 { - h.rewriteMapping() - } for _, s := range samples { cd := h.descs[s.ref] @@ -314,7 +293,14 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { return nil } -func (h *HeadBlock) rewriteMapping() { +func (h *HeadBlock) updateMapping() { + h.mapper.mtx.Lock() + defer h.mapper.mtx.Unlock() + + if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { + return + } + cds := make([]*chunkDesc, len(h.descs)) copy(cds, h.descs) @@ -322,31 +308,44 @@ func (h *HeadBlock) rewriteMapping() { return labels.Compare(cds[i].lset, cds[j].lset) < 0 }) - h.mapper = newPositionMapper(s) + h.mapper.update(s) +} + +// remapPostings changes the order of the postings from their ID to the ordering +// of the series they reference. +// Returned postings have no longer monotonic IDs and MUST NOT be used for regular +// postings set operations, i.e. intersect and merge. +func (h *HeadBlock) remapPostings(p Postings) Postings { + list, err := expandPostings(p) + if err != nil { + return errPostings{err: err} + } + + h.mapper.mtx.RLock() + defer h.mapper.mtx.RUnlock() + + h.mapper.Sort(list) + + slice.Sort(list, func(i, j int) bool { + return h.mapper.fw[list[i]] < h.mapper.fw[list[j]] + }) + + return newListPostings(list) } // positionMapper stores a position mapping from unsorted to // sorted indices of a sortable collection. type positionMapper struct { + mtx sync.RWMutex sortable sort.Interface iv, fw []int } func newPositionMapper(s sort.Interface) *positionMapper { - m := &positionMapper{ - sortable: s, - iv: make([]int, s.Len()), - fw: make([]int, s.Len()), - } - for i := range m.iv { - m.iv[i] = i - } - sort.Sort(m) - - for i, k := range m.iv { - m.fw[k] = i + m := &positionMapper{} + if s != nil { + m.update(s) } - return m } @@ -358,3 +357,25 @@ func (m *positionMapper) Swap(i, j int) { m.iv[i], m.iv[j] = m.iv[j], m.iv[i] } + +func (m *positionMapper) Sort(l []uint32) { + slice.Sort(l, func(i, j int) bool { + return m.fw[l[i]] < m.fw[l[j]] + }) +} + +func (m *positionMapper) update(s sort.Interface) { + m.sortable = s + + m.iv = make([]int, s.Len()) + m.fw = make([]int, s.Len()) + + for i := range m.iv { + m.iv[i] = i + } + sort.Sort(m) + + for i, k := range m.iv { + m.fw[k] = i + } +} diff --git a/querier.go b/querier.go index 0bcd336c5..3080a29d1 100644 --- a/querier.go +++ b/querier.go @@ -65,6 +65,7 @@ func (s *DB) Querier(mint, maxt int64) Querier { // TODO(fabxc): find nicer solution. if hb, ok := b.(*HeadBlock); ok { + hb.updateMapping() q.postingsMapper = hb.remapPostings } }