diff --git a/head.go b/head.go index 1cc654bd3..0fbf747a3 100644 --- a/head.go +++ b/head.go @@ -298,23 +298,25 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { // be no further failures. if len(newSeries) > 0 { newLabels = make([]labels.Labels, 0, len(newSeries)) + base0 := len(h.series) h.mtx.RUnlock() h.mtx.Lock() - base := len(h.series) + base1 := len(h.series) i := 0 for hash, ser := range newSeries { lset := ser[0].labels // We switched locks and have to re-validate that the series were not // created by another goroutine in the meantime. - ms := h.get(hash, lset) - if ms != nil { - for _, s := range ser { - s.ref = ms.ref + if base1 != base0 { + if ms := h.get(hash, lset); ms != nil { + for _, s := range ser { + s.ref = ms.ref + } + continue } - continue } // Series is still new. newLabels = append(newLabels, lset) @@ -322,7 +324,7 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { h.create(hash, lset) // Set sample references to the series we just created. for _, s := range ser { - s.ref = uint32(base + i) + s.ref = uint32(base1 + i) } i++ } @@ -343,9 +345,11 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) { ) for _, s := range samples { ser := h.series[s.ref] + ser.mtx.Lock() ok := ser.append(s.t, s.v) ser.mtx.Unlock() + if !ok { total-- continue