Make mapper updates asynchronous

pull/5805/head
Fabian Reinartz 8 years ago
parent 32c32013a6
commit e825a0b40c

@ -59,6 +59,10 @@ type headBlock struct {
metamtx sync.RWMutex
meta BlockMeta
updatec chan struct{}
stopc chan struct{}
donec chan struct{}
}
func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) {
@ -107,6 +111,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
postings: &memPostings{m: make(map[term][]uint32)},
mapper: newPositionMapper(nil),
meta: *meta,
updatec: make(chan struct{}, 1),
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
r := wal.Reader()
@ -131,11 +138,25 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
return nil, errors.Wrap(err, "consume WAL")
}
h.updateMapping()
go h.run()
h.updatec <- struct{}{}
return h, nil
}
func (h *headBlock) run() {
defer close(h.donec)
for {
select {
case <-h.stopc:
return
case <-h.updatec:
h.updateMapping()
}
}
}
// inBounds returns true if the given timestamp is within the valid
// time bounds of the block.
func (h *headBlock) inBounds(t int64) bool {
@ -144,6 +165,9 @@ func (h *headBlock) inBounds(t int64) bool {
// Close syncs all data and closes underlying resources of the head block.
func (h *headBlock) Close() error {
close(h.stopc)
<-h.donec
// Lock mutex and leave it locked so we panic if there's a bug causing
// the block to be used afterwards.
h.mtx.Lock()
@ -324,6 +348,11 @@ func (a *headAppender) createSeries() {
a.mtx.Unlock()
a.mtx.RLock()
select {
case a.updatec <- struct{}{}:
default:
}
}
func (a *headAppender) Commit() error {
@ -530,6 +559,8 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
func (h *headBlock) updateMapping() {
h.mtx.RLock()
// No need to rlock the mapper as this method is not run concurrently and
// the only one ever modifying the mapper.
if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) {
h.mtx.RUnlock()
return
@ -544,7 +575,9 @@ func (h *headBlock) updateMapping() {
return labels.Compare(series[i].lset, series[j].lset) < 0
})
h.mapper.mtx.Lock()
h.mapper.update(s)
h.mapper.mtx.Unlock()
}
// remapPostings changes the order of the postings from their ID to the ordering
@ -552,18 +585,28 @@ func (h *headBlock) updateMapping() {
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular
// postings set operations, i.e. intersect and merge.
func (h *headBlock) remapPostings(p Postings) Postings {
list, err := expandPostings(p)
if err != nil {
return errPostings{err: err}
}
// Expand the postings but only up until the point where the mapper
// covers existing metrics.
ep := make([]uint32, 0, 64)
h.mapper.mtx.Lock()
defer h.mapper.mtx.Unlock()
h.mapper.mtx.RLock()
defer h.mapper.mtx.RUnlock()
max := uint32(h.mapper.Len())
for p.Next() {
if p.At() > max {
break
}
ep = append(ep, p.At())
}
if err := p.Err(); err != nil {
return errPostings{err: errors.Wrap(err, "expand postings")}
}
h.updateMapping()
h.mapper.Sort(list)
h.mapper.Sort(ep)
return newListPostings(list)
return newListPostings(ep)
}
type memSeries struct {

Loading…
Cancel
Save