Fix min/max time handling and concurrent crc32 usage

pull/5805/head
Fabian Reinartz 7 years ago
parent 970bffec8d
commit 0db4c227b7

@ -240,11 +240,16 @@ func (h *Head) Truncate(mint int64) error {
if mint%h.chunkRange != 0 { if mint%h.chunkRange != 0 {
return errors.Errorf("truncating at %d not aligned", mint) return errors.Errorf("truncating at %d not aligned", mint)
} }
if h.minTime >= mint { if h.MinTime() >= mint {
return nil return nil
} }
atomic.StoreInt64(&h.minTime, mint) atomic.StoreInt64(&h.minTime, mint)
// Ensure that max time is at least as high as min time.
for h.MaxTime() < mint {
atomic.CompareAndSwapInt64(&h.maxTime, h.MaxTime(), mint)
}
// This was an initial call to Truncate after loading blocks on startup. // This was an initial call to Truncate after loading blocks on startup.
// We haven't read back the WAL yet, so do not attempt to truncate it. // We haven't read back the WAL yet, so do not attempt to truncate it.
if initialize { if initialize {
@ -279,15 +284,15 @@ func (h *Head) Truncate(mint int64) error {
// Returns true if the initialization took an effect. // Returns true if the initialization took an effect.
func (h *Head) initTime(t int64) (initialized bool) { func (h *Head) initTime(t int64) (initialized bool) {
// In the init state, the head has a high timestamp of math.MinInt64. // In the init state, the head has a high timestamp of math.MinInt64.
if h.MaxTime() != math.MinInt64 {
return false
}
mint, _ := rangeForTimestamp(t, h.chunkRange) mint, _ := rangeForTimestamp(t, h.chunkRange)
if !atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t) { if !atomic.CompareAndSwapInt64(&h.minTime, math.MinInt64, mint) {
return false return false
} }
atomic.StoreInt64(&h.minTime, mint-h.chunkRange) // Ensure that max time is initialized to at least the min time we just set.
// Concurrent appenders may already have set it to a higher value.
atomic.CompareAndSwapInt64(&h.maxTime, math.MinInt64, t)
return true return true
} }
@ -335,7 +340,7 @@ func (h *Head) Appender() Appender {
// The head cache might not have a starting point yet. The init appender // The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base. // picks up the first appended timestamp as the base.
if h.MaxTime() == math.MinInt64 { if h.MinTime() == math.MinInt64 {
return &initAppender{head: h} return &initAppender{head: h}
} }
return h.appender() return h.appender()

@ -308,9 +308,11 @@ func (w *SegmentWAL) Truncate(mint int64, p Postings) error {
if err != nil { if err != nil {
return errors.Wrap(err, "create compaction segment") return errors.Wrap(err, "create compaction segment")
} }
csf := newSegmentFile(f) var (
csf = newSegmentFile(f)
activeSeries := []RefSeries{} crc32 = newCRC32()
activeSeries = []RefSeries{}
)
Loop: Loop:
for r.next() { for r.next() {
@ -337,7 +339,7 @@ Loop:
buf := w.getBuffer() buf := w.getBuffer()
flag = w.encodeSeries(buf, activeSeries) flag = w.encodeSeries(buf, activeSeries)
_, err = w.writeTo(csf, WALEntrySeries, flag, buf.get()) _, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.get())
w.putBuffer(buf) w.putBuffer(buf)
if err != nil { if err != nil {
@ -355,20 +357,17 @@ Loop:
if err := csf.Truncate(off); err != nil { if err := csf.Truncate(off); err != nil {
return err return err
} }
if err := csf.Close(); err != nil { csf.Sync()
return errors.Wrap(err, "close tmp file") csf.Close()
}
if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { if err := renameFile(csf.Name(), candidates[0].Name()); err != nil {
return err return err
} }
for _, f := range candidates[1:] { for _, f := range candidates[1:] {
if err := f.Close(); err != nil {
return errors.Wrap(err, "close obsolete WAL segment file")
}
if err := os.RemoveAll(f.Name()); err != nil { if err := os.RemoveAll(f.Name()); err != nil {
return errors.Wrap(err, "delete WAL segment file") return errors.Wrap(err, "delete WAL segment file")
} }
f.Close()
} }
if err := w.dirFile.Sync(); err != nil { if err := w.dirFile.Sync(); err != nil {
return err return err
@ -381,9 +380,7 @@ Loop:
return err return err
} }
// We don't need it to be open. // We don't need it to be open.
if err := csf.Close(); err != nil { csf.Close()
return err
}
w.mtx.Lock() w.mtx.Lock()
w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...) w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...)
@ -674,19 +671,19 @@ func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
return err return err
} }
} }
n, err := w.writeTo(w.cur, t, flag, buf) n, err := w.writeTo(w.cur, w.crc32, t, flag, buf)
w.curN += int64(n) w.curN += int64(n)
return err return err
} }
func (w *SegmentWAL) writeTo(wr io.Writer, t WALEntryType, flag uint8, buf []byte) (int, error) { func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) {
if len(buf) == 0 { if len(buf) == 0 {
return 0, nil return 0, nil
} }
w.crc32.Reset() crc32.Reset()
wr = io.MultiWriter(w.crc32, wr) wr = io.MultiWriter(crc32, wr)
var b [6]byte var b [6]byte
b[0] = byte(t) b[0] = byte(t)
@ -702,7 +699,7 @@ func (w *SegmentWAL) writeTo(wr io.Writer, t WALEntryType, flag uint8, buf []byt
if err != nil { if err != nil {
return n1 + n2, err return n1 + n2, err
} }
n3, err := wr.Write(w.crc32.Sum(b[:0])) n3, err := wr.Write(crc32.Sum(b[:0]))
return n1 + n2 + n3, err return n1 + n2 + n3, err
} }

@ -122,8 +122,8 @@ func TestSegmentWAL_cut(t *testing.T) {
func TestSegmentWAL_Truncate(t *testing.T) { func TestSegmentWAL_Truncate(t *testing.T) {
const ( const (
numMetrics = 50 numMetrics = 20000
batch = 10 batch = 100
) )
series, err := readPrometheusLabels("testdata/20k.series", numMetrics) series, err := readPrometheusLabels("testdata/20k.series", numMetrics)
require.NoError(t, err) require.NoError(t, err)
@ -134,7 +134,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
w, err := OpenSegmentWAL(dir, nil, 0) w, err := OpenSegmentWAL(dir, nil, 0)
require.NoError(t, err) require.NoError(t, err)
w.segmentSize = 1000 w.segmentSize = 10000
for i := 0; i < numMetrics; i += batch { for i := 0; i < numMetrics; i += batch {
var rs []RefSeries var rs []RefSeries

Loading…
Cancel
Save