Rw testability improvements (#6537)

* Change createTimeseries to take values for number of series and number
of samples per series.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Take num of samples to expect in expectSampleCount instead of array of
samples.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Add field to TestStorageClient to ignore samples sent waitgroup for
potential tests where we don't care about delivery of all samples.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix up tests a little bit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
pull/6874/head
Callum Styan 2020-02-25 11:10:57 -08:00 committed by GitHub
parent c4e74e241f
commit 1518083168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 20 deletions

View File

@ -47,7 +47,7 @@ func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
// batch timeout case. // batch timeout case.
n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 n := config.DefaultQueueConfig.MaxSamplesPerSend * 2
samples, series := createTimeseries(n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples[:len(samples)/2], series) c.expectSamples(samples[:len(samples)/2], series)
@ -70,15 +70,15 @@ func TestSampleDelivery(t *testing.T) {
defer m.Stop() defer m.Stop()
c.waitForExpectedSamples(t) c.waitForExpectedSamples(t)
m.Append(samples[len(samples)/2:])
c.expectSamples(samples[len(samples)/2:], series) c.expectSamples(samples[len(samples)/2:], series)
m.Append(samples[len(samples)/2:])
c.waitForExpectedSamples(t) c.waitForExpectedSamples(t)
} }
func TestSampleDeliveryTimeout(t *testing.T) { func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration // Let's send one less sample than batch size, and wait the timeout duration
n := 9 n := 9
samples, series := createTimeseries(n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestStorageClient()
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
@ -151,7 +151,8 @@ func TestShutdown(t *testing.T) {
metrics := newQueueManagerMetrics(nil) metrics := newQueueManagerMetrics(nil)
m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) m := NewQueueManager(nil, metrics, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline)
samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0) m.StoreSeries(series, 0)
m.Start() m.Start()
@ -202,8 +203,9 @@ func TestSeriesReset(t *testing.T) {
func TestReshard(t *testing.T) { func TestReshard(t *testing.T) {
size := 10 // Make bigger to find more races. size := 10 // Make bigger to find more races.
n := config.DefaultQueueConfig.Capacity * size nSeries := 6
samples, series := createTimeseries(n) nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
c := NewTestStorageClient() c := NewTestStorageClient()
c.expectSamples(samples, series) c.expectSamples(samples, series)
@ -335,16 +337,19 @@ func TestCalculateDesiredsShards(t *testing.T) {
} }
} }
func createTimeseries(n int) ([]record.RefSample, []record.RefSeries) { func createTimeseries(numSamples, numSeries int) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, n) samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, n) series := make([]record.RefSeries, 0, numSeries)
for i := 0; i < n; i++ { for i := 0; i < numSeries; i++ {
name := fmt.Sprintf("test_metric_%d", i) name := fmt.Sprintf("test_metric_%d", i)
samples = append(samples, record.RefSample{ for j := 0; j < numSamples; j++ {
Ref: uint64(i), samples = append(samples, record.RefSample{
T: int64(i), Ref: uint64(i),
V: float64(i), T: int64(j),
}) V: float64(i),
})
}
series = append(series, record.RefSeries{ series = append(series, record.RefSeries{
Ref: uint64(i), Ref: uint64(i),
Labels: labels.Labels{{Name: "__name__", Value: name}}, Labels: labels.Labels{{Name: "__name__", Value: name}},
@ -365,6 +370,7 @@ func getSeriesNameFromRef(r record.RefSeries) string {
type TestStorageClient struct { type TestStorageClient struct {
receivedSamples map[string][]prompb.Sample receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample
withWaitGroup bool
wg sync.WaitGroup wg sync.WaitGroup
mtx sync.Mutex mtx sync.Mutex
buf []byte buf []byte
@ -372,12 +378,16 @@ type TestStorageClient struct {
func NewTestStorageClient() *TestStorageClient { func NewTestStorageClient() *TestStorageClient {
return &TestStorageClient{ return &TestStorageClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{}, receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{}, expectedSamples: map[string][]prompb.Sample{},
} }
} }
func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) { func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record.RefSeries) {
if !c.withWaitGroup {
return
}
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
@ -395,6 +405,9 @@ func (c *TestStorageClient) expectSamples(ss []record.RefSample, series []record
} }
func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) {
if !c.withWaitGroup {
return
}
c.wg.Wait() c.wg.Wait()
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
@ -405,13 +418,19 @@ func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) {
} }
} }
func (c *TestStorageClient) expectSampleCount(ss []record.RefSample) { func (c *TestStorageClient) expectSampleCount(numSamples int) {
if !c.withWaitGroup {
return
}
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
c.wg.Add(len(ss)) c.wg.Add(numSamples)
} }
func (c *TestStorageClient) waitForExpectedSampleCount() { func (c *TestStorageClient) waitForExpectedSampleCount() {
if !c.withWaitGroup {
return
}
c.wg.Wait() c.wg.Wait()
} }
@ -447,7 +466,9 @@ func (c *TestStorageClient) Store(_ context.Context, req []byte) error {
c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample)
} }
} }
c.wg.Add(-count) if c.withWaitGroup {
c.wg.Add(-count)
}
return nil return nil
} }
@ -493,7 +514,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
// batch timeout case. // batch timeout case.
n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * 10
samples, series := createTimeseries(n) samples, series := createTimeseries(n, n)
c := NewTestStorageClient() c := NewTestStorageClient()
@ -515,7 +536,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c.expectSampleCount(samples) c.expectSampleCount(len(samples))
m.Append(samples) m.Append(samples)
c.waitForExpectedSampleCount() c.waitForExpectedSampleCount()
} }