mirror of https://github.com/prometheus/prometheus
Fix wrong comparison in head block resorting
parent
55ee4b5b3b
commit
789e8224ff
|
@ -258,10 +258,6 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO(fabxc): find more transparent way of handling this.
|
|
||||||
if hb, ok := b.(*headBlock); ok {
|
|
||||||
all = hb.remapPostings(all)
|
|
||||||
}
|
|
||||||
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
|
s := newCompactionSeriesSet(b.Index(), b.Chunks(), all)
|
||||||
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
|
|
44
head.go
44
head.go
|
@ -186,12 +186,30 @@ func (h *headBlock) Querier(mint, maxt int64) Querier {
|
||||||
if h.closed {
|
if h.closed {
|
||||||
panic(fmt.Sprintf("block %s already closed", h.dir))
|
panic(fmt.Sprintf("block %s already closed", h.dir))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reference on the original slice to use for postings mapping.
|
||||||
|
series := h.series[:]
|
||||||
|
|
||||||
return &blockQuerier{
|
return &blockQuerier{
|
||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
index: h.Index(),
|
index: h.Index(),
|
||||||
chunks: h.Chunks(),
|
chunks: h.Chunks(),
|
||||||
postingsMapper: h.remapPostings,
|
postingsMapper: func(p Postings) Postings {
|
||||||
|
ep := make([]uint32, 0, 64)
|
||||||
|
|
||||||
|
for p.Next() {
|
||||||
|
ep = append(ep, p.At())
|
||||||
|
}
|
||||||
|
if err := p.Err(); err != nil {
|
||||||
|
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(ep, func(i, j int) bool {
|
||||||
|
return labels.Compare(series[ep[i]].lset, series[ep[j]].lset) < 0
|
||||||
|
})
|
||||||
|
return newListPostings(ep)
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -554,30 +572,6 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// remapPostings changes the order of the postings from their ID to the ordering
|
|
||||||
// of the series they reference.
|
|
||||||
// Returned postings have no longer monotonic IDs and MUST NOT be used for regular
|
|
||||||
// postings set operations, i.e. intersect and merge.
|
|
||||||
func (h *headBlock) remapPostings(p Postings) Postings {
|
|
||||||
ep := make([]uint32, 0, 64)
|
|
||||||
|
|
||||||
for p.Next() {
|
|
||||||
ep = append(ep, p.At())
|
|
||||||
}
|
|
||||||
if err := p.Err(); err != nil {
|
|
||||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(ep, func(i, j int) bool {
|
|
||||||
return labels.Compare(h.series[i].lset, h.series[j].lset) < 0
|
|
||||||
})
|
|
||||||
// TODO(fabxc): this is a race! have to either rlock head through entire
|
|
||||||
// query or make access to h.series atomic. Potentially reference sub Slice
|
|
||||||
// in querier?
|
|
||||||
|
|
||||||
return newListPostings(ep)
|
|
||||||
}
|
|
||||||
|
|
||||||
type memSeries struct {
|
type memSeries struct {
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue