From 5be9a1426ffb12544355f26304818458465e546c Mon Sep 17 00:00:00 2001 From: Vasily Sliouniaev Date: Tue, 16 Apr 2019 11:25:19 +0100 Subject: [PATCH] Prevent reshard concurrent with calling stop (#5460) * Prevent reshard concurrent with calling stop Signed-off-by: Vasily --- storage/remote/queue_manager.go | 5 ++++- storage/remote/queue_manager_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e9a5dba87..9dd50ee60 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -322,9 +322,12 @@ func (t *QueueManager) Stop() { defer level.Info(t.logger).Log("msg", "Remote storage stopped.") close(t.quit) + t.wg.Wait() + // Wait for all QueueManager routines to end before stopping shards and WAL watcher. This + // is to ensure we don't end up executing a reshard and shards.stop() at the same time, which + // causes a closed channel panic. t.shards.stop() t.watcher.Stop() - t.wg.Wait() // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 48ecc5052..f709c452f 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -233,6 +233,30 @@ func TestReshard(t *testing.T) { c.waitForExpectedSamples(t) } +func TestReshardRaceWithStop(t *testing.T) { + c := NewTestStorageClient() + var m *QueueManager + h := sync.Mutex{} + + h.Lock() + + go func() { + for { + m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) + m.Start() + h.Unlock() + h.Lock() + m.Stop() + } + }() + + for i := 1; i < 100; i++ { + h.Lock() + m.reshardChan <- i + h.Unlock() + } +} + func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) { samples := make([]tsdb.RefSample, 0, n) series := make([]tsdb.RefSeries, 0, n)