|
|
|
@ -53,7 +53,7 @@ const (
|
|
|
|
|
type SamplesCB func([]RefSample) error |
|
|
|
|
|
|
|
|
|
// SeriesCB is the callback after reading series.
|
|
|
|
|
type SeriesCB func([]labels.Labels) error |
|
|
|
|
type SeriesCB func([]RefSeries) error |
|
|
|
|
|
|
|
|
|
// DeletesCB is the callback after reading deletes.
|
|
|
|
|
type DeletesCB func([]Stone) error |
|
|
|
@ -63,7 +63,7 @@ type SegmentWAL struct {
|
|
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
|
|
|
dirFile *os.File |
|
|
|
|
files []*os.File |
|
|
|
|
files []*segmentFile |
|
|
|
|
|
|
|
|
|
logger log.Logger |
|
|
|
|
flushInterval time.Duration |
|
|
|
@ -73,6 +73,10 @@ type SegmentWAL struct {
|
|
|
|
|
cur *bufio.Writer |
|
|
|
|
curN int64 |
|
|
|
|
|
|
|
|
|
// The max time of samples committed last/being committed. Not global or current
|
|
|
|
|
// segment values.
|
|
|
|
|
maxt int64 |
|
|
|
|
|
|
|
|
|
stopc chan struct{} |
|
|
|
|
donec chan struct{} |
|
|
|
|
} |
|
|
|
@ -81,10 +85,10 @@ type SegmentWAL struct {
|
|
|
|
|
// It must be completely read before new entries are logged.
|
|
|
|
|
type WAL interface { |
|
|
|
|
Reader(mint int64) WALReader |
|
|
|
|
LogSeries([]labels.Labels) error |
|
|
|
|
LogSeries([]RefSeries) error |
|
|
|
|
LogSamples([]RefSample) error |
|
|
|
|
LogDeletes([]Stone) error |
|
|
|
|
Truncate(maxt int64) error |
|
|
|
|
Truncate(int64, Postings) error |
|
|
|
|
Close() error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -92,10 +96,10 @@ type NopWAL struct{}
|
|
|
|
|
|
|
|
|
|
func (NopWAL) Read(SeriesCB, SamplesCB, DeletesCB) error { return nil } |
|
|
|
|
func (w NopWAL) Reader(int64) WALReader { return w } |
|
|
|
|
func (NopWAL) LogSeries([]labels.Labels) error { return nil } |
|
|
|
|
func (NopWAL) LogSeries([]RefSeries) error { return nil } |
|
|
|
|
func (NopWAL) LogSamples([]RefSample) error { return nil } |
|
|
|
|
func (NopWAL) LogDeletes([]Stone) error { return nil } |
|
|
|
|
func (NopWAL) Truncate(int64) error { return nil } |
|
|
|
|
func (NopWAL) Truncate(int64, Postings) error { return nil } |
|
|
|
|
func (NopWAL) Close() error { return nil } |
|
|
|
|
|
|
|
|
|
// WALReader reads entries from a WAL.
|
|
|
|
@ -103,6 +107,12 @@ type WALReader interface {
|
|
|
|
|
Read(SeriesCB, SamplesCB, DeletesCB) error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RefSeries is the series labels with the series ID.
|
|
|
|
|
type RefSeries struct { |
|
|
|
|
Ref uint64 |
|
|
|
|
Labels labels.Labels |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
|
|
|
|
type RefSample struct { |
|
|
|
|
Ref uint64 |
|
|
|
@ -110,6 +120,15 @@ type RefSample struct {
|
|
|
|
|
V float64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type segmentFile struct { |
|
|
|
|
f *os.File |
|
|
|
|
maxt int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f segmentFile) Close() error { |
|
|
|
|
return f.f.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
|
|
|
|
) |
|
|
|
@ -163,17 +182,102 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
|
|
|
|
|
|
|
|
|
|
// Reader returns a new reader over the the write ahead log data.
|
|
|
|
|
// It must be completely consumed before writing to the WAL.
|
|
|
|
|
func (w *SegmentWAL) Reader(int64) WALReader { |
|
|
|
|
return newWALReader(w, w.logger) |
|
|
|
|
func (w *SegmentWAL) Reader(mint int64) WALReader { |
|
|
|
|
return newWALReader(w, mint, w.logger) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Log writes a batch of new series labels and samples to the log.
|
|
|
|
|
//func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error {
|
|
|
|
|
//return nil
|
|
|
|
|
//}
|
|
|
|
|
// Truncate deletes the values prior to mint and the series entries not in p.
|
|
|
|
|
func (w *SegmentWAL) Truncate(mint int64, p Postings) error { |
|
|
|
|
// TODO(gouthamve): Handle the deletes too.
|
|
|
|
|
delFiles := make([]*segmentFile, 0) |
|
|
|
|
|
|
|
|
|
// All files have to be traversed as there could be two segments for a block
|
|
|
|
|
// with first block having times (10000, 20000) and SECOND one having (0, 10000).
|
|
|
|
|
for _, f := range w.files { |
|
|
|
|
if f.maxt < mint { |
|
|
|
|
delFiles = append(delFiles, f) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(delFiles) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tempWAL := &SegmentWAL{ |
|
|
|
|
logger: w.logger, |
|
|
|
|
files: delFiles, |
|
|
|
|
} |
|
|
|
|
wr := newWALReader(tempWAL, 0, tempWAL.logger) |
|
|
|
|
|
|
|
|
|
// Create a new tmp file.
|
|
|
|
|
// TODO: Do it properly.
|
|
|
|
|
newF, err := os.Create(delFiles[0].f.Name() + ".tmp") |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "create tmp series dump file") |
|
|
|
|
} |
|
|
|
|
// Write header metadata for new file.
|
|
|
|
|
metab := make([]byte, 8) |
|
|
|
|
binary.BigEndian.PutUint32(metab[:4], WALMagic) |
|
|
|
|
metab[4] = WALFormatDefault |
|
|
|
|
if _, err := newF.Write(metab); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
WRLoop: |
|
|
|
|
for wr.next() { |
|
|
|
|
rt, flag, byt := wr.at() |
|
|
|
|
|
|
|
|
|
if rt != WALEntrySeries { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
series, err := wr.decodeSeries(flag, byt) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "decode samples while truncating") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
activeSeries := make([]RefSeries, 0, len(series)) |
|
|
|
|
for _, s := range series { |
|
|
|
|
if !p.Seek(uint32(s.Ref)) { |
|
|
|
|
break WRLoop |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if p.At() == uint32(s.Ref) { |
|
|
|
|
activeSeries = append(activeSeries, s) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(activeSeries) == 0 { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
buf := getWALBuffer() |
|
|
|
|
buf = encodeSeries(buf, activeSeries) |
|
|
|
|
_, err = newF.Write(buf) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "write series") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := newF.Close(); err != nil { |
|
|
|
|
return errors.Wrap(err, "close tmp file") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := renameFile(newF.Name(), w.files[0].f.Name()); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
delFiles = delFiles[1:] |
|
|
|
|
for _, f := range delFiles { |
|
|
|
|
if err := os.RemoveAll(f.f.Name()); err != nil { |
|
|
|
|
return errors.Wrap(err, "delete WAL segment file") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// TODO: sync parent directory.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// LogSeries writes a batch of new series labels to the log.
|
|
|
|
|
func (w *SegmentWAL) LogSeries(series []labels.Labels) error { |
|
|
|
|
// The series have to be ordered.
|
|
|
|
|
func (w *SegmentWAL) LogSeries(series []RefSeries) error { |
|
|
|
|
if err := w.encodeSeries(series); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -225,13 +329,14 @@ func (w *SegmentWAL) initSegments() error {
|
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
w.files = append(w.files, f) |
|
|
|
|
w.files = append(w.files, &segmentFile{f: f}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Consume and validate meta headers.
|
|
|
|
|
for _, f := range w.files { |
|
|
|
|
for _, sf := range w.files { |
|
|
|
|
metab := make([]byte, 8) |
|
|
|
|
|
|
|
|
|
f := sf.f |
|
|
|
|
if n, err := f.Read(metab); err != nil { |
|
|
|
|
return errors.Wrapf(err, "validate meta %q", f.Name()) |
|
|
|
|
} else if n != 8 { |
|
|
|
@ -293,7 +398,9 @@ func (w *SegmentWAL) cut() error {
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
w.files = append(w.files, f) |
|
|
|
|
w.files = append(w.files, &segmentFile{f: f}) |
|
|
|
|
|
|
|
|
|
// TODO(gouthamve): make the buffer size a constant.
|
|
|
|
|
w.cur = bufio.NewWriterSize(f, 4*1024*1024) |
|
|
|
|
w.curN = 8 |
|
|
|
|
|
|
|
|
@ -304,7 +411,7 @@ func (w *SegmentWAL) tail() *os.File {
|
|
|
|
|
if len(w.files) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return w.files[len(w.files)-1] |
|
|
|
|
return w.files[len(w.files)-1].f |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sync flushes the changes to disk.
|
|
|
|
@ -330,10 +437,6 @@ func (w *SegmentWAL) Sync() error {
|
|
|
|
|
return fileutil.Fdatasync(tail) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) Truncate(maxt int64) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) sync() error { |
|
|
|
|
if err := w.flush(); err != nil { |
|
|
|
|
return err |
|
|
|
@ -396,6 +499,7 @@ const (
|
|
|
|
|
// It should be a multiple of the minimum sector size so that WAL can safely
|
|
|
|
|
// distinguish between torn writes and ordinary data corruption.
|
|
|
|
|
walPageBytes = 16 * minSectorSize |
|
|
|
|
// TODO(gouthamve): What is this?
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { |
|
|
|
@ -404,6 +508,7 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
|
|
|
|
|
|
|
|
|
// Cut to the next segment if the entry exceeds the file size unless it would also
|
|
|
|
|
// exceed the size of a new segment.
|
|
|
|
|
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
|
|
|
|
|
var ( |
|
|
|
|
// 6-byte header + 4-byte CRC32 + buf.
|
|
|
|
|
sz = int64(6 + 4 + len(buf)) |
|
|
|
@ -439,6 +544,14 @@ func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
|
|
|
|
w.curN += sz |
|
|
|
|
|
|
|
|
|
putWALBuffer(buf) |
|
|
|
|
|
|
|
|
|
// set the file's maxt.
|
|
|
|
|
if len(w.files) > 0 { |
|
|
|
|
cf := w.files[len(w.files)-1] |
|
|
|
|
if cf.maxt < w.maxt { |
|
|
|
|
cf.maxt = w.maxt |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -463,16 +576,31 @@ func putWALBuffer(b []byte) {
|
|
|
|
|
walBuffers.Put(b) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { |
|
|
|
|
func (w *SegmentWAL) encodeSeries(series []RefSeries) error { |
|
|
|
|
if len(series) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
b := make([]byte, binary.MaxVarintLen32) |
|
|
|
|
buf := getWALBuffer() |
|
|
|
|
buf = encodeSeries(buf, series) |
|
|
|
|
|
|
|
|
|
return w.entry(WALEntrySeries, walSeriesSimple, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func encodeSeries(buf []byte, series []RefSeries) []byte { |
|
|
|
|
b := make([]byte, binary.MaxVarintLen64) |
|
|
|
|
// Store the base reference number of the first series.
|
|
|
|
|
// All series encode their ref as a delta to the first.
|
|
|
|
|
first := series[0] |
|
|
|
|
binary.BigEndian.PutUint64(b, first.Ref) |
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
|
|
|
|
|
for _, lset := range series { |
|
|
|
|
n := binary.PutUvarint(b, uint64(len(lset))) |
|
|
|
|
for _, s := range series { |
|
|
|
|
n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) |
|
|
|
|
buf = append(buf, b[:n]...) |
|
|
|
|
|
|
|
|
|
lset := s.Labels |
|
|
|
|
n = binary.PutUvarint(b, uint64(len(lset))) |
|
|
|
|
buf = append(buf, b[:n]...) |
|
|
|
|
|
|
|
|
|
for _, l := range lset { |
|
|
|
@ -486,7 +614,7 @@ func (w *SegmentWAL) encodeSeries(series []labels.Labels) error {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return w.entry(WALEntrySeries, walSeriesSimple, buf) |
|
|
|
|
return buf |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) encodeSamples(samples []RefSample) error { |
|
|
|
@ -508,7 +636,12 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
|
|
|
|
|
binary.BigEndian.PutUint64(b, uint64(first.T)) |
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
|
|
|
|
|
w.maxt = 0 |
|
|
|
|
for _, s := range samples { |
|
|
|
|
if w.maxt < s.T { |
|
|
|
|
w.maxt = s.T |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) |
|
|
|
|
buf = append(buf, b[:n]...) |
|
|
|
|
|
|
|
|
@ -543,6 +676,7 @@ func (w *SegmentWAL) encodeDeletes(stones []Stone) error {
|
|
|
|
|
type walReader struct { |
|
|
|
|
logger log.Logger |
|
|
|
|
|
|
|
|
|
mint int64 |
|
|
|
|
wal *SegmentWAL |
|
|
|
|
cur int |
|
|
|
|
buf []byte |
|
|
|
@ -555,12 +689,13 @@ type walReader struct {
|
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newWALReader(w *SegmentWAL, l log.Logger) *walReader { |
|
|
|
|
func newWALReader(w *SegmentWAL, mint int64, l log.Logger) *walReader { |
|
|
|
|
if l == nil { |
|
|
|
|
l = log.NewNopLogger() |
|
|
|
|
} |
|
|
|
|
return &walReader{ |
|
|
|
|
logger: l, |
|
|
|
|
mint: mint, |
|
|
|
|
wal: w, |
|
|
|
|
buf: make([]byte, 0, 128*4096), |
|
|
|
|
crc32: newCRC32(), |
|
|
|
@ -589,7 +724,24 @@ func (r *walReader) Read(seriesf SeriesCB, samplesf SamplesCB, deletesf DeletesC
|
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
samplesf(s) |
|
|
|
|
|
|
|
|
|
// Update the times for the wal segment file and select only valid samples.
|
|
|
|
|
cf := r.wal.files[r.cur] |
|
|
|
|
validSamples := make([]RefSample, 0, len(s)) |
|
|
|
|
|
|
|
|
|
for _, smpl := range s { |
|
|
|
|
if smpl.T < r.mint { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if cf.maxt < smpl.T { |
|
|
|
|
cf.maxt = smpl.T |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
validSamples = append(validSamples, smpl) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
samplesf(validSamples) |
|
|
|
|
case WALEntryDeletes: |
|
|
|
|
s, err := r.decodeDeletes(flag, b) |
|
|
|
|
if err != nil { |
|
|
|
@ -607,7 +759,7 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
|
|
|
|
|
if r.cur >= len(r.wal.files) { |
|
|
|
|
return 0, 0, nil, io.EOF |
|
|
|
|
} |
|
|
|
|
cf := r.wal.files[r.cur] |
|
|
|
|
cf := r.wal.files[r.cur].f |
|
|
|
|
|
|
|
|
|
et, flag, b, err := r.entry(cf) |
|
|
|
|
// If we reached the end of the reader, advance to the next one
|
|
|
|
@ -634,7 +786,7 @@ func (r *walReader) next() bool {
|
|
|
|
|
if r.cur >= len(r.wal.files) { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
cf := r.wal.files[r.cur] |
|
|
|
|
cf := r.wal.files[r.cur].f |
|
|
|
|
|
|
|
|
|
// Save position after last valid entry if we have to truncate the WAL.
|
|
|
|
|
lastOffset, err := cf.Seek(0, os.SEEK_CUR) |
|
|
|
@ -675,7 +827,7 @@ func (r *walReader) next() bool {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) current() *os.File { |
|
|
|
|
return r.wal.files[r.cur] |
|
|
|
|
return r.wal.files[r.cur].f |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// truncate the WAL after the last valid entry.
|
|
|
|
@ -684,7 +836,8 @@ func (r *walReader) truncate(lastOffset int64) error {
|
|
|
|
|
"err", r.err, "file", r.current().Name(), "pos", lastOffset) |
|
|
|
|
|
|
|
|
|
// Close and delete all files after the current one.
|
|
|
|
|
for _, f := range r.wal.files[r.cur+1:] { |
|
|
|
|
for _, sf := range r.wal.files[r.cur+1:] { |
|
|
|
|
f := sf.f |
|
|
|
|
if err := f.Close(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -754,9 +907,25 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
|
|
|
|
return etype, flag, buf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) { |
|
|
|
|
series := []labels.Labels{} |
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { |
|
|
|
|
series := []RefSeries{} |
|
|
|
|
if len(b) < 8 { |
|
|
|
|
return nil, errors.Wrap(errInvalidSize, "header length") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
baseRef := binary.BigEndian.Uint64(b) |
|
|
|
|
b = b[8:] |
|
|
|
|
|
|
|
|
|
for len(b) > 0 { |
|
|
|
|
var ser RefSeries |
|
|
|
|
// TODO: Check again.
|
|
|
|
|
dref, n := binary.Varint(b) |
|
|
|
|
if n < 1 { |
|
|
|
|
return nil, errors.Wrap(errInvalidSize, "series ref delta") |
|
|
|
|
} |
|
|
|
|
b = b[n:] |
|
|
|
|
ser.Ref = uint64(int64(baseRef) + dref) |
|
|
|
|
|
|
|
|
|
l, n := binary.Uvarint(b) |
|
|
|
|
if n < 1 { |
|
|
|
|
return nil, errors.Wrap(errInvalidSize, "number of labels") |
|
|
|
@ -779,8 +948,9 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]labels.Labels, error) {
|
|
|
|
|
lset[i].Value = string(b[n : n+int(vl)]) |
|
|
|
|
b = b[n+int(vl):] |
|
|
|
|
} |
|
|
|
|
ser.Labels = lset |
|
|
|
|
|
|
|
|
|
series = append(series, lset) |
|
|
|
|
series = append(series, ser) |
|
|
|
|
} |
|
|
|
|
return series, nil |
|
|
|
|
} |
|
|
|
|