Merge pull request #6841 from prometheus/beorn7/isolation

Port isolation from old TSDB PR
pull/6901/head
Björn Rabenstein 2020-02-28 17:48:04 +01:00 committed by GitHub
commit d137cddd12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 819 additions and 183 deletions

View File

@ -87,6 +87,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
}
testutil.Ok(t, it.Err())
if len(samples) == 0 {
continue
}
name := series.Labels().String()
result[name] = samples
}
@ -1276,20 +1280,29 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
lres, err := expandSeriesSet(ss)
lres, _, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, c.series, lres)
}
}
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, error) {
result := []labels.Labels{}
// expandSeriesSet returns the raw labels in the order they are retrieved from
// the series set and the samples keyed by Labels().String().
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, error) {
resultLabels := []labels.Labels{}
resultSamples := map[string][]sample{}
for ss.Next() {
result = append(result, ss.At().Labels())
series := ss.At()
samples := []sample{}
it := series.Iterator()
for it.Next() {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
}
return result, ss.Err()
resultLabels = append(resultLabels, series.Labels())
resultSamples[series.Labels().String()] = samples
}
return resultLabels, resultSamples, ss.Err()
}
func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
@ -2477,6 +2490,136 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
testutil.Equals(t, 1000.0, sum)
}
func TestDBCannotSeePartialCommits(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
stop := make(chan struct{})
firstInsert := make(chan struct{})
// Insert data in batches.
go func() {
iter := 0
for {
app := db.Appender()
for j := 0; j < 100; j++ {
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
testutil.Ok(t, err)
}
err = app.Commit()
testutil.Ok(t, err)
if iter == 0 {
close(firstInsert)
}
iter++
select {
case <-stop:
return
default:
}
}
}()
<-firstInsert
// This is a race condition, so do a few tests to tickle it.
// Usually most will fail.
inconsistencies := 0
for i := 0; i < 10; i++ {
func() {
querier, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
values := map[float64]struct{}{}
for _, series := range seriesSet {
values[series[len(series)-1].v] = struct{}{}
}
if len(values) != 1 {
inconsistencies++
}
}()
}
stop <- struct{}{}
testutil.Equals(t, 0, inconsistencies, "Some queries saw inconsistent results.")
}
func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
testutil.Ok(t, err)
defer db.Close()
querierBeforeAdd, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierBeforeAdd.Close()
app := db.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierAfterAddButBeforeCommit.Close()
// None of the queriers should return anything after the Add but before the commit.
ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
// This commit is after the queriers are created, so should not be returned.
err = app.Commit()
testutil.Ok(t, err)
// Nothing returned for querier created before the Add.
ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet)
// Series exists but has no samples for querier created after Add.
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{}}, seriesSet)
querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000)
testutil.Ok(t, err)
defer querierAfterCommit.Close()
// Samples are returned for querier created after Commit.
ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}, seriesSet)
}
// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
// that the resulted segments includes the expected chunks data.
func TestChunkWriter_ReadAfterWrite(t *testing.T) {

View File

@ -81,18 +81,20 @@ type Head struct {
symMtx sync.RWMutex
symbols map[string]struct{}
values map[string]stringset // label names to possible values
values map[string]stringset // Label names to possible values.
deletedMtx sync.Mutex
deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until.
postings *index.MemPostings // postings lists for terms
postings *index.MemPostings // Postings lists for terms.
tombstones *tombstones.MemTombstones
iso *isolation
cardinalityMutex sync.Mutex
cardinalityCache *index.PostingsStats // posting stats cache which will expire after 30sec
lastPostingsStatsCall time.Duration // last posting stats call (PostingsCardinalityStats()) time for caching
cardinalityCache *index.PostingsStats // Posting stats cache which will expire after 30sec.
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
}
type headMetrics struct {
@ -105,8 +107,6 @@ type headMetrics struct {
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
minTime prometheus.GaugeFunc
maxTime prometheus.GaugeFunc
samplesAppended prometheus.Counter
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
@ -119,109 +119,93 @@ type headMetrics struct {
}
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m := &headMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
m := &headMetrics{
activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
}),
series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
})
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
})
m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_removed_total",
Help: "Total number of series removed in the head",
})
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
}),
seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_not_found_total",
Help: "Total number of requests for series that were not found.",
})
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
}),
chunks: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_chunks",
Help: "Total number of chunks in the head block.",
})
m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{
}),
chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_created_total",
Help: "Total number of chunks created in the head",
})
m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{
}),
chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_chunks_removed_total",
Help: "Total number of chunks removed in the head",
})
m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
}),
gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_head_gc_duration_seconds",
Help: "Runtime of garbage collection in the head block.",
Objectives: map[float64]float64{},
})
m.maxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MaxTime())
})
m.minTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MinTime())
})
m.walTruncateDuration = prometheus.NewSummary(prometheus.SummaryOpts{
}),
walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Name: "prometheus_tsdb_wal_truncate_duration_seconds",
Help: "Duration of WAL truncation.",
Objectives: map[float64]float64{},
})
m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_corruptions_total",
Help: "Total number of WAL corruptions.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
}),
samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
})
m.headTruncateFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
})
m.headTruncateTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_total",
Help: "Total number of head truncations attempted.",
})
m.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
})
m.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
})
m.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
})
m.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{
}),
checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
})
}),
}
if r != nil {
r.MustRegister(
m.activeAppenders,
m.series,
m.chunks,
m.chunksCreated,
m.chunksRemoved,
m.series,
m.seriesCreated,
m.seriesRemoved,
m.seriesNotFound,
m.minTime,
m.maxTime,
m.gcDuration,
m.walTruncateDuration,
m.walCorruptionsTotal,
@ -232,6 +216,34 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
m.checkpointDeleteTotal,
m.checkpointCreationFail,
m.checkpointCreationTotal,
// Metrics bound to functions and not needed in tests
// can be created and registered on the spot.
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_max_time",
Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MaxTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_min_time",
Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
}, func() float64 {
return float64(h.MinTime())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_low_watermark",
Help: "The lowest TSDB append ID that is still referenced.",
}, func() float64 {
return float64(h.iso.lowWatermark())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_isolation_high_watermark",
Help: "The highest TSDB append ID that has been given out.",
}, func() float64 {
h.iso.appendMtx.Lock()
defer h.iso.appendMtx.Unlock()
return float64(h.iso.lastAppendID)
}),
)
}
return m
@ -279,6 +291,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
symbols: map[string]struct{}{},
postings: index.NewUnorderedMemPostings(),
tombstones: tombstones.NewMemTombstones(),
iso: newIsolation(),
deleted: map[uint64]int{},
}
h.metrics = newHeadMetrics(h, r)
@ -314,8 +327,7 @@ func (h *Head) processWALSamples(
}
refSeries[s.Ref] = ms
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
if _, chunkCreated := ms.append(s.T, s.V, 0); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
@ -564,7 +576,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
}
// Init loads data from the write ahead log and prepares the head for writes.
// It should be called before using an appender so that
// It should be called before using an appender so that it
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime = minValidTime
@ -775,7 +787,7 @@ func (h *RangeHead) Index(mint, maxt int64) (IndexReader, error) {
}
func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt), nil
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil
}
func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
@ -810,6 +822,8 @@ func (h *RangeHead) Meta() BlockMeta {
type initAppender struct {
app storage.Appender
head *Head
appendID, cleanupAppendIDsBelow uint64
}
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -817,7 +831,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
return a.app.Add(lset, t, v)
}
a.head.initTime(t)
a.app = a.head.appender()
a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow)
return a.app.Add(lset, t, v)
}
@ -847,15 +861,22 @@ func (a *initAppender) Rollback() error {
func (h *Head) Appender() storage.Appender {
h.metrics.activeAppenders.Inc()
appendID := h.iso.newAppendID()
cleanupAppendIDsBelow := h.iso.lowWatermark()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 {
return &initAppender{head: h}
return &initAppender{
head: h,
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
return h.appender()
}
return h.appender(appendID, cleanupAppendIDsBelow)
}
func (h *Head) appender() *headAppender {
func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender {
return &headAppender{
head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaction window.
@ -865,6 +886,8 @@ func (h *Head) appender() *headAppender {
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
}
@ -922,6 +945,8 @@ type headAppender struct {
series []record.RefSeries
samples []record.RefSample
sampleSeries []*memSeries
appendID, cleanupAppendIDsBelow uint64
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -1023,7 +1048,8 @@ func (a *headAppender) Commit() error {
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
ok, chunkCreated := series.append(s.T, s.V)
ok, chunkCreated := series.append(s.T, s.V, a.appendID)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1038,6 +1064,7 @@ func (a *headAppender) Commit() error {
a.head.metrics.samplesAppended.Add(float64(total))
a.head.updateMinMaxTime(a.mint, a.maxt)
a.head.iso.closeAppend(a.appendID)
return nil
}
@ -1048,14 +1075,16 @@ func (a *headAppender) Rollback() error {
for i := range a.samples {
series = a.sampleSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
a.head.putAppendBuffer(a.samples)
a.samples = nil
a.head.iso.closeAppend(a.appendID)
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
a.samples = nil
return a.log()
}
@ -1182,14 +1211,19 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64), nil
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil
}
func (h *Head) chunksRange(mint, maxt int64) *headChunkReader {
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) *headChunkReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{head: h, mint: mint, maxt: maxt}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}
}
// NumSeries returns the number of active series in the head.
@ -1240,9 +1274,11 @@ func (h *Head) Close() error {
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
}
func (h *headChunkReader) Close() error {
h.isoState.Close()
return nil
}
@ -1287,6 +1323,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
Chunk: c.chunk,
s: s,
cid: int(cid),
isoState: h.isoState,
}, nil
}
@ -1294,11 +1331,12 @@ type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid int
isoState *isolationState
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, reuseIter)
it := c.s.iterator(c.cid, c.isoState, reuseIter)
c.s.Unlock()
return it
}
@ -1698,6 +1736,8 @@ type memSeries struct {
pendingCommit bool // Whether there are samples waiting to be committed to this series.
app chunkenc.Appender // Current appender for the chunk.
txs *txRing
}
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
@ -1706,6 +1746,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
txs: newTxRing(4),
}
return s
}
@ -1805,8 +1846,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
return k
}
// append adds the sample (t, v) to the series.
func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation.
func (s *memSeries) append(t int64, v float64, appendID uint64) (success, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -1843,11 +1885,19 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
s.txs.add(appendID)
return true, chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
// its current timestamp and the upper bound up to which we insert data.
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
// acquiring lock.
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
s.txs.cleanupAppendIDsBelow(bound)
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
// chunk, its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/4 full.
func computeChunkEndTime(start, cur, max int64) int64 {
a := (max - start) / ((cur - start + 1) * 4)
@ -1857,31 +1907,92 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}
func (s *memSeries) iterator(id int, it chunkenc.Iterator) chunkenc.Iterator {
func (s *memSeries) iterator(id int, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed.
// We must ensure to not garbage collect as long as any readers still hold a reference.
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if c == nil {
return chunkenc.NewNopIterator()
}
ix := id - s.firstChunkID
numSamples := c.chunk.NumSamples()
stopAfter := numSamples
if isoState != nil {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.chunks {
totalSamples += d.chunk.NumSamples()
if j < ix {
previousSamples += d.chunk.NumSamples()
}
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := 0; index < appendIDsToConsider; index++ {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
stopAfter = numSamples - (appendIDsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if id-s.firstChunkID < len(s.chunks)-1 {
if stopAfter == numSamples {
return c.chunk.Iterator(it)
}
if msIter, ok := it.(*stopIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.stopAfter = stopAfter
return msIter
}
return &stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = c.chunk.NumSamples()
msIter.total = numSamples
msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
stopIterator: stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
total: c.chunk.NumSamples(),
stopAfter: stopAfter,
},
total: numSamples,
buf: s.sampleBuf,
}
}
@ -1900,16 +2011,29 @@ func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}
type memSafeIterator struct {
type stopIterator struct {
chunkenc.Iterator
i int
i, stopAfter int
}
func (it *stopIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
return it.Iterator.Next()
}
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
}
func (it *memSafeIterator) Next() bool {
if it.i+1 >= it.total {
if it.i+1 >= it.stopAfter {
return false
}
it.i++

View File

@ -243,9 +243,9 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, c.Err())
return x
}
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil)))
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, nil)))
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, nil)))
})
}
}
@ -296,7 +296,16 @@ func TestHead_WALMultiRef(t *testing.T) {
}
func TestHead_Truncate(t *testing.T) {
h, err := NewHead(nil, nil, nil, 10000, DefaultStripeSize)
dir, err := ioutil.TempDir("", "test_truncate")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.New(nil, nil, dir, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, w, 10000, DefaultStripeSize)
testutil.Ok(t, err)
defer h.Close()
@ -308,18 +317,18 @@ func TestHead_Truncate(t *testing.T) {
s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
s1.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
{minTime: 2000, maxTime: 2999},
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
}
s2.chunks = []*memChunk{
{minTime: 1000, maxTime: 1999},
{minTime: 2000, maxTime: 2999},
{minTime: 3000, maxTime: 3999},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
}
s3.chunks = []*memChunk{
{minTime: 0, maxTime: 999},
{minTime: 1000, maxTime: 1999},
{minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()},
{minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()},
}
s4.chunks = []*memChunk{}
@ -329,12 +338,12 @@ func TestHead_Truncate(t *testing.T) {
testutil.Ok(t, h.Truncate(2000))
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s1.ref).chunks)
testutil.Equals(t, []*memChunk{
{minTime: 2000, maxTime: 2999},
{minTime: 3000, maxTime: 3999},
{minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()},
{minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()},
}, h.series.getByID(s2.ref).chunks)
testutil.Assert(t, h.series.getByID(s3.ref) == nil, "")
@ -375,7 +384,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i))
ok, _ := s.append(int64(i), float64(i), 0)
testutil.Assert(t, ok == true, "sample append failed")
}
@ -397,11 +406,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
// Validate that the series' sample buffer is applied correctly to the last chunk
// after truncation.
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil)
it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil, nil)
_, ok := it1.(*memSafeIterator)
testutil.Assert(t, ok == true, "")
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil)
it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil, nil)
_, ok = it2.(*memSafeIterator)
testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
}
@ -921,19 +930,19 @@ func TestMemSeries_append(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1)
ok, chunkCreated := s.append(998, 1, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2)
ok, chunkCreated = s.append(999, 2, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3)
ok, chunkCreated = s.append(1000, 3, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4)
ok, chunkCreated = s.append(1001, 4, 0)
testutil.Assert(t, ok, "append failed")
testutil.Assert(t, !chunkCreated, "second sample should use same chunk")
@ -943,7 +952,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i))
ok, _ := s.append(1001+int64(i), float64(i), 0)
testutil.Assert(t, ok, "append failed")
}
@ -966,18 +975,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0)
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999)
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000)
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999)
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
@ -993,7 +1002,7 @@ func TestGCChunkAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 1500)
cr := h.chunksRange(0, 1500, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
@ -1018,18 +1027,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0)
ok, chunkCreated := s.append(0, 0, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999)
ok, chunkCreated = s.append(999, 999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000)
ok, chunkCreated = s.append(1000, 1000, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999)
ok, chunkCreated = s.append(1999, 1999, 0)
testutil.Assert(t, ok, "series append failed")
testutil.Assert(t, !chunkCreated, "chunks was created")
@ -1045,7 +1054,7 @@ func TestGCSeriesAccess(t *testing.T) {
}}, lset)
testutil.Equals(t, 2, len(chunks))
cr := h.chunksRange(0, 2000)
cr := h.chunksRange(0, 2000, nil)
_, err = cr.Chunk(chunks[0].Ref)
testutil.Ok(t, err)
_, err = cr.Chunk(chunks[1].Ref)
@ -1068,7 +1077,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h.initTime(0)
app := h.appender()
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
@ -1096,7 +1105,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h.initTime(0)
app := h.appender()
app := h.appender(0, 0)
lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err)
@ -1408,5 +1417,166 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) {
testutil.Equals(t, c.samplesCount, samplesCount, "test samples %d", i)
q.Close()
}
}
func TestMemSeriesIsolation(t *testing.T) {
// Put a series, select it. GC it and then access it.
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
lastValue := func(maxAppendID uint64) int {
idx, err := hb.Index(hb.MinTime(), hb.MaxTime())
testutil.Ok(t, err)
iso := hb.iso.State()
iso.maxAppendID = maxAppendID
querier := &blockQuerier{
mint: 0,
maxt: 10000,
index: idx,
chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso),
tombstones: tombstones.NewMemTombstones(),
}
testutil.Ok(t, err)
defer querier.Close()
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err)
for _, series := range seriesSet {
return int(series[len(series)-1].v)
}
return -1
}
i := 0
for ; i <= 1000; i++ {
var app storage.Appender
// To initialize bounds.
if hb.MinTime() == math.MaxInt64 {
app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0}
} else {
app = hb.appender(uint64(i), 0)
}
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
}
// Test simple cases in different chunks when no appendID cleanup has been performed.
testutil.Equals(t, 10, lastValue(10))
testutil.Equals(t, 130, lastValue(130))
testutil.Equals(t, 160, lastValue(160))
testutil.Equals(t, 240, lastValue(240))
testutil.Equals(t, 500, lastValue(500))
testutil.Equals(t, 750, lastValue(750))
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup appendIDs below 500.
app := hb.appender(uint64(i), 500)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
i++
// We should not get queries with a maxAppendID below 500 after the cleanup,
// but they only take the remaining appendIDs into account.
testutil.Equals(t, 499, lastValue(10))
testutil.Equals(t, 499, lastValue(130))
testutil.Equals(t, 499, lastValue(160))
testutil.Equals(t, 499, lastValue(240))
testutil.Equals(t, 500, lastValue(500))
testutil.Equals(t, 995, lastValue(995))
testutil.Equals(t, 999, lastValue(999))
// Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs.
app = hb.appender(uint64(i), 1000)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, 999, lastValue(998))
testutil.Equals(t, 999, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
i++
// Cleanup appendIDs below 1001, but with a rollback.
app = hb.appender(uint64(i), 1001)
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, 1000, lastValue(999))
testutil.Equals(t, 1000, lastValue(1000))
testutil.Equals(t, 1001, lastValue(1001))
testutil.Equals(t, 1002, lastValue(1002))
testutil.Equals(t, 1002, lastValue(1003))
}
func TestIsolationRollback(t *testing.T) {
// Rollback after a failed append and test if the low watermark has progressed anyway.
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
_, err = app.Add(labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
testutil.NotOk(t, err)
testutil.Ok(t, app.Rollback())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 3, 3)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.")
}
func TestIsolationLowWatermarkMonotonous(t *testing.T) {
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app1 := hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(1), hb.iso.lowWatermark())
app1 = hb.Appender()
_, err = app1.Add(labels.FromStrings("foo", "bar"), 1, 1)
testutil.Ok(t, err)
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not commited yet.")
app2 := hb.Appender()
_, err = app2.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app2.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not commited yet.")
is := hb.iso.State()
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.")
testutil.Ok(t, app1.Commit())
testutil.Equals(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is commited, low watermark should stay at 2 because read is still ongoing.")
is.Close()
testutil.Equals(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.")
}

199
tsdb/isolation.go Normal file
View File

@ -0,0 +1,199 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"sync"
)
// isolationState holds the isolation information.
type isolationState struct {
// We will ignore all appends above the max, or that are incomplete.
maxAppendID uint64
incompleteAppends map[uint64]struct{}
lowWatermark uint64 // Lowest of incompleteAppends/maxAppendID.
isolation *isolation
// Doubly linked list of active reads.
next *isolationState
prev *isolationState
}
// Close closes the state.
func (i *isolationState) Close() {
i.isolation.readMtx.Lock()
defer i.isolation.readMtx.Unlock()
i.next.prev = i.prev
i.prev.next = i.next
}
// isolation is the global isolation state.
type isolation struct {
// Mutex for accessing lastAppendID and appendsOpen.
appendMtx sync.Mutex
// Each append is given an internal id.
lastAppendID uint64
// Which appends are currently in progress.
appendsOpen map[uint64]struct{}
// Mutex for accessing readsOpen.
// If taking both appendMtx and readMtx, take appendMtx first.
readMtx sync.Mutex
// All current in use isolationStates. This is a doubly-linked list.
readsOpen *isolationState
}
func newIsolation() *isolation {
isoState := &isolationState{}
isoState.next = isoState
isoState.prev = isoState
return &isolation{
appendsOpen: map[uint64]struct{}{},
readsOpen: isoState,
}
}
// lowWatermark returns the appendID below which we no longer need to track
// which appends were from which appendID.
func (i *isolation) lowWatermark() uint64 {
i.appendMtx.Lock() // Take appendMtx first.
defer i.appendMtx.Unlock()
i.readMtx.Lock()
defer i.readMtx.Unlock()
if i.readsOpen.prev != i.readsOpen {
return i.readsOpen.prev.lowWatermark
}
lw := i.lastAppendID
for k := range i.appendsOpen {
if k < lw {
lw = k
}
}
return lw
}
// State returns an object used to control isolation
// between a query and appends. Must be closed when complete.
func (i *isolation) State() *isolationState {
i.appendMtx.Lock() // Take append mutex before read mutex.
defer i.appendMtx.Unlock()
isoState := &isolationState{
maxAppendID: i.lastAppendID,
lowWatermark: i.lastAppendID,
incompleteAppends: make(map[uint64]struct{}, len(i.appendsOpen)),
isolation: i,
}
for k := range i.appendsOpen {
isoState.incompleteAppends[k] = struct{}{}
if k < isoState.lowWatermark {
isoState.lowWatermark = k
}
}
i.readMtx.Lock()
defer i.readMtx.Unlock()
isoState.prev = i.readsOpen
isoState.next = i.readsOpen.next
i.readsOpen.next.prev = isoState
i.readsOpen.next = isoState
return isoState
}
// newAppendID increments the transaction counter and returns a new transaction ID.
func (i *isolation) newAppendID() uint64 {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
i.lastAppendID++
i.appendsOpen[i.lastAppendID] = struct{}{}
return i.lastAppendID
}
func (i *isolation) closeAppend(appendID uint64) {
i.appendMtx.Lock()
defer i.appendMtx.Unlock()
delete(i.appendsOpen, appendID)
}
// The transactionID ring buffer.
type txRing struct {
txIDs []uint64
txIDFirst int // Position of the first id in the ring.
txIDCount int // How many ids in the ring.
}
func newTxRing(cap int) *txRing {
return &txRing{
txIDs: make([]uint64, cap),
}
}
func (txr *txRing) add(appendID uint64) {
if txr.txIDCount == len(txr.txIDs) {
// Ring buffer is full, expand by doubling.
newRing := make([]uint64, txr.txIDCount*2)
idx := copy(newRing[:], txr.txIDs[txr.txIDFirst:])
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
txr.txIDs = newRing
txr.txIDFirst = 0
}
txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
txr.txIDCount++
}
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
pos := txr.txIDFirst
for txr.txIDCount > 0 {
if txr.txIDs[pos] < bound {
txr.txIDFirst++
txr.txIDCount--
} else {
break
}
pos++
if pos == len(txr.txIDs) {
pos = 0
}
}
txr.txIDFirst %= len(txr.txIDs)
}
func (txr *txRing) iterator() *txRingIterator {
return &txRingIterator{
pos: txr.txIDFirst,
ids: txr.txIDs,
}
}
// txRingIterator lets you iterate over the ring. It doesn't terminate,
// it DOESN'T terminate.
type txRingIterator struct {
ids []uint64
pos int
}
func (it *txRingIterator) At() uint64 {
return it.ids[it.pos]
}
func (it *txRingIterator) Next() {
it.pos++
if it.pos == len(it.ids) {
it.pos = 0
}
}