mirror of https://github.com/prometheus/prometheus
Prevent reshard concurrent with calling stop (#5460)
* Prevent reshard concurrent with calling stop Signed-off-by: Vasily <v.sliouniaev@gmail.com>pull/5465/head
parent
559237cc4f
commit
5be9a1426f
|
@ -322,9 +322,12 @@ func (t *QueueManager) Stop() {
|
|||
defer level.Info(t.logger).Log("msg", "Remote storage stopped.")
|
||||
|
||||
close(t.quit)
|
||||
t.wg.Wait()
|
||||
// Wait for all QueueManager routines to end before stopping shards and WAL watcher. This
|
||||
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
|
||||
// causes a closed channel panic.
|
||||
t.shards.stop()
|
||||
t.watcher.Stop()
|
||||
t.wg.Wait()
|
||||
|
||||
// On shutdown, release the strings in the labels from the intern pool.
|
||||
t.seriesMtx.Lock()
|
||||
|
|
|
@ -233,6 +233,30 @@ func TestReshard(t *testing.T) {
|
|||
c.waitForExpectedSamples(t)
|
||||
}
|
||||
|
||||
func TestReshardRaceWithStop(t *testing.T) {
|
||||
c := NewTestStorageClient()
|
||||
var m *QueueManager
|
||||
h := sync.Mutex{}
|
||||
|
||||
h.Lock()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline)
|
||||
m.Start()
|
||||
h.Unlock()
|
||||
h.Lock()
|
||||
m.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 1; i < 100; i++ {
|
||||
h.Lock()
|
||||
m.reshardChan <- i
|
||||
h.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) {
|
||||
samples := make([]tsdb.RefSample, 0, n)
|
||||
series := make([]tsdb.RefSeries, 0, n)
|
||||
|
|
Loading…
Reference in New Issue