Fix races

pull/5805/head
Fabian Reinartz 2017-01-07 16:20:32 +01:00
parent 54f5027406
commit 6aa922c5a6
1 changed files with 17 additions and 30 deletions

43
head.go
View File

@ -34,7 +34,7 @@ type HeadBlock struct {
wal *WAL wal *WAL
bstats BlockStats bstats *BlockStats
} }
// OpenHeadBlock creates a new empty head block. // OpenHeadBlock creates a new empty head block.
@ -53,9 +53,10 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) {
wal: wal, wal: wal,
mapper: newPositionMapper(nil), mapper: newPositionMapper(nil),
} }
b.bstats = &BlockStats{
b.bstats.MinTime = math.MaxInt64 MinTime: math.MaxInt64,
b.bstats.MaxTime = math.MinInt64 MaxTime: math.MinInt64,
}
err = wal.ReadAll(&walHandler{ err = wal.ReadAll(&walHandler{
series: func(lset labels.Labels) { series: func(lset labels.Labels) {
@ -97,7 +98,7 @@ func (h *HeadBlock) dir() string { return h.d }
func (h *HeadBlock) persisted() bool { return false } func (h *HeadBlock) persisted() bool { return false }
func (h *HeadBlock) index() IndexReader { return h } func (h *HeadBlock) index() IndexReader { return h }
func (h *HeadBlock) series() SeriesReader { return h } func (h *HeadBlock) series() SeriesReader { return h }
func (h *HeadBlock) stats() BlockStats { return h.bstats } func (h *HeadBlock) stats() BlockStats { return *h.bstats }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
@ -111,18 +112,12 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
} }
func (h *HeadBlock) interval() (int64, int64) { func (h *HeadBlock) interval() (int64, int64) {
h.mtx.RLock()
defer h.mtx.RUnlock()
return h.bstats.MinTime, h.bstats.MaxTime return h.bstats.MinTime, h.bstats.MaxTime
} }
// Stats returns statisitics about the indexed data. // Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (BlockStats, error) { func (h *HeadBlock) Stats() (BlockStats, error) {
h.mtx.RLock() return *h.bstats, nil
defer h.mtx.RUnlock()
return h.bstats, nil
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
@ -227,8 +222,8 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
h.postings.add(cd.ref, term{}) h.postings.add(cd.ref, term{})
// For the head block there's exactly one chunk per series. // For the head block there's exactly one chunk per series.
h.bstats.ChunkCount++ atomic.AddUint32(&h.bstats.ChunkCount, 1)
h.bstats.SeriesCount++ atomic.AddUint32(&h.bstats.SeriesCount, 1)
return cd return cd
} }
@ -253,6 +248,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
uniqueHashes = map[uint64]uint32{} uniqueHashes = map[uint64]uint32{}
) )
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock()
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
@ -286,8 +282,6 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
newSamples = append(newSamples, s) newSamples = append(newSamples, s)
} }
h.mtx.RUnlock()
// Write all new series and samples to the WAL and add it to the // Write all new series and samples to the WAL and add it to the
// in-mem database on success. // in-mem database on success.
if err := h.wal.Log(newSeries, samples); err != nil { if err := h.wal.Log(newSeries, samples); err != nil {
@ -297,6 +291,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
// After the samples were successfully written to the WAL, there may // After the samples were successfully written to the WAL, there may
// be no further failures. // be no further failures.
if len(newSeries) > 0 { if len(newSeries) > 0 {
h.mtx.RUnlock()
h.mtx.Lock() h.mtx.Lock()
base := len(h.descs) base := len(h.descs)
@ -309,9 +304,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
} }
h.mtx.Unlock() h.mtx.Unlock()
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock()
} }
total := len(samples) total := len(samples)
@ -325,23 +318,17 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
} }
cd.append(s.t, s.v) cd.append(s.t, s.v)
if t := h.bstats.MaxTime; s.t > t { for t := h.bstats.MaxTime; s.t > t; t = h.bstats.MaxTime {
// h.bstats.MaxTime = s.t if atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) {
for !atomic.CompareAndSwapInt64(&h.bstats.MaxTime, t, s.t) {
if t = h.bstats.MaxTime; s.t <= t {
break break
} }
} }
} for t := h.bstats.MinTime; s.t < t; t = h.bstats.MinTime {
if t := h.bstats.MinTime; s.t < t { if atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) {
// h.bstats.MinTime = s.t
for !atomic.CompareAndSwapInt64(&h.bstats.MinTime, t, s.t) {
if t = h.bstats.MinTime; s.t >= t {
break break
} }
} }
} }
}
atomic.AddUint64(&h.bstats.SampleCount, uint64(total)) atomic.AddUint64(&h.bstats.SampleCount, uint64(total))