mirror of https://github.com/prometheus/prometheus
Add ShardedPostings() support to TSDB (#10421)
This PR is a reference implementation of the proposal described in #10420. In addition to what described in #10420, in this PR I've introduced labels.StableHash(). The idea is to offer an hashing function which doesn't change over time, and that's used by query sharding in order to get a stable behaviour over time. The implementation of labels.StableHash() is the hashing function used by Prometheus before stringlabels, and what's used by Grafana Mimir for query sharding (because built before stringlabels was a thing). Follow up work As mentioned in #10420, if this PR is accepted I'm also open to upload another foundamental piece used by Grafana Mimir query sharding to accelerate the query execution: an optional, configurable and fast in-memory cache for the series hashes. Signed-off-by: Marco Pracucci <marco@pracucci.com>pull/13492/head
parent
19efd0a675
commit
501bc6419e
|
@ -0,0 +1,47 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !stringlabels
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// StableHash is a labels hashing implementation which is guaranteed to not change over time.
|
||||
// This function should be used whenever labels hashing backward compatibility must be guaranteed.
|
||||
func StableHash(ls Labels) uint64 {
|
||||
// Use xxhash.Sum64(b) for fast path as it's faster.
|
||||
b := make([]byte, 0, 1024)
|
||||
for i, v := range ls {
|
||||
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
|
||||
// If labels entry is 1KB+ do not allocate whole entry.
|
||||
h := xxhash.New()
|
||||
_, _ = h.Write(b)
|
||||
for _, v := range ls[i:] {
|
||||
_, _ = h.WriteString(v.Name)
|
||||
_, _ = h.Write(seps)
|
||||
_, _ = h.WriteString(v.Value)
|
||||
_, _ = h.Write(seps)
|
||||
}
|
||||
return h.Sum64()
|
||||
}
|
||||
|
||||
b = append(b, v.Name...)
|
||||
b = append(b, seps[0])
|
||||
b = append(b, v.Value...)
|
||||
b = append(b, seps[0])
|
||||
}
|
||||
return xxhash.Sum64(b)
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build stringlabels
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
"github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
// StableHash is a labels hashing implementation which is guaranteed to not change over time.
|
||||
// This function should be used whenever labels hashing backward compatibility must be guaranteed.
|
||||
func StableHash(ls Labels) uint64 {
|
||||
// Use xxhash.Sum64(b) for fast path as it's faster.
|
||||
b := make([]byte, 0, 1024)
|
||||
var h *xxhash.Digest
|
||||
for i := 0; i < len(ls.data); {
|
||||
var v Label
|
||||
v.Name, i = decodeString(ls.data, i)
|
||||
v.Value, i = decodeString(ls.data, i)
|
||||
if h == nil && len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
|
||||
// If labels entry is 1KB+, switch to Write API. Copy in the values up to this point.
|
||||
h = xxhash.New()
|
||||
_, _ = h.Write(b)
|
||||
}
|
||||
if h != nil {
|
||||
_, _ = h.WriteString(v.Name)
|
||||
_, _ = h.Write(seps)
|
||||
_, _ = h.WriteString(v.Value)
|
||||
_, _ = h.Write(seps)
|
||||
continue
|
||||
}
|
||||
|
||||
b = append(b, v.Name...)
|
||||
b = append(b, seps[0])
|
||||
b = append(b, v.Value...)
|
||||
b = append(b, seps[0])
|
||||
}
|
||||
if h != nil {
|
||||
return h.Sum64()
|
||||
}
|
||||
return xxhash.Sum64(b)
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestStableHash tests that StableHash is stable.
|
||||
// The hashes this test asserts should not be changed.
|
||||
func TestStableHash(t *testing.T) {
|
||||
for expectedHash, lbls := range map[uint64]Labels{
|
||||
0xef46db3751d8e999: EmptyLabels(),
|
||||
0x347c8ee7a9e29708: FromStrings("hello", "world"),
|
||||
0xcbab40540f26097d: FromStrings(MetricName, "metric", "label", "value"),
|
||||
} {
|
||||
require.Equal(t, expectedHash, StableHash(lbls))
|
||||
}
|
||||
}
|
|
@ -197,6 +197,20 @@ type SelectHints struct {
|
|||
By bool // Indicate whether it is without or by.
|
||||
Range int64 // Range vector selector range in milliseconds.
|
||||
|
||||
// ShardCount is the total number of shards that series should be split into
|
||||
// at query time. Then, only series in the ShardIndex shard will be returned
|
||||
// by the query.
|
||||
//
|
||||
// ShardCount equal to 0 means that sharding is disabled.
|
||||
ShardCount uint64
|
||||
|
||||
// ShardIndex is the series shard index to query. The index must be between 0 and ShardCount-1.
|
||||
// When ShardCount is set to a value > 0, then a query will only process series within the
|
||||
// ShardIndex's shard.
|
||||
//
|
||||
// Series are sharded by "labels stable hash" mod "ShardCount".
|
||||
ShardIndex uint64
|
||||
|
||||
// DisableTrimming allows to disable trimming of matching series chunks based on query Start and End time.
|
||||
// When disabled, the result may contain samples outside the queried time range but Select() performances
|
||||
// may be improved.
|
||||
|
|
|
@ -81,6 +81,11 @@ type IndexReader interface {
|
|||
// by the label set of the underlying series.
|
||||
SortedPostings(index.Postings) index.Postings
|
||||
|
||||
// ShardedPostings returns a postings list filtered by the provided shardIndex
|
||||
// out of shardCount. For a given posting, its shard MUST be computed hashing
|
||||
// the series labels mod shardCount, using a hash function which is consistent over time.
|
||||
ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
|
||||
|
||||
// Series populates the given builder and chunk metas for the series identified
|
||||
// by the reference.
|
||||
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
|
||||
|
@ -517,6 +522,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
|
|||
return r.ir.SortedPostings(p)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
return r.ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
if err := r.ir.Series(ref, builder, chks); err != nil {
|
||||
return fmt.Errorf("block: %s: %w", r.b.Meta().ULID, err)
|
||||
|
|
|
@ -84,6 +84,7 @@ func DefaultOptions() *Options {
|
|||
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
|
||||
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
|
||||
EnableOverlappingCompaction: true,
|
||||
EnableSharding: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,6 +187,9 @@ type Options struct {
|
|||
// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
|
||||
// For Prometheus, this will always be true.
|
||||
EnableOverlappingCompaction bool
|
||||
|
||||
// EnableSharding enables query sharding support in TSDB.
|
||||
EnableSharding bool
|
||||
}
|
||||
|
||||
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
|
||||
|
@ -875,6 +879,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
|||
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
|
||||
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
|
||||
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
|
||||
headOpts.EnableSharding = opts.EnableSharding
|
||||
if opts.WALReplayConcurrency > 0 {
|
||||
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
|
||||
}
|
||||
|
|
17
tsdb/head.go
17
tsdb/head.go
|
@ -176,6 +176,9 @@ type HeadOptions struct {
|
|||
// The default value is GOMAXPROCS.
|
||||
// If it is set to a negative value or zero, the default value is used.
|
||||
WALReplayConcurrency int
|
||||
|
||||
// EnableSharding enables ShardedPostings() support in the Head.
|
||||
EnableSharding bool
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -1663,7 +1666,12 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
|||
|
||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||
return newMemSeries(lset, id, h.opts.IsolationDisabled)
|
||||
shardHash := uint64(0)
|
||||
if h.opts.EnableSharding {
|
||||
shardHash = labels.StableHash(lset)
|
||||
}
|
||||
|
||||
return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
|
@ -2022,6 +2030,10 @@ type memSeries struct {
|
|||
lset labels.Labels
|
||||
meta *metadata.Metadata
|
||||
|
||||
// Series labels hash to use for sharding purposes. The value is always 0 when sharding has not
|
||||
// been explicitly enabled in TSDB.
|
||||
shardHash uint64
|
||||
|
||||
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
|
||||
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:
|
||||
//
|
||||
|
@ -2071,11 +2083,12 @@ type memSeriesOOOFields struct {
|
|||
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
|
||||
}
|
||||
|
||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
|
||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, isolationDisabled bool) *memSeries {
|
||||
s := &memSeries{
|
||||
lset: lset,
|
||||
ref: id,
|
||||
nextAt: math.MinInt64,
|
||||
shardHash: shardHash,
|
||||
}
|
||||
if !isolationDisabled {
|
||||
s.txs = newTxRing(0)
|
||||
|
|
|
@ -149,7 +149,35 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.NewListPostings(ep)
|
||||
}
|
||||
|
||||
// ShardedPostings implements IndexReader. This function returns an failing postings list if sharding
|
||||
// has not been enabled in the Head.
|
||||
func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
if !h.head.opts.EnableSharding {
|
||||
return index.ErrPostings(errors.New("sharding is disabled"))
|
||||
}
|
||||
|
||||
out := make([]storage.SeriesRef, 0, 128)
|
||||
|
||||
for p.Next() {
|
||||
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
|
||||
if s == nil {
|
||||
level.Debug(h.head.logger).Log("msg", "Looked up series not found")
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if s.shardHash%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, storage.SeriesRef(s.ref))
|
||||
}
|
||||
|
||||
return index.NewListPostings(out)
|
||||
}
|
||||
|
||||
// Series returns the series for the given reference.
|
||||
// Chunks are skipped if chks is nil.
|
||||
func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
|
||||
|
||||
|
@ -159,6 +187,10 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
}
|
||||
builder.Assign(s.lset)
|
||||
|
||||
if chks == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
|
|
|
@ -526,7 +526,7 @@ func TestMemSeries_chunk(t *testing.T) {
|
|||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, true)
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true)
|
||||
|
||||
if tc.setup != nil {
|
||||
tc.setup(t, series, chunkDiskMapper)
|
||||
|
|
|
@ -52,20 +52,30 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||
)
|
||||
|
||||
func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||
require.NoError(t, err)
|
||||
|
||||
// newTestHeadDefaultOptions returns the HeadOptions that should be used by default in unit tests.
|
||||
func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions {
|
||||
opts := DefaultHeadOptions()
|
||||
opts.ChunkRange = chunkRange
|
||||
opts.ChunkDirRoot = dir
|
||||
opts.EnableExemplarStorage = true
|
||||
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
||||
opts.EnableNativeHistograms.Store(true)
|
||||
if oooEnabled {
|
||||
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func newTestHead(t testing.TB, chunkRange int64, compressWAL wlog.CompressionType, oooEnabled bool) (*Head, *wlog.WL) {
|
||||
return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled))
|
||||
}
|
||||
|
||||
func newTestHeadWithOptions(t testing.TB, compressWAL wlog.CompressionType, opts *HeadOptions) (*Head, *wlog.WL) {
|
||||
dir := t.TempDir()
|
||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Override the chunks dir with the testing one.
|
||||
opts.ChunkDirRoot = dir
|
||||
|
||||
h, err := NewHead(nil, nil, wal, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
|
@ -342,7 +352,7 @@ func BenchmarkLoadWLs(b *testing.B) {
|
|||
}
|
||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
// There's only one head chunk because only a single sample is appended. mmapChunks()
|
||||
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
|
||||
|
@ -912,7 +922,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
|
@ -1053,7 +1063,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
|||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, true)
|
||||
series := newMemSeries(labels.EmptyLabels(), 1, 0, true)
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
|
@ -1631,7 +1641,7 @@ func TestMemSeries_append(t *testing.T) {
|
|||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
|
@ -1692,7 +1702,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
|||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
|
||||
histograms := tsdbutil.GenerateTestHistograms(4)
|
||||
histogramWithOneMoreBucket := histograms[3].Copy()
|
||||
|
@ -1754,7 +1764,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
|||
samplesPerChunk: samplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
|
||||
// At this slow rate, we will fill the chunk in two block durations.
|
||||
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
||||
|
@ -2900,6 +2910,71 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHeadShardedPostings(t *testing.T) {
|
||||
headOpts := newTestHeadDefaultOptions(1000, false)
|
||||
headOpts.EnableSharding = true
|
||||
head, _ := newTestHeadWithOptions(t, wlog.CompressionNone, headOpts)
|
||||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Append some series.
|
||||
app := head.Appender(ctx)
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := app.Append(0, labels.FromStrings("unique", fmt.Sprintf("value%d", i), "const", "1"), 100, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
ir := head.indexRange(0, 200)
|
||||
|
||||
// List all postings for a given label value. This is what we expect to get
|
||||
// in output from all shards.
|
||||
p, err := ir.Postings(ctx, "const", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
var expected []storage.SeriesRef
|
||||
for p.Next() {
|
||||
expected = append(expected, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
require.NotEmpty(t, expected)
|
||||
|
||||
// Query the same postings for each shard.
|
||||
const shardCount = uint64(4)
|
||||
actualShards := make(map[uint64][]storage.SeriesRef)
|
||||
actualPostings := make([]storage.SeriesRef, 0, len(expected))
|
||||
|
||||
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
|
||||
p, err = ir.Postings(ctx, "const", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
p = ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
|
||||
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
|
||||
actualPostings = append(actualPostings, ref)
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
}
|
||||
|
||||
// We expect the postings merged out of shards is the exact same of the non sharded ones.
|
||||
require.ElementsMatch(t, expected, actualPostings)
|
||||
|
||||
// We expect the series in each shard are the expected ones.
|
||||
for shardIndex, ids := range actualShards {
|
||||
for _, id := range ids {
|
||||
var lbls labels.ScratchBuilder
|
||||
|
||||
require.NoError(t, ir.Series(id, &lbls, nil))
|
||||
require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrReuseAppender(t *testing.T) {
|
||||
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||
defer func() {
|
||||
|
@ -3038,7 +3113,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
|||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
|
|
|
@ -1744,6 +1744,33 @@ func (r *Reader) SortedPostings(p Postings) Postings {
|
|||
return p
|
||||
}
|
||||
|
||||
// ShardedPostings returns a postings list filtered by the provided shardIndex out of shardCount.
|
||||
func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Postings {
|
||||
var (
|
||||
out = make([]storage.SeriesRef, 0, 128)
|
||||
bufLbls = labels.ScratchBuilder{}
|
||||
)
|
||||
|
||||
for p.Next() {
|
||||
id := p.At()
|
||||
|
||||
// Get the series labels (no chunks).
|
||||
err := r.Series(id, &bufLbls, nil)
|
||||
if err != nil {
|
||||
return ErrPostings(fmt.Errorf("series %d not found", id))
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if labels.StableHash(bufLbls.Labels())%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, id)
|
||||
}
|
||||
|
||||
return NewListPostings(out)
|
||||
}
|
||||
|
||||
// Size returns the size of an index file.
|
||||
func (r *Reader) Size() int64 {
|
||||
return int64(r.b.Len())
|
||||
|
@ -1864,9 +1891,12 @@ func (dec *Decoder) LabelValueFor(ctx context.Context, b []byte, label string) (
|
|||
|
||||
// Series decodes a series entry from the given byte slice into builder and chks.
|
||||
// Previous contents of builder can be overwritten - make sure you copy before retaining.
|
||||
// Skips reading chunks metadata if chks is nil.
|
||||
func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
builder.Reset()
|
||||
if chks != nil {
|
||||
*chks = (*chks)[:0]
|
||||
}
|
||||
|
||||
d := encoding.Decbuf{B: b}
|
||||
|
||||
|
@ -1892,6 +1922,11 @@ func (dec *Decoder) Series(b []byte, builder *labels.ScratchBuilder, chks *[]chu
|
|||
builder.Add(ln, lv)
|
||||
}
|
||||
|
||||
// Skip reading chunks metadata if chks is nil.
|
||||
if chks == nil {
|
||||
return d.Err()
|
||||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k = d.Uvarint()
|
||||
|
||||
|
|
|
@ -242,6 +242,58 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
}, labelIndices)
|
||||
|
||||
require.NoError(t, ir.Close())
|
||||
|
||||
t.Run("ShardedPostings()", func(t *testing.T) {
|
||||
ir, err := NewFileReader(fn)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, ir.Close())
|
||||
})
|
||||
|
||||
// List all postings for a given label value. This is what we expect to get
|
||||
// in output from all shards.
|
||||
p, err = ir.Postings(ctx, "a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
var expected []storage.SeriesRef
|
||||
for p.Next() {
|
||||
expected = append(expected, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
require.NotEmpty(t, expected)
|
||||
|
||||
// Query the same postings for each shard.
|
||||
const shardCount = uint64(4)
|
||||
actualShards := make(map[uint64][]storage.SeriesRef)
|
||||
actualPostings := make([]storage.SeriesRef, 0, len(expected))
|
||||
|
||||
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
|
||||
p, err = ir.Postings(ctx, "a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
p = ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
|
||||
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
|
||||
actualPostings = append(actualPostings, ref)
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
}
|
||||
|
||||
// We expect the postings merged out of shards is the exact same of the non sharded ones.
|
||||
require.ElementsMatch(t, expected, actualPostings)
|
||||
|
||||
// We expect the series in each shard are the expected ones.
|
||||
for shardIndex, ids := range actualShards {
|
||||
for _, id := range ids {
|
||||
var lbls labels.ScratchBuilder
|
||||
|
||||
require.NoError(t, ir.Series(id, &lbls, nil))
|
||||
require.Equal(t, shardIndex, labels.StableHash(lbls.Labels())%shardCount)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPostingsMany(t *testing.T) {
|
||||
|
@ -565,6 +617,55 @@ func TestSymbols(t *testing.T) {
|
|||
require.NoError(t, iter.Err())
|
||||
}
|
||||
|
||||
func BenchmarkReader_ShardedPostings(b *testing.B) {
|
||||
const (
|
||||
numSeries = 10000
|
||||
numShards = 16
|
||||
)
|
||||
|
||||
dir, err := os.MkdirTemp("", "benchmark_reader_sharded_postings")
|
||||
require.NoError(b, err)
|
||||
defer func() {
|
||||
require.NoError(b, os.RemoveAll(dir))
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Generate an index.
|
||||
fn := filepath.Join(dir, indexFilename)
|
||||
|
||||
iw, err := NewWriter(ctx, fn)
|
||||
require.NoError(b, err)
|
||||
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i)))
|
||||
}
|
||||
require.NoError(b, iw.AddSymbol("const"))
|
||||
require.NoError(b, iw.AddSymbol("unique"))
|
||||
|
||||
for i := 1; i <= numSeries; i++ {
|
||||
require.NoError(b, iw.AddSeries(storage.SeriesRef(i),
|
||||
labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i))))
|
||||
}
|
||||
|
||||
require.NoError(b, iw.Close())
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
// Create a reader to read back all postings from the index.
|
||||
ir, err := NewFileReader(fn)
|
||||
require.NoError(b, err)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
allPostings, err := ir.Postings(ctx, "const", fmt.Sprintf("%10d", 1))
|
||||
require.NoError(b, err)
|
||||
|
||||
ir.ShardedPostings(allPostings, uint64(n%numShards), numShards)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecoder_Postings_WrongInput(t *testing.T) {
|
||||
_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie"))
|
||||
require.Error(t, err)
|
||||
|
|
|
@ -440,6 +440,10 @@ func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.P
|
|||
return p
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
return ir.ch.oooIR.ShardedPostings(p, shardIndex, shardCount)
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
return ir.ch.oooIR.series(ref, builder, chks, 0, ir.ch.lastMmapRef)
|
||||
}
|
||||
|
|
|
@ -131,11 +131,15 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
|
|||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
sharded := hints != nil && hints.ShardCount > 0
|
||||
|
||||
p, err := PostingsForMatchers(ctx, q.index, ms...)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(err)
|
||||
}
|
||||
if sharded {
|
||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||
}
|
||||
if sortSeries {
|
||||
p = q.index.SortedPostings(p)
|
||||
}
|
||||
|
@ -171,6 +175,8 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
|
|||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
sharded := hints != nil && hints.ShardCount > 0
|
||||
|
||||
if hints != nil {
|
||||
mint = hints.Start
|
||||
maxt = hints.End
|
||||
|
@ -180,6 +186,9 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
|
|||
if err != nil {
|
||||
return storage.ErrChunkSeriesSet(err)
|
||||
}
|
||||
if sharded {
|
||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||
}
|
||||
if sortSeries {
|
||||
p = q.index.SortedPostings(p)
|
||||
}
|
||||
|
|
|
@ -2326,6 +2326,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.NewListPostings(ep)
|
||||
}
|
||||
|
||||
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
out := make([]storage.SeriesRef, 0, 128)
|
||||
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if s.l.Hash()%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, ref)
|
||||
}
|
||||
|
||||
return index.NewListPostings(out)
|
||||
}
|
||||
|
||||
func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
|
@ -3272,6 +3293,10 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.EmptyPostings()
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) ShardedPostings(ps index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
return ps
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue