diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 29131c52b..52cccfabd 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -87,6 +87,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str } testutil.Ok(t, it.Err()) + if len(samples) == 0 { + continue + } + name := series.Labels().String() result[name] = samples } @@ -1276,20 +1280,29 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) - lres, err := expandSeriesSet(ss) + lres, _, err := expandSeriesSet(ss) testutil.Ok(t, err) - testutil.Equals(t, c.series, lres) } } -func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, error) { - result := []labels.Labels{} +// expandSeriesSet returns the raw labels in the order they are retrieved from +// the series set and the samples keyed by Labels().String(). +func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, error) { + resultLabels := []labels.Labels{} + resultSamples := map[string][]sample{} for ss.Next() { - result = append(result, ss.At().Labels()) + series := ss.At() + samples := []sample{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + resultLabels = append(resultLabels, series.Labels()) + resultSamples[series.Labels().String()] = samples } - - return result, ss.Err() + return resultLabels, resultSamples, ss.Err() } func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { @@ -2477,6 +2490,136 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { testutil.Equals(t, 1000.0, sum) } +func TestDBCannotSeePartialCommits(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + testutil.Ok(t, err) + defer db.Close() + + stop := make(chan struct{}) + firstInsert := make(chan struct{}) + + // Insert data in batches. + go func() { + iter := 0 + for { + app := db.Appender() + + for j := 0; j < 100; j++ { + _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) + testutil.Ok(t, err) + } + err = app.Commit() + testutil.Ok(t, err) + + if iter == 0 { + close(firstInsert) + } + iter++ + + select { + case <-stop: + return + default: + } + } + }() + + <-firstInsert + + // This is a race condition, so do a few tests to tickle it. + // Usually most will fail. + inconsistencies := 0 + for i := 0; i < 10; i++ { + func() { + querier, err := db.Querier(context.Background(), 0, 1000000) + testutil.Ok(t, err) + defer querier.Close() + + ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + + _, seriesSet, err := expandSeriesSet(ss) + testutil.Ok(t, err) + values := map[float64]struct{}{} + for _, series := range seriesSet { + values[series[len(series)-1].v] = struct{}{} + } + if len(values) != 1 { + inconsistencies++ + } + }() + } + stop <- struct{}{} + + testutil.Equals(t, 0, inconsistencies, "Some queries saw inconsistent results.") +} + +func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + testutil.Ok(t, err) + defer db.Close() + + querierBeforeAdd, err := db.Querier(context.Background(), 0, 1000000) + testutil.Ok(t, err) + defer querierBeforeAdd.Close() + + app := db.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + testutil.Ok(t, err) + + querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000) + testutil.Ok(t, err) + defer querierAfterAddButBeforeCommit.Close() + + // None of the queriers should return anything after the Add but before the commit. + ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + _, seriesSet, err := expandSeriesSet(ss) + testutil.Ok(t, err) + testutil.Equals(t, map[string][]sample{}, seriesSet) + + ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + _, seriesSet, err = expandSeriesSet(ss) + testutil.Ok(t, err) + testutil.Equals(t, map[string][]sample{}, seriesSet) + + // This commit is after the queriers are created, so should not be returned. + err = app.Commit() + testutil.Ok(t, err) + + // Nothing returned for querier created before the Add. + ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + _, seriesSet, err = expandSeriesSet(ss) + testutil.Ok(t, err) + testutil.Equals(t, map[string][]sample{}, seriesSet) + + // Series exists but has no samples for querier created after Add. + ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + _, seriesSet, err = expandSeriesSet(ss) + testutil.Ok(t, err) + testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{}}, seriesSet) + + querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000) + testutil.Ok(t, err) + defer querierAfterCommit.Close() + + // Samples are returned for querier created after Commit. + ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + _, seriesSet, err = expandSeriesSet(ss) + testutil.Ok(t, err) + testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}, seriesSet) +} + // TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and // that the resulted segments includes the expected chunks data. func TestChunkWriter_ReadAfterWrite(t *testing.T) { diff --git a/tsdb/head.go b/tsdb/head.go index 726e70690..812f44d31 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -81,18 +81,20 @@ type Head struct { symMtx sync.RWMutex symbols map[string]struct{} - values map[string]stringset // label names to possible values + values map[string]stringset // Label names to possible values. deletedMtx sync.Mutex deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. - postings *index.MemPostings // postings lists for terms + postings *index.MemPostings // Postings lists for terms. tombstones *tombstones.MemTombstones + iso *isolation + cardinalityMutex sync.Mutex - cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec - lastPostingsStatsCall time.Duration // last posting stats call (PostingsCardinalityStats()) time for caching + cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec. + lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. } type headMetrics struct { @@ -105,8 +107,6 @@ type headMetrics struct { chunksCreated prometheus.Counter chunksRemoved prometheus.Counter gcDuration prometheus.Summary - minTime prometheus.GaugeFunc - maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary walCorruptionsTotal prometheus.Counter @@ -119,109 +119,93 @@ type headMetrics struct { } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { - m := &headMetrics{} - - m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_active_appenders", - Help: "Number of currently active appender transactions", - }) - m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_series", - Help: "Total number of series in the head block.", - }, func() float64 { - return float64(h.NumSeries()) - }) - m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_created_total", - Help: "Total number of series created in the head", - }) - m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_removed_total", - Help: "Total number of series removed in the head", - }) - m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_not_found_total", - Help: "Total number of requests for series that were not found.", - }) - m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_chunks", - Help: "Total number of chunks in the head block.", - }) - m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_chunks_created_total", - Help: "Total number of chunks created in the head", - }) - m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_chunks_removed_total", - Help: "Total number of chunks removed in the head", - }) - m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_head_gc_duration_seconds", - Help: "Runtime of garbage collection in the head block.", - Objectives: map[float64]float64{}, - }) - m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_max_time", - Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.", - }, func() float64 { - return float64(h.MaxTime()) - }) - m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_head_min_time", - Help: "Minimum time bound of the head block. The unit is decided by the library consumer.", - }, func() float64 { - return float64(h.MinTime()) - }) - m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_truncate_duration_seconds", - Help: "Duration of WAL truncation.", - Objectives: map[float64]float64{}, - }) - m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", - Help: "Total number of WAL corruptions.", - }) - m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_samples_appended_total", - Help: "Total number of appended samples.", - }) - m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_truncations_failed_total", - Help: "Total number of head truncations that failed.", - }) - m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_truncations_total", - Help: "Total number of head truncations attempted.", - }) - m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_checkpoint_deletions_failed_total", - Help: "Total number of checkpoint deletions that failed.", - }) - m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_checkpoint_deletions_total", - Help: "Total number of checkpoint deletions attempted.", - }) - m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_checkpoint_creations_failed_total", - Help: "Total number of checkpoint creations that failed.", - }) - m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_checkpoint_creations_total", - Help: "Total number of checkpoint creations attempted.", - }) + m := &headMetrics{ + activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_active_appenders", + Help: "Number of currently active appender transactions", + }), + series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_series", + Help: "Total number of series in the head block.", + }, func() float64 { + return float64(h.NumSeries()) + }), + seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_series_created_total", + Help: "Total number of series created in the head", + }), + seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_series_removed_total", + Help: "Total number of series removed in the head", + }), + seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_series_not_found_total", + Help: "Total number of requests for series that were not found.", + }), + chunks: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_chunks", + Help: "Total number of chunks in the head block.", + }), + chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_chunks_created_total", + Help: "Total number of chunks created in the head", + }), + chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_chunks_removed_total", + Help: "Total number of chunks removed in the head", + }), + gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_head_gc_duration_seconds", + Help: "Runtime of garbage collection in the head block.", + }), + walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "prometheus_tsdb_wal_truncate_duration_seconds", + Help: "Duration of WAL truncation.", + }), + walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }), + samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_samples_appended_total", + Help: "Total number of appended samples.", + }), + headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_truncations_failed_total", + Help: "Total number of head truncations that failed.", + }), + headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_truncations_total", + Help: "Total number of head truncations attempted.", + }), + checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }), + checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }), + checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }), + checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", + }), + } if r != nil { r.MustRegister( m.activeAppenders, + m.series, m.chunks, m.chunksCreated, m.chunksRemoved, - m.series, m.seriesCreated, m.seriesRemoved, m.seriesNotFound, - m.minTime, - m.maxTime, m.gcDuration, m.walTruncateDuration, m.walCorruptionsTotal, @@ -232,6 +216,34 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.checkpointDeleteTotal, m.checkpointCreationFail, m.checkpointCreationTotal, + // Metrics bound to functions and not needed in tests + // can be created and registered on the spot. + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_max_time", + Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.", + }, func() float64 { + return float64(h.MaxTime()) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_head_min_time", + Help: "Minimum time bound of the head block. The unit is decided by the library consumer.", + }, func() float64 { + return float64(h.MinTime()) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_isolation_low_watermark", + Help: "The lowest TSDB append ID that is still referenced.", + }, func() float64 { + return float64(h.iso.lowWatermark()) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_isolation_high_watermark", + Help: "The highest TSDB append ID that has been given out.", + }, func() float64 { + h.iso.appendMtx.Lock() + defer h.iso.appendMtx.Unlock() + return float64(h.iso.lastAppendID) + }), ) } return m @@ -279,6 +291,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), tombstones: tombstones.NewMemTombstones(), + iso: newIsolation(), deleted: map[uint64]int{}, } h.metrics = newHeadMetrics(h, r) @@ -314,8 +327,7 @@ func (h *Head) processWALSamples( } refSeries[s.Ref] = ms } - _, chunkCreated := ms.append(s.T, s.V) - if chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -564,7 +576,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } // Init loads data from the write ahead log and prepares the head for writes. -// It should be called before using an appender so that +// It should be called before using an appender so that it // limits the ingested samples to the head min valid time. func (h *Head) Init(minValidTime int64) error { h.minValidTime = minValidTime @@ -775,7 +787,7 @@ func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) { } func (h *RangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt), nil + return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil } func (h *RangeHead) Tombstones() (tombstones.Reader, error) { @@ -810,6 +822,8 @@ func (h *RangeHead) Meta() BlockMeta { type initAppender struct { app storage.Appender head *Head + + appendID, cleanupAppendIDsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -817,7 +831,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender() + a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow) return a.app.Add(lset, t, v) } @@ -847,24 +861,33 @@ func (a *initAppender) Rollback() error { func (h *Head) Appender() storage.Appender { h.metrics.activeAppenders.Inc() + appendID := h.iso.newAppendID() + cleanupAppendIDsBelow := h.iso.lowWatermark() + // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MaxInt64 { - return &initAppender{head: h} + return &initAppender{ + head: h, + appendID: appendID, + cleanupAppendIDsBelow: cleanupAppendIDsBelow, + } } - return h.appender() + return h.appender(appendID, cleanupAppendIDsBelow) } -func (h *Head) appender() *headAppender { +func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender { return &headAppender{ head: h, // Set the minimum valid time to whichever is greater the head min valid time or the compaction window. // This ensures that no samples will be added within the compaction window to avoid races. - minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2), - mint: math.MaxInt64, - maxt: math.MinInt64, - samples: h.getAppendBuffer(), - sampleSeries: h.getSeriesBuffer(), + minValidTime: max(atomic.LoadInt64(&h.minValidTime), h.MaxTime()-h.chunkRange/2), + mint: math.MaxInt64, + maxt: math.MinInt64, + samples: h.getAppendBuffer(), + sampleSeries: h.getSeriesBuffer(), + appendID: appendID, + cleanupAppendIDsBelow: cleanupAppendIDsBelow, } } @@ -922,6 +945,8 @@ type headAppender struct { series []record.RefSeries samples []record.RefSample sampleSeries []*memSeries + + appendID, cleanupAppendIDsBelow uint64 } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -1023,7 +1048,8 @@ func (a *headAppender) Commit() error { for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() - ok, chunkCreated := series.append(s.T, s.V) + ok, chunkCreated := series.append(s.T, s.V, a.appendID) + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() @@ -1038,6 +1064,7 @@ func (a *headAppender) Commit() error { a.head.metrics.samplesAppended.Add(float64(total)) a.head.updateMinMaxTime(a.mint, a.maxt) + a.head.iso.closeAppend(a.appendID) return nil } @@ -1048,14 +1075,16 @@ func (a *headAppender) Rollback() error { for i := range a.samples { series = a.sampleSeries[i] series.Lock() + series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() } a.head.putAppendBuffer(a.samples) + a.samples = nil + a.head.iso.closeAppend(a.appendID) // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. - a.samples = nil return a.log() } @@ -1182,14 +1211,19 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { // Chunks returns a ChunkReader against the block. func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64), nil + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil } -func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { +func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader { if hmin := h.MinTime(); hmin > mint { mint = hmin } - return &headChunkReader{head: h, mint: mint, maxt: maxt} + return &headChunkReader{ + head: h, + mint: mint, + maxt: maxt, + isoState: is, + } } // NumSeries returns the number of active series in the head. @@ -1240,9 +1274,11 @@ func (h *Head) Close() error { type headChunkReader struct { head *Head mint, maxt int64 + isoState *isolationState } func (h *headChunkReader) Close() error { + h.isoState.Close() return nil } @@ -1284,21 +1320,23 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Unlock() return &safeChunk{ - Chunk: c.chunk, - s: s, - cid: int(cid), + Chunk: c.chunk, + s: s, + cid: int(cid), + isoState: h.isoState, }, nil } type safeChunk struct { chunkenc.Chunk - s *memSeries - cid int + s *memSeries + cid int + isoState *isolationState } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, reuseIter) + it := c.s.iterator(c.cid, c.isoState, reuseIter) c.s.Unlock() return it } @@ -1698,6 +1736,8 @@ type memSeries struct { pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. + + txs *txRing } func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { @@ -1706,6 +1746,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, + txs: newTxRing(4), } return s } @@ -1805,8 +1846,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { return k } -// append adds the sample (t, v) to the series. -func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { +// append adds the sample (t, v) to the series. The caller also has to provide +// the appendID for isolation. +func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -1843,11 +1885,19 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} + s.txs.add(appendID) + return true, chunkCreated } -// computeChunkEndTime estimates the end timestamp based the beginning of a chunk, -// its current timestamp and the upper bound up to which we insert data. +// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after +// acquiring lock. +func (s *memSeries) cleanupAppendIDsBelow(bound uint64) { + s.txs.cleanupAppendIDsBelow(bound) +} + +// computeChunkEndTime estimates the end timestamp based the beginning of a +// chunk, its current timestamp and the upper bound up to which we insert data. // It assumes that the time range is 1/4 full. func computeChunkEndTime(start, cur, max int64) int64 { a := (max - start) / ((cur - start + 1) * 4) @@ -1857,32 +1907,93 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator { +func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator { c := s.chunk(id) - // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, - // which got then garbage collected before it got accessed. - // We must ensure to not garbage collect as long as any readers still hold a reference. + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a + // series's chunk, which got then garbage collected before it got + // accessed. We must ensure to not garbage collect as long as any + // readers still hold a reference. if c == nil { return chunkenc.NewNopIterator() } + ix := id - s.firstChunkID + + numSamples := c.chunk.NumSamples() + stopAfter := numSamples + + if isoState != nil { + totalSamples := 0 // Total samples in this series. + previousSamples := 0 // Samples before this chunk. + + for j, d := range s.chunks { + totalSamples += d.chunk.NumSamples() + if j < ix { + previousSamples += d.chunk.NumSamples() + } + } + + // Removing the extra transactionIDs that are relevant for samples that + // come after this chunk, from the total transactionIDs. + appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples)) + + // Iterate over the appendIDs, find the first one that the isolation state says not + // to return. + it := s.txs.iterator() + for index := 0; index < appendIDsToConsider; index++ { + appendID := it.At() + if appendID <= isoState.maxAppendID { // Easy check first. + if _, ok := isoState.incompleteAppends[appendID]; !ok { + it.Next() + continue + } + } + stopAfter = numSamples - (appendIDsToConsider - index) + if stopAfter < 0 { + stopAfter = 0 // Stopped in a previous chunk. + } + break + } + } + + if stopAfter == 0 { + return chunkenc.NewNopIterator() + } + if id-s.firstChunkID < len(s.chunks)-1 { - return c.chunk.Iterator(it) + if stopAfter == numSamples { + return c.chunk.Iterator(it) + } + if msIter, ok := it.(*stopIterator); ok { + msIter.Iterator = c.chunk.Iterator(msIter.Iterator) + msIter.i = -1 + msIter.stopAfter = stopAfter + return msIter + } + return &stopIterator{ + Iterator: c.chunk.Iterator(it), + i: -1, + stopAfter: stopAfter, + } } // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. if msIter, ok := it.(*memSafeIterator); ok { msIter.Iterator = c.chunk.Iterator(msIter.Iterator) msIter.i = -1 - msIter.total = c.chunk.NumSamples() + msIter.total = numSamples + msIter.stopAfter = stopAfter msIter.buf = s.sampleBuf return msIter } return &memSafeIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - total: c.chunk.NumSamples(), - buf: s.sampleBuf, + stopIterator: stopIterator{ + Iterator: c.chunk.Iterator(it), + i: -1, + stopAfter: stopAfter, + }, + total: numSamples, + buf: s.sampleBuf, } } @@ -1900,16 +2011,29 @@ func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { return mc.minTime <= maxt && mint <= mc.maxTime } -type memSafeIterator struct { +type stopIterator struct { chunkenc.Iterator - i int + i, stopAfter int +} + +func (it *stopIterator) Next() bool { + if it.i+1 >= it.stopAfter { + return false + } + it.i++ + return it.Iterator.Next() +} + +type memSafeIterator struct { + stopIterator + total int buf [4]sample } func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.total { + if it.i+1 >= it.stopAfter { return false } it.i++ diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f6b0d088d..0a274ca54 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -243,9 +243,9 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Ok(t, c.Err()) return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil))) - testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil))) + testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil))) }) } } @@ -296,7 +296,16 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_Truncate(t *testing.T) { - h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize) + dir, err := ioutil.TempDir("", "test_truncate") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + + w, err := wal.New(nil, nil, dir, false) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize) testutil.Ok(t, err) defer h.Close() @@ -308,18 +317,18 @@ func TestHead_Truncate(t *testing.T) { s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, } s2.chunks = []*memChunk{ - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, } s3.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } s4.chunks = []*memChunk{} @@ -329,12 +338,12 @@ func TestHead_Truncate(t *testing.T) { testutil.Ok(t, h.Truncate(2000)) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s1.ref).chunks) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s2.ref).chunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") @@ -375,7 +384,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i)) + ok, _ := s.append(int64(i), float64(i), 0) testutil.Assert(t, ok == true, "sample append failed") } @@ -397,11 +406,11 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil) + it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil) + it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } @@ -921,19 +930,19 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1) + ok, chunkCreated := s.append(998, 1, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2) + ok, chunkCreated = s.append(999, 2, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3) + ok, chunkCreated = s.append(1000, 3, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4) + ok, chunkCreated = s.append(1001, 4, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") @@ -943,7 +952,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i)) + ok, _ := s.append(1001+int64(i), float64(i), 0) testutil.Assert(t, ok, "append failed") } @@ -966,18 +975,18 @@ func TestGCChunkAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0) + ok, chunkCreated := s.append(0, 0, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999) + ok, chunkCreated = s.append(999, 999, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000) + ok, chunkCreated = s.append(1000, 1000, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999) + ok, chunkCreated = s.append(1999, 1999, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -993,7 +1002,7 @@ func TestGCChunkAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 1500) + cr := h.chunksRange(0, 1500, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) @@ -1018,18 +1027,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0) + ok, chunkCreated := s.append(0, 0, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999) + ok, chunkCreated = s.append(999, 999, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000) + ok, chunkCreated = s.append(1000, 1000, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999) + ok, chunkCreated = s.append(1999, 1999, 0) testutil.Assert(t, ok, "series append failed") testutil.Assert(t, !chunkCreated, "chunks was created") @@ -1045,7 +1054,7 @@ func TestGCSeriesAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 2000) + cr := h.chunksRange(0, 2000, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) @@ -1068,7 +1077,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { h.initTime(0) - app := h.appender() + app := h.appender(0, 0) lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1096,7 +1105,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { h.initTime(0) - app := h.appender() + app := h.appender(0, 0) lset := labels.FromStrings("a", "1") _, err = app.Add(lset, 2100, 1) testutil.Ok(t, err) @@ -1408,5 +1417,166 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { testutil.Equals(t, c.samplesCount, samplesCount, "test samples %d", i) q.Close() } - +} + +func TestMemSeriesIsolation(t *testing.T) { + // Put a series, select it. GC it and then access it. + hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + testutil.Ok(t, err) + defer hb.Close() + + lastValue := func(maxAppendID uint64) int { + idx, err := hb.Index(hb.MinTime(), hb.MaxTime()) + testutil.Ok(t, err) + + iso := hb.iso.State() + iso.maxAppendID = maxAppendID + + querier := &blockQuerier{ + mint: 0, + maxt: 10000, + index: idx, + chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso), + tombstones: tombstones.NewMemTombstones(), + } + + testutil.Ok(t, err) + defer querier.Close() + + ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + testutil.Ok(t, err) + + _, seriesSet, err := expandSeriesSet(ss) + testutil.Ok(t, err) + for _, series := range seriesSet { + return int(series[len(series)-1].v) + } + return -1 + } + + i := 0 + for ; i <= 1000; i++ { + var app storage.Appender + // To initialize bounds. + if hb.MinTime() == math.MaxInt64 { + app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0} + } else { + app = hb.appender(uint64(i), 0) + } + + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + } + + // Test simple cases in different chunks when no appendID cleanup has been performed. + testutil.Equals(t, 10, lastValue(10)) + testutil.Equals(t, 130, lastValue(130)) + testutil.Equals(t, 160, lastValue(160)) + testutil.Equals(t, 240, lastValue(240)) + testutil.Equals(t, 500, lastValue(500)) + testutil.Equals(t, 750, lastValue(750)) + testutil.Equals(t, 995, lastValue(995)) + testutil.Equals(t, 999, lastValue(999)) + + // Cleanup appendIDs below 500. + app := hb.appender(uint64(i), 500) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + i++ + + // We should not get queries with a maxAppendID below 500 after the cleanup, + // but they only take the remaining appendIDs into account. + testutil.Equals(t, 499, lastValue(10)) + testutil.Equals(t, 499, lastValue(130)) + testutil.Equals(t, 499, lastValue(160)) + testutil.Equals(t, 499, lastValue(240)) + testutil.Equals(t, 500, lastValue(500)) + testutil.Equals(t, 995, lastValue(995)) + testutil.Equals(t, 999, lastValue(999)) + + // Cleanup appendIDs below 1000, which means the sample buffer is + // the only thing with appendIDs. + app = hb.appender(uint64(i), 1000) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, 999, lastValue(998)) + testutil.Equals(t, 999, lastValue(999)) + testutil.Equals(t, 1000, lastValue(1000)) + testutil.Equals(t, 1001, lastValue(1001)) + testutil.Equals(t, 1002, lastValue(1002)) + testutil.Equals(t, 1002, lastValue(1003)) + + i++ + // Cleanup appendIDs below 1001, but with a rollback. + app = hb.appender(uint64(i), 1001) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + testutil.Ok(t, err) + testutil.Ok(t, app.Rollback()) + testutil.Equals(t, 1000, lastValue(999)) + testutil.Equals(t, 1000, lastValue(1000)) + testutil.Equals(t, 1001, lastValue(1001)) + testutil.Equals(t, 1002, lastValue(1002)) + testutil.Equals(t, 1002, lastValue(1003)) +} + +func TestIsolationRollback(t *testing.T) { + // Rollback after a failed append and test if the low watermark has progressed anyway. + hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + testutil.Ok(t, err) + defer hb.Close() + + app := hb.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) + + app = hb.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1) + testutil.Ok(t, err) + _, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) + testutil.NotOk(t, err) + testutil.Ok(t, app.Rollback()) + testutil.Equals(t, uint64(2), hb.iso.lowWatermark()) + + app = hb.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.") +} + +func TestIsolationLowWatermarkMonotonous(t *testing.T) { + hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize) + testutil.Ok(t, err) + defer hb.Close() + + app1 := hb.Appender() + _, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0) + testutil.Ok(t, err) + testutil.Ok(t, app1.Commit()) + testutil.Equals(t, uint64(1), hb.iso.lowWatermark()) + + app1 = hb.Appender() + _, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1) + testutil.Ok(t, err) + testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not commited yet.") + + app2 := hb.Appender() + _, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1) + testutil.Ok(t, err) + testutil.Ok(t, app2.Commit()) + testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not commited yet.") + + is := hb.iso.State() + testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.") + + testutil.Ok(t, app1.Commit()) + testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is commited, low watermark should stay at 2 because read is still ongoing.") + + is.Close() + testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.") } diff --git a/tsdb/isolation.go b/tsdb/isolation.go new file mode 100644 index 000000000..beaecfc48 --- /dev/null +++ b/tsdb/isolation.go @@ -0,0 +1,199 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "sync" +) + +// isolationState holds the isolation information. +type isolationState struct { + // We will ignore all appends above the max, or that are incomplete. + maxAppendID uint64 + incompleteAppends map[uint64]struct{} + lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID. + isolation *isolation + + // Doubly linked list of active reads. + next *isolationState + prev *isolationState +} + +// Close closes the state. +func (i *isolationState) Close() { + i.isolation.readMtx.Lock() + defer i.isolation.readMtx.Unlock() + i.next.prev = i.prev + i.prev.next = i.next +} + +// isolation is the global isolation state. +type isolation struct { + // Mutex for accessing lastAppendID and appendsOpen. + appendMtx sync.Mutex + // Each append is given an internal id. + lastAppendID uint64 + // Which appends are currently in progress. + appendsOpen map[uint64]struct{} + // Mutex for accessing readsOpen. + // If taking both appendMtx and readMtx, take appendMtx first. + readMtx sync.Mutex + // All current in use isolationStates. This is a doubly-linked list. + readsOpen *isolationState +} + +func newIsolation() *isolation { + isoState := &isolationState{} + isoState.next = isoState + isoState.prev = isoState + + return &isolation{ + appendsOpen: map[uint64]struct{}{}, + readsOpen: isoState, + } +} + +// lowWatermark returns the appendID below which we no longer need to track +// which appends were from which appendID. +func (i *isolation) lowWatermark() uint64 { + i.appendMtx.Lock() // Take appendMtx first. + defer i.appendMtx.Unlock() + i.readMtx.Lock() + defer i.readMtx.Unlock() + if i.readsOpen.prev != i.readsOpen { + return i.readsOpen.prev.lowWatermark + } + lw := i.lastAppendID + for k := range i.appendsOpen { + if k < lw { + lw = k + } + } + return lw +} + +// State returns an object used to control isolation +// between a query and appends. Must be closed when complete. +func (i *isolation) State() *isolationState { + i.appendMtx.Lock() // Take append mutex before read mutex. + defer i.appendMtx.Unlock() + isoState := &isolationState{ + maxAppendID: i.lastAppendID, + lowWatermark: i.lastAppendID, + incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)), + isolation: i, + } + for k := range i.appendsOpen { + isoState.incompleteAppends[k] = struct{}{} + if k < isoState.lowWatermark { + isoState.lowWatermark = k + } + } + + i.readMtx.Lock() + defer i.readMtx.Unlock() + isoState.prev = i.readsOpen + isoState.next = i.readsOpen.next + i.readsOpen.next.prev = isoState + i.readsOpen.next = isoState + return isoState +} + +// newAppendID increments the transaction counter and returns a new transaction ID. +func (i *isolation) newAppendID() uint64 { + i.appendMtx.Lock() + defer i.appendMtx.Unlock() + i.lastAppendID++ + i.appendsOpen[i.lastAppendID] = struct{}{} + return i.lastAppendID +} + +func (i *isolation) closeAppend(appendID uint64) { + i.appendMtx.Lock() + defer i.appendMtx.Unlock() + delete(i.appendsOpen, appendID) +} + +// The transactionID ring buffer. +type txRing struct { + txIDs []uint64 + txIDFirst int // Position of the first id in the ring. + txIDCount int // How many ids in the ring. +} + +func newTxRing(cap int) *txRing { + return &txRing{ + txIDs: make([]uint64, cap), + } +} + +func (txr *txRing) add(appendID uint64) { + if txr.txIDCount == len(txr.txIDs) { + // Ring buffer is full, expand by doubling. + newRing := make([]uint64, txr.txIDCount*2) + idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:]) + copy(newRing[idx:], txr.txIDs[:txr.txIDFirst]) + txr.txIDs = newRing + txr.txIDFirst = 0 + } + + txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID + txr.txIDCount++ +} + +func (txr *txRing) cleanupAppendIDsBelow(bound uint64) { + pos := txr.txIDFirst + + for txr.txIDCount > 0 { + if txr.txIDs[pos] < bound { + txr.txIDFirst++ + txr.txIDCount-- + } else { + break + } + + pos++ + if pos == len(txr.txIDs) { + pos = 0 + } + } + + txr.txIDFirst %= len(txr.txIDs) +} + +func (txr *txRing) iterator() *txRingIterator { + return &txRingIterator{ + pos: txr.txIDFirst, + ids: txr.txIDs, + } +} + +// txRingIterator lets you iterate over the ring. It doesn't terminate, +// it DOESN'T terminate. +type txRingIterator struct { + ids []uint64 + + pos int +} + +func (it *txRingIterator) At() uint64 { + return it.ids[it.pos] +} + +func (it *txRingIterator) Next() { + it.pos++ + if it.pos == len(it.ids) { + it.pos = 0 + } +}