Add headers and format flag for WAL segments

pull/5805/head
Fabian Reinartz 2017-02-14 00:24:53 -08:00
parent 79944a5912
commit eeb03e97e2
1 changed files with 36 additions and 5 deletions

41
wal.go
View File

@ -3,7 +3,6 @@ package tsdb
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"fmt"
"hash/crc32" "hash/crc32"
"io" "io"
"math" "math"
@ -21,8 +20,13 @@ import (
// WALEntryType indicates what data a WAL entry contains. // WALEntryType indicates what data a WAL entry contains.
type WALEntryType byte type WALEntryType byte
// The valid WAL entry types.
const ( const (
WALMagic = 0x43AF00EF
// Format versioning flag of a WAL segment file.
WALFormatDefault byte = 1
// Entry types in a segment file.
WALEntrySymbols = 1 WALEntrySymbols = 1
WALEntrySeries = 2 WALEntrySeries = 2
WALEntrySamples = 3 WALEntrySamples = 3
@ -93,9 +97,7 @@ type walHandler struct {
// ReadAll consumes all entries in the WAL and triggers the registered handlers. // ReadAll consumes all entries in the WAL and triggers the registered handlers.
func (w *WAL) ReadAll(h *walHandler) error { func (w *WAL) ReadAll(h *walHandler) error {
fmt.Println("readall", w.files)
for _, f := range w.files { for _, f := range w.files {
fmt.Println(" ", f.Name())
dec := newWALDecoder(f, h) dec := newWALDecoder(f, h)
for { for {
@ -151,6 +153,24 @@ func (w *WAL) initSegments() error {
} }
w.files = append(w.files, lf) w.files = append(w.files, lf)
// Consume and validate meta headers.
for _, f := range w.files {
metab := make([]byte, 8)
if n, err := f.Read(metab); err != nil {
return errors.Wrapf(err, "validate meta %q", f.Name())
} else if n != 8 {
return errors.Errorf("invalid header size %d in %q", n, f.Name())
}
if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic {
return errors.Errorf("invalid magic header %x in %q", m, f.Name())
}
if metab[4] != WALFormatDefault {
return errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name())
}
}
return nil return nil
} }
@ -190,9 +210,18 @@ func (w *WAL) cut() error {
return err return err
} }
// Write header metadata for new file.
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], WALMagic)
metab[4] = WALFormatDefault
if _, err := f.Write(metab); err != nil {
return err
}
w.files = append(w.files, f) w.files = append(w.files, f)
w.cur = bufio.NewWriterSize(f, 4*1024*1024) w.cur = bufio.NewWriterSize(f, 4*1024*1024)
w.curN = 0 w.curN = len(metab)
return nil return nil
} }
@ -391,6 +420,8 @@ type walDecoder struct {
buf []byte buf []byte
} }
// newWALDecoder returns a new decoder for the default WAL format. The meta
// headers of a segment must already have been consumed.
func newWALDecoder(r io.Reader, h *walHandler) *walDecoder { func newWALDecoder(r io.Reader, h *walHandler) *walDecoder {
return &walDecoder{ return &walDecoder{
r: r, r: r,