|
|
|
@ -297,32 +297,38 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
|
|
|
|
|
// After the samples were successfully written to the WAL, there may
|
|
|
|
|
// be no further failures.
|
|
|
|
|
if len(newSeries) > 0 {
|
|
|
|
|
// TODO(fabxc): re-check if we actually have to create a new series
|
|
|
|
|
// after acquiring the write lock.
|
|
|
|
|
// If concurrent appenders attempt to create the same series, there's
|
|
|
|
|
// a semantical race between switching locks.
|
|
|
|
|
newLabels = make([]labels.Labels, 0, len(newSeries))
|
|
|
|
|
|
|
|
|
|
h.mtx.RUnlock()
|
|
|
|
|
h.mtx.Lock()
|
|
|
|
|
|
|
|
|
|
base := len(h.series)
|
|
|
|
|
i := 0
|
|
|
|
|
|
|
|
|
|
for hash, ser := range newSeries {
|
|
|
|
|
h.create(hash, ser[0].labels)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.mtx.Unlock()
|
|
|
|
|
h.mtx.RLock()
|
|
|
|
|
|
|
|
|
|
newLabels = make([]labels.Labels, 0, len(newSeries))
|
|
|
|
|
lset := ser[0].labels
|
|
|
|
|
// We switched locks and have to re-validate that the series were not
|
|
|
|
|
// created by another goroutine in the meantime.
|
|
|
|
|
ms := h.get(hash, lset)
|
|
|
|
|
if ms != nil {
|
|
|
|
|
for _, s := range ser {
|
|
|
|
|
s.ref = ms.ref
|
|
|
|
|
}
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// Series is still new.
|
|
|
|
|
newLabels = append(newLabels, lset)
|
|
|
|
|
|
|
|
|
|
i := 0
|
|
|
|
|
for _, ser := range newSeries {
|
|
|
|
|
newLabels = append(newLabels, ser[0].labels)
|
|
|
|
|
h.create(hash, lset)
|
|
|
|
|
// Set sample references to the series we just created.
|
|
|
|
|
for _, s := range ser {
|
|
|
|
|
s.ref = uint32(base + i)
|
|
|
|
|
}
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
h.mtx.Unlock()
|
|
|
|
|
h.mtx.RLock()
|
|
|
|
|
}
|
|
|
|
|
// Write all new series and samples to the WAL and add it to the
|
|
|
|
|
// in-mem database on success.
|
|
|
|
|