mirror of https://github.com/prometheus/prometheus
Adapt storage APIs to uint64 references
parent
a007eb2e1e
commit
0efecea6d4
|
@ -27,8 +27,8 @@ func (a nopAppendable) Appender() (storage.Appender, error) {
|
|||
|
||||
type nopAppender struct{}
|
||||
|
||||
func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil }
|
||||
func (a nopAppender) AddFast(labels.Labels, string, int64, float64) error { return nil }
|
||||
func (a nopAppender) Add(labels.Labels, int64, float64) (uint64, error) { return 0, nil }
|
||||
func (a nopAppender) AddFast(labels.Labels, uint64, int64, float64) error { return nil }
|
||||
func (a nopAppender) Commit() error { return nil }
|
||||
func (a nopAppender) Rollback() error { return nil }
|
||||
|
||||
|
@ -36,18 +36,18 @@ type collectResultAppender struct {
|
|||
result []sample
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) AddFast(m labels.Labels, ref string, t int64, v float64) error {
|
||||
func (a *collectResultAppender) AddFast(m labels.Labels, ref uint64, t int64, v float64) error {
|
||||
// Not implemented.
|
||||
return storage.ErrNotFound
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (string, error) {
|
||||
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
|
||||
a.result = append(a.result, sample{
|
||||
metric: m,
|
||||
t: t,
|
||||
v: v,
|
||||
})
|
||||
return "", nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (a *collectResultAppender) Commit() error { return nil }
|
||||
|
|
|
@ -465,7 +465,7 @@ type lsetCacheEntry struct {
|
|||
}
|
||||
|
||||
type refEntry struct {
|
||||
ref string
|
||||
ref uint64
|
||||
lastIter uint64
|
||||
}
|
||||
|
||||
|
@ -490,7 +490,7 @@ type scrapeCache struct {
|
|||
iter uint64 // Current scrape iteration.
|
||||
|
||||
refs map[string]*refEntry // Parsed string to ref.
|
||||
lsets map[string]*lsetCacheEntry // Ref to labelset and string.
|
||||
lsets map[uint64]*lsetCacheEntry // Ref to labelset and string.
|
||||
|
||||
// seriesCur and seriesPrev store the labels of series that were seen
|
||||
// in the current and previous scrape.
|
||||
|
@ -502,7 +502,7 @@ type scrapeCache struct {
|
|||
func newScrapeCache() *scrapeCache {
|
||||
return &scrapeCache{
|
||||
refs: map[string]*refEntry{},
|
||||
lsets: map[string]*lsetCacheEntry{},
|
||||
lsets: map[uint64]*lsetCacheEntry{},
|
||||
seriesCur: map[uint64]labels.Labels{},
|
||||
seriesPrev: map[uint64]labels.Labels{},
|
||||
}
|
||||
|
@ -530,17 +530,17 @@ func (c *scrapeCache) iterDone() {
|
|||
c.iter++
|
||||
}
|
||||
|
||||
func (c *scrapeCache) getRef(met string) (string, bool) {
|
||||
func (c *scrapeCache) getRef(met string) (uint64, bool) {
|
||||
e, ok := c.refs[met]
|
||||
if !ok {
|
||||
return "", false
|
||||
return 0, false
|
||||
}
|
||||
e.lastIter = c.iter
|
||||
return e.ref, true
|
||||
}
|
||||
|
||||
func (c *scrapeCache) addRef(met, ref string, lset labels.Labels, hash uint64) {
|
||||
if ref == "" {
|
||||
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
|
||||
if ref == 0 {
|
||||
return
|
||||
}
|
||||
// Clean up the label set cache before overwriting the ref for a previously seen
|
||||
|
@ -826,7 +826,7 @@ loop:
|
|||
hash = lset.Hash()
|
||||
}
|
||||
|
||||
var ref string
|
||||
var ref uint64
|
||||
ref, err = app.Add(lset, t, v)
|
||||
// TODO(fabxc): also add a dropped-cache?
|
||||
switch err {
|
||||
|
|
|
@ -872,20 +872,20 @@ type errorAppender struct {
|
|||
collectResultAppender
|
||||
}
|
||||
|
||||
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
switch lset.Get(model.MetricNameLabel) {
|
||||
case "out_of_order":
|
||||
return "", storage.ErrOutOfOrderSample
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
case "amend":
|
||||
return "", storage.ErrDuplicateSampleForTimestamp
|
||||
return 0, storage.ErrDuplicateSampleForTimestamp
|
||||
case "out_of_bounds":
|
||||
return "", storage.ErrOutOfBounds
|
||||
return 0, storage.ErrOutOfBounds
|
||||
default:
|
||||
return app.collectResultAppender.Add(lset, t, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (app *errorAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
|
||||
func (app *errorAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
|
||||
return app.collectResultAppender.AddFast(lset, ref, t, v)
|
||||
}
|
||||
|
||||
|
|
|
@ -198,21 +198,21 @@ type limitAppender struct {
|
|||
i int
|
||||
}
|
||||
|
||||
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if !value.IsStaleNaN(v) {
|
||||
app.i++
|
||||
if app.i > app.limit {
|
||||
return "", errSampleLimit
|
||||
return 0, errSampleLimit
|
||||
}
|
||||
}
|
||||
ref, err := app.Appender.Add(lset, t, v)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return 0, err
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *limitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
|
||||
func (app *limitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
|
||||
if !value.IsStaleNaN(v) {
|
||||
app.i++
|
||||
if app.i > app.limit {
|
||||
|
@ -231,19 +231,19 @@ type timeLimitAppender struct {
|
|||
maxTime int64
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if t > app.maxTime {
|
||||
return "", storage.ErrOutOfBounds
|
||||
return 0, storage.ErrOutOfBounds
|
||||
}
|
||||
|
||||
ref, err := app.Appender.Add(lset, t, v)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return 0, err
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error {
|
||||
func (app *timeLimitAppender) AddFast(lset labels.Labels, ref uint64, t int64, v float64) error {
|
||||
if t > app.maxTime {
|
||||
return storage.ErrOutOfBounds
|
||||
}
|
||||
|
@ -260,7 +260,7 @@ type ruleLabelsAppender struct {
|
|||
labels labels.Labels
|
||||
}
|
||||
|
||||
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app ruleLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
lb := labels.NewBuilder(lset)
|
||||
|
||||
for _, l := range app.labels {
|
||||
|
@ -282,7 +282,7 @@ type honorLabelsAppender struct {
|
|||
// Merges the sample's metric with the given labels if the label is not
|
||||
// already present in the metric.
|
||||
// This also considers labels explicitly set to the empty string.
|
||||
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app honorLabelsAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
lb := labels.NewBuilder(lset)
|
||||
|
||||
for _, l := range app.labels {
|
||||
|
@ -302,10 +302,10 @@ type relabelAppender struct {
|
|||
|
||||
var errSeriesDropped = errors.New("series dropped")
|
||||
|
||||
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (app relabelAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
lset = relabel.Process(lset, app.relabelings...)
|
||||
if lset == nil {
|
||||
return "", errSeriesDropped
|
||||
return 0, errSeriesDropped
|
||||
}
|
||||
return app.Appender.Add(lset, t, v)
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ type fanoutAppender struct {
|
|||
secondaries []Appender
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) {
|
||||
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
ref, err := f.primary.Add(l, t, v)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
|
@ -109,13 +109,13 @@ func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error
|
|||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Add(l, t, v); err != nil {
|
||||
return "", err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error {
|
||||
func (f *fanoutAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
|
||||
if err := f.primary.AddFast(l, ref, t, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -54,9 +54,9 @@ type Querier interface {
|
|||
|
||||
// Appender provides batched appends against a storage.
|
||||
type Appender interface {
|
||||
Add(l labels.Labels, t int64, v float64) (string, error)
|
||||
Add(l labels.Labels, t int64, v float64) (uint64, error)
|
||||
|
||||
AddFast(l labels.Labels, ref string, t int64, v float64) error
|
||||
AddFast(l labels.Labels, ref uint64, t int64, v float64) error
|
||||
|
||||
// Commit submits the collected samples and purges the batch.
|
||||
Commit() error
|
||||
|
|
|
@ -23,7 +23,7 @@ func (s *Storage) Appender() (storage.Appender, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) {
|
||||
func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
for _, q := range s.queues {
|
||||
|
@ -33,7 +33,7 @@ func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) {
|
|||
Value: model.SampleValue(v),
|
||||
})
|
||||
}
|
||||
return "", nil
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func labelsToMetric(ls labels.Labels) model.Metric {
|
||||
|
@ -44,7 +44,7 @@ func labelsToMetric(ls labels.Labels) model.Metric {
|
|||
return metric
|
||||
}
|
||||
|
||||
func (s *Storage) AddFast(l labels.Labels, _ string, t int64, v float64) error {
|
||||
func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error {
|
||||
_, err := s.Add(l, t, v)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -129,23 +129,23 @@ type appender struct {
|
|||
a tsdb.Appender
|
||||
}
|
||||
|
||||
func (a appender) Add(lset labels.Labels, t int64, v float64) (string, error) {
|
||||
func (a appender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
ref, err := a.a.Add(toTSDBLabels(lset), t, v)
|
||||
|
||||
switch errors.Cause(err) {
|
||||
case tsdb.ErrNotFound:
|
||||
return "", storage.ErrNotFound
|
||||
return 0, storage.ErrNotFound
|
||||
case tsdb.ErrOutOfOrderSample:
|
||||
return "", storage.ErrOutOfOrderSample
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
case tsdb.ErrAmendSample:
|
||||
return "", storage.ErrDuplicateSampleForTimestamp
|
||||
return 0, storage.ErrDuplicateSampleForTimestamp
|
||||
case tsdb.ErrOutOfBounds:
|
||||
return "", storage.ErrOutOfBounds
|
||||
return 0, storage.ErrOutOfBounds
|
||||
}
|
||||
return ref, err
|
||||
}
|
||||
|
||||
func (a appender) AddFast(_ labels.Labels, ref string, t int64, v float64) error {
|
||||
func (a appender) AddFast(_ labels.Labels, ref uint64, t int64, v float64) error {
|
||||
err := a.a.AddFast(ref, t, v)
|
||||
|
||||
switch errors.Cause(err) {
|
||||
|
|
Loading…
Reference in New Issue