diff --git a/compact.go b/compact.go index a3bc7d17a..17a4da041 100644 --- a/compact.go +++ b/compact.go @@ -457,7 +457,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexr := b.Index() - all, err := indexr.Postings("", "") + all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } diff --git a/head.go b/head.go index a5ce94e45..3ead78fc4 100644 --- a/head.go +++ b/head.go @@ -207,7 +207,7 @@ func (h *Head) ReadWAL() error { } ms := h.series.getByID(s.Ref) if ms == nil { - h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref) + h.logger.Log("msg", "unknown series reference in WAL", "ref", s.Ref, "ts", s.T, "mint", mint) continue } _, chunkCreated := ms.append(s.T, s.V) @@ -267,7 +267,7 @@ func (h *Head) Truncate(mint int64) error { start = time.Now() - p, err := h.indexRange(mint, math.MaxInt64).Postings("", "") + p, err := h.indexRange(mint, math.MaxInt64).Postings(allPostingsKey.Name, allPostingsKey.Value) if err != nil { return err } @@ -1038,8 +1038,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo return prev, false } s.hashes[i].set(hash, series) - - s.hashes[i][hash] = append(s.hashes[i][hash], series) s.locks[i].Unlock() i = series.ref & stripeMask diff --git a/postings.go b/postings.go index 97a29ab19..0e51b221b 100644 --- a/postings.go +++ b/postings.go @@ -45,7 +45,7 @@ func (p *memPostings) get(name, value string) Postings { return newListPostings(l) } -var allLabel = labels.Label{} +var allPostingsKey = labels.Label{} // add adds a document to the index. The caller has to ensure that no // term argument appears twice. @@ -53,13 +53,36 @@ func (p *memPostings) add(id uint64, lset labels.Labels) { p.mtx.Lock() for _, l := range lset { - p.m[l] = append(p.m[l], id) + p.addFor(id, l) } - p.m[allLabel] = append(p.m[allLabel], id) + p.addFor(id, allPostingsKey) p.mtx.Unlock() } +func (p *memPostings) addFor(id uint64, l labels.Label) { + list := append(p.m[l], id) + p.m[l] = list + + // There is no guarantee that no higher ID was inserted before as they may + // be generated independently before adding them to postings. + // We repair order violations on insert. The invariant is that the first n-1 + // items in the list are already sorted. + for i := len(list) - 1; i >= 1; i-- { + if list[i] >= list[i-1] { + break + } + list[i], list[i-1] = list[i-1], list[i] + } +} + +func expandPostings(p Postings) (res []uint64, err error) { + for p.Next() { + res = append(res, p.At()) + } + return res, p.Err() +} + // Postings provides iterative access over a postings list. type Postings interface { // Next advances the iterator and returns true if another value was found. diff --git a/postings_test.go b/postings_test.go index 5d726ca3a..48cd2b608 100644 --- a/postings_test.go +++ b/postings_test.go @@ -21,6 +21,15 @@ import ( "github.com/stretchr/testify/require" ) +func TestMemPostings_addFor(t *testing.T) { + p := newMemPostings() + p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} + + p.addFor(5, allPostingsKey) + + require.Equal(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey]) +} + type mockPostings struct { next func() bool seek func(uint64) bool @@ -33,13 +42,6 @@ func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) } func (m *mockPostings) Value() uint64 { return m.value() } func (m *mockPostings) Err() error { return m.err() } -func expandPostings(p Postings) (res []uint64, err error) { - for p.Next() { - res = append(res, p.At()) - } - return res, p.Err() -} - func TestIntersect(t *testing.T) { var cases = []struct { a, b []uint64