From adaf4d2099c025f30f52b8737a9974571c1d8c76 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 3 May 2017 02:36:40 +0530 Subject: [PATCH 1/2] Handle duplicate & out of order values in same txn Add docs about not erroring out on exact dupes. Moved tests to require.* Signed-off-by: Goutham Veeramachaneni --- head.go | 7 +++- head_test.go | 103 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 72 insertions(+), 38 deletions(-) diff --git a/head.go b/head.go index b350af799..cd6cf0728 100644 --- a/head.go +++ b/head.go @@ -357,6 +357,9 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < c.maxTime { return ErrOutOfOrderSample } + + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) { return ErrAmendSample } @@ -634,8 +637,8 @@ func (s *memSeries) append(t int64, v float64) bool { c.minTime = t } else { c = s.head() - // Skip duplicate samples. - if c.maxTime == t && s.lastValue != v { + // Skip duplicate and out of order samples. + if c.maxTime >= t { return false } } diff --git a/head_test.go b/head_test.go index 63f6da6f7..cd8939fd1 100644 --- a/head_test.go +++ b/head_test.go @@ -92,24 +92,16 @@ func TestAmendDatapointCausesError(t *testing.T) { defer os.RemoveAll(tmpdir) hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - if err != nil { - t.Fatalf("Error creating head block: %s", err) - } + require.NoError(t, err, "Error creating head block") app := hb.Appender() _, err = app.Add(labels.Labels{}, 0, 0) - if err != nil { - t.Fatalf("Failed to add sample: %s", err) - } - if err = app.Commit(); err != nil { - t.Fatalf("Unexpected error committing appender: %s", err) - } + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") app = hb.Appender() _, err = app.Add(labels.Labels{}, 0, 1) - if err != ErrAmendSample { - t.Fatalf("Expected error amending sample, got: %s", err) - } + require.Equal(t, ErrAmendSample, err) } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { @@ -117,24 +109,16 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { defer os.RemoveAll(tmpdir) hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - if err != nil { - t.Fatalf("Error creating head block: %s", err) - } + require.NoError(t, err, "Error creating head block") app := hb.Appender() _, err = app.Add(labels.Labels{}, 0, math.NaN()) - if err != nil { - t.Fatalf("Failed to add sample: %s", err) - } - if err = app.Commit(); err != nil { - t.Fatalf("Unexpected error committing appender: %s", err) - } + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") app = hb.Appender() _, err = app.Add(labels.Labels{}, 0, math.NaN()) - if err != nil { - t.Fatalf("Unexpected error adding duplicate NaN sample, got: %s", err) - } + require.NoError(t, err) } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { @@ -142,22 +126,69 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { defer os.RemoveAll(tmpdir) hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - if err != nil { - t.Fatalf("Error creating head block: %s", err) - } + require.NoError(t, err, "Error creating head block") app := hb.Appender() _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) - if err != nil { - t.Fatalf("Failed to add sample: %s", err) - } - if err = app.Commit(); err != nil { - t.Fatalf("Unexpected error committing appender: %s", err) - } + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") app = hb.Appender() _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002)) - if err != ErrAmendSample { - t.Fatalf("Expected error amending sample, got: %s", err) - } + require.Equal(t, ErrAmendSample, err) +} + +func TestSkippingInvalidValuesInSameTxn(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + require.NoError(t, err) + + // Append AmendedValue + app := hb.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2) + require.NoError(t, err) + require.NoError(t, app.Commit()) + require.Equal(t, uint64(1), hb.Meta().Stats.NumSamples) + + // Make sure the right value is stored. + q := hb.Querier(0, 10) + ss := q.Select(labels.NewEqualMatcher("a", "b")) + require.True(t, ss.Next()) + it := ss.At().Iterator() + require.True(t, it.Next()) + ts, v := it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(1), v) + require.False(t, it.Next()) + require.False(t, ss.Next()) + require.NoError(t, q.Close()) + + // Append Out of Order Value + app = hb.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3) + require.NoError(t, err) + _, err = app.Add(labels.Labels{{"a", "b"}}, 7, 5) + require.NoError(t, err) + require.NoError(t, app.Commit()) + require.Equal(t, uint64(2), hb.Meta().Stats.NumSamples) + + q = hb.Querier(0, 10) + ss = q.Select(labels.NewEqualMatcher("a", "b")) + require.True(t, ss.Next()) + it = ss.At().Iterator() + require.True(t, it.Next()) + ts, v = it.At() + require.Equal(t, int64(0), ts) + require.Equal(t, float64(1), v) + require.True(t, it.Next()) + ts, v = it.At() + require.Equal(t, int64(10), ts) + require.Equal(t, float64(3), v) + require.False(t, it.Next()) + require.False(t, ss.Next()) + require.NoError(t, q.Close()) } From 0becfc8eb720d4a0e32a8211dd8361f96586847b Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 3 May 2017 16:04:50 +0530 Subject: [PATCH 2/2] Simplify checking by using readSeriesSet Signed-off-by: Goutham Veeramachaneni --- head_test.go | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/head_test.go b/head_test.go index cd8939fd1..38f68a1bd 100644 --- a/head_test.go +++ b/head_test.go @@ -145,7 +145,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err) - // Append AmendedValue + // Append AmendedValue. app := hb.Appender() _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1) require.NoError(t, err) @@ -157,17 +157,16 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { // Make sure the right value is stored. q := hb.Querier(0, 10) ss := q.Select(labels.NewEqualMatcher("a", "b")) - require.True(t, ss.Next()) - it := ss.At().Iterator() - require.True(t, it.Next()) - ts, v := it.At() - require.Equal(t, int64(0), ts) - require.Equal(t, float64(1), v) - require.False(t, it.Next()) - require.False(t, ss.Next()) + ssMap, err := readSeriesSet(ss) + require.NoError(t, err) + + require.Equal(t, map[string][]sample{ + labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, + }, ssMap) + require.NoError(t, q.Close()) - // Append Out of Order Value + // Append Out of Order Value. app = hb.Appender() _, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3) require.NoError(t, err) @@ -178,17 +177,11 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q = hb.Querier(0, 10) ss = q.Select(labels.NewEqualMatcher("a", "b")) - require.True(t, ss.Next()) - it = ss.At().Iterator() - require.True(t, it.Next()) - ts, v = it.At() - require.Equal(t, int64(0), ts) - require.Equal(t, float64(1), v) - require.True(t, it.Next()) - ts, v = it.At() - require.Equal(t, int64(10), ts) - require.Equal(t, float64(3), v) - require.False(t, it.Next()) - require.False(t, ss.Next()) + ssMap, err = readSeriesSet(ss) + require.NoError(t, err) + + require.Equal(t, map[string][]sample{ + labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, + }, ssMap) require.NoError(t, q.Close()) }