diff --git a/head.go b/head.go index 211cadf77..d9b4589ee 100644 --- a/head.go +++ b/head.go @@ -59,6 +59,10 @@ type headBlock struct { metamtx sync.RWMutex meta BlockMeta + + updatec chan struct{} + stopc chan struct{} + donec chan struct{} } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { @@ -107,6 +111,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, mapper: newPositionMapper(nil), meta: *meta, + updatec: make(chan struct{}, 1), + stopc: make(chan struct{}), + donec: make(chan struct{}), } r := wal.Reader() @@ -131,11 +138,25 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { return nil, errors.Wrap(err, "consume WAL") } - h.updateMapping() + go h.run() + h.updatec <- struct{}{} return h, nil } +func (h *headBlock) run() { + defer close(h.donec) + + for { + select { + case <-h.stopc: + return + case <-h.updatec: + h.updateMapping() + } + } +} + // inBounds returns true if the given timestamp is within the valid // time bounds of the block. func (h *headBlock) inBounds(t int64) bool { @@ -144,6 +165,9 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { + close(h.stopc) + <-h.donec + // Lock mutex and leave it locked so we panic if there's a bug causing // the block to be used afterwards. h.mtx.Lock() @@ -324,6 +348,11 @@ func (a *headAppender) createSeries() { a.mtx.Unlock() a.mtx.RLock() + + select { + case a.updatec <- struct{}{}: + default: + } } func (a *headAppender) Commit() error { @@ -530,6 +559,8 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { func (h *headBlock) updateMapping() { h.mtx.RLock() + // No need to rlock the mapper as this method is not run concurrently and + // the only one ever modifying the mapper. if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) { h.mtx.RUnlock() return @@ -544,7 +575,9 @@ func (h *headBlock) updateMapping() { return labels.Compare(series[i].lset, series[j].lset) < 0 }) + h.mapper.mtx.Lock() h.mapper.update(s) + h.mapper.mtx.Unlock() } // remapPostings changes the order of the postings from their ID to the ordering @@ -552,18 +585,28 @@ func (h *headBlock) updateMapping() { // Returned postings have no longer monotonic IDs and MUST NOT be used for regular // postings set operations, i.e. intersect and merge. func (h *headBlock) remapPostings(p Postings) Postings { - list, err := expandPostings(p) - if err != nil { - return errPostings{err: err} - } + // Expand the postings but only up until the point where the mapper + // covers existing metrics. + ep := make([]uint32, 0, 64) - h.mapper.mtx.Lock() - defer h.mapper.mtx.Unlock() + h.mapper.mtx.RLock() + defer h.mapper.mtx.RUnlock() + + max := uint32(h.mapper.Len()) + + for p.Next() { + if p.At() > max { + break + } + ep = append(ep, p.At()) + } + if err := p.Err(); err != nil { + return errPostings{err: errors.Wrap(err, "expand postings")} + } - h.updateMapping() - h.mapper.Sort(list) + h.mapper.Sort(ep) - return newListPostings(list) + return newListPostings(ep) } type memSeries struct {