From 0b8c77361e89ca5d920f5a12773aa0ee6b896beb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 22 Dec 2016 12:05:24 +0100 Subject: [PATCH] Add initial WAL writing --- db.go | 26 ++++++-- head.go | 78 +++++++++++++++++------ wal.go | 191 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 270 insertions(+), 25 deletions(-) create mode 100644 wal.go diff --git a/db.go b/db.go index 16def55b1..3d990cdbf 100644 --- a/db.go +++ b/db.go @@ -163,6 +163,7 @@ func (ba *bucketAppender) Commit() error { type hashedSample struct { hash uint64 labels labels.Labels + ref uint32 t int64 v float64 @@ -197,18 +198,25 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) { return nil, err } + // TODO(fabxc): get time from client-defined `now` function. + baset := time.Now().UnixNano() / int64(time.Millisecond) + if len(pbs) > 0 { + baset = pbs[0].stats.MaxTime + } + + head, err := NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset) + if err != nil { + return nil, err + } + s := &Shard{ path: path, persistCh: make(chan struct{}, 1), logger: logger, + head: head, persisted: pbs, // TODO(fabxc): restore from checkpoint. } - // TODO(fabxc): get base time from pre-existing blocks. Otherwise - // it should come from a user defined start timestamp. - // Use actual time for now. - s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond)) - return s, nil } @@ -219,6 +227,7 @@ func (s *Shard) Close() error { for _, pb := range s.persisted { e.Add(pb.Close()) } + e.Add(s.head.Close()) return e.Err() } @@ -292,7 +301,12 @@ func (s *Shard) persist() error { // Set new head block. head := s.head - s.head = NewHeadBlock(head.stats.MaxTime) + newHead, err := NewHeadBlock(filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MaxTime)), head.stats.MaxTime) + if err != nil { + s.mtx.Unlock() + return err + } + s.head = newHead s.mtx.Unlock() diff --git a/head.go b/head.go index d5ad515db..afd9f39aa 100644 --- a/head.go +++ b/head.go @@ -21,24 +21,36 @@ type HeadBlock struct { // to their position in the chunk desc slice. hashes map[uint64][]int - symbols []string // all seen strings values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms + wal *WAL + stats BlockStats } // NewHeadBlock creates a new empty head block. -func NewHeadBlock(baseTime int64) *HeadBlock { +func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { + wal, err := CreateWAL(dir) + if err != nil { + return nil, err + } + b := &HeadBlock{ descs: []*chunkDesc{}, hashes: map[uint64][]int{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, + wal: wal, } b.stats.MinTime = baseTime - return b + return b, nil +} + +// Close syncs all data and closes underlying resources of the head block. +func (h *HeadBlock) Close() error { + return h.wal.Close() } // Querier returns a new querier over the head block. @@ -111,15 +123,18 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { +func (h *HeadBlock) get(hash uint64, lset labels.Labels) (*chunkDesc, uint32) { refs := h.hashes[hash] for _, ref := range refs { if cd := h.descs[ref]; cd.lset.Equals(lset) { - return cd + return cd, uint32(ref) } } - // None of the given chunks was for the series, create a new one. + return nil, 0 +} + +func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { cd := &chunkDesc{ lset: lset, chunk: chunks.NewXORChunk(int(math.MaxInt64)), @@ -128,7 +143,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { ref := len(h.descs) h.descs = append(h.descs, cd) - h.hashes[hash] = append(refs, ref) + h.hashes[hash] = append(h.hashes[hash], ref) // Add each label pair as a term to the inverted index. terms := make([]term, 0, len(lset)) @@ -153,28 +168,53 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { } func (h *HeadBlock) appendBatch(samples []hashedSample) error { - var merr MultiError + // Find head chunks for all samples and allocate new IDs/refs for + // ones we haven't seen before. + var ( + newSeries []labels.Labels + newHashes []uint64 + ) for _, s := range samples { - merr.Add(h.append(s.hash, s.labels, s.t, s.v)) - } + cd, ref := h.get(s.hash, s.labels) + if cd != nil { + // TODO(fabxc): sample refs are only scoped within a block for + // now and we ignore any previously set value + s.ref = ref + continue + } - return merr.Err() -} + s.ref = uint32(len(h.descs) + len(newSeries)) + newSeries = append(newSeries, s.labels) + newHashes = append(newHashes, s.hash) + } -// append adds the sample to the headblock. -func (h *HeadBlock) append(hash uint64, lset labels.Labels, ts int64, v float64) error { - if err := h.get(hash, lset).append(ts, v); err != nil { + // Write all new series and samples to the WAL and add it to the + // in-mem database on success. + if err := h.wal.Log(newSeries, samples); err != nil { return err } - h.stats.SampleCount++ + for i, s := range newSeries { + h.create(newHashes[i], s) + } - if ts > h.stats.MaxTime { - h.stats.MaxTime = ts + var merr MultiError + for _, s := range samples { + // TODO(fabxc): ensure that this won't be able to actually error in practice. + if err := h.descs[s.ref].append(s.t, s.v); err != nil { + merr.Add(err) + continue + } + + h.stats.SampleCount++ + + if s.t > h.stats.MaxTime { + h.stats.MaxTime = s.t + } } - return nil + return merr.Err() } func (h *HeadBlock) persist(p string) (int64, error) { diff --git a/wal.go b/wal.go new file mode 100644 index 000000000..0d3cbda97 --- /dev/null +++ b/wal.go @@ -0,0 +1,191 @@ +package tsdb + +import ( + "encoding/binary" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/fabxc/tsdb/labels" +) + +// WALEntryType indicates what data a WAL entry contains. +type WALEntryType byte + +// The valid WAL entry types. +const ( + WALEntrySymbols = 1 + WALEntrySeries = 2 + WALEntrySamples = 3 +) + +// WAL is a write ahead log for series data. It can only be written to. +// Use WALReader to read back from a write ahead log. +type WAL struct { + f *fileutil.LockedFile + enc *walEncoder + + symbols map[string]uint32 +} + +// CreateWAL creates a new write ahead log in the given directory. +func CreateWAL(dir string) (*WAL, error) { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + + p := filepath.Join(dir, "wal") + + f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) + if err != nil { + return nil, err + } + if _, err = f.Seek(0, os.SEEK_END); err != nil { + return nil, err + } + + w := &WAL{ + f: f, + enc: newWALEncoder(f), + symbols: map[string]uint32{}, + } + return w, nil +} + +// Log writes a batch of new series labels and samples to the log. +func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error { + if err := w.enc.encodeSeries(series); err != nil { + return err + } + if err := w.enc.encodeSamples(samples); err != nil { + return err + } + return nil +} + +func (w *WAL) sync() error { + return fileutil.Fdatasync(w.f.File) +} + +// Close sync all data and closes the underlying resources. +func (w *WAL) Close() error { + if err := w.sync(); err != nil { + return err + } + return w.f.Close() +} + +// OpenWAL does things. +func OpenWAL(dir string) (*WAL, error) { + return nil, nil +} + +type walEncoder struct { + w io.Writer + + buf []byte +} + +func newWALEncoder(w io.Writer) *walEncoder { + return &walEncoder{ + w: w, + buf: make([]byte, 1024*1024), + } +} + +func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error { + h := crc32.NewIEEE() + w := io.MultiWriter(h, e.w) + + b := make([]byte, 6) + b[0] = byte(et) + b[1] = flag + + binary.BigEndian.PutUint32(b[2:], uint32(len(e.buf))) + + if _, err := w.Write(b); err != nil { + return err + } + if _, err := w.Write(e.buf[:n]); err != nil { + return err + } + if _, err := e.w.Write(h.Sum(nil)); err != nil { + return err + } + + return nil +} + +const ( + walSeriesSimple = 1 + walSamplesSimple = 1 +) + +func (e *walEncoder) encodeSeries(series []labels.Labels) error { + if len(series) == 0 { + return nil + } + var ( + b = make([]byte, binary.MaxVarintLen32) + buf = e.buf[:0] + ) + + for _, lset := range series { + n := binary.PutUvarint(b, uint64(len(lset))) + buf = append(buf, b[:n]...) + + for _, l := range lset { + n = binary.PutUvarint(b, uint64(len(l.Name))) + buf = append(buf, b[:n]...) + + n = binary.PutUvarint(b, uint64(len(l.Value))) + buf = append(buf, b[:n]...) + } + } + + return e.entry(WALEntrySeries, walSeriesSimple, len(buf)) +} + +func (e *walEncoder) encodeSamples(samples []hashedSample) error { + if len(samples) == 0 { + return nil + } + var ( + b = make([]byte, binary.MaxVarintLen64) + buf = e.buf[:0] + ) + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + // + // TODO(fabxc): optimize for all samples having the same timestamp. + first := samples[0] + + binary.BigEndian.PutUint32(b, first.ref) + buf = append(buf, b[:4]...) + binary.BigEndian.PutUint64(b, uint64(first.t)) + buf = append(buf, b[:8]...) + + for _, s := range samples { + n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) + buf = append(buf, b[:n]...) + + n = binary.PutVarint(b, s.t-first.t) + buf = append(buf, b[:n]...) + + binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) + buf = append(buf, b[:8]...) + } + + return e.entry(WALEntrySamples, walSamplesSimple, len(buf)) +} + +type walDecoder struct { + r io.Reader + + handleSeries func(labels.Labels) + handleSample func(hashedSample) +}