diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e3d7ac03a..203eafb07 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -60,6 +60,7 @@ import ( _ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations. "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/logging" "github.com/prometheus/prometheus/pkg/relabel" @@ -1163,6 +1164,10 @@ func (n notReadyAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar return 0, tsdb.ErrNotReady } +func (n notReadyAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + return 0, tsdb.ErrNotReady +} + func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady } func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady } diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go new file mode 100644 index 000000000..6ea1cb76e --- /dev/null +++ b/pkg/histogram/sparse_histogram.go @@ -0,0 +1,28 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package histogram + +type SparseHistogram struct { + Ts int64 + Count, ZeroCount uint64 + Sum, ZeroThreshold float64 + Schema int32 + PositiveSpans, NegativeSpans []Span + PositiveBuckets, NegativeBuckets []int64 +} + +type Span struct { + Offset int32 + Length uint32 +} diff --git a/scrape/helpers_test.go b/scrape/helpers_test.go index da29d6c12..f49695877 100644 --- a/scrape/helpers_test.go +++ b/scrape/helpers_test.go @@ -18,6 +18,7 @@ import ( "math/rand" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -34,6 +35,9 @@ func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, erro func (a nopAppender) AppendExemplar(uint64, labels.Labels, exemplar.Exemplar) (uint64, error) { return 0, nil } +func (a nopAppender) AppendHistogram(uint64, labels.Labels, histogram.SparseHistogram) (uint64, error) { + return 0, nil +} func (a nopAppender) Commit() error { return nil } func (a nopAppender) Rollback() error { return nil } @@ -46,12 +50,15 @@ type sample struct { // collectResultAppender records all samples that were added through the appender. // It can be used as its zero value or be backed by another appender it writes samples through. type collectResultAppender struct { - next storage.Appender - result []sample - pendingResult []sample - rolledbackResult []sample - pendingExemplars []exemplar.Exemplar - resultExemplars []exemplar.Exemplar + next storage.Appender + result []sample + pendingResult []sample + rolledbackResult []sample + pendingExemplars []exemplar.Exemplar + resultExemplars []exemplar.Exemplar + resultHistograms []histogram.SparseHistogram + pendingHistograms []histogram.SparseHistogram + rolledbackHistograms []histogram.SparseHistogram } func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) { @@ -84,11 +91,22 @@ func (a *collectResultAppender) AppendExemplar(ref uint64, l labels.Labels, e ex return a.next.AppendExemplar(ref, l, e) } +func (a *collectResultAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + a.pendingHistograms = append(a.pendingHistograms, sh) + if a.next == nil { + return 0, nil + } + + return a.next.AppendHistogram(ref, l, sh) +} + func (a *collectResultAppender) Commit() error { a.result = append(a.result, a.pendingResult...) a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...) + a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...) a.pendingResult = nil a.pendingExemplars = nil + a.pendingHistograms = nil if a.next == nil { return nil } @@ -97,7 +115,9 @@ func (a *collectResultAppender) Commit() error { func (a *collectResultAppender) Rollback() error { a.rolledbackResult = a.pendingResult + a.rolledbackHistograms = a.pendingHistograms a.pendingResult = nil + a.pendingHistograms = nil if a.next == nil { return nil } diff --git a/storage/fanout.go b/storage/fanout.go index 206105fa5..df323a316 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" ) @@ -172,6 +173,20 @@ func (f *fanoutAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar. return ref, nil } +func (f *fanoutAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + ref, err := f.primary.AppendHistogram(ref, l, sh) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.AppendHistogram(ref, l, sh); err != nil { + return 0, err + } + } + return ref, nil +} + func (f *fanoutAppender) Commit() (err error) { err = f.primary.Commit() diff --git a/storage/interface.go b/storage/interface.go index e017d9317..a435c6060 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -181,6 +182,7 @@ type Appender interface { Rollback() error ExemplarAppender + HistogramAppender } // GetRef is an extra interface on Appenders used by downstream projects @@ -209,6 +211,19 @@ type ExemplarAppender interface { AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) } +// HistogramAppender provides an interface for adding sparse histogram to the Prometheus. +type HistogramAppender interface { + // AppendHistogram adds a sparse histogram for the given series labels. + // An optional reference number can be provided to accelerate calls. + // A reference number is returned which can be used to add further + // histograms in the same or later transactions. + // Returned reference numbers are ephemeral and may be rejected in calls + // to Append() at any point. Adding the sample via Append() returns a new + // reference number. + // If the reference is 0 it must not be used for caching. + AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) +} + // SeriesSet contains a set of series. type SeriesSet interface { Next() bool diff --git a/storage/remote/write.go b/storage/remote/write.go index 2d96f70ae..f17505852 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -15,6 +15,7 @@ package remote import ( "context" + "errors" "fmt" "sync" "time" @@ -25,6 +26,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/wal" @@ -236,6 +238,10 @@ func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar. return 0, nil } +func (t *timestampTracker) AppendHistogram(_ uint64, _ labels.Labels, _ histogram.SparseHistogram) (uint64, error) { + return 0, errors.New("not implemented") +} + // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { t.writeStorage.samplesIn.incr(t.samples + t.exemplars) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 5e086fe2f..bc8581b01 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -24,6 +24,7 @@ import ( "github.com/go-kit/log" "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/histogram" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -138,3 +139,8 @@ func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex // noop until we implement exemplars over remote write return 0, nil } + +func (*mockAppendable) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + // noop until we implement sparse histograms over remote write + return 0, nil +} diff --git a/tsdb/head.go b/tsdb/head.go index 1fce0368f..296fa56c0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -16,6 +16,7 @@ package tsdb import ( "context" "fmt" + "github.com/prometheus/prometheus/pkg/histogram" "math" "path/filepath" "runtime" @@ -1194,6 +1195,16 @@ func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex return a.app.AppendExemplar(ref, l, e) } +func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + if a.app != nil { + return a.app.AppendHistogram(ref, l, sh) + } + a.head.initTime(sh.Ts) + a.app = a.head.appender() + + return a.app.AppendHistogram(ref, l, sh) +} + var _ storage.GetRef = &initAppender{} func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) { @@ -1446,6 +1457,11 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex return s.ref, nil } +func (a *headAppender) AppendHistogram(ref uint64, _ labels.Labels, sh histogram.SparseHistogram) (uint64, error) { + // TODO. + return 0, nil +} + var _ storage.GetRef = &headAppender{} func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {