mirror of https://github.com/prometheus/prometheus
HistogramAppender interface for sparse histograms (#9007)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>pull/9009/head
parent
df3b674f01
commit
64bea6999e
|
@ -60,6 +60,7 @@ import (
|
|||
_ "github.com/prometheus/prometheus/discovery/install" // Register service discovery implementations.
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/logging"
|
||||
"github.com/prometheus/prometheus/pkg/relabel"
|
||||
|
@ -1163,6 +1164,10 @@ func (n notReadyAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar
|
|||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
func (n notReadyAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
|
||||
|
||||
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package histogram
|
||||
|
||||
type SparseHistogram struct {
|
||||
Ts int64
|
||||
Count, ZeroCount uint64
|
||||
Sum, ZeroThreshold float64
|
||||
Schema int32
|
||||
PositiveSpans, NegativeSpans []Span
|
||||
PositiveBuckets, NegativeBuckets []int64
|
||||
}
|
||||
|
||||
type Span struct {
|
||||
Offset int32
|
||||
Length uint32
|
||||
}
|
|
@ -18,6 +18,7 @@ import (
|
|||
"math/rand"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
@ -34,6 +35,9 @@ func (a nopAppender) Append(uint64, labels.Labels, int64, float64) (uint64, erro
|
|||
func (a nopAppender) AppendExemplar(uint64, labels.Labels, exemplar.Exemplar) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (a nopAppender) AppendHistogram(uint64, labels.Labels, histogram.SparseHistogram) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
func (a nopAppender) Commit() error { return nil }
|
||||
func (a nopAppender) Rollback() error { return nil }
|
||||
|
||||
|
@ -46,12 +50,15 @@ type sample struct {
|
|||
// collectResultAppender records all samples that were added through the appender.
|
||||
// It can be used as its zero value or be backed by another appender it writes samples through.
|
||||
type collectResultAppender struct {
|
||||
next storage.Appender
|
||||
result []sample
|
||||
pendingResult []sample
|
||||
rolledbackResult []sample
|
||||
pendingExemplars []exemplar.Exemplar
|
||||
resultExemplars []exemplar.Exemplar
|
||||
next storage.Appender
|
||||
result []sample
|
||||
pendingResult []sample
|
||||
rolledbackResult []sample
|
||||
pendingExemplars []exemplar.Exemplar
|
||||
resultExemplars []exemplar.Exemplar
|
||||
resultHistograms []histogram.SparseHistogram
|
||||
pendingHistograms []histogram.SparseHistogram
|
||||
rolledbackHistograms []histogram.SparseHistogram
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Append(ref uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
|
@ -84,11 +91,22 @@ func (a *collectResultAppender) AppendExemplar(ref uint64, l labels.Labels, e ex
|
|||
return a.next.AppendExemplar(ref, l, e)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
a.pendingHistograms = append(a.pendingHistograms, sh)
|
||||
if a.next == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return a.next.AppendHistogram(ref, l, sh)
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Commit() error {
|
||||
a.result = append(a.result, a.pendingResult...)
|
||||
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
|
||||
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
|
||||
a.pendingResult = nil
|
||||
a.pendingExemplars = nil
|
||||
a.pendingHistograms = nil
|
||||
if a.next == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -97,7 +115,9 @@ func (a *collectResultAppender) Commit() error {
|
|||
|
||||
func (a *collectResultAppender) Rollback() error {
|
||||
a.rolledbackResult = a.pendingResult
|
||||
a.rolledbackHistograms = a.pendingHistograms
|
||||
a.pendingResult = nil
|
||||
a.pendingHistograms = nil
|
||||
if a.next == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
)
|
||||
|
@ -172,6 +173,20 @@ func (f *fanoutAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
ref, err := f.primary.AppendHistogram(ref, l, sh)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.AppendHistogram(ref, l, sh); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) Commit() (err error) {
|
||||
err = f.primary.Commit()
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
|
@ -181,6 +182,7 @@ type Appender interface {
|
|||
Rollback() error
|
||||
|
||||
ExemplarAppender
|
||||
HistogramAppender
|
||||
}
|
||||
|
||||
// GetRef is an extra interface on Appenders used by downstream projects
|
||||
|
@ -209,6 +211,19 @@ type ExemplarAppender interface {
|
|||
AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error)
|
||||
}
|
||||
|
||||
// HistogramAppender provides an interface for adding sparse histogram to the Prometheus.
|
||||
type HistogramAppender interface {
|
||||
// AppendHistogram adds a sparse histogram for the given series labels.
|
||||
// An optional reference number can be provided to accelerate calls.
|
||||
// A reference number is returned which can be used to add further
|
||||
// histograms in the same or later transactions.
|
||||
// Returned reference numbers are ephemeral and may be rejected in calls
|
||||
// to Append() at any point. Adding the sample via Append() returns a new
|
||||
// reference number.
|
||||
// If the reference is 0 it must not be used for caching.
|
||||
AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error)
|
||||
}
|
||||
|
||||
// SeriesSet contains a set of series.
|
||||
type SeriesSet interface {
|
||||
Next() bool
|
||||
|
|
|
@ -15,6 +15,7 @@ package remote
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -25,6 +26,7 @@ import (
|
|||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/wal"
|
||||
|
@ -236,6 +238,10 @@ func (t *timestampTracker) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
func (t *timestampTracker) AppendHistogram(_ uint64, _ labels.Labels, _ histogram.SparseHistogram) (uint64, error) {
|
||||
return 0, errors.New("not implemented")
|
||||
}
|
||||
|
||||
// Commit implements storage.Appender.
|
||||
func (t *timestampTracker) Commit() error {
|
||||
t.writeStorage.samplesIn.incr(t.samples + t.exemplars)
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -138,3 +139,8 @@ func (*mockAppendable) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex
|
|||
// noop until we implement exemplars over remote write
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (*mockAppendable) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
// noop until we implement sparse histograms over remote write
|
||||
return 0, nil
|
||||
}
|
||||
|
|
16
tsdb/head.go
16
tsdb/head.go
|
@ -16,6 +16,7 @@ package tsdb
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/prometheus/prometheus/pkg/histogram"
|
||||
"math"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
@ -1194,6 +1195,16 @@ func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex
|
|||
return a.app.AppendExemplar(ref, l, e)
|
||||
}
|
||||
|
||||
func (a *initAppender) AppendHistogram(ref uint64, l labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
if a.app != nil {
|
||||
return a.app.AppendHistogram(ref, l, sh)
|
||||
}
|
||||
a.head.initTime(sh.Ts)
|
||||
a.app = a.head.appender()
|
||||
|
||||
return a.app.AppendHistogram(ref, l, sh)
|
||||
}
|
||||
|
||||
var _ storage.GetRef = &initAppender{}
|
||||
|
||||
func (a *initAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
|
||||
|
@ -1446,6 +1457,11 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex
|
|||
return s.ref, nil
|
||||
}
|
||||
|
||||
func (a *headAppender) AppendHistogram(ref uint64, _ labels.Labels, sh histogram.SparseHistogram) (uint64, error) {
|
||||
// TODO.
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var _ storage.GetRef = &headAppender{}
|
||||
|
||||
func (a *headAppender) GetRef(lset labels.Labels) (uint64, labels.Labels) {
|
||||
|
|
Loading…
Reference in New Issue