Browse Source

Introduce TSDB changes for appending metadata to the WAL (#10972)

* Append metadata to the WAL

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Remove extra whitespace; Reword some docstrings and comments

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Use RLock() for hasNewMetadata check

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Use single byte for metric type in RefMetadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Update proposed WAL format for single-byte type metadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Implementa MetadataAppender interface for the Agent

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Address first round of review comments

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Amend description of metadata in wal.md

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Correct key used to retrieve metadata from cache

When we're setting metadata entries in the scrapeCace, we're using the
p.Help(), p.Unit(), p.Type() helpers, which retrieve the series name and
use it as the cache key. When checking for cache entries though, we used
p.Series() as the key, which included the metric name _with_ its labels.
That meant that we were never actually hitting the cache. We're fixing
this by utiling the __name__ internal label for correctly getting the
cache entries after they've been set by setHelp(), setType() or
setUnit().

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Put feature behind a feature flag

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix AppendMetadata docstring

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Reorder WAL format document

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Change error message of AppendMetadata; Fix access of s.meta in AppendMetadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Reuse temporary buffer in Metadata encoder

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Only keep latest metadata for each refID during checkpointing

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix test that's referencing decoding metadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Avoid creating metadata block if no new metadata are present

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Add tests for corrupt metadata block and relevant record type

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix CR comments

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Extract logic about changing metadata in an anonymous function

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Implement new proposed WAL format and amend relevant tests

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Use 'const' for metadata field names

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Apply metadata to head memSeries in Commit, not in AppendMetadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Add docstring and rename extracted helper in scrape.go

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Add tests for tsdb-related cases

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix linter issues vol1

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix linter issues vol2

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix Windows test by closing WAL reader files

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Use switch instead of two if statements in metadata decoding

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix review comments around TestMetadata* tests

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Add code for replaying WAL; test correctness of in-memory data after a replay

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Remove scrape-loop related code from PR

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Address first round of comments

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Simplify tests by sorting slices before comparison

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix test to use separate transactions

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Empty out buffer and record slices after encoding latest metadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix linting issue

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Update calculation for DroppedMetadata metric

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Rename MetadataAppender interface and AppendMetadata method to MetadataUpdater/UpdateMetadata

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Reuse buffer when encoding latest metadata for each series

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Fix review comments; Check all returned error values using two helpers

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Simplify use of helpers

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>

* Satisfy linter

Signed-off-by: Paschalis Tsilias <paschalist0@gmail.com>
pull/11035/head
Paschalis Tsilias 2 years ago committed by GitHub
parent
commit
d1122e0743
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      cmd/prometheus/main.go
  2. 23
      model/metadata/metadata.go
  3. 22
      scrape/helpers_test.go
  4. 15
      storage/fanout.go
  5. 14
      storage/interface.go
  6. 7
      storage/remote/write.go
  7. 7
      storage/remote/write_handler_test.go
  8. 6
      tsdb/agent/db.go
  9. 252
      tsdb/db_test.go
  10. 30
      tsdb/docs/format/wal.md
  11. 3
      tsdb/head.go
  12. 83
      tsdb/head_append.go
  13. 9
      tsdb/head_test.go
  14. 38
      tsdb/head_wal.go
  15. 141
      tsdb/record/record.go
  16. 105
      tsdb/record/record_test.go
  17. 39
      tsdb/wal/checkpoint.go
  18. 37
      tsdb/wal/checkpoint_test.go

5
cmd/prometheus/main.go

@ -58,6 +58,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/notifier"
_ "github.com/prometheus/prometheus/plugins" // Register plugins.
@ -1403,6 +1404,10 @@ func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels,
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }

23
model/metadata/metadata.go

@ -0,0 +1,23 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import "github.com/prometheus/prometheus/model/textparse"
// Metadata stores a series' metadata information.
type Metadata struct {
Type textparse.MetricType
Unit string
Help string
}

22
scrape/helpers_test.go

@ -21,6 +21,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)
@ -39,6 +40,11 @@ func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (s
func (a nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}
func (a nopAppender) Commit() error { return nil }
func (a nopAppender) Rollback() error { return nil }
@ -57,6 +63,8 @@ type collectResultAppender struct {
rolledbackResult []sample
pendingExemplars []exemplar.Exemplar
resultExemplars []exemplar.Exemplar
pendingMetadata []metadata.Metadata
resultMetadata []metadata.Metadata
}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
@ -89,11 +97,25 @@ func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.L
return a.next.AppendExemplar(ref, l, e)
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.pendingMetadata = append(a.pendingMetadata, m)
if ref == 0 {
ref = storage.SeriesRef(rand.Uint64())
}
if a.next == nil {
return ref, nil
}
return a.next.UpdateMetadata(ref, l, m)
}
func (a *collectResultAppender) Commit() error {
a.result = append(a.result, a.pendingResult...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
a.pendingResult = nil
a.pendingExemplars = nil
a.pendingMetadata = nil
if a.next == nil {
return nil
}

15
storage/fanout.go

@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
@ -172,6 +173,20 @@ func (f *fanoutAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exempl
return ref, nil
}
func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
ref, err := f.primary.UpdateMetadata(ref, l, m)
if err != nil {
return ref, err
}
for _, appender := range f.secondaries {
if _, err := appender.UpdateMetadata(ref, l, m); err != nil {
return 0, err
}
}
return ref, nil
}
func (f *fanoutAppender) Commit() (err error) {
err = f.primary.Commit()

14
storage/interface.go

@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
@ -222,6 +223,7 @@ type Appender interface {
// Appender has to be discarded after rollback.
Rollback() error
ExemplarAppender
MetadataUpdater
}
// GetRef is an extra interface on Appenders used by downstream projects
@ -250,6 +252,18 @@ type ExemplarAppender interface {
AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error)
}
// MetadataUpdater provides an interface for associating metadata to stored series.
type MetadataUpdater interface {
// UpdateMetadata updates a metadata entry for the given series and labels.
// A series reference number is returned which can be used to modify the
// metadata of the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to UpdateMetadata() at any point. If the series does not exist,
// UpdateMetadata returns an error.
// If the reference is 0 it must not be used for caching.
UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error)
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool

7
storage/remote/write.go

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/wal"
)
@ -268,6 +269,12 @@ func (t *timestampTracker) AppendExemplar(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}
func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
// UpadteMetadata is no-op for remote write (where timestampTracker is being used) for now.
return 0, nil
}
// Commit implements storage.Appender.
func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars)

7
storage/remote/write_handler_test.go

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
)
@ -186,3 +187,9 @@ func (m *mockAppendable) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e
m.exemplars = append(m.exemplars, mockExemplar{l, e.Labels, e.Ts, e.Value})
return 0, nil
}
func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write.
// UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now.
return 0, nil
}

6
tsdb/agent/db.go

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
@ -812,6 +813,11 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
return storage.SeriesRef(s.ref), nil
}
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in the Agent's appender.
return 0, nil
}
// Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error {
a.mtx.RLock()

252
tsdb/db_test.go

@ -41,6 +41,7 @@ import (
"go.uber.org/goleak"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -3489,3 +3490,254 @@ func TestDBPanicOnMmappingHeadChunk(t *testing.T) {
require.NoError(t, db.Close())
}
func TestMetadataInWAL(t *testing.T) {
updateMetadata := func(t *testing.T, app storage.Appender, s labels.Labels, m metadata.Metadata) {
_, err := app.UpdateMetadata(0, s, m)
require.NoError(t, err)
}
db := newTestDB(t)
ctx := context.Background()
// Add some series so we can append metadata to them.
app := db.Appender(ctx)
s1 := labels.FromStrings("a", "b")
s2 := labels.FromStrings("c", "d")
s3 := labels.FromStrings("e", "f")
s4 := labels.FromStrings("g", "h")
for _, s := range []labels.Labels{s1, s2, s3, s4} {
_, err := app.Append(0, s, 0, 0)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Add a first round of metadata to the first three series.
// Re-take the Appender, as the previous Commit will have it closed.
m1 := metadata.Metadata{Type: "gauge", Unit: "unit_1", Help: "help_1"}
m2 := metadata.Metadata{Type: "gauge", Unit: "unit_2", Help: "help_2"}
m3 := metadata.Metadata{Type: "gauge", Unit: "unit_3", Help: "help_3"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s2, m2)
updateMetadata(t, app, s3, m3)
require.NoError(t, app.Commit())
// Add a replicated metadata entry to the first series,
// a completely new metadata entry for the fourth series,
// and a changed metadata entry to the second series.
m4 := metadata.Metadata{Type: "counter", Unit: "unit_4", Help: "help_4"}
m5 := metadata.Metadata{Type: "counter", Unit: "unit_5", Help: "help_5"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s4, m4)
updateMetadata(t, app, s2, m5)
require.NoError(t, app.Commit())
// Read the WAL to see if the disk storage format is correct.
recs := readTestWAL(t, path.Join(db.Dir(), "wal"))
var gotMetadataBlocks [][]record.RefMetadata
for _, rec := range recs {
if mr, ok := rec.([]record.RefMetadata); ok {
gotMetadataBlocks = append(gotMetadataBlocks, mr)
}
}
expectedMetadata := []record.RefMetadata{
{Ref: 1, Type: record.GetMetricType(m1.Type), Unit: m1.Unit, Help: m1.Help},
{Ref: 2, Type: record.GetMetricType(m2.Type), Unit: m2.Unit, Help: m2.Help},
{Ref: 3, Type: record.GetMetricType(m3.Type), Unit: m3.Unit, Help: m3.Help},
{Ref: 4, Type: record.GetMetricType(m4.Type), Unit: m4.Unit, Help: m4.Help},
{Ref: 2, Type: record.GetMetricType(m5.Type), Unit: m5.Unit, Help: m5.Help},
}
require.Len(t, gotMetadataBlocks, 2)
require.Equal(t, expectedMetadata[:3], gotMetadataBlocks[0])
require.Equal(t, expectedMetadata[3:], gotMetadataBlocks[1])
}
func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
updateMetadata := func(t *testing.T, app storage.Appender, s labels.Labels, m metadata.Metadata) {
_, err := app.UpdateMetadata(0, s, m)
require.NoError(t, err)
}
ctx := context.Background()
numSamples := 10000
hb, w := newTestHead(t, int64(numSamples)*10, false)
// Add some series so we can append metadata to them.
app := hb.Appender(ctx)
s1 := labels.FromStrings("a", "b")
s2 := labels.FromStrings("c", "d")
s3 := labels.FromStrings("e", "f")
s4 := labels.FromStrings("g", "h")
for _, s := range []labels.Labels{s1, s2, s3, s4} {
_, err := app.Append(0, s, 0, 0)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Add a first round of metadata to the first three series.
// Re-take the Appender, as the previous Commit will have it closed.
m1 := metadata.Metadata{Type: "gauge", Unit: "unit_1", Help: "help_1"}
m2 := metadata.Metadata{Type: "gauge", Unit: "unit_2", Help: "help_2"}
m3 := metadata.Metadata{Type: "gauge", Unit: "unit_3", Help: "help_3"}
m4 := metadata.Metadata{Type: "gauge", Unit: "unit_4", Help: "help_4"}
app = hb.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s2, m2)
updateMetadata(t, app, s3, m3)
updateMetadata(t, app, s4, m4)
require.NoError(t, app.Commit())
// Update metadata for first series.
m5 := metadata.Metadata{Type: "counter", Unit: "unit_5", Help: "help_5"}
app = hb.Appender(ctx)
updateMetadata(t, app, s1, m5)
require.NoError(t, app.Commit())
// Switch back-and-forth metadata for second series.
// Since it ended on a new metadata record, we expect a single new entry.
m6 := metadata.Metadata{Type: "counter", Unit: "unit_6", Help: "help_6"}
app = hb.Appender(ctx)
updateMetadata(t, app, s2, m6)
require.NoError(t, app.Commit())
app = hb.Appender(ctx)
updateMetadata(t, app, s2, m2)
require.NoError(t, app.Commit())
app = hb.Appender(ctx)
updateMetadata(t, app, s2, m6)
require.NoError(t, app.Commit())
app = hb.Appender(ctx)
updateMetadata(t, app, s2, m2)
require.NoError(t, app.Commit())
app = hb.Appender(ctx)
updateMetadata(t, app, s2, m6)
require.NoError(t, app.Commit())
// Let's create a checkpoint.
first, last, err := wal.Segments(w.Dir())
require.NoError(t, err)
keep := func(id chunks.HeadSeriesRef) bool {
return id != 3
}
_, err = wal.Checkpoint(log.NewNopLogger(), w, first, last-1, keep, 0)
require.NoError(t, err)
// Confirm there's been a checkpoint.
cdir, _, err := wal.LastCheckpoint(w.Dir())
require.NoError(t, err)
// Read in checkpoint and WAL.
recs := readTestWAL(t, cdir)
var gotMetadataBlocks [][]record.RefMetadata
for _, rec := range recs {
if mr, ok := rec.([]record.RefMetadata); ok {
gotMetadataBlocks = append(gotMetadataBlocks, mr)
}
}
// There should only be 1 metadata block present, with only the latest
// metadata kept around.
wantMetadata := []record.RefMetadata{
{Ref: 1, Type: record.GetMetricType(m5.Type), Unit: m5.Unit, Help: m5.Help},
{Ref: 2, Type: record.GetMetricType(m6.Type), Unit: m6.Unit, Help: m6.Help},
{Ref: 4, Type: record.GetMetricType(m4.Type), Unit: m4.Unit, Help: m4.Help},
}
require.Len(t, gotMetadataBlocks, 1)
require.Len(t, gotMetadataBlocks[0], 3)
gotMetadataBlock := gotMetadataBlocks[0]
sort.Slice(gotMetadataBlock, func(i, j int) bool { return gotMetadataBlock[i].Ref < gotMetadataBlock[j].Ref })
require.Equal(t, wantMetadata, gotMetadataBlock)
require.NoError(t, hb.Close())
}
func TestMetadataAssertInMemoryData(t *testing.T) {
updateMetadata := func(t *testing.T, app storage.Appender, s labels.Labels, m metadata.Metadata) {
_, err := app.UpdateMetadata(0, s, m)
require.NoError(t, err)
}
db := openTestDB(t, nil, nil)
ctx := context.Background()
// Add some series so we can append metadata to them.
app := db.Appender(ctx)
s1 := labels.FromStrings("a", "b")
s2 := labels.FromStrings("c", "d")
s3 := labels.FromStrings("e", "f")
s4 := labels.FromStrings("g", "h")
for _, s := range []labels.Labels{s1, s2, s3, s4} {
_, err := app.Append(0, s, 0, 0)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Add a first round of metadata to the first three series.
// The in-memory data held in the db Head should hold the metadata.
m1 := metadata.Metadata{Type: "gauge", Unit: "unit_1", Help: "help_1"}
m2 := metadata.Metadata{Type: "gauge", Unit: "unit_2", Help: "help_2"}
m3 := metadata.Metadata{Type: "gauge", Unit: "unit_3", Help: "help_3"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s2, m2)
updateMetadata(t, app, s3, m3)
require.NoError(t, app.Commit())
series1 := db.head.series.getByHash(s1.Hash(), s1)
series2 := db.head.series.getByHash(s2.Hash(), s2)
series3 := db.head.series.getByHash(s3.Hash(), s3)
series4 := db.head.series.getByHash(s4.Hash(), s4)
require.Equal(t, series1.meta, m1)
require.Equal(t, series2.meta, m2)
require.Equal(t, series3.meta, m3)
require.Equal(t, series4.meta, metadata.Metadata{})
// Add a replicated metadata entry to the first series,
// a changed metadata entry to the second series,
// and a completely new metadata entry for the fourth series.
// The in-memory data held in the db Head should be correctly updated.
m4 := metadata.Metadata{Type: "counter", Unit: "unit_4", Help: "help_4"}
m5 := metadata.Metadata{Type: "counter", Unit: "unit_5", Help: "help_5"}
app = db.Appender(ctx)
updateMetadata(t, app, s1, m1)
updateMetadata(t, app, s4, m4)
updateMetadata(t, app, s2, m5)
require.NoError(t, app.Commit())
series1 = db.head.series.getByHash(s1.Hash(), s1)
series2 = db.head.series.getByHash(s2.Hash(), s2)
series3 = db.head.series.getByHash(s3.Hash(), s3)
series4 = db.head.series.getByHash(s4.Hash(), s4)
require.Equal(t, series1.meta, m1)
require.Equal(t, series2.meta, m5)
require.Equal(t, series3.meta, m3)
require.Equal(t, series4.meta, m4)
require.NoError(t, db.Close())
// Reopen the DB, replaying the WAL. The Head must have been replayed
// correctly in memory.
reopenDB, err := Open(db.Dir(), nil, nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, reopenDB.Close())
})
_, err = reopenDB.head.wal.Size()
require.NoError(t, err)
require.Equal(t, reopenDB.head.series.getByHash(s1.Hash(), s1).meta, m1)
require.Equal(t, reopenDB.head.series.getByHash(s2.Hash(), s2).meta, m5)
require.Equal(t, reopenDB.head.series.getByHash(s3.Hash(), s3).meta, m3)
require.Equal(t, reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4)
}

30
tsdb/docs/format/wal.md

@ -118,3 +118,33 @@ See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/Op
│ . . . │
└──────────────────────────────────────────────────────────────────┘
```
### Metadata records
Metadata records encode the metadata updates associated with a series.
```
┌────────────────────────────────────────────┐
│ type = 5 <1b>
├────────────────────────────────────────────┤
│ ┌────────────────────────────────────────┐ │
│ │ series_id <uvarint> │ │
│ ├────────────────────────────────────────┤ │
│ │ metric_type <1b> │ │
│ ├────────────────────────────────────────┤ │
│ │ num_fields <uvarint> │ │
│ ├───────────────────────┬────────────────┤ │
│ │ len(name_1) <uvarint> │ name_1 <bytes> │ │
│ ├───────────────────────┼────────────────┤ │
│ │ len(val_1) <uvarint> │ val_1 <bytes> │ │
│ ├───────────────────────┴────────────────┤ │
│ │ . . . │ │
│ ├───────────────────────┬────────────────┤ │
│ │ len(name_n) <uvarint> │ name_n <bytes> │ │
│ ├───────────────────────┼────────────────┤ │
│ │ len(val_n) <uvarint> │ val_n <bytes> │ │
│ └───────────────────────┴────────────────┘ │
│ . . . │
└────────────────────────────────────────────┘
```

3
tsdb/head.go

@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -75,6 +76,7 @@ type Head struct {
logger log.Logger
appendPool sync.Pool
exemplarsPool sync.Pool
metadataPool sync.Pool
seriesPool sync.Pool
bytesPool sync.Pool
memChunkPool sync.Pool
@ -1514,6 +1516,7 @@ type memSeries struct {
ref chunks.HeadSeriesRef
lset labels.Labels
meta metadata.Metadata
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
// When compaction runs, chunks get moved into a block and all pointers are shifted like so:

83
tsdb/head_append.go

@ -23,6 +23,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -65,6 +66,15 @@ func (a *initAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e
return a.app.AppendExemplar(ref, l, e)
}
func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.UpdateMetadata(ref, l, m)
}
a.app = a.head.appender()
return a.app.UpdateMetadata(ref, l, m)
}
// initTime initializes a head with the first timestamp. This only needs to be called
// for a completely fresh head with an empty WAL.
func (h *Head) initTime(t int64) {
@ -130,6 +140,7 @@ func (h *Head) appender() *headAppender {
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
exemplars: exemplarsBuf,
metadata: h.getMetadataBuffer(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
@ -196,6 +207,19 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) {
h.exemplarsPool.Put(b[:0])
}
func (h *Head) getMetadataBuffer() []record.RefMetadata {
b := h.metadataPool.Get()
if b == nil {
return make([]record.RefMetadata, 0, 512)
}
return b.([]record.RefMetadata)
}
func (h *Head) putMetadataBuffer(b []record.RefMetadata) {
//nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty.
h.metadataPool.Put(b[:0])
}
func (h *Head) getSeriesBuffer() []*memSeries {
b := h.seriesPool.Get()
if b == nil {
@ -232,10 +256,12 @@ type headAppender struct {
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
series []record.RefSeries // New series held by this appender.
samples []record.RefSample // New samples held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
sampleSeries []*memSeries // Series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
series []record.RefSeries // New series held by this appender.
metadata []record.RefMetadata // New metadata held by this appender.
samples []record.RefSample // New samples held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
sampleSeries []*memSeries // Series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
appendID, cleanupAppendIDsBelow uint64
closed bool
@ -358,6 +384,37 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
return storage.SeriesRef(s.ref), nil
}
// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
s = a.head.series.getByHash(lset.Hash(), lset)
if s != nil {
ref = storage.SeriesRef(s.ref)
}
}
if s == nil {
return 0, fmt.Errorf("unknown series when trying to add metadata with HeadSeriesRef: %d and labels: %s", ref, lset)
}
s.RLock()
hasNewMetadata := s.meta != meta
s.RUnlock()
if hasNewMetadata {
a.metadata = append(a.metadata, record.RefMetadata{
Ref: s.ref,
Type: record.GetMetricType(meta.Type),
Unit: meta.Unit,
Help: meta.Help,
})
a.metadataSeries = append(a.metadataSeries, s)
}
return ref, nil
}
var _ storage.GetRef = &headAppender{}
func (a *headAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) {
@ -389,6 +446,14 @@ func (a *headAppender) log() error {
return errors.Wrap(err, "log series")
}
}
if len(a.metadata) > 0 {
rec = enc.Metadata(a.metadata, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return errors.Wrap(err, "log metadata")
}
}
if len(a.samples) > 0 {
rec = enc.Samples(a.samples, buf)
buf = rec[:0]
@ -449,6 +514,7 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putAppendBuffer(a.samples)
defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.putExemplarBuffer(a.exemplars)
defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID)
total := len(a.samples)
@ -471,6 +537,13 @@ func (a *headAppender) Commit() (err error) {
}
}
for i, m := range a.metadata {
series = a.metadataSeries[i]
series.Lock()
series.meta = metadata.Metadata{Type: record.ToTextparseMetricType(m.Type), Unit: m.Unit, Help: m.Help}
series.Unlock()
}
a.head.metrics.samplesAppended.Add(float64(total))
a.head.updateMinMaxTime(a.mint, a.maxt)
@ -619,8 +692,10 @@ func (a *headAppender) Rollback() (err error) {
}
a.head.putAppendBuffer(a.samples)
a.head.putExemplarBuffer(a.exemplars)
a.head.putMetadataBuffer(a.metadata)
a.samples = nil
a.exemplars = nil
a.metadata = nil
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.

9
tsdb/head_test.go

@ -123,6 +123,10 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
tstones, err := dec.Tombstones(rec, nil)
require.NoError(t, err)
recs = append(recs, tstones)
case record.Metadata:
meta, err := dec.Metadata(rec, nil)
require.NoError(t, err)
recs = append(recs, meta)
default:
t.Fatalf("unknown record type")
}
@ -1004,7 +1008,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
recs := readTestWAL(t, cdir)
recs = append(recs, readTestWAL(t, w.Dir())...)
var series, samples, stones int
var series, samples, stones, metadata int
for _, rec := range recs {
switch rec.(type) {
case []record.RefSeries:
@ -1013,6 +1017,8 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
samples++
case []tombstones.Stone:
stones++
case []record.RefMetadata:
metadata++
default:
t.Fatalf("unknown record type")
}
@ -1020,6 +1026,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
require.Equal(t, 1, series)
require.Equal(t, 9999, samples)
require.Equal(t, 1, stones)
require.Equal(t, 0, metadata)
}
func TestDelete_e2e(t *testing.T) {

38
tsdb/head_wal.go

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -46,6 +47,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
// for error reporting.
var unknownRefs atomic.Uint64
var unknownExemplarRefs atomic.Uint64
var unknownMetadataRefs atomic.Uint64
// Track number of series records that had overlapping m-map chunks.
var mmapOverlappingChunks uint64
@ -81,6 +83,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return []record.RefExemplar{}
},
}
metadataPool = sync.Pool{
New: func() interface{} {
return []record.RefMetadata{}
},
}
)
defer func() {
@ -184,6 +191,18 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
return
}
decoded <- exemplars
case record.Metadata:
meta := metadataPool.Get().([]record.RefMetadata)[:0]
meta, err := dec.Metadata(rec, meta)
if err != nil {
decodeErr = &wal.CorruptionErr{
Err: errors.Wrap(err, "decode metadata"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decoded <- meta
default:
// Noop.
}
@ -307,6 +326,21 @@ Outer:
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool.Put(v)
case []record.RefMetadata:
for _, m := range v {
s := h.series.getByID(chunks.HeadSeriesRef(m.Ref))
if s == nil {
unknownMetadataRefs.Inc()
continue
}
s.meta = metadata.Metadata{
Type: record.ToTextparseMetricType(m.Type),
Unit: m.Unit,
Help: m.Help,
}
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
metadataPool.Put(v)
default:
panic(fmt.Errorf("unexpected decoded type: %T", d))
}
@ -333,8 +367,8 @@ Outer:
return errors.Wrap(r.Err(), "read records")
}
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load())
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 || unknownMetadataRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "metadata", unknownMetadataRefs.Load())
}
if mmapOverlappingChunks > 0 {
level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", mmapOverlappingChunks)

141
tsdb/record/record.go

@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
@ -42,6 +43,8 @@ const (
Tombstones Type = 3
// Exemplars is used to match WAL records of type Exemplars.
Exemplars Type = 4
// Metadata is used to match WAL records of type Metadata.
Metadata Type = 6
)
func (rt Type) String() string {
@ -54,11 +57,74 @@ func (rt Type) String() string {
return "exemplars"
case Tombstones:
return "tombstones"
case Metadata:
return "metadata"
default:
return "unknown"
}
}
// MetricType represents the type of a series.
type MetricType uint8
const (
UnknownMT MetricType = 0
Counter MetricType = 1
Gauge MetricType = 2
Histogram MetricType = 3
GaugeHistogram MetricType = 4
Summary MetricType = 5
Info MetricType = 6
Stateset MetricType = 7
)
func GetMetricType(t textparse.MetricType) uint8 {
switch t {
case textparse.MetricTypeCounter:
return uint8(Counter)
case textparse.MetricTypeGauge:
return uint8(Gauge)
case textparse.MetricTypeHistogram:
return uint8(Histogram)
case textparse.MetricTypeGaugeHistogram:
return uint8(GaugeHistogram)
case textparse.MetricTypeSummary:
return uint8(Summary)
case textparse.MetricTypeInfo:
return uint8(Info)
case textparse.MetricTypeStateset:
return uint8(Stateset)
default:
return uint8(UnknownMT)
}
}
func ToTextparseMetricType(m uint8) textparse.MetricType {
switch m {
case uint8(Counter):
return textparse.MetricTypeCounter
case uint8(Gauge):
return textparse.MetricTypeGauge
case uint8(Histogram):
return textparse.MetricTypeHistogram
case uint8(GaugeHistogram):
return textparse.MetricTypeGaugeHistogram
case uint8(Summary):
return textparse.MetricTypeSummary
case uint8(Info):
return textparse.MetricTypeInfo
case uint8(Stateset):
return textparse.MetricTypeStateset
default:
return textparse.MetricTypeUnknown
}
}
const (
unitMetaName = "UNIT"
helpMetaName = "HELP"
)
// ErrNotFound is returned if a looked up resource was not found. Duplicate ErrNotFound from head.go.
var ErrNotFound = errors.New("not found")
@ -75,6 +141,14 @@ type RefSample struct {
V float64
}
// RefMetadata is the metadata associated with a series ID.
type RefMetadata struct {
Ref chunks.HeadSeriesRef
Type uint8
Unit string
Help string
}
// RefExemplar is an exemplar with it's labels, timestamp, value the exemplar was collected/observed with, and a reference to a series.
type RefExemplar struct {
Ref chunks.HeadSeriesRef
@ -83,7 +157,7 @@ type RefExemplar struct {
Labels labels.Labels
}
// Decoder decodes series, sample, and tombstone records.
// Decoder decodes series, sample, metadata and tombstone records.
// The zero value is ready to use.
type Decoder struct{}
@ -94,7 +168,7 @@ func (d *Decoder) Type(rec []byte) Type {
return Unknown
}
switch t := Type(rec[0]); t {
case Series, Samples, Tombstones, Exemplars:
case Series, Samples, Tombstones, Exemplars, Metadata:
return t
}
return Unknown
@ -132,6 +206,49 @@ func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
return series, nil
}
// Metadata appends metadata in rec to the given slice.
func (d *Decoder) Metadata(rec []byte, metadata []RefMetadata) ([]RefMetadata, error) {
dec := encoding.Decbuf{B: rec}
if Type(dec.Byte()) != Metadata {
return nil, errors.New("invalid record type")
}
for len(dec.B) > 0 && dec.Err() == nil {
ref := dec.Uvarint64()
typ := dec.Byte()
numFields := dec.Uvarint()
// We're currently aware of two more metadata fields other than TYPE; that is UNIT and HELP.
// We can skip the rest of the fields (if we encounter any), but we must decode them anyway
// so we can correctly align with the start with the next metadata record.
var unit, help string
for i := 0; i < numFields; i++ {
fieldName := dec.UvarintStr()
fieldValue := dec.UvarintStr()
switch fieldName {
case unitMetaName:
unit = fieldValue
case helpMetaName:
help = fieldValue
}
}
metadata = append(metadata, RefMetadata{
Ref: chunks.HeadSeriesRef(ref),
Type: typ,
Unit: unit,
Help: help,
})
}
if dec.Err() != nil {
return nil, dec.Err()
}
if len(dec.B) > 0 {
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return metadata, nil
}
// Samples appends samples in rec to the given slice.
func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
dec := encoding.Decbuf{B: rec}
@ -259,6 +376,26 @@ func (e *Encoder) Series(series []RefSeries, b []byte) []byte {
return buf.Get()
}
// Metadata appends the encoded metadata to b and returns the resulting slice.
func (e *Encoder) Metadata(metadata []RefMetadata, b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(byte(Metadata))
for _, m := range metadata {
buf.PutUvarint64(uint64(m.Ref))
buf.PutByte(m.Type)
buf.PutUvarint(2) // num_fields: We currently have two more metadata fields, UNIT and HELP.
buf.PutUvarintStr(unitMetaName)
buf.PutUvarintStr(m.Unit)
buf.PutUvarintStr(helpMetaName)
buf.PutUvarintStr(m.Help)
}
return buf.Get()
}
// Samples appends the encoded samples to b and returns the resulting slice.
func (e *Encoder) Samples(samples []RefSample, b []byte) []byte {
buf := encoding.Encbuf{B: b}

105
tsdb/record/record_test.go

@ -45,6 +45,30 @@ func TestRecord_EncodeDecode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, series, decSeries)
metadata := []RefMetadata{
{
Ref: 100,
Type: uint8(Counter),
Unit: "",
Help: "some magic counter",
},
{
Ref: 1,
Type: uint8(Counter),
Unit: "seconds",
Help: "CPU time counter",
},
{
Ref: 147741,
Type: uint8(Gauge),
Unit: "percentage",
Help: "current memory usage",
},
}
decMetadata, err := dec.Metadata(enc.Metadata(metadata, nil), nil)
require.NoError(t, err)
require.Equal(t, metadata, decMetadata)
samples := []RefSample{
{Ref: 0, T: 12423423, V: 1.2345},
{Ref: 123, T: -1231, V: -123},
@ -136,6 +160,16 @@ func TestRecord_Corrupted(t *testing.T) {
_, err := dec.Exemplars(corrupted, nil)
require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize)
})
t.Run("Test corrupted metadata record", func(t *testing.T) {
meta := []RefMetadata{
{Ref: 147, Type: uint8(Counter), Unit: "unit", Help: "help"},
}
corrupted := enc.Metadata(meta, nil)[:8]
_, err := dec.Metadata(corrupted, nil)
require.Equal(t, errors.Cause(err), encoding.ErrInvalidSize)
})
}
func TestRecord_Type(t *testing.T) {
@ -154,9 +188,80 @@ func TestRecord_Type(t *testing.T) {
recordType = dec.Type(enc.Tombstones(tstones, nil))
require.Equal(t, Tombstones, recordType)
metadata := []RefMetadata{{Ref: 147, Type: uint8(Counter), Unit: "unit", Help: "help"}}
recordType = dec.Type(enc.Metadata(metadata, nil))
require.Equal(t, Metadata, recordType)
recordType = dec.Type(nil)
require.Equal(t, Unknown, recordType)
recordType = dec.Type([]byte{0})
require.Equal(t, Unknown, recordType)
}
func TestRecord_MetadataDecodeUnknownExtraFields(t *testing.T) {
var enc encoding.Encbuf
var dec Decoder
// Write record type.
enc.PutByte(byte(Metadata))
// Write first metadata entry, all known fields.
enc.PutUvarint64(101)
enc.PutByte(byte(Counter))
enc.PutUvarint(2)
enc.PutUvarintStr(unitMetaName)
enc.PutUvarintStr("")
enc.PutUvarintStr(helpMetaName)
enc.PutUvarintStr("some magic counter")
// Write second metadata entry, known fields + unknown fields.
enc.PutUvarint64(99)
enc.PutByte(byte(Counter))
enc.PutUvarint(3)
// Known fields.
enc.PutUvarintStr(unitMetaName)
enc.PutUvarintStr("seconds")
enc.PutUvarintStr(helpMetaName)
enc.PutUvarintStr("CPU time counter")
// Unknown fields.
enc.PutUvarintStr("an extra field name to be skipped")
enc.PutUvarintStr("with its value")
// Write third metadata entry, with unknown fields and different order.
enc.PutUvarint64(47250)
enc.PutByte(byte(Gauge))
enc.PutUvarint(4)
enc.PutUvarintStr("extra name one")
enc.PutUvarintStr("extra value one")
enc.PutUvarintStr(helpMetaName)
enc.PutUvarintStr("current memory usage")
enc.PutUvarintStr("extra name two")
enc.PutUvarintStr("extra value two")
enc.PutUvarintStr(unitMetaName)
enc.PutUvarintStr("percentage")
// Should yield known fields for all entries and skip over unknown fields.
expectedMetadata := []RefMetadata{
{
Ref: 101,
Type: uint8(Counter),
Unit: "",
Help: "some magic counter",
}, {
Ref: 99,
Type: uint8(Counter),
Unit: "seconds",
Help: "CPU time counter",
}, {
Ref: 47250,
Type: uint8(Gauge),
Unit: "percentage",
Help: "current memory usage",
},
}
decMetadata, err := dec.Metadata(enc.Get(), nil)
require.NoError(t, err)
require.Equal(t, expectedMetadata, decMetadata)
}

39
tsdb/wal/checkpoint.go

@ -41,10 +41,12 @@ type CheckpointStats struct {
DroppedSamples int
DroppedTombstones int
DroppedExemplars int
DroppedMetadata int
TotalSeries int // Processed series including dropped ones.
TotalSamples int // Processed samples including dropped ones.
TotalTombstones int // Processed tombstones including dropped ones.
TotalExemplars int // Processed exemplars including dropped ones.
TotalMetadata int // Processed metadata including dropped ones.
}
// LastCheckpoint returns the directory name and index of the most recent checkpoint.
@ -84,7 +86,8 @@ const checkpointPrefix = "checkpoint."
// Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL.
// It includes the most recent checkpoint if it exists.
// All series not satisfying keep and samples/tombstones/exemplars below mint are dropped.
// All series not satisfying keep, samples/tombstones/exemplars below mint and
// metadata that are not the latest are dropped.
//
// The checkpoint is stored in a directory named checkpoint.N in the same
// segmented format as the original WAL itself.
@ -149,13 +152,16 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea
samples []record.RefSample
tstones []tombstones.Stone
exemplars []record.RefExemplar
metadata []record.RefMetadata
dec record.Decoder
enc record.Encoder
buf []byte
recs [][]byte
latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata)
)
for r.Next() {
series, samples, tstones, exemplars = series[:0], samples[:0], tstones[:0], exemplars[:0]
series, samples, tstones, exemplars, metadata = series[:0], samples[:0], tstones[:0], exemplars[:0], metadata[:0]
// We don't reset the buffer since we batch up multiple records
// before writing them to the checkpoint.
@ -238,6 +244,23 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea
}
stats.TotalExemplars += len(exemplars)
stats.DroppedExemplars += len(exemplars) - len(repl)
case record.Metadata:
metadata, err := dec.Metadata(rec, metadata)
if err != nil {
return nil, errors.Wrap(err, "decode metadata")
}
// Only keep reference to the latest found metadata for each refID.
repl := 0
for _, m := range metadata {
if keep(m.Ref) {
if _, ok := latestMetadataMap[m.Ref]; !ok {
repl++
}
latestMetadataMap[m.Ref] = m
}
}
stats.TotalMetadata += len(metadata)
stats.DroppedMetadata += len(metadata) - repl
default:
// Unknown record type, probably from a future Prometheus version.
continue
@ -265,6 +288,18 @@ func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.Hea
if err := cp.Log(recs...); err != nil {
return nil, errors.Wrap(err, "flush records")
}
// Flush latest metadata records for each series.
if len(latestMetadataMap) > 0 {
latestMetadata := make([]record.RefMetadata, 0, len(latestMetadataMap))
for _, m := range latestMetadataMap {
latestMetadata = append(latestMetadata, m)
}
if err := cp.Log(enc.Metadata(latestMetadata, buf[:0])); err != nil {
return nil, errors.Wrap(err, "flush metadata records")
}
}
if err := cp.Close(); err != nil {
return nil, errors.Wrap(err, "close checkpoint")
}

37
tsdb/wal/checkpoint_test.go

@ -18,6 +18,7 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"
@ -153,6 +154,14 @@ func TestCheckpoint(t *testing.T) {
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
require.NoError(t, w.Log(b))
b = enc.Metadata([]record.RefMetadata{
{Ref: 2, Unit: "unit", Help: "help"},
{Ref: 3, Unit: "unit", Help: "help"},
{Ref: 4, Unit: "unit", Help: "help"},
{Ref: 5, Unit: "unit", Help: "help"},
}, nil)
require.NoError(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
@ -170,6 +179,16 @@ func TestCheckpoint(t *testing.T) {
}, nil)
require.NoError(t, w.Log(b))
// Write changing metadata for each series. In the end, only the latest
// version should end up in the checkpoint.
b = enc.Metadata([]record.RefMetadata{
{Ref: 0, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
{Ref: 1, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
{Ref: 2, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
{Ref: 3, Unit: fmt.Sprintf("%d", last), Help: fmt.Sprintf("%d", last)},
}, nil)
require.NoError(t, w.Log(b))
last += 100
}
require.NoError(t, w.Close())
@ -193,6 +212,7 @@ func TestCheckpoint(t *testing.T) {
var dec record.Decoder
var series []record.RefSeries
var metadata []record.RefMetadata
r := NewReader(sr)
for r.Next() {
@ -214,14 +234,27 @@ func TestCheckpoint(t *testing.T) {
for _, e := range exemplars {
require.GreaterOrEqual(t, e.T, last/2, "exemplar with wrong timestamp")
}
case record.Metadata:
metadata, err = dec.Metadata(rec, metadata)
require.NoError(t, err)
}
}
require.NoError(t, r.Err())
require.Equal(t, []record.RefSeries{
expectedRefSeries := []record.RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}
require.Equal(t, expectedRefSeries, series)
expectedRefMetadata := []record.RefMetadata{
{Ref: 0, Unit: fmt.Sprintf("%d", last-100), Help: fmt.Sprintf("%d", last-100)},
{Ref: 2, Unit: fmt.Sprintf("%d", last-100), Help: fmt.Sprintf("%d", last-100)},
{Ref: 4, Unit: "unit", Help: "help"},
}
sort.Slice(metadata, func(i, j int) bool { return metadata[i].Ref < metadata[j].Ref })
require.Equal(t, expectedRefMetadata, metadata)
})
}
}

Loading…
Cancel
Save