diff --git a/db.go b/db.go index d9382364c..7ca513ea3 100644 --- a/db.go +++ b/db.go @@ -290,7 +290,10 @@ func (s *Shard) appendBatch(samples []hashedSample) error { // TODO(fabxc): distinguish samples between concurrent heads for // different time blocks. Those may occurr during transition to still // allow late samples to arrive for a previous block. - err := s.head.appendBatch(samples, s.metrics.samplesAppended) + err := s.head.appendBatch(samples) + if err != nil { + s.metrics.samplesAppended.Add(float64(len(samples))) + } // TODO(fabxc): randomize over time and use better scoring function. if s.head.stats.SampleCount/(uint64(s.head.stats.ChunkCount)+1) > 400 { @@ -401,6 +404,7 @@ func (s *Shard) persist() error { // chunkDesc wraps a plain data chunk and provides cached meta data about it. type chunkDesc struct { + ref uint32 lset labels.Labels chunk chunks.Chunk diff --git a/head.go b/head.go index a58dd9af4..b5e4f063a 100644 --- a/head.go +++ b/head.go @@ -7,7 +7,6 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" - "github.com/prometheus/client_golang/prometheus" ) // HeadBlock handles reads and writes of time series data within a time window. @@ -18,8 +17,8 @@ type HeadBlock struct { // is assigned the index as its ID. descs []*chunkDesc // hashes contains a collision map of label set hashes of chunks - // to their position in the chunk desc slice. - hashes map[uint64][]int + // to their chunk descs. + hashes map[uint64][]*chunkDesc values map[string]stringset // label names to possible values postings *memPostings // postings lists for terms @@ -38,7 +37,7 @@ func OpenHeadBlock(dir string, baseTime int64) (*HeadBlock, error) { b := &HeadBlock{ descs: []*chunkDesc{}, - hashes: map[uint64][]int{}, + hashes: map[uint64][]*chunkDesc{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, wal: wal, @@ -136,15 +135,15 @@ func (h *HeadBlock) Series(ref uint32, mint, maxt int64) (Series, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *HeadBlock) get(hash uint64, lset labels.Labels) (*chunkDesc, uint32) { - refs := h.hashes[hash] +func (h *HeadBlock) get(hash uint64, lset labels.Labels) *chunkDesc { + cds := h.hashes[hash] - for _, ref := range refs { - if cd := h.descs[ref]; cd.lset.Equals(lset) { - return cd, uint32(ref) + for _, cd := range cds { + if cd.lset.Equals(lset) { + return cd } } - return nil, 0 + return nil } func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { @@ -160,10 +159,10 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { panic(err) } // Index the new chunk. - ref := len(h.descs) + cd.ref = uint32(len(h.descs)) h.descs = append(h.descs, cd) - h.hashes[hash] = append(h.hashes[hash], ref) + h.hashes[hash] = append(h.hashes[hash], cd) // Add each label pair as a term to the inverted index. terms := make([]term, 0, len(lset)) @@ -178,7 +177,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { } valset.set(l.Value) } - h.postings.add(uint32(ref), terms...) + h.postings.add(cd.ref, terms...) // For the head block there's exactly one chunk per series. h.stats.ChunkCount++ @@ -187,7 +186,7 @@ func (h *HeadBlock) create(hash uint64, lset labels.Labels) *chunkDesc { return cd } -func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Counter) error { +func (h *HeadBlock) appendBatch(samples []hashedSample) error { // Find head chunks for all samples and allocate new IDs/refs for // ones we haven't seen before. var ( @@ -199,11 +198,11 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun for i := range samples { s := &samples[i] - cd, ref := h.get(s.hash, s.labels) + cd := h.get(s.hash, s.labels) if cd != nil { // TODO(fabxc): sample refs are only scoped within a block for // now and we ignore any previously set value - s.ref = ref + s.ref = cd.ref continue } @@ -226,6 +225,8 @@ func (h *HeadBlock) appendBatch(samples []hashedSample, appended prometheus.Coun return err } + // After the samples were successfully written to the WAL, there may + // be no further failures. for i, s := range newSeries { h.create(newHashes[i], s) } diff --git a/postings.go b/postings.go index 2f4d10975..9dde3e678 100644 --- a/postings.go +++ b/postings.go @@ -15,7 +15,11 @@ type term struct { // Postings returns an iterator over the postings list for s. func (p *memPostings) get(t term) Postings { - return &listPostings{list: p.m[t], idx: -1} + l := p.m[t] + if l == nil { + return emptyPostings + } + return &listPostings{list: l, idx: -1} } // add adds a document to the index. The caller has to ensure that no