@ -20,6 +20,7 @@ import (
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"strconv"
@ -60,19 +61,101 @@ func (p *page) full() bool {
return pageSize - p . alloc < recordHeaderSize
}
// Segment represents a segment file.
type Segment struct {
* os . File
dir string
i int
}
// Index returns the index of the segment.
func ( s * Segment ) Index ( ) int {
return s . i
}
// Dir returns the directory of the segment.
func ( s * Segment ) Dir ( ) string {
return s . dir
}
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Segment int
Offset int
Err error
}
func ( e * CorruptionErr ) Error ( ) string {
if e . Segment < 0 {
return fmt . Sprintf ( "corruption after %d bytes: %s" , e . Offset , e . Err )
}
return fmt . Sprintf ( "corruption in segment %d at %d: %s" , e . Segment , e . Offset , e . Err )
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
func OpenWriteSegment ( dir string , k int ) ( * Segment , error ) {
// Only .active segments are allowed to be opened for write.
f , err := os . OpenFile ( SegmentName ( dir , k ) , os . O_WRONLY | os . O_APPEND , 0666 )
if err != nil {
return nil , err
}
stat , err := f . Stat ( )
if err != nil {
f . Close ( )
return nil , err
}
// If the last page is torn, fill it with zeros.
// In case it was torn after all records were written successfully, this
// will just pad the page and everything will be fine.
// If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end.
if d := stat . Size ( ) % pageSize ; d != 0 {
if _ , err := f . Write ( make ( [ ] byte , pageSize - d ) ) ; err != nil {
f . Close ( )
return nil , errors . Wrap ( err , "zero-pad torn page" )
}
}
return & Segment { File : f , i : k , dir : dir } , nil
}
// CreateSegment creates a new segment k in dir.
func CreateSegment ( dir string , k int ) ( * Segment , error ) {
f , err := os . OpenFile ( SegmentName ( dir , k ) , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0666 )
if err != nil {
return nil , err
}
return & Segment { File : f , i : k , dir : dir } , nil
}
// OpenReadSegment opens the segment k in dir for reading.
func OpenReadSegment ( fn string ) ( * Segment , error ) {
k , err := strconv . Atoi ( filepath . Base ( fn ) )
if err != nil {
return nil , errors . New ( "not a valid filename" )
}
f , err := os . Open ( fn )
if err != nil {
return nil , err
}
return & Segment { File : f , i : k , dir : filepath . Dir ( fn ) } , nil
}
// WAL is a write ahead log that stores records in segment files.
// It must be read from start to end once before logging new data.
// If an errore occurs during read, the repair procedure must be called
// before it's safe to do further writes.
//
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
// safely truncated.
// Segments are terminated by one full zero page to allow tailing readers
// to detect segment boundaries.
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
type WAL struct {
dir string
logger log . Logger
segmentSize int
mtx sync . RWMutex
segment * os . File // active segment
segment * Segment // active segment
donePages int // pages written to the segment
page * page // active page
stopc chan chan struct { }
@ -85,10 +168,12 @@ type WAL struct {
// New returns a new WAL over the given directory.
func New ( logger log . Logger , reg prometheus . Registerer , dir string ) ( * WAL , error ) {
return newWAL ( logger , reg , dir , defaultSegmentSize )
return NewSize ( logger , reg , dir , defaultSegmentSize )
}
func newWAL ( logger log . Logger , reg prometheus . Registerer , dir string , segmentSize int ) ( * WAL , error ) {
// NewSize returns a new WAL over the given directory.
// New segments are created with the specified size.
func NewSize ( logger log . Logger , reg prometheus . Registerer , dir string , segmentSize int ) ( * WAL , error ) {
if segmentSize % pageSize != 0 {
return nil , errors . New ( "invalid segment size" )
}
@ -124,16 +209,23 @@ func newWAL(logger log.Logger, reg prometheus.Registerer, dir string, segmentSiz
_ , j , err := w . Segments ( )
if err != nil {
return nil , err
return nil , err ors. Wrap ( err , "get segment range" )
}
// Fresh dir, no segments yet.
if j == - 1 {
w . segment , err = os . OpenFile ( SegmentName ( w . dir , 0 ) , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0666 )
if w . segment , err = CreateSegment ( w . dir , 0 ) ; err != nil {
return nil , err
}
} else {
w . segment , err = os . OpenFile ( SegmentName ( w . dir , j ) , os . O_WRONLY | os . O_APPEND , 0666 )
}
if err != nil {
return nil , err
if w . segment , err = OpenWriteSegment ( w . dir , j ) ; err != nil {
return nil , err
}
// Correctly initialize donePages.
stat , err := w . segment . Stat ( )
if err != nil {
return nil , err
}
w . donePages = int ( stat . Size ( ) / pageSize )
}
go w . run ( )
@ -146,23 +238,99 @@ func (w *WAL) Dir() string {
}
func ( w * WAL ) run ( ) {
Loop :
for {
// Processing all pending functions has precedence over shutdown.
select {
case f := <- w . actorc :
f ( )
default :
case donec := <- w . stopc :
defer close ( donec )
break Loop
}
}
// Drain and process any remaining functions.
for {
select {
case f := <- w . actorc :
f ( )
case donec := <- w . stopc :
close ( donec )
default :
return
}
}
}
// Repair attempts to repair the WAL based on the error.
// It discards all data behind the corruption
func ( w * WAL ) Repair ( err error ) error {
// We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible.
// But that's not generally applicable if the records have any kind of causality.
// Maybe as an extra mode in the future if mid-WAL corruptions become
// a frequent concern.
cerr , ok := err . ( * CorruptionErr )
if ! ok {
return errors . New ( "cannot handle error" )
}
if cerr . Segment < 0 {
return errors . New ( "corruption error does not specify position" )
}
level . Warn ( w . logger ) . Log ( "msg" , "starting corruption repair" ,
"segment" , cerr . Segment , "offset" , cerr . Offset )
// All segments behind the corruption can no longer be used.
segs , err := listSegments ( w . dir )
if err != nil {
return errors . Wrap ( err , "list segments" )
}
level . Warn ( w . logger ) . Log ( "msg" , "deleting all segments behind corruption" )
for _ , s := range segs {
if s . n <= cerr . Segment {
continue
}
if err := os . Remove ( filepath . Join ( w . dir , s . s ) ) ; err != nil {
return errors . Wrap ( err , "delete segment" )
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption.
level . Warn ( w . logger ) . Log ( "msg" , "rewrite corrupted segment" )
fn := SegmentName ( w . dir , cerr . Segment )
tmpfn := fn + ".repair"
if err := fileutil . Rename ( fn , tmpfn ) ; err != nil {
return err
}
// Create a clean segment and make it the active one.
s , err := CreateSegment ( w . dir , cerr . Segment )
if err != nil {
return err
}
w . segment = s
f , err := os . Open ( tmpfn )
if err != nil {
return errors . Wrap ( err , "open segment" )
}
defer f . Close ( )
r := NewReader ( bufio . NewReader ( f ) )
for r . Next ( ) {
if err := w . Log ( r . Record ( ) ) ; err != nil {
return errors . Wrap ( err , "insert record" )
}
}
// We expect an error here, so nothing to handle.
if err := os . Remove ( tmpfn ) ; err != nil {
return errors . Wrap ( err , "delete corrupted segment" )
}
return nil
}
// SegmentName builds a segment name for the directory.
func SegmentName ( dir string , i int ) string {
return filepath . Join ( dir , fmt . Sprintf ( "%06d" , i ) )
@ -170,15 +338,13 @@ func SegmentName(dir string, i int) string {
// nextSegment creates the next segment and closes the previous one.
func ( w * WAL ) nextSegment ( ) error {
if err := w . flushPage ( true ) ; err != nil {
return err
}
k , err := strconv . Atoi ( filepath . Base ( w . segment . Name ( ) ) )
if err != nil {
return errors . Errorf ( "current segment %q not numerical" , w . segment . Name ( ) )
// Only flush the current page if it actually holds data.
if w . page . alloc > 0 {
if err := w . flushPage ( true ) ; err != nil {
return err
}
}
// TODO(fabxc): write initialization page with meta info?
next , err := os . OpenFile ( SegmentName ( w . dir , k + 1 ) , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0666 )
next , err := CreateSegment ( w . dir , w . segment . Index ( ) + 1 )
if err != nil {
return errors . Wrap ( err , "create new segment file" )
}
@ -186,8 +352,7 @@ func (w *WAL) nextSegment() error {
w . segment = next
w . donePages = 0
// Don't block further writes by handling the last segment.
// TODO(fabxc): write a termination page as a marker to detect torn segments?
// Don't block further writes by fsyncing the last segment.
w . actorc <- func ( ) {
if err := w . fsync ( prev ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "sync previous segment" , "err" , err )
@ -366,9 +531,9 @@ func (w *WAL) Truncate(i int) error {
return nil
}
func ( w * WAL ) fsync ( f * os. File ) error {
func ( w * WAL ) fsync ( f * Segment ) error {
start := time . Now ( )
err := fileutil . Fsync ( f )
err := fileutil . Fsync ( f .File )
w . fsyncDuration . Observe ( time . Since ( start ) . Seconds ( ) )
return err
}
@ -426,65 +591,65 @@ func listSegments(dir string) (refs []segmentRef, err error) {
return refs , nil
}
type multiReadCloser struct {
io . Reader
files [ ] * os . File
}
// NewSegmentsReader returns a new reader over all segments in the directory.
func NewSegmentsReader ( dir string ) ( io . ReadCloser , error ) {
refs , err := listSegments ( dir )
if err != nil {
return nil , err
}
var rdrs [ ] io . Reader
var files [ ] * os . File
for _ , r := range refs {
f , err := os . Open ( filepath . Join ( dir , r . s ) )
if err != nil {
return nil , err
}
rdrs = append ( rdrs , f )
files = append ( files , f )
}
return & multiReadCloser {
Reader : io . MultiReader ( rdrs ... ) ,
files : files ,
} , nil
return NewSegmentsRangeReader ( dir , 0 , math . MaxInt64 )
}
// NewSegmentsRangeReader returns a new reader over the given WAL segment range.
// If m or n are -1, the range is open on the respective end.
func NewSegmentsRangeReader ( dir string , m , n int ) ( io . ReadCloser , error ) {
refs , err := listSegments ( dir )
if err != nil {
return nil , err
}
var rdrs [ ] io . Reader
var files [ ] * os . File
var segs [ ] * Segment
for _ , r := range refs {
if r. n < m {
if m >= 0 && r . n < m {
continue
}
if r. n > n {
if n >= 0 && r . n > n {
break
}
f, err := os . Open ( filepath . Join ( dir , r . s ) )
s, err := OpenReadSegment ( filepath . Join ( dir , r . s ) )
if err != nil {
return nil , err
}
rdrs = append ( rdrs , f )
files = append ( files , f )
segs = append ( segs , s )
}
return newSegmentBufReader ( segs ... ) , nil
}
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr io . Reader
err error
rec [ ] byte
total int // total bytes processed.
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
// The main purpose is that we are able to track segment and offset for
// corruption reporting.
type segmentBufReader struct {
buf * bufio . Reader
segs [ ] * Segment
cur int
off int
more bool
}
func newSegmentBufReader ( segs ... * Segment ) * segmentBufReader {
return & segmentBufReader {
buf : bufio . NewReaderSize ( nil , 16 * pageSize ) ,
segs : segs ,
cur : - 1 ,
}
return & multiReadCloser {
Reader : io . MultiReader ( rdrs ... ) ,
files : files ,
} , nil
}
func ( r * multiReadCloser ) Close ( ) ( err error ) {
for _ , s := range r . files {
func ( r * segmentBufReader ) Close ( ) ( err error ) {
for _ , s := range r . segs {
if e := s . Close ( ) ; e != nil {
err = e
}
@ -492,24 +657,42 @@ func (r *multiReadCloser) Close() (err error) {
return err
}
// Reader reads WAL records from an io.Reader.
type Reader struct {
rdr * bufio . Reader
err error
rec [ ] byte
total int // total bytes processed.
func ( r * segmentBufReader ) Read ( b [ ] byte ) ( n int , err error ) {
if ! r . more {
if r . cur + 1 >= len ( r . segs ) {
return 0 , io . EOF
}
r . cur ++
r . off = 0
r . more = true
r . buf . Reset ( r . segs [ r . cur ] )
}
n , err = r . buf . Read ( b )
r . off += n
if err != io . EOF {
return n , err
}
// Just return what we read so far, but don't signal EOF.
// Only unset more so we don't invalidate the current segment and
// offset before the next read.
r . more = false
// If no more segments are left, it's the end for the reader.
if len ( r . segs ) == 0 {
return n , io . EOF
}
return n , nil
}
// NewReader returns a new reader.
func NewReader ( r io . Reader ) * Reader {
return & Reader { rdr : bufio . NewReader ( r ) }
return & Reader { rdr : r}
}
// Next advances the reader to the next records and returns true if it exists.
// It must not be called once after it returned false.
func ( r * Reader ) Next ( ) bool {
err := r . next ( )
if err == io . EOF {
if err ors. Cause ( err ) == io . EOF {
return false
}
r . err = err
@ -523,9 +706,8 @@ func (r *Reader) next() (err error) {
i := 0
for {
hdr [ 0 ] , err = r . rdr . ReadByte ( )
if err != nil {
return err
if _ , err = io . ReadFull ( r . rdr , hdr [ : 1 ] ) ; err != nil {
return errors . Wrap ( err , "read first header byte" )
}
r . total ++
typ := recType ( hdr [ 0 ] )
@ -541,7 +723,7 @@ func (r *Reader) next() (err error) {
}
n , err := io . ReadFull ( r . rdr , buf [ : k ] )
if err != nil {
return err
return err ors. Wrap ( err , "read remaining zeros" )
}
r . total += n
@ -554,7 +736,7 @@ func (r *Reader) next() (err error) {
}
n , err := io . ReadFull ( r . rdr , hdr [ 1 : ] )
if err != nil {
return err
return err ors. Wrap ( err , "read remaining header" )
}
r . total += n
@ -608,9 +790,25 @@ func (r *Reader) next() (err error) {
}
}
// Err returns the last encountered error.
// Err returns the last encountered error wrapped in a corruption error.
// If the reader does not allow to infer a segment index and offset, a total
// offset in the reader stream will be provided.
func ( r * Reader ) Err ( ) error {
return r . err
if r . err == nil {
return nil
}
if b , ok := r . rdr . ( * segmentBufReader ) ; ok {
return & CorruptionErr {
Err : r . err ,
Segment : b . segs [ b . cur ] . Index ( ) ,
Offset : b . off ,
}
}
return & CorruptionErr {
Err : r . err ,
Segment : - 1 ,
Offset : r . total ,
}
}
// Record returns the current record. The returned byte slice is only