Avoid deadlock when processing duplicate series record (#9170) (#8)

* Avoid deadlock when processing duplicate series record

`processWALSamples()` needs to be able to send on its output channel
before it can read the input channel, so reads to allow this in case the
output channel is full.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* processWALSamples: update comment

Previous text seems to relate to an earlier implementation.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

Co-authored-by: Bryan Boreham <bjboreham@gmail.com>
owilliams/utf8-02-mimir
Marco Pracucci 3 years ago committed by GitHub
parent fbe211d3a8
commit 175efada22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -218,9 +218,17 @@ Outer:
// It is possible that some old sample is being processed in processWALSamples that // It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish // could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer. // processing all old samples after emptying the buffer.
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
inputs[idx] <- []record.RefSample{} inputs[idx] <- []record.RefSample{}
for len(inputs[idx]) != 0 { for len(inputs[idx]) != 0 {
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
} }
// Checking if the new m-mapped chunks overlap with the already existing ones. // Checking if the new m-mapped chunks overlap with the already existing ones.
@ -341,9 +349,9 @@ Outer:
return nil return nil
} }
// processWALSamples adds a partition of samples it receives to the head and passes // processWALSamples adds the samples it receives to the head and passes
// them on to other workers. // the buffer received to an output channel for reuse.
// Samples before the mint timestamp are discarded. // Samples before the minValidTime timestamp are discarded.
func (h *Head) processWALSamples( func (h *Head) processWALSamples(
minValidTime int64, minValidTime int64,
input <-chan []record.RefSample, output chan<- []record.RefSample, input <-chan []record.RefSample, output chan<- []record.RefSample,

Loading…
Cancel
Save