|
|
|
@ -98,8 +98,8 @@ func TestSampleDelivery(t *testing.T) {
|
|
|
|
|
c.expectSamples(samples[:len(samples)/2]) |
|
|
|
|
|
|
|
|
|
m := NewQueueManager(QueueManagerConfig{ |
|
|
|
|
Client: c, |
|
|
|
|
Shards: 1, |
|
|
|
|
Client: c, |
|
|
|
|
MaxShards: 1, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// These should be received by the client.
|
|
|
|
@ -185,8 +185,10 @@ func (c *TestBlockingStorageClient) Name() string {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (t *QueueManager) queueLen() int { |
|
|
|
|
t.shardsMtx.Lock() |
|
|
|
|
defer t.shardsMtx.Unlock() |
|
|
|
|
queueLength := 0 |
|
|
|
|
for _, shard := range t.shards { |
|
|
|
|
for _, shard := range t.shards.queues { |
|
|
|
|
queueLength += len(shard) |
|
|
|
|
} |
|
|
|
|
return queueLength |
|
|
|
@ -197,7 +199,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|
|
|
|
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
|
|
|
|
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
|
|
|
|
// should be left on the queue.
|
|
|
|
|
n := defaultMaxSamplesPerSend*defaultShards + defaultMaxSamplesPerSend |
|
|
|
|
n := defaultMaxSamplesPerSend*1 + defaultMaxSamplesPerSend |
|
|
|
|
|
|
|
|
|
samples := make(model.Samples, 0, n) |
|
|
|
|
for i := 0; i < n; i++ { |
|
|
|
@ -214,6 +216,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|
|
|
|
m := NewQueueManager(QueueManagerConfig{ |
|
|
|
|
Client: c, |
|
|
|
|
QueueCapacity: n, |
|
|
|
|
MaxShards: 1, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
@ -250,7 +253,7 @@ func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
numCalls := c.NumCalls() |
|
|
|
|
if numCalls != uint64(defaultShards) { |
|
|
|
|
t.Errorf("Saw %d concurrent sends, expected %d", numCalls, defaultShards) |
|
|
|
|
if numCalls != uint64(1) { |
|
|
|
|
t.Errorf("Saw %d concurrent sends, expected 1", numCalls) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|