Prevent reshard concurrent with calling stop (#5460)

* Prevent reshard concurrent with calling stop

Signed-off-by: Vasily <v.sliouniaev@gmail.com>
pull/5464/head
Vasily Sliouniaev 6 years ago committed by Brian Brazil
parent 1a9cdbd024
commit ef19ede14a

@ -322,9 +322,12 @@ func (t *QueueManager) Stop() {
defer level.Info(t.logger).Log("msg", "Remote storage stopped.")
close(t.quit)
t.wg.Wait()
// Wait for all QueueManager routines to end before stopping shards and WAL watcher. This
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
t.shards.stop()
t.watcher.Stop()
t.wg.Wait()
// On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()

@ -233,6 +233,30 @@ func TestReshard(t *testing.T) {
c.waitForExpectedSamples(t)
}
func TestReshardRaceWithStop(t *testing.T) {
c := NewTestStorageClient()
var m *QueueManager
h := sync.Mutex{}
h.Lock()
go func() {
for {
m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
m.Start()
h.Unlock()
h.Lock()
m.Stop()
}
}()
for i := 1; i < 100; i++ {
h.Lock()
m.reshardChan <- i
h.Unlock()
}
}
func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) {
samples := make([]tsdb.RefSample, 0, n)
series := make([]tsdb.RefSeries, 0, n)

Loading…
Cancel
Save