mirror of https://github.com/prometheus/prometheus
Merge pull request #128 from Gouthamve/expose-types
Expose types for easier interface implementations.pull/5805/head
commit
e2e2947ee8
4
block.go
4
block.go
|
@ -250,7 +250,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
|
||||||
ir := pb.indexr
|
ir := pb.indexr
|
||||||
|
|
||||||
// Choose only valid postings which have chunks in the time-range.
|
// Choose only valid postings which have chunks in the time-range.
|
||||||
stones := map[uint32]intervals{}
|
stones := map[uint32]Intervals{}
|
||||||
|
|
||||||
var lset labels.Labels
|
var lset labels.Labels
|
||||||
var chks []ChunkMeta
|
var chks []ChunkMeta
|
||||||
|
@ -272,7 +272,7 @@ Outer:
|
||||||
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
|
||||||
// Delete only until the current vlaues and not beyond.
|
// Delete only until the current vlaues and not beyond.
|
||||||
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
|
||||||
stones[p.At()] = intervals{{tmin, tmax}}
|
stones[p.At()] = Intervals{{tmin, tmax}}
|
||||||
continue Outer
|
continue Outer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
|
||||||
type deletedIterator struct {
|
type deletedIterator struct {
|
||||||
it chunks.Iterator
|
it chunks.Iterator
|
||||||
|
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *deletedIterator) At() (int64, float64) {
|
func (it *deletedIterator) At() (int64, float64) {
|
||||||
|
@ -76,7 +76,7 @@ Outer:
|
||||||
continue Outer
|
continue Outer
|
||||||
}
|
}
|
||||||
|
|
||||||
if ts > tr.maxt {
|
if ts > tr.Maxt {
|
||||||
it.intervals = it.intervals[1:]
|
it.intervals = it.intervals[1:]
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,18 +50,18 @@ func TestDeletedIterator(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
r intervals
|
r Intervals
|
||||||
}{
|
}{
|
||||||
{r: intervals{{1, 20}}},
|
{r: Intervals{{1, 20}}},
|
||||||
{r: intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
|
{r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
|
||||||
{r: intervals{{1, 10}, {12, 20}, {20, 30}}},
|
{r: Intervals{{1, 10}, {12, 20}, {20, 30}}},
|
||||||
{r: intervals{{1, 10}, {12, 23}, {25, 30}}},
|
{r: Intervals{{1, 10}, {12, 23}, {25, 30}}},
|
||||||
{r: intervals{{1, 23}, {12, 20}, {25, 30}}},
|
{r: Intervals{{1, 23}, {12, 20}, {25, 30}}},
|
||||||
{r: intervals{{1, 23}, {12, 20}, {25, 3000}}},
|
{r: Intervals{{1, 23}, {12, 20}, {25, 3000}}},
|
||||||
{r: intervals{{0, 2000}}},
|
{r: Intervals{{0, 2000}}},
|
||||||
{r: intervals{{500, 2000}}},
|
{r: Intervals{{500, 2000}}},
|
||||||
{r: intervals{{0, 200}}},
|
{r: Intervals{{0, 200}}},
|
||||||
{r: intervals{{1000, 20000}}},
|
{r: Intervals{{1000, 20000}}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
|
@ -72,7 +72,7 @@ func TestDeletedIterator(t *testing.T) {
|
||||||
i++
|
i++
|
||||||
for _, tr := range ranges {
|
for _, tr := range ranges {
|
||||||
if tr.inBounds(i) {
|
if tr.inBounds(i) {
|
||||||
i = tr.maxt + 1
|
i = tr.Maxt + 1
|
||||||
ranges = ranges[1:]
|
ranges = ranges[1:]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ func TestDeletedIterator(t *testing.T) {
|
||||||
i++
|
i++
|
||||||
for _, tr := range ranges {
|
for _, tr := range ranges {
|
||||||
if tr.inBounds(i) {
|
if tr.inBounds(i) {
|
||||||
i = tr.maxt + 1
|
i = tr.Maxt + 1
|
||||||
ranges = ranges[1:]
|
ranges = ranges[1:]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
14
compact.go
14
compact.go
|
@ -458,7 +458,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
// Re-encode the chunk to not have deleted values.
|
// Re-encode the chunk to not have deleted values.
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
if intervalOverlap(dranges[0].mint, dranges[len(dranges)-1].maxt, chk.MinTime, chk.MaxTime) {
|
if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
|
||||||
newChunk := chunks.NewXORChunk()
|
newChunk := chunks.NewXORChunk()
|
||||||
app, err := newChunk.Appender()
|
app, err := newChunk.Appender()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -542,7 +542,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
|
||||||
|
|
||||||
type compactionSet interface {
|
type compactionSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []ChunkMeta, intervals)
|
At() (labels.Labels, []ChunkMeta, Intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,7 +555,7 @@ type compactionSeriesSet struct {
|
||||||
|
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []ChunkMeta
|
c []ChunkMeta
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -582,7 +582,7 @@ func (c *compactionSeriesSet) Next() bool {
|
||||||
if len(c.intervals) > 0 {
|
if len(c.intervals) > 0 {
|
||||||
chks := make([]ChunkMeta, 0, len(c.c))
|
chks := make([]ChunkMeta, 0, len(c.c))
|
||||||
for _, chk := range c.c {
|
for _, chk := range c.c {
|
||||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
|
||||||
chks = append(chks, chk)
|
chks = append(chks, chk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -609,7 +609,7 @@ func (c *compactionSeriesSet) Err() error {
|
||||||
return c.p.Err()
|
return c.p.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) {
|
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return c.l, c.c, c.intervals
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -619,7 +619,7 @@ type compactionMerger struct {
|
||||||
aok, bok bool
|
aok, bok bool
|
||||||
l labels.Labels
|
l labels.Labels
|
||||||
c []ChunkMeta
|
c []ChunkMeta
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionSeries struct {
|
type compactionSeries struct {
|
||||||
|
@ -700,7 +700,7 @@ func (c *compactionMerger) Err() error {
|
||||||
return c.b.Err()
|
return c.b.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, intervals) {
|
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return c.l, c.c, c.intervals
|
return c.l, c.c, c.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,11 +161,11 @@ func TestDeleteSimple(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
remaint []int64
|
remaint []int64
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
intervals: intervals{{1, 3}, {4, 7}},
|
intervals: Intervals{{1, 3}, {4, 7}},
|
||||||
remaint: []int64{0, 8, 9},
|
remaint: []int64{0, 8, 9},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ Outer:
|
||||||
// TODO(gouthamve): Reset the tombstones somehow.
|
// TODO(gouthamve): Reset the tombstones somehow.
|
||||||
// Delete the ranges.
|
// Delete the ranges.
|
||||||
for _, r := range c.intervals {
|
for _, r := range c.intervals {
|
||||||
require.NoError(t, db.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b")))
|
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare the result.
|
// Compare the result.
|
||||||
|
|
2
head.go
2
head.go
|
@ -249,7 +249,7 @@ Outer:
|
||||||
|
|
||||||
// Delete only until the current values and not beyond.
|
// Delete only until the current values and not beyond.
|
||||||
tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime)
|
tmin, tmax := clampInterval(mint, maxt, h.series[ref].chunks[0].minTime, h.series[ref].head().maxTime)
|
||||||
stones = append(stones, Stone{ref, intervals{{tmin, tmax}}})
|
stones = append(stones, Stone{ref, Intervals{{tmin, tmax}}})
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Err() != nil {
|
if p.Err() != nil {
|
||||||
|
|
26
head_test.go
26
head_test.go
|
@ -399,27 +399,27 @@ func TestHBDeleteSimple(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
remaint []int64
|
remaint []int64
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
intervals: intervals{{0, 3}},
|
intervals: Intervals{{0, 3}},
|
||||||
remaint: []int64{4, 5, 6, 7, 8, 9},
|
remaint: []int64{4, 5, 6, 7, 8, 9},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
intervals: intervals{{1, 3}},
|
intervals: Intervals{{1, 3}},
|
||||||
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
|
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
intervals: intervals{{1, 3}, {4, 7}},
|
intervals: Intervals{{1, 3}, {4, 7}},
|
||||||
remaint: []int64{0, 8, 9},
|
remaint: []int64{0, 8, 9},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
intervals: intervals{{1, 3}, {4, 700}},
|
intervals: Intervals{{1, 3}, {4, 700}},
|
||||||
remaint: []int64{0},
|
remaint: []int64{0},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
intervals: intervals{{0, 9}},
|
intervals: Intervals{{0, 9}},
|
||||||
remaint: []int64{},
|
remaint: []int64{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -431,7 +431,7 @@ Outer:
|
||||||
|
|
||||||
// Delete the ranges.
|
// Delete the ranges.
|
||||||
for _, r := range c.intervals {
|
for _, r := range c.intervals {
|
||||||
require.NoError(t, hb.Delete(r.mint, r.maxt, labels.NewEqualMatcher("a", "b")))
|
require.NoError(t, hb.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare the result.
|
// Compare the result.
|
||||||
|
@ -596,18 +596,18 @@ func TestDelete_e2e(t *testing.T) {
|
||||||
// Delete a time-range from each-selector.
|
// Delete a time-range from each-selector.
|
||||||
dels := []struct {
|
dels := []struct {
|
||||||
ms []labels.Matcher
|
ms []labels.Matcher
|
||||||
drange intervals
|
drange Intervals
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")},
|
ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")},
|
||||||
drange: intervals{{300, 500}, {600, 670}},
|
drange: Intervals{{300, 500}, {600, 670}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ms: []labels.Matcher{
|
ms: []labels.Matcher{
|
||||||
labels.NewEqualMatcher("a", "b"),
|
labels.NewEqualMatcher("a", "b"),
|
||||||
labels.NewEqualMatcher("job", "prom-k8s"),
|
labels.NewEqualMatcher("job", "prom-k8s"),
|
||||||
},
|
},
|
||||||
drange: intervals{{300, 500}, {100, 670}},
|
drange: Intervals{{300, 500}, {100, 670}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ms: []labels.Matcher{
|
ms: []labels.Matcher{
|
||||||
|
@ -615,7 +615,7 @@ func TestDelete_e2e(t *testing.T) {
|
||||||
labels.NewEqualMatcher("instance", "localhost:9090"),
|
labels.NewEqualMatcher("instance", "localhost:9090"),
|
||||||
labels.NewEqualMatcher("job", "prometheus"),
|
labels.NewEqualMatcher("job", "prometheus"),
|
||||||
},
|
},
|
||||||
drange: intervals{{300, 400}, {100, 6700}},
|
drange: Intervals{{300, 400}, {100, 6700}},
|
||||||
},
|
},
|
||||||
// TODO: Add Regexp Matchers.
|
// TODO: Add Regexp Matchers.
|
||||||
}
|
}
|
||||||
|
@ -626,7 +626,7 @@ func TestDelete_e2e(t *testing.T) {
|
||||||
hb.tombstones = newEmptyTombstoneReader()
|
hb.tombstones = newEmptyTombstoneReader()
|
||||||
|
|
||||||
for _, r := range del.drange {
|
for _, r := range del.drange {
|
||||||
require.NoError(t, hb.Delete(r.mint, r.maxt, del.ms...))
|
require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
|
||||||
}
|
}
|
||||||
|
|
||||||
matched := labels.Slice{}
|
matched := labels.Slice{}
|
||||||
|
@ -716,7 +716,7 @@ func boundedSamples(full []sample, mint, maxt int64) []sample {
|
||||||
return full
|
return full
|
||||||
}
|
}
|
||||||
|
|
||||||
func deletedSamples(full []sample, dranges intervals) []sample {
|
func deletedSamples(full []sample, dranges Intervals) []sample {
|
||||||
ds := make([]sample, 0, len(full))
|
ds := make([]sample, 0, len(full))
|
||||||
Outer:
|
Outer:
|
||||||
for _, s := range full {
|
for _, s := range full {
|
||||||
|
|
37
querier.go
37
querier.go
|
@ -128,6 +128,18 @@ func (q *querier) Close() error {
|
||||||
return merr.Err()
|
return merr.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBlockQuerier returns a queries against the readers.
|
||||||
|
func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier {
|
||||||
|
return &blockQuerier{
|
||||||
|
index: ir,
|
||||||
|
chunks: cr,
|
||||||
|
tombstones: tr,
|
||||||
|
|
||||||
|
mint: mint,
|
||||||
|
maxt: maxt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// blockQuerier provides querying access to a single block database.
|
// blockQuerier provides querying access to a single block database.
|
||||||
type blockQuerier struct {
|
type blockQuerier struct {
|
||||||
index IndexReader
|
index IndexReader
|
||||||
|
@ -348,6 +360,13 @@ type mergedSeriesSet struct {
|
||||||
adone, bdone bool
|
adone, bdone bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewMergedSeriesSet takes two series sets as a single series set. The input series sets
|
||||||
|
// must be sorted and sequential in time, i.e. if they have the same label set,
|
||||||
|
// the datapoints of a must be before the datapoints of b.
|
||||||
|
func NewMergedSeriesSet(a, b SeriesSet) SeriesSet {
|
||||||
|
return newMergedSeriesSet(a, b)
|
||||||
|
}
|
||||||
|
|
||||||
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
|
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
|
||||||
s := &mergedSeriesSet{a: a, b: b}
|
s := &mergedSeriesSet{a: a, b: b}
|
||||||
// Initialize first elements of both sets as Next() needs
|
// Initialize first elements of both sets as Next() needs
|
||||||
|
@ -403,7 +422,7 @@ func (s *mergedSeriesSet) Next() bool {
|
||||||
|
|
||||||
type chunkSeriesSet interface {
|
type chunkSeriesSet interface {
|
||||||
Next() bool
|
Next() bool
|
||||||
At() (labels.Labels, []ChunkMeta, intervals)
|
At() (labels.Labels, []ChunkMeta, Intervals)
|
||||||
Err() error
|
Err() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,11 +436,11 @@ type baseChunkSeries struct {
|
||||||
|
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
chks []ChunkMeta
|
chks []ChunkMeta
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
|
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,7 +474,7 @@ Outer:
|
||||||
// Only those chunks that are not entirely deleted.
|
// Only those chunks that are not entirely deleted.
|
||||||
chks := make([]ChunkMeta, 0, len(s.chks))
|
chks := make([]ChunkMeta, 0, len(s.chks))
|
||||||
for _, chk := range s.chks {
|
for _, chk := range s.chks {
|
||||||
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
|
||||||
chks = append(chks, chk)
|
chks = append(chks, chk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -482,10 +501,10 @@ type populatedChunkSeries struct {
|
||||||
err error
|
err error
|
||||||
chks []ChunkMeta
|
chks []ChunkMeta
|
||||||
lset labels.Labels
|
lset labels.Labels
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
|
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return s.lset, s.chks, s.intervals
|
return s.lset, s.chks, s.intervals
|
||||||
}
|
}
|
||||||
func (s *populatedChunkSeries) Err() error { return s.err }
|
func (s *populatedChunkSeries) Err() error { return s.err }
|
||||||
|
@ -570,7 +589,7 @@ type chunkSeries struct {
|
||||||
|
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
|
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *chunkSeries) Labels() labels.Labels {
|
func (s *chunkSeries) Labels() labels.Labels {
|
||||||
|
@ -676,10 +695,10 @@ type chunkSeriesIterator struct {
|
||||||
|
|
||||||
maxt, mint int64
|
maxt, mint int64
|
||||||
|
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChunkSeriesIterator(cs []ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
|
func newChunkSeriesIterator(cs []ChunkMeta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
|
||||||
it := cs[0].Chunk.Iterator()
|
it := cs[0].Chunk.Iterator()
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
it = &deletedIterator{it: it, intervals: dranges}
|
it = &deletedIterator{it: it, intervals: dranges}
|
||||||
|
|
|
@ -555,10 +555,10 @@ func TestBlockQuerierDelete(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
tombstones: newTombstoneReader(
|
tombstones: newTombstoneReader(
|
||||||
map[uint32]intervals{
|
map[uint32]Intervals{
|
||||||
1: intervals{{1, 3}},
|
1: Intervals{{1, 3}},
|
||||||
2: intervals{{1, 3}, {6, 10}},
|
2: Intervals{{1, 3}, {6, 10}},
|
||||||
3: intervals{{6, 10}},
|
3: Intervals{{6, 10}},
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -1205,7 +1205,7 @@ func (m *mockChunkSeriesSet) Next() bool {
|
||||||
return m.i < len(m.l)
|
return m.i < len(m.l)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) {
|
func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
|
||||||
return m.l[m.i], m.cm[m.i], nil
|
return m.l[m.i], m.cm[m.i], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,8 +61,8 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||||
for _, itv := range v {
|
for _, itv := range v {
|
||||||
buf.reset()
|
buf.reset()
|
||||||
buf.putUvarint32(k)
|
buf.putUvarint32(k)
|
||||||
buf.putVarint64(itv.mint)
|
buf.putVarint64(itv.Mint)
|
||||||
buf.putVarint64(itv.maxt)
|
buf.putVarint64(itv.Maxt)
|
||||||
|
|
||||||
_, err = mw.Write(buf.get())
|
_, err = mw.Write(buf.get())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -83,12 +83,12 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
|
||||||
// that is deleted.
|
// that is deleted.
|
||||||
type Stone struct {
|
type Stone struct {
|
||||||
ref uint32
|
ref uint32
|
||||||
intervals intervals
|
intervals Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
// TombstoneReader is the iterator over tombstones.
|
// TombstoneReader is the iterator over tombstones.
|
||||||
type TombstoneReader interface {
|
type TombstoneReader interface {
|
||||||
Get(ref uint32) intervals
|
Get(ref uint32) Intervals
|
||||||
}
|
}
|
||||||
|
|
||||||
func readTombstones(dir string) (tombstoneReader, error) {
|
func readTombstones(dir string) (tombstoneReader, error) {
|
||||||
|
@ -131,41 +131,42 @@ func readTombstones(dir string) (tombstoneReader, error) {
|
||||||
return nil, d.err()
|
return nil, d.err()
|
||||||
}
|
}
|
||||||
|
|
||||||
stonesMap.add(k, interval{mint, maxt})
|
stonesMap.add(k, Interval{mint, maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
return newTombstoneReader(stonesMap), nil
|
return newTombstoneReader(stonesMap), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type tombstoneReader map[uint32]intervals
|
type tombstoneReader map[uint32]Intervals
|
||||||
|
|
||||||
func newTombstoneReader(ts map[uint32]intervals) tombstoneReader {
|
func newTombstoneReader(ts map[uint32]Intervals) tombstoneReader {
|
||||||
return tombstoneReader(ts)
|
return tombstoneReader(ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEmptyTombstoneReader() tombstoneReader {
|
func newEmptyTombstoneReader() tombstoneReader {
|
||||||
return tombstoneReader(make(map[uint32]intervals))
|
return tombstoneReader(make(map[uint32]Intervals))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t tombstoneReader) Get(ref uint32) intervals {
|
func (t tombstoneReader) Get(ref uint32) Intervals {
|
||||||
return t[ref]
|
return t[ref]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t tombstoneReader) add(ref uint32, itv interval) {
|
func (t tombstoneReader) add(ref uint32, itv Interval) {
|
||||||
t[ref] = t[ref].add(itv)
|
t[ref] = t[ref].add(itv)
|
||||||
}
|
}
|
||||||
|
|
||||||
type interval struct {
|
// Interval represents a single time-interval.
|
||||||
mint, maxt int64
|
type Interval struct {
|
||||||
|
Mint, Maxt int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr interval) inBounds(t int64) bool {
|
func (tr Interval) inBounds(t int64) bool {
|
||||||
return t >= tr.mint && t <= tr.maxt
|
return t >= tr.Mint && t <= tr.Maxt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr interval) isSubrange(dranges intervals) bool {
|
func (tr Interval) isSubrange(dranges Intervals) bool {
|
||||||
for _, r := range dranges {
|
for _, r := range dranges {
|
||||||
if r.inBounds(tr.mint) && r.inBounds(tr.maxt) {
|
if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,43 +174,44 @@ func (tr interval) isSubrange(dranges intervals) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
type intervals []interval
|
// Intervals represents a set of increasing and non-overlapping time-intervals.
|
||||||
|
type Intervals []Interval
|
||||||
|
|
||||||
// This adds the new time-range to the existing ones.
|
// This adds the new time-range to the existing ones.
|
||||||
// The existing ones must be sorted.
|
// The existing ones must be sorted.
|
||||||
func (itvs intervals) add(n interval) intervals {
|
func (itvs Intervals) add(n Interval) Intervals {
|
||||||
for i, r := range itvs {
|
for i, r := range itvs {
|
||||||
// TODO(gouthamve): Make this codepath easier to digest.
|
// TODO(gouthamve): Make this codepath easier to digest.
|
||||||
if r.inBounds(n.mint-1) || r.inBounds(n.mint) {
|
if r.inBounds(n.Mint-1) || r.inBounds(n.Mint) {
|
||||||
if n.maxt > r.maxt {
|
if n.Maxt > r.Maxt {
|
||||||
itvs[i].maxt = n.maxt
|
itvs[i].Maxt = n.Maxt
|
||||||
}
|
}
|
||||||
|
|
||||||
j := 0
|
j := 0
|
||||||
for _, r2 := range itvs[i+1:] {
|
for _, r2 := range itvs[i+1:] {
|
||||||
if n.maxt < r2.mint {
|
if n.Maxt < r2.Mint {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
if j != 0 {
|
if j != 0 {
|
||||||
if itvs[i+j].maxt > n.maxt {
|
if itvs[i+j].Maxt > n.Maxt {
|
||||||
itvs[i].maxt = itvs[i+j].maxt
|
itvs[i].Maxt = itvs[i+j].Maxt
|
||||||
}
|
}
|
||||||
itvs = append(itvs[:i+1], itvs[i+j+1:]...)
|
itvs = append(itvs[:i+1], itvs[i+j+1:]...)
|
||||||
}
|
}
|
||||||
return itvs
|
return itvs
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.inBounds(n.maxt+1) || r.inBounds(n.maxt) {
|
if r.inBounds(n.Maxt+1) || r.inBounds(n.Maxt) {
|
||||||
if n.mint < r.maxt {
|
if n.Mint < r.Maxt {
|
||||||
itvs[i].mint = n.mint
|
itvs[i].Mint = n.Mint
|
||||||
}
|
}
|
||||||
return itvs
|
return itvs
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.mint < r.mint {
|
if n.Mint < r.Mint {
|
||||||
newRange := make(intervals, i, len(itvs[:i])+1)
|
newRange := make(Intervals, i, len(itvs[:i])+1)
|
||||||
copy(newRange, itvs[:i])
|
copy(newRange, itvs[:i])
|
||||||
newRange = append(newRange, n)
|
newRange = append(newRange, n)
|
||||||
newRange = append(newRange, itvs[i:]...)
|
newRange = append(newRange, itvs[i:]...)
|
||||||
|
|
|
@ -29,15 +29,15 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
|
||||||
|
|
||||||
ref := uint32(0)
|
ref := uint32(0)
|
||||||
|
|
||||||
stones := make(map[uint32]intervals)
|
stones := make(map[uint32]Intervals)
|
||||||
// Generate the tombstones.
|
// Generate the tombstones.
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
ref += uint32(rand.Int31n(10)) + 1
|
ref += uint32(rand.Int31n(10)) + 1
|
||||||
numRanges := rand.Intn(5) + 1
|
numRanges := rand.Intn(5) + 1
|
||||||
dranges := make(intervals, 0, numRanges)
|
dranges := make(Intervals, 0, numRanges)
|
||||||
mint := rand.Int63n(time.Now().UnixNano())
|
mint := rand.Int63n(time.Now().UnixNano())
|
||||||
for j := 0; j < numRanges; j++ {
|
for j := 0; j < numRanges; j++ {
|
||||||
dranges = dranges.add(interval{mint, mint + rand.Int63n(1000)})
|
dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)})
|
||||||
mint += rand.Int63n(1000) + 1
|
mint += rand.Int63n(1000) + 1
|
||||||
}
|
}
|
||||||
stones[ref] = dranges
|
stones[ref] = dranges
|
||||||
|
@ -54,64 +54,64 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
|
||||||
|
|
||||||
func TestAddingNewIntervals(t *testing.T) {
|
func TestAddingNewIntervals(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
exist intervals
|
exist Intervals
|
||||||
new interval
|
new Interval
|
||||||
|
|
||||||
exp intervals
|
exp Intervals
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
new: interval{1, 2},
|
new: Interval{1, 2},
|
||||||
exp: intervals{{1, 2}},
|
exp: Intervals{{1, 2}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 2}},
|
exist: Intervals{{1, 2}},
|
||||||
new: interval{1, 2},
|
new: Interval{1, 2},
|
||||||
exp: intervals{{1, 2}},
|
exp: Intervals{{1, 2}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 4}, {6, 6}},
|
exist: Intervals{{1, 4}, {6, 6}},
|
||||||
new: interval{5, 6},
|
new: Interval{5, 6},
|
||||||
exp: intervals{{1, 6}},
|
exp: Intervals{{1, 6}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{21, 23},
|
new: Interval{21, 23},
|
||||||
exp: intervals{{1, 10}, {12, 23}, {25, 30}},
|
exp: Intervals{{1, 10}, {12, 23}, {25, 30}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 2}, {3, 5}, {7, 7}},
|
exist: Intervals{{1, 2}, {3, 5}, {7, 7}},
|
||||||
new: interval{6, 7},
|
new: Interval{6, 7},
|
||||||
exp: intervals{{1, 2}, {3, 7}},
|
exp: Intervals{{1, 2}, {3, 7}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{21, 25},
|
new: Interval{21, 25},
|
||||||
exp: intervals{{1, 10}, {12, 30}},
|
exp: Intervals{{1, 10}, {12, 30}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{18, 23},
|
new: Interval{18, 23},
|
||||||
exp: intervals{{1, 10}, {12, 23}, {25, 30}},
|
exp: Intervals{{1, 10}, {12, 23}, {25, 30}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{9, 23},
|
new: Interval{9, 23},
|
||||||
exp: intervals{{1, 23}, {25, 30}},
|
exp: Intervals{{1, 23}, {25, 30}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{9, 230},
|
new: Interval{9, 230},
|
||||||
exp: intervals{{1, 230}},
|
exp: Intervals{{1, 230}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{5, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{5, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{1, 4},
|
new: Interval{1, 4},
|
||||||
exp: intervals{{1, 10}, {12, 20}, {25, 30}},
|
exp: Intervals{{1, 10}, {12, 20}, {25, 30}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
exist: intervals{{5, 10}, {12, 20}, {25, 30}},
|
exist: Intervals{{5, 10}, {12, 20}, {25, 30}},
|
||||||
new: interval{11, 14},
|
new: Interval{11, 14},
|
||||||
exp: intervals{{5, 20}, {25, 30}},
|
exp: Intervals{{5, 20}, {25, 30}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
6
wal.go
6
wal.go
|
@ -491,8 +491,8 @@ func (w *SegmentWAL) encodeDeletes(stones []Stone) error {
|
||||||
for _, itv := range s.intervals {
|
for _, itv := range s.intervals {
|
||||||
eb.reset()
|
eb.reset()
|
||||||
eb.putUvarint32(s.ref)
|
eb.putUvarint32(s.ref)
|
||||||
eb.putVarint64(itv.mint)
|
eb.putVarint64(itv.Mint)
|
||||||
eb.putVarint64(itv.maxt)
|
eb.putVarint64(itv.Maxt)
|
||||||
buf = append(buf, eb.get()...)
|
buf = append(buf, eb.get()...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -794,7 +794,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
|
||||||
for db.len() > 0 {
|
for db.len() > 0 {
|
||||||
var s Stone
|
var s Stone
|
||||||
s.ref = db.uvarint32()
|
s.ref = db.uvarint32()
|
||||||
s.intervals = intervals{{db.varint64(), db.varint64()}}
|
s.intervals = Intervals{{db.varint64(), db.varint64()}}
|
||||||
if db.err() != nil {
|
if db.err() != nil {
|
||||||
return nil, db.err()
|
return nil, db.err()
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,7 +220,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
||||||
|
|
||||||
for j := 0; j < i*20; j++ {
|
for j := 0; j < i*20; j++ {
|
||||||
ts := rand.Int63()
|
ts := rand.Int63()
|
||||||
stones = append(stones, Stone{rand.Uint32(), intervals{{ts, ts + rand.Int63n(10000)}}})
|
stones = append(stones, Stone{rand.Uint32(), Intervals{{ts, ts + rand.Int63n(10000)}}})
|
||||||
}
|
}
|
||||||
|
|
||||||
lbls := series[i : i+stepSize]
|
lbls := series[i : i+stepSize]
|
||||||
|
|
Loading…
Reference in New Issue