Add initial WAL writing

pull/5805/head
Fabian Reinartz 2016-12-22 12:05:24 +01:00
parent 2a825f6c28
commit 0b8c77361e
3 changed files with 271 additions and 26 deletions

26
db.go
View File

@ -163,6 +163,7 @@ func (ba *bucketAppender) Commit() error {
type hashedSample struct {
hash uint64
labels labels.Labels
ref uint32
t int64
v float64
@ -197,18 +198,25 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) {
return nil, err
}
// TODO(fabxc): get time from client-defined `now` function.
baset := time.Now().UnixNano() / int64(time.Millisecond)
if len(pbs) > 0 {
baset = pbs[0].stats.MaxTime
}
head, err := NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset)
if err != nil {
return nil, err
}
s := &Shard{
path: path,
persistCh: make(chan struct{}, 1),
logger: logger,
head: head,
persisted: pbs,
// TODO(fabxc): restore from checkpoint.
}
// TODO(fabxc): get base time from pre-existing blocks. Otherwise
// it should come from a user defined start timestamp.
// Use actual time for now.
s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond))
return s, nil
}
@ -219,6 +227,7 @@ func (s *Shard) Close() error {
for _, pb := range s.persisted {
e.Add(pb.Close())
}
e.Add(s.head.Close())
return e.Err()
}
@ -292,7 +301,12 @@ func (s *Shard) persist() error {
// Set new head block.
head := s.head
s.head = NewHeadBlock(head.stats.MaxTime)
newHead, err := NewHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MaxTime)), head.stats.MaxTime)
if err != nil {
s.mtx.Unlock()
return err
}
s.head = newHead
s.mtx.Unlock()

80
head.go
View File

@ -21,24 +21,36 @@ type HeadBlock struct {
// to their position in the chunk desc slice.
hashes map[uint64][]int
symbols []string // all seen strings
values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms
wal *WAL
stats BlockStats
}
// NewHeadBlock creates a new empty head block.
func NewHeadBlock(baseTime int64) *HeadBlock {
func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
wal, err := CreateWAL(dir)
if err != nil {
return nil, err
}
b := &HeadBlock{
descs: []*chunkDesc{},
hashes: map[uint64][]int{},
values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)},
wal: wal,
}
b.stats.MinTime = baseTime
return b
return b, nil
}
// Close syncs all data and closes underlying resources of the head block.
func (h *HeadBlock) Close() error {
return h.wal.Close()
}
// Querier returns a new querier over the head block.
@ -111,15 +123,18 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) {
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
func (h *HeadBlock) get(hash uint64, lset labels.Labels) (*chunkDesc, uint32) {
refs := h.hashes[hash]
for _, ref := range refs {
if cd := h.descs[ref]; cd.lset.Equals(lset) {
return cd
return cd, uint32(ref)
}
}
// None of the given chunks was for the series, create a new one.
return nil, 0
}
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
cd := &chunkDesc{
lset: lset,
chunk: chunks.NewXORChunk(int(math.MaxInt64)),
@ -128,7 +143,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
ref := len(h.descs)
h.descs = append(h.descs, cd)
h.hashes[hash] = append(refs, ref)
h.hashes[hash] = append(h.hashes[hash], ref)
// Add each label pair as a term to the inverted index.
terms := make([]term, 0, len(lset))
@ -153,28 +168,53 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc {
}
func (h *HeadBlock) appendBatch(samples []hashedSample) error {
var merr MultiError
// Find head chunks for all samples and allocate new IDs/refs for
// ones we haven't seen before.
var (
newSeries []labels.Labels
newHashes []uint64
)
for _, s := range samples {
merr.Add(h.append(s.hash, s.labels, s.t, s.v))
cd, ref := h.get(s.hash, s.labels)
if cd != nil {
// TODO(fabxc): sample refs are only scoped within a block for
// now and we ignore any previously set value
s.ref = ref
continue
}
s.ref = uint32(len(h.descs) + len(newSeries))
newSeries = append(newSeries, s.labels)
newHashes = append(newHashes, s.hash)
}
return merr.Err()
}
// append adds the sample to the headblock.
func (h *HeadBlock) append(hash uint64, lset labels.Labels, ts int64, v float64) error {
if err := h.get(hash, lset).append(ts, v); err != nil {
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := h.wal.Log(newSeries, samples); err != nil {
return err
}
h.stats.SampleCount++
if ts > h.stats.MaxTime {
h.stats.MaxTime = ts
for i, s := range newSeries {
h.create(newHashes[i], s)
}
return nil
var merr MultiError
for _, s := range samples {
// TODO(fabxc): ensure that this won't be able to actually error in practice.
if err := h.descs[s.ref].append(s.t, s.v); err != nil {
merr.Add(err)
continue
}
h.stats.SampleCount++
if s.t > h.stats.MaxTime {
h.stats.MaxTime = s.t
}
}
return merr.Err()
}
func (h *HeadBlock) persist(p string) (int64, error) {

191
wal.go Normal file
View File

@ -0,0 +1,191 @@
package tsdb
import (
"encoding/binary"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/labels"
)
// WALEntryType indicates what data a WAL entry contains.
type WALEntryType byte
// The valid WAL entry types.
const (
WALEntrySymbols = 1
WALEntrySeries = 2
WALEntrySamples = 3
)
// WAL is a write ahead log for series data. It can only be written to.
// Use WALReader to read back from a write ahead log.
type WAL struct {
f *fileutil.LockedFile
enc *walEncoder
symbols map[string]uint32
}
// CreateWAL creates a new write ahead log in the given directory.
func CreateWAL(dir string) (*WAL, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
p := filepath.Join(dir, "wal")
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
if _, err = f.Seek(0, os.SEEK_END); err != nil {
return nil, err
}
w := &WAL{
f: f,
enc: newWALEncoder(f),
symbols: map[string]uint32{},
}
return w, nil
}
// Log writes a batch of new series labels and samples to the log.
func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
if err := w.enc.encodeSeries(series); err != nil {
return err
}
if err := w.enc.encodeSamples(samples); err != nil {
return err
}
return nil
}
func (w *WAL) sync() error {
return fileutil.Fdatasync(w.f.File)
}
// Close sync all data and closes the underlying resources.
func (w *WAL) Close() error {
if err := w.sync(); err != nil {
return err
}
return w.f.Close()
}
// OpenWAL does things.
func OpenWAL(dir string) (*WAL, error) {
return nil, nil
}
type walEncoder struct {
w io.Writer
buf []byte
}
func newWALEncoder(w io.Writer) *walEncoder {
return &walEncoder{
w: w,
buf: make([]byte, 1024*1024),
}
}
func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error {
h := crc32.NewIEEE()
w := io.MultiWriter(h, e.w)
b := make([]byte, 6)
b[0] = byte(et)
b[1] = flag
binary.BigEndian.PutUint32(b[2:], uint32(len(e.buf)))
if _, err := w.Write(b); err != nil {
return err
}
if _, err := w.Write(e.buf[:n]); err != nil {
return err
}
if _, err := e.w.Write(h.Sum(nil)); err != nil {
return err
}
return nil
}
const (
walSeriesSimple = 1
walSamplesSimple = 1
)
func (e *walEncoder) encodeSeries(series []labels.Labels) error {
if len(series) == 0 {
return nil
}
var (
b = make([]byte, binary.MaxVarintLen32)
buf = e.buf[:0]
)
for _, lset := range series {
n := binary.PutUvarint(b, uint64(len(lset)))
buf = append(buf, b[:n]...)
for _, l := range lset {
n = binary.PutUvarint(b, uint64(len(l.Name)))
buf = append(buf, b[:n]...)
n = binary.PutUvarint(b, uint64(len(l.Value)))
buf = append(buf, b[:n]...)
}
}
return e.entry(WALEntrySeries, walSeriesSimple, len(buf))
}
func (e *walEncoder) encodeSamples(samples []hashedSample) error {
if len(samples) == 0 {
return nil
}
var (
b = make([]byte, binary.MaxVarintLen64)
buf = e.buf[:0]
)
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
//
// TODO(fabxc): optimize for all samples having the same timestamp.
first := samples[0]
binary.BigEndian.PutUint32(b, first.ref)
buf = append(buf, b[:4]...)
binary.BigEndian.PutUint64(b, uint64(first.t))
buf = append(buf, b[:8]...)
for _, s := range samples {
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
buf = append(buf, b[:n]...)
n = binary.PutVarint(b, s.t-first.t)
buf = append(buf, b[:n]...)
binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
buf = append(buf, b[:8]...)
}
return e.entry(WALEntrySamples, walSamplesSimple, len(buf))
}
type walDecoder struct {
r io.Reader
handleSeries func(labels.Labels)
handleSample func(hashedSample)
}