Browse Source

federate: Fix PeekBack usage

In most cases, there is no sample at `maxt`, so `PeekBack` has to be
used. So far, `PeekBack` did not return a float histogram, and we
disregarded even any returned normal histogram. This fixes both, and
also tweaks the unit test to discover the problem (by using an earlier
timestamp than "now" for the samples in the TSDB).

Signed-off-by: beorn7 <beorn@grafana.com>
pull/11830/head
beorn7 2 years ago committed by Ganesh Vernekar
parent
commit
d121db7a65
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
  1. 6
      storage/buffer.go
  2. 2
      storage/buffer_test.go
  3. 12
      web/federate.go
  4. 8
      web/federate_test.go

6
storage/buffer.go

@ -68,9 +68,11 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool {
// PeekBack returns the nth previous element of the iterator. If there is none buffered, // PeekBack returns the nth previous element of the iterator. If there is none buffered,
// ok is false. // ok is false.
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) { func (b *BufferedSeriesIterator) PeekBack(n int) (
t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, ok bool,
) {
s, ok := b.buf.nthLast(n) s, ok := b.buf.nthLast(n)
return s.t, s.v, s.h, ok return s.t, s.v, s.h, s.fh, ok
} }
// Buffer returns an iterator over the buffered data. Invalidates previously // Buffer returns an iterator over the buffered data. Invalidates previously

2
storage/buffer_test.go

@ -107,7 +107,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.Equal(t, ev, v, "value mismatch") require.Equal(t, ev, v, "value mismatch")
} }
prevSampleEq := func(ets int64, ev float64, eok bool) { prevSampleEq := func(ets int64, ev float64, eok bool) {
ts, v, _, ok := it.PeekBack(1) ts, v, _, _, ok := it.PeekBack(1)
require.Equal(t, eok, ok, "exist mismatch") require.Equal(t, eok, ok, "exist mismatch")
require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch") require.Equal(t, ev, v, "value mismatch")

12
web/federate.go

@ -116,7 +116,8 @@ Loop:
var ( var (
t int64 t int64
v float64 v float64
h *histogram.FloatHistogram h *histogram.Histogram
fh *histogram.FloatHistogram
ok bool ok bool
) )
valueType := it.Seek(maxt) valueType := it.Seek(maxt)
@ -124,12 +125,15 @@ Loop:
case chunkenc.ValFloat: case chunkenc.ValFloat:
t, v = it.At() t, v = it.At()
case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
t, h = it.AtFloatHistogram() t, fh = it.AtFloatHistogram()
default: default:
t, v, _, ok = it.PeekBack(1) t, v, h, fh, ok = it.PeekBack(1)
if !ok { if !ok {
continue Loop continue Loop
} }
if h != nil {
fh = h.ToFloat()
}
} }
// The exposition formats do not support stale markers, so drop them. This // The exposition formats do not support stale markers, so drop them. This
// is good enough for staleness handling of federated data, as the // is good enough for staleness handling of federated data, as the
@ -141,7 +145,7 @@ Loop:
vec = append(vec, promql.Sample{ vec = append(vec, promql.Sample{
Metric: s.Labels(), Metric: s.Labels(),
Point: promql.Point{T: t, V: v, H: h}, Point: promql.Point{T: t, V: v, H: fh},
}) })
} }
if ws := set.Warnings(); len(ws) > 0 { if ws := set.Warnings(); len(ws) > 0 {

8
web/federate_test.go

@ -340,16 +340,16 @@ func TestFederationWithNativeHistograms(t *testing.T) {
l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i)) l := labels.FromStrings("__name__", "test_metric", "foo", fmt.Sprintf("%d", i))
expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i)) expL := labels.FromStrings("__name__", "test_metric", "instance", "", "foo", fmt.Sprintf("%d", i))
if i%3 == 0 { if i%3 == 0 {
_, err = app.Append(0, l, 101*60*1000, float64(i*100)) _, err = app.Append(0, l, 100*60*1000, float64(i*100))
expVec = append(expVec, promql.Sample{ expVec = append(expVec, promql.Sample{
Point: promql.Point{T: 101 * 60 * 1000, V: float64(i * 100)}, Point: promql.Point{T: 100 * 60 * 1000, V: float64(i * 100)},
Metric: expL, Metric: expL,
}) })
} else { } else {
hist.ZeroCount++ hist.ZeroCount++
_, err = app.AppendHistogram(0, l, 101*60*1000, hist.Copy(), nil) _, err = app.AppendHistogram(0, l, 100*60*1000, hist.Copy(), nil)
expVec = append(expVec, promql.Sample{ expVec = append(expVec, promql.Sample{
Point: promql.Point{T: 101 * 60 * 1000, H: hist.ToFloat()}, Point: promql.Point{T: 100 * 60 * 1000, H: hist.ToFloat()},
Metric: expL, Metric: expL,
}) })
} }

Loading…
Cancel
Save