Merge pull request #14694 from prometheus/ct-histogram

Histogram CT Zero ingestion
pull/14989/head
Arthur Silva Sens 2024-09-26 12:48:46 -03:00 committed by GitHub
commit d5f65cfce0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 532 additions and 36 deletions

View File

@ -1597,6 +1597,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View File

@ -55,6 +55,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
return 0, nil
}
func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}
@ -78,6 +82,7 @@ func equalFloatSamples(a, b floatSample) bool {
}
type histogramSample struct {
metric labels.Labels
t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
@ -146,7 +151,7 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t})
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
if a.next == nil {
return 0, nil
}
@ -154,6 +159,13 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
return a.next.AppendHistogram(ref, l, t, h, fh)
}
func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
return a.AppendHistogram(ref, l, ct, &histogram.Histogram{}, nil)
}
return a.AppendHistogram(ref, l, ct, nil, &histogram.FloatHistogram{})
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()

View File

@ -39,8 +39,10 @@ import (
"github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -858,6 +860,178 @@ func TestManagerCTZeroIngestion(t *testing.T) {
}
}
// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram,
// but in the form of dto.Histogram.
func generateTestHistogram(i int) *dto.Histogram {
helper := tsdbutil.GenerateTestHistogram(i)
h := &dto.Histogram{}
h.SampleCount = proto.Uint64(helper.Count)
h.SampleSum = proto.Float64(helper.Sum)
h.Schema = proto.Int32(helper.Schema)
h.ZeroThreshold = proto.Float64(helper.ZeroThreshold)
h.ZeroCount = proto.Uint64(helper.ZeroCount)
h.PositiveSpan = make([]*dto.BucketSpan, len(helper.PositiveSpans))
for i, span := range helper.PositiveSpans {
h.PositiveSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(span.Offset),
Length: proto.Uint32(span.Length),
}
}
h.PositiveDelta = helper.PositiveBuckets
h.NegativeSpan = make([]*dto.BucketSpan, len(helper.NegativeSpans))
for i, span := range helper.NegativeSpans {
h.NegativeSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(span.Offset),
Length: proto.Uint32(span.Length),
}
}
h.NegativeDelta = helper.NegativeBuckets
return h
}
func TestManagerCTZeroIngestionHistogram(t *testing.T) {
const mName = "expected_histogram"
for _, tc := range []struct {
name string
inputHistSample *dto.Histogram
enableCTZeroIngestion bool
}{
{
name: "disabled with CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
h.CreatedTimestamp = timestamppb.Now()
return h
}(),
enableCTZeroIngestion: false,
},
{
name: "enabled with CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
h.CreatedTimestamp = timestamppb.Now()
return h
}(),
enableCTZeroIngestion: true,
},
{
name: "enabled without CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
return h
}(),
enableCTZeroIngestion: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
EnableNativeHistogramsIngestion: true,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
nil,
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true // TODO(bwplotka): Kill or use?
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
ctrType := dto.MetricType_HISTOGRAM
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Histogram: tc.inputHistSample}},
}))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
var got []histogramSample
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
// and it's not worth waiting.
for _, h := range app.resultHistograms {
if h.metric.Get(model.MetricNameLabel) == mName {
got = append(got, h)
}
}
if len(app.resultHistograms) > 0 {
return nil
}
return fmt.Errorf("expected some histogram samples, got none")
}), "after 1 minute")
scrapeManager.Stop()
// Check for zero samples, assuming we only injected always one histogram sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
// Zero sample.
require.Equal(t, histogram.Histogram{}, *got[0].h)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
return
}
// Expect only one, valid sample.
require.Len(t, got, 1)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
})
}
}
func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
// Check that all metrics can be unregistered, allowing a second manager to be created.

View File

@ -1701,7 +1701,15 @@ loop:
} else {
if sl.enableCTZeroIngestion {
if ctMs := p.CreatedTimestamp(); ctMs != nil {
if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil)
} else {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh)
}
} else {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug.

View File

@ -2000,6 +2000,7 @@ metric: <
contentType: "application/vnd.google.protobuf",
histograms: []histogramSample{{
t: 1234568,
metric: labels.FromStrings("__name__", "test_histogram"),
h: &histogram.Histogram{
Count: 175,
ZeroCount: 2,
@ -2126,6 +2127,7 @@ metric: <
},
histograms: []histogramSample{{
t: 1234568,
metric: labels.FromStrings("__name__", "test_histogram"),
h: &histogram.Histogram{
Count: 175,
ZeroCount: 2,

View File

@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
return ref, nil
}
func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
ref, err := f.primary.UpdateMetadata(ref, l, m)
if err != nil {

View File

@ -51,6 +51,7 @@ var (
// behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now.
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
ErrCTNewerThanSample = fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
)
// SeriesRef is a generic series reference. In prometheus it is either a
@ -313,6 +314,20 @@ type HistogramAppender interface {
// pointer. AppendHistogram won't mutate the histogram, but in turn
// depends on the caller to not mutate it either.
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
// AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming
// sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be
// appended, for example when ct is too old, or when it would collide with
// incoming sample (sample has priority).
//
// AppendHistogramCTZeroSample has to be called before the corresponding histogram AppendHistogram.
// A series reference number is returned which can be used to modify the
// CT for the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AppendHistogramCTZeroSample() at any point.
//
// If the reference is 0 it must not be used for caching.
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
}
// MetadataUpdater provides an interface for associating metadata to stored series.

View File

@ -306,6 +306,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}
func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO: Implement
return 0, nil
}
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
// UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now.

View File

@ -915,6 +915,13 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, nil
}
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now.
// TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218).
return 0, nil
}
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
if m.updateMetadataErr != nil {
return 0, m.updateMetadataErr

View File

@ -972,6 +972,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
return storage.SeriesRef(series.ref), nil
}
func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO(bwplotka/arthursens): Wire metadata in the Agent's appender.
return 0, nil
}
func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in the Agent's appender.
return 0, nil

View File

@ -79,6 +79,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
return a.app.AppendHistogram(ref, l, t, h, fh)
}
func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.UpdateMetadata(ref, l, m)
@ -388,7 +398,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t {
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
@ -747,6 +757,107 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if !a.head.opts.EnableNativeHistograms.Load() {
return 0, storage.ErrNativeHistogramsDisabled
}
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
if err != nil {
return 0, err
}
if created {
switch {
case h != nil:
s.lastHistogramValue = &histogram.Histogram{}
case fh != nil:
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
}
switch {
case h != nil:
zeroHistogram := &histogram.Histogram{}
s.Lock()
// Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true)
if err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
return 0, storage.ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s.Unlock()
return 0, storage.ErrOutOfOrderCT
}
s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref,
T: ct,
H: zeroHistogram,
})
a.histogramSeries = append(a.histogramSeries, s)
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{}
s.Lock()
// Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples.
if err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
return 0, storage.ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s.Unlock()
return 0, storage.ErrOutOfOrderCT
}
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: ct,
FH: zeroFloatHistogram,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
}
if ct > a.maxt {
a.maxt = ct
}
return storage.SeriesRef(s.ref), nil
}
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {

View File

@ -6281,10 +6281,14 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing
require.ErrorIs(t, err, storage.NewDuplicateHistogramToFloatErr(2_000, 10.0))
}
func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
func TestHeadAppender_AppendCT(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
type appendableSamples struct {
ts int64
val float64
fSample float64
h *histogram.Histogram
fh *histogram.FloatHistogram
ct int64
}
for _, tc := range []struct {
@ -6293,20 +6297,10 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
expectedSamples []chunks.Sample
}{
{
name: "In order ct+normal sample",
name: "In order ct+normal sample/floatSample",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
},
},
{
name: "Consecutive appends with same ct ignore ct",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 1},
{ts: 100, fSample: 10, ct: 1},
{ts: 101, fSample: 10, ct: 1},
},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
@ -6315,10 +6309,86 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
},
},
{
name: "Consecutive appends with newer ct do not ignore ct",
name: "In order ct+normal sample/histogram",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 102, val: 10, ct: 101},
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "In order ct+normal sample/floathistogram",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with same ct ignore ct/floatSample",
appendableSamples: []appendableSamples{
{ts: 100, fSample: 10, ct: 1},
{ts: 101, fSample: 10, ct: 1},
},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
sample{t: 100, f: 10},
sample{t: 101, f: 10},
},
},
{
name: "Consecutive appends with same ct ignore ct/histogram",
appendableSamples: []appendableSamples{
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with same ct ignore ct/floathistogram",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with newer ct do not ignore ct/floatSample",
appendableSamples: []appendableSamples{
{ts: 100, fSample: 10, ct: 1},
{ts: 102, fSample: 10, ct: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
@ -6328,10 +6398,36 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
},
},
{
name: "CT equals to previous sample timestamp is ignored",
name: "Consecutive appends with newer ct do not ignore ct/histogram",
appendableSamples: []appendableSamples{
{ts: 100, val: 10, ct: 1},
{ts: 101, val: 10, ct: 100},
{ts: 100, h: testHistogram, ct: 1},
{ts: 102, h: testHistogram, ct: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.CounterReset}},
sample{t: 102, h: testHistogram},
},
},
{
name: "Consecutive appends with newer ct do not ignore ct/floathistogram",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 102, fh: testFloatHistogram, ct: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.CounterReset}},
sample{t: 102, fh: testFloatHistogram},
},
},
{
name: "CT equals to previous sample timestamp is ignored/floatSample",
appendableSamples: []appendableSamples{
{ts: 100, fSample: 10, ct: 1},
{ts: 101, fSample: 10, ct: 100},
},
expectedSamples: []chunks.Sample{
sample{t: 1, f: 0},
@ -6339,6 +6435,38 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
sample{t: 101, f: 10},
},
},
{
name: "CT equals to previous sample timestamp is ignored/histogram",
appendableSamples: []appendableSamples{
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 100},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "CT equals to previous sample timestamp is ignored/floathistogram",
appendableSamples: []appendableSamples{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 100},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
@ -6348,11 +6476,22 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
a := h.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
for _, sample := range tc.appendableSamples {
// Append float if it's a float test case
if sample.fSample != 0 {
_, err := a.AppendCTZeroSample(0, lbls, sample.ts, sample.ct)
require.NoError(t, err)
_, err = a.Append(0, lbls, sample.ts, sample.val)
_, err = a.Append(0, lbls, sample.ts, sample.fSample)
require.NoError(t, err)
}
// Append histograms if it's a histogram test case
if sample.h != nil || sample.fh != nil {
ref, err := a.AppendHistogramCTZeroSample(0, lbls, sample.ts, sample.ct, sample.h, sample.fh)
require.NoError(t, err)
_, err = a.AppendHistogram(ref, lbls, sample.ts, sample.h, sample.fh)
require.NoError(t, err)
}
}
require.NoError(t, a.Commit())
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)