Histogram CT Zero ingestion

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
pull/14694/head
Arthur Silva Sens 4 months ago
parent faf5ba29ba
commit 6bd9b1a7cc
No known key found for this signature in database

@ -1597,6 +1597,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels
return 0, tsdb.ErrNotReady return 0, tsdb.ErrNotReady
} }
func (n notReadyAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady return 0, tsdb.ErrNotReady
} }

@ -55,6 +55,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
return 0, nil return 0, nil
} }
func (a nopAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil return 0, nil
} }
@ -78,9 +82,10 @@ func equalFloatSamples(a, b floatSample) bool {
} }
type histogramSample struct { type histogramSample struct {
t int64 metric labels.Labels
h *histogram.Histogram t int64
fh *histogram.FloatHistogram h *histogram.Histogram
fh *histogram.FloatHistogram
} }
type collectResultAppendable struct { type collectResultAppendable struct {
@ -146,7 +151,7 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t}) a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
if a.next == nil { if a.next == nil {
return 0, nil return 0, nil
} }
@ -154,6 +159,13 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
return a.next.AppendHistogram(ref, l, t, h, fh) return a.next.AppendHistogram(ref, l, t, h, fh)
} }
func (a *collectResultAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
return a.AppendHistogram(ref, l, ct, &histogram.Histogram{}, nil)
}
return a.AppendHistogram(ref, l, ct, nil, &histogram.FloatHistogram{})
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()

@ -39,8 +39,10 @@ import (
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
_ "github.com/prometheus/prometheus/discovery/file" _ "github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/runutil" "github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -858,6 +860,178 @@ func TestManagerCTZeroIngestion(t *testing.T) {
} }
} }
// generateTestHistogram generates the same thing as tsdbutil.GenerateTestHistogram,
// but in the form of dto.Histogram.
func generateTestHistogram(i int) *dto.Histogram {
helper := tsdbutil.GenerateTestHistogram(i)
h := &dto.Histogram{}
h.SampleCount = proto.Uint64(helper.Count)
h.SampleSum = proto.Float64(helper.Sum)
h.Schema = proto.Int32(helper.Schema)
h.ZeroThreshold = proto.Float64(helper.ZeroThreshold)
h.ZeroCount = proto.Uint64(helper.ZeroCount)
h.PositiveSpan = make([]*dto.BucketSpan, len(helper.PositiveSpans))
for i, span := range helper.PositiveSpans {
h.PositiveSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(span.Offset),
Length: proto.Uint32(span.Length),
}
}
h.PositiveDelta = helper.PositiveBuckets
h.NegativeSpan = make([]*dto.BucketSpan, len(helper.NegativeSpans))
for i, span := range helper.NegativeSpans {
h.NegativeSpan[i] = &dto.BucketSpan{
Offset: proto.Int32(span.Offset),
Length: proto.Uint32(span.Length),
}
}
h.NegativeDelta = helper.NegativeBuckets
return h
}
func TestManagerCTZeroIngestionHistogram(t *testing.T) {
const mName = "expected_histogram"
for _, tc := range []struct {
name string
inputHistSample *dto.Histogram
enableCTZeroIngestion bool
}{
{
name: "disabled with CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
h.CreatedTimestamp = timestamppb.Now()
return h
}(),
enableCTZeroIngestion: false,
},
{
name: "enabled with CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
h.CreatedTimestamp = timestamppb.Now()
return h
}(),
enableCTZeroIngestion: true,
},
{
name: "enabled without CT on histogram",
inputHistSample: func() *dto.Histogram {
h := generateTestHistogram(0)
return h
}(),
enableCTZeroIngestion: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
app := &collectResultAppender{}
scrapeManager, err := NewManager(
&Options{
EnableCreatedTimestampZeroIngestion: tc.enableCTZeroIngestion,
EnableNativeHistogramsIngestion: true,
skipOffsetting: true,
},
log.NewLogfmtLogger(os.Stderr),
nil,
&collectResultAppendable{app},
prometheus.NewRegistry(),
)
require.NoError(t, err)
require.NoError(t, scrapeManager.ApplyConfig(&config.Config{
GlobalConfig: config.GlobalConfig{
// Disable regular scrapes.
ScrapeInterval: model.Duration(9999 * time.Minute),
ScrapeTimeout: model.Duration(5 * time.Second),
// Ensure the proto is chosen. We need proto as it's the only protocol
// with the CT parsing support.
ScrapeProtocols: []config.ScrapeProtocol{config.PrometheusProto},
},
ScrapeConfigs: []*config.ScrapeConfig{{JobName: "test"}},
}))
once := sync.Once{}
// Start fake HTTP target to that allow one scrape only.
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fail := true // TODO(bwplotka): Kill or use?
once.Do(func() {
fail = false
w.Header().Set("Content-Type", `application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited`)
ctrType := dto.MetricType_HISTOGRAM
w.Write(protoMarshalDelimited(t, &dto.MetricFamily{
Name: proto.String(mName),
Type: &ctrType,
Metric: []*dto.Metric{{Histogram: tc.inputHistSample}},
}))
})
if fail {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
// Add fake target directly into tsets + reload. Normally users would use
// Manager.Run and wait for minimum 5s refresh interval.
scrapeManager.updateTsets(map[string][]*targetgroup.Group{
"test": {{
Targets: []model.LabelSet{{
model.SchemeLabel: model.LabelValue(serverURL.Scheme),
model.AddressLabel: model.LabelValue(serverURL.Host),
}},
}},
})
scrapeManager.reload()
var got []histogramSample
// Wait for one scrape.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
// and it's not worth waiting.
for _, h := range app.resultHistograms {
if h.metric.Get(model.MetricNameLabel) == mName {
got = append(got, h)
}
}
if len(app.resultHistograms) > 0 {
return nil
}
return fmt.Errorf("expected some histogram samples, got none")
}), "after 1 minute")
scrapeManager.Stop()
// Check for zero samples, assuming we only injected always one histogram sample.
// Did it contain CT to inject? If yes, was CT zero enabled?
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableCTZeroIngestion {
require.Len(t, got, 2)
// Zero sample.
require.Equal(t, histogram.Histogram{}, *got[0].h)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
return
}
// Expect only one, valid sample.
require.Len(t, got, 1)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
})
}
}
func TestUnregisterMetrics(t *testing.T) { func TestUnregisterMetrics(t *testing.T) {
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()
// Check that all metrics can be unregistered, allowing a second manager to be created. // Check that all metrics can be unregistered, allowing a second manager to be created.

@ -1701,7 +1701,15 @@ loop:
} else { } else {
if sl.enableCTZeroIngestion { if sl.enableCTZeroIngestion {
if ctMs := p.CreatedTimestamp(); ctMs != nil { if ctMs := p.CreatedTimestamp(); ctMs != nil {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) if isHistogram && sl.enableNativeHistogramIngestion {
if h != nil {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, h, nil)
} else {
ref, err = app.AppendHistogramCTZeroSample(ref, lset, t, *ctMs, nil, fh)
}
} else {
ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs)
}
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
// CT is an experimental feature. For now, we don't need to fail the // CT is an experimental feature. For now, we don't need to fail the
// scrape on errors updating the created timestamp, log debug. // scrape on errors updating the created timestamp, log debug.

@ -1999,7 +1999,8 @@ metric: <
`, `,
contentType: "application/vnd.google.protobuf", contentType: "application/vnd.google.protobuf",
histograms: []histogramSample{{ histograms: []histogramSample{{
t: 1234568, t: 1234568,
metric: labels.FromStrings("__name__", "test_histogram"),
h: &histogram.Histogram{ h: &histogram.Histogram{
Count: 175, Count: 175,
ZeroCount: 2, ZeroCount: 2,
@ -2125,7 +2126,8 @@ metric: <
{metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175}, {metric: labels.FromStrings("__name__", "test_histogram_bucket", "le", "+Inf"), t: 1234568, f: 175},
}, },
histograms: []histogramSample{{ histograms: []histogramSample{{
t: 1234568, t: 1234568,
metric: labels.FromStrings("__name__", "test_histogram"),
h: &histogram.Histogram{ h: &histogram.Histogram{
Count: 175, Count: 175,
ZeroCount: 2, ZeroCount: 2,

@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
return ref, nil return ref, nil
} }
func (f *fanoutAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
ref, err := f.primary.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) { func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
ref, err := f.primary.UpdateMetadata(ref, l, m) ref, err := f.primary.UpdateMetadata(ref, l, m)
if err != nil { if err != nil {

@ -50,7 +50,8 @@ var (
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected // NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result // behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now. // it's recommended to ignore this error for now.
ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring") ErrOutOfOrderCT = fmt.Errorf("created timestamp out of order, ignoring")
ErrCTNewerThanSample = fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring")
) )
// SeriesRef is a generic series reference. In prometheus it is either a // SeriesRef is a generic series reference. In prometheus it is either a
@ -313,6 +314,20 @@ type HistogramAppender interface {
// pointer. AppendHistogram won't mutate the histogram, but in turn // pointer. AppendHistogram won't mutate the histogram, but in turn
// depends on the caller to not mutate it either. // depends on the caller to not mutate it either.
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
// AppendHistogramCTZeroSample adds synthetic zero sample for the given ct timestamp,
// which will be associated with given series, labels and the incoming
// sample's t (timestamp). AppendHistogramCTZeroSample returns error if zero sample can't be
// appended, for example when ct is too old, or when it would collide with
// incoming sample (sample has priority).
//
// AppendHistogramCTZeroSample has to be called before the corresponding histogram AppendHistogram.
// A series reference number is returned which can be used to modify the
// CT for the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AppendHistogramCTZeroSample() at any point.
//
// If the reference is 0 it must not be used for caching.
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
} }
// MetadataUpdater provides an interface for associating metadata to stored series. // MetadataUpdater provides an interface for associating metadata to stored series.

@ -306,6 +306,11 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, nil return 0, nil
} }
func (t *timestampTracker) AppendHistogramCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO: Implement
return 0, nil
}
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write. // TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
// UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now. // UpdateMetadata is no-op for remote write (where timestampTracker is being used) for now.

@ -915,6 +915,13 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, nil return 0, nil
} }
func (m *mockAppendable) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendCTZeroSample is no-op for remote-write for now.
// TODO(bwplotka/arthursens): Add support for PRW 2.0 for CT zero feature (but also we might
// replace this with in-metadata CT storage, see https://github.com/prometheus/prometheus/issues/14218).
return 0, nil
}
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) { func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) {
if m.updateMetadataErr != nil { if m.updateMetadataErr != nil {
return 0, m.updateMetadataErr return 0, m.updateMetadataErr

@ -972,6 +972,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
return storage.SeriesRef(series.ref), nil return storage.SeriesRef(series.ref), nil
} }
func (a *appender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// TODO(bwplotka/arthursens): Wire metadata in the Agent's appender.
return 0, nil
}
func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in the Agent's appender. // TODO: Wire metadata in the Agent's appender.
return 0, nil return 0, nil

@ -79,6 +79,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
return a.app.AppendHistogram(ref, l, t, h, fh) return a.app.AppendHistogram(ref, l, t, h, fh)
} }
func (a *initAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
}
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if a.app != nil { if a.app != nil {
return a.app.UpdateMetadata(ref, l, m) return a.app.UpdateMetadata(ref, l, m)
@ -388,7 +398,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
// storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation. // storage.CreatedTimestampAppender.AppendCTZeroSample for further documentation.
func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) { func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64) (storage.SeriesRef, error) {
if ct >= t { if ct >= t {
return 0, fmt.Errorf("CT is newer or the same as sample's timestamp, ignoring") return 0, storage.ErrCTNewerThanSample
} }
s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
@ -747,6 +757,107 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return storage.SeriesRef(s.ref), nil return storage.SeriesRef(s.ref), nil
} }
func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if !a.head.opts.EnableNativeHistograms.Load() {
return 0, storage.ErrNativeHistogramsDisabled
}
if ct >= t {
return 0, storage.ErrCTNewerThanSample
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", ErrInvalidSample)
}
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample)
}
var created bool
var err error
s, created, err = a.head.getOrCreate(lset.Hash(), lset)
if err != nil {
return 0, err
}
if created {
switch {
case h != nil:
s.lastHistogramValue = &histogram.Histogram{}
case fh != nil:
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
a.series = append(a.series, record.RefSeries{
Ref: s.ref,
Labels: lset,
})
}
}
switch {
case h != nil:
zeroHistogram := &histogram.Histogram{}
s.Lock()
// Although we call `appendableHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true)
if err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
return 0, storage.ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s.Unlock()
return 0, storage.ErrOutOfOrderCT
}
s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{
Ref: s.ref,
T: ct,
H: zeroHistogram,
})
a.histogramSeries = append(a.histogramSeries, s)
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{}
s.Lock()
// Although we call `appendableFloatHistogram` with oooHistogramsEnabled=true, for CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow, true) // OOO is not allowed for CTZeroSamples.
if err != nil {
s.Unlock()
if errors.Is(err, storage.ErrOutOfOrderSample) {
return 0, storage.ErrOutOfOrderCT
}
}
// OOO is not allowed because after the first scrape, CT will be the same for most (if not all) future samples.
// This is to prevent the injected zero from being marked as OOO forever.
if isOOO {
s.Unlock()
return 0, storage.ErrOutOfOrderCT
}
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: ct,
FH: zeroFloatHistogram,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
}
if ct > a.maxt {
a.maxt = ct
}
return storage.SeriesRef(s.ref), nil
}
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't // UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does. // use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) { func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {

@ -6363,6 +6363,166 @@ func TestHeadAppender_AppendCTZeroSample(t *testing.T) {
} }
} }
func TestHeadAppender_AppendHistogramCTZeroSample(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
lbls := labels.FromStrings("foo", "bar")
type appendableHistograms struct {
ts int64
h *histogram.Histogram
fh *histogram.FloatHistogram
ct int64
}
for _, tc := range []struct {
name string
appendableHistograms []appendableHistograms
expectedHistograms []chunks.Sample
}{
{
name: "In order ct+normal sample/histogram",
appendableHistograms: []appendableHistograms{
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 1},
},
expectedHistograms: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "In order ct+normal sample/floathistogram",
appendableHistograms: []appendableHistograms{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedHistograms: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with same ct ignore ct/histogram",
appendableHistograms: []appendableHistograms{
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 1},
},
expectedHistograms: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with same ct ignore ct/floathistogram",
appendableHistograms: []appendableHistograms{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedHistograms: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
{
name: "Consecutive appends with newer ct do not ignore ct/histogram",
appendableHistograms: []appendableHistograms{
{ts: 100, h: testHistogram, ct: 1},
{ts: 102, h: testHistogram, ct: 101},
},
expectedHistograms: []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.CounterReset}},
sample{t: 102, h: testHistogram},
},
},
{
name: "Consecutive appends with newer ct do not ignore ct/floathistogram",
appendableHistograms: []appendableHistograms{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 102, fh: testFloatHistogram, ct: 101},
},
expectedHistograms: []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.CounterReset}},
sample{t: 102, fh: testFloatHistogram},
},
},
{
name: "CT equals to previous sample timestamp is ignored/histogram",
appendableHistograms: []appendableHistograms{
{ts: 100, h: testHistogram, ct: 1},
{ts: 101, h: testHistogram, ct: 100},
},
expectedHistograms: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
}
}(),
},
{
name: "CT equals to previous sample timestamp is ignored/floathistogram",
appendableHistograms: []appendableHistograms{
{ts: 100, fh: testFloatHistogram, ct: 1},
{ts: 101, fh: testFloatHistogram, ct: 100},
},
expectedHistograms: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
}
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
defer func() {
require.NoError(t, head.Close())
}()
appender := head.Appender(context.Background())
for _, sample := range tc.appendableHistograms {
ref, err := appender.AppendHistogramCTZeroSample(0, lbls, sample.ts, sample.ct, sample.h, sample.fh)
require.NoError(t, err)
_, err = appender.AppendHistogram(ref, lbls, sample.ts, sample.h, sample.fh)
require.NoError(t, err)
}
require.NoError(t, appender.Commit())
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
result := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, tc.expectedHistograms, result[`{foo="bar"}`])
})
}
}
func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) { func TestHeadCompactableDoesNotCompactEmptyHead(t *testing.T) {
// Use a chunk range of 1 here so that if we attempted to determine if the head // Use a chunk range of 1 here so that if we attempted to determine if the head
// was compactable using default values for min and max times, `Head.compactable()` // was compactable using default values for min and max times, `Head.compactable()`

Loading…
Cancel
Save