From 1492031ef2e04e94bf1d50a7d46eb881aa2b0c0a Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Mon, 2 Oct 2023 16:24:25 +0200 Subject: [PATCH] Optimize ListPostings Next() (#12906) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Next() call of ListPostings() was updating two values, while we can just update the position. This is up to 30% faster for high number of Postings. goos: linux goarch: amd64 pkg: github.com/prometheus/prometheus/tsdb/index cpu: 11th Gen Intel(R) Core(TM) i7-11700K @ 3.60GHz │ old │ new │ │ sec/op │ sec/op vs base │ ListPostings/count=100-16 819.2n ± 0% 732.6n ± 0% -10.58% (p=0.000 n=20) ListPostings/count=1000-16 2.685µ ± 1% 2.017µ ± 0% -24.88% (p=0.000 n=20) ListPostings/count=10000-16 21.43µ ± 1% 14.81µ ± 0% -30.91% (p=0.000 n=20) ListPostings/count=100000-16 209.4µ ± 1% 143.3µ ± 0% -31.55% (p=0.000 n=20) ListPostings/count=1000000-16 2.086m ± 1% 1.436m ± 1% -31.18% (p=0.000 n=20) geomean 29.02µ 21.41µ -26.22% We're talking about microseconds here, but they just keep adding. Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 35 ++++----- tsdb/index/postings_test.go | 144 ++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 20 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index e5fa5fc54..5f24dbfe1 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -747,7 +747,7 @@ func (rp *removedPostings) Err() error { // ListPostings implements the Postings interface over a plain list. type ListPostings struct { list []storage.SeriesRef - cur storage.SeriesRef + pos int } func NewListPostings(list []storage.SeriesRef) Postings { @@ -759,39 +759,34 @@ func newListPostings(list ...storage.SeriesRef) *ListPostings { } func (it *ListPostings) At() storage.SeriesRef { - return it.cur + return it.list[it.pos-1] } func (it *ListPostings) Next() bool { - if len(it.list) > 0 { - it.cur = it.list[0] - it.list = it.list[1:] + if it.pos < len(it.list) { + it.pos++ return true } - it.cur = 0 return false } func (it *ListPostings) Seek(x storage.SeriesRef) bool { - // If the current value satisfies, then return. - if it.cur >= x { - return true + if it.pos == 0 { + it.pos++ } - if len(it.list) == 0 { + if it.pos > len(it.list) { return false } + // If the current value satisfies, then return. + if it.list[it.pos-1] >= x { + return true + } // Do binary search between current position and end. - i := sort.Search(len(it.list), func(i int) bool { - return it.list[i] >= x - }) - if i < len(it.list) { - it.cur = it.list[i] - it.list = it.list[i+1:] - return true - } - it.list = nil - return false + it.pos = sort.Search(len(it.list[it.pos-1:]), func(i int) bool { + return it.list[i+it.pos-1] >= x + }) + it.pos + return it.pos-1 < len(it.list) } func (it *ListPostings) Err() error { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index b2ed1064d..f0f3bb75a 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1118,3 +1118,147 @@ func TestPostingsWithIndexHeap(t *testing.T) { require.Equal(t, storage.SeriesRef(25), node.p.At()) }) } + +func TestListPostings(t *testing.T) { + t.Run("empty list", func(t *testing.T) { + p := NewListPostings(nil) + require.False(t, p.Next()) + require.False(t, p.Seek(10)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + + t.Run("one posting", func(t *testing.T) { + t.Run("next", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek less", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek equal", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek more", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + require.False(t, p.Seek(15)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek after next", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + require.True(t, p.Next()) + require.False(t, p.Seek(15)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + }) + + t.Run("multiple postings", func(t *testing.T) { + t.Run("next", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20}) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20}) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.True(t, p.Seek(20)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek lest than last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + require.True(t, p.Seek(45)) + require.Equal(t, storage.SeriesRef(50), p.At()) + require.False(t, p.Next()) + }) + t.Run("seek exactly last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + require.True(t, p.Seek(50)) + require.Equal(t, storage.SeriesRef(50), p.At()) + require.False(t, p.Next()) + }) + t.Run("seek more than last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + require.False(t, p.Seek(60)) + require.False(t, p.Next()) + }) + }) + + t.Run("seek", func(t *testing.T) { + for _, c := range []int{2, 8, 9, 10} { + t.Run(fmt.Sprintf("count=%d", c), func(t *testing.T) { + list := make([]storage.SeriesRef, c) + for i := 0; i < c; i++ { + list[i] = storage.SeriesRef(i * 10) + } + + t.Run("all one by one", func(t *testing.T) { + p := NewListPostings(list) + for i := 0; i < c; i++ { + require.True(t, p.Seek(storage.SeriesRef(i*10))) + require.Equal(t, storage.SeriesRef(i*10), p.At()) + } + require.False(t, p.Seek(storage.SeriesRef(c*10))) + }) + + t.Run("each one", func(t *testing.T) { + for _, ref := range list { + p := NewListPostings(list) + require.True(t, p.Seek(ref)) + require.Equal(t, ref, p.At()) + } + }) + }) + } + }) +} + +func BenchmarkListPostings(b *testing.B) { + const maxCount = 1e6 + input := make([]storage.SeriesRef, maxCount) + for i := 0; i < maxCount; i++ { + input[i] = storage.SeriesRef(i << 2) + } + + for _, count := range []int{100, 1e3, 10e3, 100e3, maxCount} { + b.Run(fmt.Sprintf("count=%d", count), func(b *testing.B) { + for i := 0; i < b.N; i++ { + p := NewListPostings(input[:count]) + var sum storage.SeriesRef + for p.Next() { + sum += p.At() + } + require.NotZero(b, sum) + } + }) + } +}