Encode and Decode method for tombstones (#7967)

* Encode and Decode method for tombstones

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/7993/head
Ganesh Vernekar 4 years ago committed by GitHub
parent 916dbd4c8a
commit f0d87b5d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,7 +18,6 @@ import (
"fmt" "fmt"
"hash" "hash"
"hash/crc32" "hash/crc32"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -98,33 +97,27 @@ func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) {
buf.Reset() buf.Reset()
// Write the meta. // Write the meta.
buf.PutBE32(MagicTombstone) buf.PutBE32(MagicTombstone)
buf.PutByte(tombstoneFormatV1)
n, err := f.Write(buf.Get()) n, err := f.Write(buf.Get())
if err != nil { if err != nil {
return 0, err return 0, err
} }
size += n size += n
mw := io.MultiWriter(f, hash) bytes, err := Encode(tr)
if err != nil {
if err := tr.Iter(func(ref uint64, ivs Intervals) error { return 0, errors.Wrap(err, "encoding tombstones")
for _, iv := range ivs { }
buf.Reset()
buf.PutUvarint64(ref) // Ignore first byte which is the format type. We do this for compatibility.
buf.PutVarint64(iv.Mint) if _, err := hash.Write(bytes[1:]); err != nil {
buf.PutVarint64(iv.Maxt) return 0, errors.Wrap(err, "calculating hash for tombstones")
}
n, err = mw.Write(buf.Get()) n, err = f.Write(bytes)
if err != nil { if err != nil {
return err return 0, errors.Wrap(err, "writing tombstones")
}
size += n
}
return nil
}); err != nil {
return 0, fmt.Errorf("error writing tombstones: %v", err)
} }
size += n
n, err = f.Write(hash.Sum(nil)) n, err = f.Write(hash.Sum(nil))
if err != nil { if err != nil {
@ -145,6 +138,48 @@ func WriteFile(logger log.Logger, dir string, tr Reader) (int64, error) {
return int64(size), fileutil.Replace(tmp, path) return int64(size), fileutil.Replace(tmp, path)
} }
// Encode encodes the tombstones from the reader.
// It does not attach any magic number or checksum.
func Encode(tr Reader) ([]byte, error) {
buf := encoding.Encbuf{}
buf.PutByte(tombstoneFormatV1)
err := tr.Iter(func(ref uint64, ivs Intervals) error {
for _, iv := range ivs {
buf.PutUvarint64(ref)
buf.PutVarint64(iv.Mint)
buf.PutVarint64(iv.Maxt)
}
return nil
})
return buf.Get(), err
}
// Decode decodes the tombstones from the bytes
// which was encoded using the Encode method.
func Decode(b []byte) (Reader, error) {
d := &encoding.Decbuf{B: b}
if flag := d.Byte(); flag != tombstoneFormatV1 {
return nil, errors.Errorf("invalid tombstone format %x", flag)
}
if d.Err() != nil {
return nil, d.Err()
}
stonesMap := NewMemTombstones()
for d.Len() > 0 {
k := d.Uvarint64()
mint := d.Varint64()
maxt := d.Varint64()
if d.Err() != nil {
return nil, d.Err()
}
stonesMap.AddInterval(k, Interval{mint, maxt})
}
return stonesMap, nil
}
// Stone holds the information on the posting and time-range // Stone holds the information on the posting and time-range
// that is deleted. // that is deleted.
type Stone struct { type Stone struct {
@ -168,34 +203,24 @@ func ReadTombstones(dir string) (Reader, int64, error) {
if mg := d.Be32(); mg != MagicTombstone { if mg := d.Be32(); mg != MagicTombstone {
return nil, 0, fmt.Errorf("invalid magic number %x", mg) return nil, 0, fmt.Errorf("invalid magic number %x", mg)
} }
if flag := d.Byte(); flag != tombstoneFormatV1 {
return nil, 0, fmt.Errorf("invalid tombstone format %x", flag)
}
if d.Err() != nil {
return nil, 0, d.Err()
}
// Verify checksum. // Verify checksum.
hash := newCRC32() hash := newCRC32()
if _, err := hash.Write(d.Get()); err != nil { // Ignore first byte which is the format type.
if _, err := hash.Write(d.Get()[1:]); err != nil {
return nil, 0, errors.Wrap(err, "write to hash") return nil, 0, errors.Wrap(err, "write to hash")
} }
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
return nil, 0, errors.New("checksum did not match") return nil, 0, errors.New("checksum did not match")
} }
stonesMap := NewMemTombstones() if d.Err() != nil {
return nil, 0, d.Err()
for d.Len() > 0 { }
k := d.Uvarint64()
mint := d.Varint64()
maxt := d.Varint64()
if d.Err() != nil {
return nil, 0, d.Err()
}
stonesMap.AddInterval(k, Interval{mint, maxt}) stonesMap, err := Decode(d.Get())
if err != nil {
return nil, 0, err
} }
return stonesMap, int64(len(b)), nil return stonesMap, int64(len(b)), nil

Loading…
Cancel
Save