Browse Source

Fix merge conflicts

Signed-off-by: Jesus Vazquez <jesus.vazquez@grafana.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/11420/head
Signed-off-by: Jesus Vazquez 2 years ago committed by Ganesh Vernekar
parent
commit
3362bf6d79
No known key found for this signature in database
GPG Key ID: F056451B52F1DC34
  1. 2
      documentation/examples/remote_storage/go.mod
  2. 28
      documentation/examples/remote_storage/go.sum
  3. 3
      storage/merge.go
  4. 62
      storage/merge_test.go
  5. 12
      tsdb/chunkenc/chunk.go
  6. 8
      tsdb/chunkenc/histogram.go
  7. 44
      tsdb/chunkenc/histogram_meta.go
  8. 6
      tsdb/compact_test.go
  9. 1
      tsdb/db.go
  10. 48
      tsdb/db_test.go
  11. 65
      tsdb/head.go
  12. 151
      tsdb/head_append.go
  13. 115
      tsdb/head_read.go
  14. 72
      tsdb/head_read_test.go
  15. 56
      tsdb/head_test.go
  16. 6
      tsdb/head_wal.go
  17. 4
      tsdb/ooo_head.go
  18. 4
      tsdb/ooo_head_read_test.go
  19. 2
      tsdb/ooo_head_test.go
  20. 25
      tsdb/tsdbutil/chunks.go

2
documentation/examples/remote_storage/go.mod

@ -54,7 +54,7 @@ require (
)
require (
github.com/prometheus/prometheus v0.38.0
github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
)

28
documentation/examples/remote_storage/go.sum

@ -19,7 +19,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/armon/go-metrics v0.3.10 h1:FR+drcQStOe+32sYyJYyZ7FIdgoGGBnwLl+flodp8Uo=
github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+ro=
github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.44.72 h1:i7J5XT7pjBjtl1OrdIhiQHzsG89wkZCcM1HhyK++3DI=
github.com/aws/aws-sdk-go v1.44.72/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
@ -103,19 +103,19 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/gophercloud/gophercloud v0.25.0 h1:C3Oae7y0fUVQGSsBrb3zliAjdX+riCSEh4lNMejFNI4=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 h1:uirlL/j72L93RhV4+mkWhjv0cov2I0MIgPOG9rMDr1k=
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2/go.mod h1:M5qHK+eWfAv8VR/265dIuEpL3fNfeC21tXXp9itM24A=
github.com/hashicorp/consul/api v1.14.0 h1:Y64GIJ8hYTu+tuGekwO4G4ardXoiCivX9wv1iP/kihk=
github.com/hashicorp/consul/api v1.13.1 h1:r5cPdVFUy+pFF7nt+0ArLD9hm+E39OewJkvNdjKXcL4=
github.com/hashicorp/cronexpr v1.1.1 h1:NJZDd87hGXjoZBdvyCF9mX4DCq5Wy7+A/w+A7q0wn6c=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-hclog v0.14.1 h1:nQcJDQwIAGnmoUWp8ubocEX40cCml/17YkF6csQLReU=
github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE=
github.com/hashicorp/go-hclog v0.12.2 h1:F1fdYblUEsxKiailtkhCCG2g4bipEgaHiDc8vffNpD4=
github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8=
github.com/hashicorp/go-retryablehttp v0.7.1 h1:sUiuQAnLlbvmExtFQs72iFW/HXeUn8Z1aJLQ4LJJbTQ=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/nomad/api v0.0.0-20220809212729-939d643fec2c h1:lV5A4cLQr1Bh1xGSSQ2R0fDRK4GZnfXxYia4Q7aaTXc=
github.com/hashicorp/serf v0.9.7 h1:hkdgbqizGQHuU5IPqYM1JdSMV8nKfpuOnZYXssk9muY=
github.com/hashicorp/nomad/api v0.0.0-20220629141207-c2428e1673ec h1:jAF71e0KoaY2LJlRsRxxGz6MNQOG5gTBIc+rklxfNO0=
github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc=
github.com/hetznercloud/hcloud-go v1.35.2 h1:eEDtmDiI2plZ2UQmj4YpiYse5XbtpXOUBpAdIOLxzgE=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/influxdata/influxdb v1.10.0 h1:8xDpt8KO3lzrzf/ss+l8r42AGUZvoITu5824berK7SE=
@ -157,7 +157,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182aff
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -205,10 +205,10 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo=
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/prometheus v0.38.0 h1:YSiJ5gDZmXnOntPRyHn1wb/6I1Frasj9dw57XowIqeA=
github.com/prometheus/prometheus v0.38.0/go.mod h1:2zHO5FtRhM+iu995gwKIb99EXxjeZEuXpKUTIRq4YI0=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099 h1:ISpgxhFfSrMztQTw0Za6xDDC3Fwe4kciR8Pwv3Sz9yE=
github.com/prometheus/prometheus v0.37.1-0.20221011120840-430bdc9dd099/go.mod h1:dfkjkdCd3FhLE0BiBIKwwwkZiDQnTnDThE1Zex1UwbA=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.9 h1:0roa6gXKgyta64uqh52AQG3wzZXH21unn+ltzQSXML0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@ -322,7 +322,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.13-0.20220908144252-ce397412b6a4 h1:glzimF7qHZuKVEiMbE7UqBu44MyTjt5u6j3Jz+rfMRM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -331,7 +331,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 h1:7PEE9xCtufpGJzrqweakEEnTh7YFELmnKm/ee+5jmfQ=
google.golang.org/genproto v0.0.0-20220802133213-ce4fa296bf78 h1:QntLWYqZeuBtJkth3m/6DLznnI0AHJr+AgJXvVh/izw=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@ -351,7 +351,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

3
storage/merge.go

@ -20,12 +20,13 @@ import (
"math"
"sync"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"golang.org/x/exp/slices"
)
type mergeGenericQuerier struct {

62
storage/merge_test.go

@ -560,9 +560,9 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
{
name: "single series",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}}),
},
{
name: "two empty series",
@ -575,70 +575,70 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
{
name: "two non overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}),
},
{
name: "two overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{8, 8, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}},
[]tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}},
[]tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}}, []tsdbutil.Sample{sample{3, 3, nil, nil}, sample{8, 8, nil, nil}},
[]tsdbutil.Sample{sample{7, 7, nil, nil}, sample{9, 9, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}},
),
},
{
name: "two duplicated",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}},
[]tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}},
[]tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}},
[]tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}},
),
},
{
name: "three overlapping",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{6, 6, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{4, 4, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}},
[]tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}},
[]tsdbutil.Sample{sample{0, 0}, sample{4, 4}},
[]tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}},
[]tsdbutil.Sample{sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{6, 6, nil, nil}},
[]tsdbutil.Sample{sample{0, 0, nil, nil}, sample{4, 4, nil, nil}},
),
},
{
name: "three in chained overlap",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4, nil, nil}, sample{6, 66, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6, nil, nil}, sample{10, 10, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}},
[]tsdbutil.Sample{sample{4, 4}, sample{6, 66}},
[]tsdbutil.Sample{sample{6, 6}, sample{10, 10}},
[]tsdbutil.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 5, nil, nil}},
[]tsdbutil.Sample{sample{4, 4, nil, nil}, sample{6, 66, nil, nil}},
[]tsdbutil.Sample{sample{6, 6, nil, nil}, sample{10, 10, nil, nil}},
),
},
{
name: "three in chained overlap complex",
input: []ChunkSeries{
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}, sample{15, 15, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2, nil, nil}, sample{20, 20, nil, nil}}, []tsdbutil.Sample{sample{25, 25, nil, nil}, sample{30, 30, nil, nil}}),
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18, nil, nil}, sample{26, 26, nil, nil}}, []tsdbutil.Sample{sample{31, 31, nil, nil}, sample{35, 35, nil, nil}}),
},
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}},
[]tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}},
[]tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
[]tsdbutil.Sample{sample{0, 0, nil, nil}, sample{5, 5, nil, nil}}, []tsdbutil.Sample{sample{10, 10, nil, nil}, sample{15, 15, nil, nil}},
[]tsdbutil.Sample{sample{2, 2, nil, nil}, sample{20, 20, nil, nil}}, []tsdbutil.Sample{sample{25, 25, nil, nil}, sample{30, 30, nil, nil}},
[]tsdbutil.Sample{sample{18, 18, nil, nil}, sample{26, 26, nil, nil}}, []tsdbutil.Sample{sample{31, 31, nil, nil}, sample{35, 35, nil, nil}},
),
},
{

12
tsdb/chunkenc/chunk.go

@ -44,15 +44,6 @@ func (e Encoding) String() string {
return "<unknown>"
}
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
switch e {
case EncXOR, EncHistogram:
return true
}
return false
}
// Chunk encodings for out-of-order chunks.
// These encodings must be only used by the Head block for its internal bookkeeping.
const (
@ -64,8 +55,9 @@ func IsOutOfOrderChunk(e Encoding) bool {
return (e & OutOfOrderMask) != 0
}
// IsValidEncoding returns true for supported encodings.
func IsValidEncoding(e Encoding) bool {
return e == EncXOR || e == EncOOOXOR
return e == EncXOR || e == EncOOOXOR || e == EncHistogram
}
// Chunk holds a sequence of sample pairs that can be iterated over and appended to.

8
tsdb/chunkenc/histogram.go

@ -29,10 +29,10 @@ import (
// delta of the delta to the previous number, xor = what we do for regular
// sample values):
//
// field → ts count zeroCount sum []posbuckets []negbuckets
// sample 1 raw raw raw raw []raw []raw
// sample 2 delta delta delta xor []delta []delta
// sample >2 dod dod dod xor []dod []dod
// field → ts count zeroCount sum []posbuckets []negbuckets
// sample 1 raw raw raw raw []raw []raw
// sample 2 delta delta delta xor []delta []delta
// sample >2 dod dod dod xor []dod []dod
type HistogramChunk struct {
b bstream
}

44
tsdb/chunkenc/histogram_meta.go

@ -94,18 +94,18 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) {
//
// * If the threshold is 0, store a single zero byte.
//
// * If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent
// and store the result (which will be between 1 and 254) as a single
// byte. Note that small powers of two are preferred values for the zero
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
// - If the threshold is a power of 2 between (and including) 2^-243 and 2^10,
// take the exponent from the IEEE 754 representation of the threshold, which
// covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242
// in IEEE 754 representation, and 2^10 is 0.5*2^11.) Add 243 to the exponent
// and store the result (which will be between 1 and 254) as a single
// byte. Note that small powers of two are preferred values for the zero
// threshold. The default value for the zero threshold is 2^-128 (or
// 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a
// single byte (with value 116).
//
// * In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
// - In all other cases, store 255 as a single byte, followed by the 8 bytes of
// the threshold as a float64, i.e. taking 9 bytes in total.
func putZeroThreshold(b *bstream, threshold float64) {
if threshold == 0 {
b.writeByte(0)
@ -199,11 +199,11 @@ type Interjection struct {
//
// Let's say the old buckets look like this:
//
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
// span syntax: [offset, length]
// spans : [ 0 , 2 ] [2,1] [ 3 , 2 ] [3,1] [1,1]
// bucket idx : [0] [1] 2 3 [4] 5 6 7 [8] [9] 10 11 12 [13] 14 [15]
// raw values 6 3 3 2 4 5 1
// deltas 6 -3 0 -1 2 1 -4
//
// But now we introduce a new bucket layout. (Carefully chosen example where we
// have a span appended, one unchanged[*], one prepended, and two merge - in
@ -213,12 +213,12 @@ type Interjection struct {
// that, their offset needs to change if "disrupted" by spans changing ahead of
// them
//
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
// \/ this one is "unchanged"
// spans : [ 0 , 3 ] [1,1] [ 1 , 4 ] [ 3 , 3 ]
// bucket idx : [0] [1] [2] 3 [4] 5 [6] [7] [8] [9] 10 11 12 [13] [14] [15]
// raw values 6 3 0 3 0 0 2 4 5 0 1
// deltas 6 -3 -3 3 -3 0 2 2 1 -5 1
// delta mods: / \ / \ / \
//
// Note that whenever any new buckets are introduced, the subsequent "old"
// bucket needs to readjust its delta to the new base of 0. Thus, for the caller

6
tsdb/compact_test.go

@ -1298,7 +1298,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
}
func TestHeadCompactionWithHistograms(t *testing.T) {
head, _ := newTestHead(t, DefaultBlockDuration, false)
head, _ := newTestHead(t, DefaultBlockDuration, false, false)
require.NoError(t, head.Init(0))
t.Cleanup(func() {
require.NoError(t, head.Close())
@ -1462,11 +1462,11 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
c.numBuckets,
),
func(t *testing.T) {
oldHead, _ := newTestHead(t, DefaultBlockDuration, false)
oldHead, _ := newTestHead(t, DefaultBlockDuration, false, false)
t.Cleanup(func() {
require.NoError(t, oldHead.Close())
})
sparseHead, _ := newTestHead(t, DefaultBlockDuration, false)
sparseHead, _ := newTestHead(t, DefaultBlockDuration, false, false)
t.Cleanup(func() {
require.NoError(t, sparseHead.Close())
})

1
tsdb/db.go

@ -81,6 +81,7 @@ func DefaultOptions() *Options {
StripeSize: DefaultStripeSize,
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
IsolationDisabled: defaultIsolationDisabled,
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
}
}

48
tsdb/db_test.go

@ -4069,8 +4069,8 @@ func TestOOOCompaction(t *testing.T) {
fromMins, toMins := r[0], r[1]
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts)})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts)})
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
}
}
expRes := map[string][]tsdbutil.Sample{
@ -4137,8 +4137,8 @@ func TestOOOCompaction(t *testing.T) {
series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts)})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts)})
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
}
expRes := map[string][]tsdbutil.Sample{
series1.String(): series1Samples,
@ -4269,8 +4269,8 @@ func TestOOOCompactionWithNormalCompaction(t *testing.T) {
series2Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts)})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts)})
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
series2Samples = append(series2Samples, sample{ts, float64(2 * ts), nil, nil})
}
expRes := map[string][]tsdbutil.Sample{
series1.String(): series1Samples,
@ -4457,7 +4457,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
var gotSamples []tsdbutil.Sample
for _, chunk := range chks[series1.String()] {
it := chunk.Chunk.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()
gotSamples = append(gotSamples, sample{t: ts, v: v})
}
@ -4643,7 +4643,7 @@ func TestOOODisabled(t *testing.T) {
require.Equal(t, expSamples, seriesSet)
require.Equal(t, float64(0), prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamplesAppended), "number of ooo appended samples mismatch")
require.Equal(t, float64(failedSamples),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples),
prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat))+prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat)),
"number of ooo/oob samples mismatch")
// Verifying that no OOO artifacts were generated.
@ -4723,7 +4723,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
chk, err := db.head.chunkDiskMapper.Chunk(mc.ref)
require.NoError(t, err)
it := chk.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
ts, val := it.At()
s1MmapSamples = append(s1MmapSamples, sample{t: ts, v: val})
}
@ -4952,7 +4952,7 @@ func TestOOOCompactionFailure(t *testing.T) {
series1Samples := make([]tsdbutil.Sample, 0, toMins-fromMins+1)
for min := fromMins; min <= toMins; min++ {
ts := min * time.Minute.Milliseconds()
series1Samples = append(series1Samples, sample{ts, float64(ts)})
series1Samples = append(series1Samples, sample{ts, float64(ts), nil, nil})
}
expRes := map[string][]tsdbutil.Sample{
series1.String(): series1Samples,
@ -5860,6 +5860,17 @@ func TestHistogramAppendAndQuery(t *testing.T) {
})
t.Run("new buckets incoming", func(t *testing.T) {
// In the previous unit test, during the last histogram append, we
// changed the schema and that caused a new chunk creation. Because
// of the next append the layout of the last histogram will change
// because the chunk will be re-encoded. So this forces us to modify
// the last histogram in exp1 so when we query we get the expected
// results.
lh := exp1[len(exp1)-1].H().Copy()
lh.PositiveSpans[1].Length++
lh.PositiveBuckets = append(lh.PositiveBuckets, -2) // -2 makes the last bucket 0.
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh}
// This histogram with new bucket at the end causes the re-encoding of the previous histogram.
// Hence the previous histogram is recoded into this new layout.
// But the query returns the histogram from the in-memory buffer, hence we don't see the recode here yet.
@ -5869,6 +5880,21 @@ func TestHistogramAppendAndQuery(t *testing.T) {
appendHistogram(series1, 104, h.Copy(), &exp1)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Because of the previous two histograms being on the active chunk,
// and the next append is only adding a new bucket, the active chunk
// will be re-encoded to the new layout.
lh = exp1[len(exp1)-2].H().Copy()
lh.PositiveSpans[0].Length++
lh.PositiveSpans[1].Offset--
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, -2}
exp1[len(exp1)-2] = sample{t: exp1[len(exp1)-2].T(), h: lh}
lh = exp1[len(exp1)-1].H().Copy()
lh.PositiveSpans[0].Length++
lh.PositiveSpans[1].Offset--
lh.PositiveBuckets = []int64{2, 1, -3, 2, 0, 1}
exp1[len(exp1)-1] = sample{t: exp1[len(exp1)-1].T(), h: lh}
// Now we add the new buckets in between. Empty bucket is again not present for the old histogram.
h.PositiveSpans[0].Length++
h.PositiveSpans[1].Offset--
@ -5973,7 +5999,7 @@ func TestQueryHistogramFromBlocks(t *testing.T) {
t.Helper()
opts := DefaultOptions()
opts.AllowOverlappingBlocks = true
opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works
db := openTestDB(t, opts, nil)
t.Cleanup(func() {
require.NoError(t, db.Close())

65
tsdb/head.go

@ -295,31 +295,6 @@ func (h *Head) resetInMemoryState() error {
}
type headMetrics struct {
<<<<<<< HEAD
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
chunks prometheus.Gauge
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
samplesAppended *prometheus.CounterVec
outOfBoundSamples *prometheus.CounterVec
outOfOrderSamples *prometheus.CounterVec
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
walTotalReplayDuration prometheus.Gauge
headTruncateFail prometheus.Counter
headTruncateTotal prometheus.Counter
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
mmapChunkCorruptionTotal prometheus.Counter
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
=======
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
@ -329,11 +304,11 @@ type headMetrics struct {
chunksCreated prometheus.Counter
chunksRemoved prometheus.Counter
gcDuration prometheus.Summary
samplesAppended prometheus.Counter
samplesAppended *prometheus.CounterVec
outOfOrderSamplesAppended prometheus.Counter
outOfBoundSamples prometheus.Counter
outOfOrderSamples prometheus.Counter
tooOldSamples prometheus.Counter
outOfBoundSamples *prometheus.CounterVec
outOfOrderSamples *prometheus.CounterVec
tooOldSamples *prometheus.CounterVec
walTruncateDuration prometheus.Summary
walCorruptionsTotal prometheus.Counter
dataTotalReplayDuration prometheus.Gauge
@ -346,7 +321,6 @@ type headMetrics struct {
mmapChunkCorruptionTotal prometheus.Counter
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
oooHistogram prometheus.Histogram
>>>>>>> main
}
const (
@ -409,35 +383,23 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
samplesAppended: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_samples_appended_total",
Help: "Total number of appended samples.",
<<<<<<< HEAD
}, []string{"type"}),
outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_bound_samples_total",
Help: "Total number of out of bound samples ingestion failed attempts.",
}, []string{"type"}),
outOfOrderSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_order_samples_total",
Help: "Total number of out of order samples ingestion failed attempts.",
}, []string{"type"}),
=======
}),
outOfOrderSamplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_out_of_order_samples_appended_total",
Help: "Total number of appended out of order samples.",
}),
outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{
outOfBoundSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_bound_samples_total",
Help: "Total number of out of bound samples ingestion failed attempts with out of order support disabled.",
}),
outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"type"}),
outOfOrderSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_out_of_order_samples_total",
Help: "Total number of out of order samples ingestion failed attempts due to out of order being disabled.",
}),
tooOldSamples: prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"type"}),
tooOldSamples: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_too_old_samples_total",
Help: "Total number of out of order samples ingestion failed attempts with out of support enabled, but sample outside of time window.",
}),
>>>>>>> main
}, []string{"type"}),
headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_truncations_failed_total",
Help: "Total number of head truncations that failed.",
@ -1886,6 +1848,9 @@ type memSeries struct {
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
lastValue float64
// We keep the last histogram value here (in addition to appending it to the chunk) so we can check for duplicates.
lastHistogramValue *histogram.Histogram
// Current appender for the head chunk. Set when a new head chunk is cut.
// It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
@ -1894,13 +1859,11 @@ type memSeries struct {
// txs is nil if isolation is disabled.
txs *txRing
<<<<<<< HEAD
// TODO(beorn7): The only reason we track this is to create a staleness
// marker as either histogram or float sample. Perhaps there is a better way.
isHistogramSeries bool
=======
pendingCommit bool // Whether there are samples waiting to be committed to this series.
>>>>>>> main
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {

151
tsdb/head_append.go

@ -301,15 +301,10 @@ type headAppender struct {
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
<<<<<<< HEAD
if t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
=======
// For OOO inserts, this restriction is irrelevant and will be checked later once we confirm the sample is an in-order append.
// If OOO inserts are disabled, we may as well as check this as early as we can and avoid more work.
if a.oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc()
>>>>>>> main
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfBounds
}
@ -344,12 +339,6 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
}
s.Lock()
<<<<<<< HEAD
if err := s.appendable(t, v); err != nil {
s.Unlock()
if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
=======
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -363,10 +352,9 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
if err != nil {
switch err {
case storage.ErrOutOfOrderSample:
a.head.metrics.outOfOrderSamples.Inc()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
case storage.ErrTooOldSample:
a.head.metrics.tooOldSamples.Inc()
>>>>>>> main
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
}
return 0, err
}
@ -445,7 +433,7 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram) error {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if !h.Equals(s.sampleBuf[3].h) {
if !h.Equals(s.lastHistogramValue) {
return storage.ErrDuplicateSampleForTimestamp
}
return nil
@ -767,10 +755,6 @@ func (a *headAppender) Commit() (err error) {
defer a.head.putHistogramBuffer(a.histograms)
defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID)
<<<<<<< HEAD
total := len(a.samples)
var series *memSeries
=======
var (
samplesAppended = len(a.samples)
@ -827,35 +811,10 @@ func (a *headAppender) Commit() (err error) {
wblSamples = nil
oooMmapMarkers = nil
}
>>>>>>> main
for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
<<<<<<< HEAD
if !ok {
total--
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
}
histogramsTotal := len(a.histograms)
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if !ok {
histogramsTotal--
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
=======
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
switch err {
case storage.ErrOutOfOrderSample:
@ -871,7 +830,6 @@ func (a *headAppender) Commit() (err error) {
// Do nothing.
default:
samplesAppended--
>>>>>>> main
}
var ok, chunkCreated bool
@ -939,6 +897,33 @@ func (a *headAppender) Commit() (err error) {
series.Unlock()
}
histogramsTotal := len(a.histograms)
histoOOORejected := 0
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
}
if s.T > inOrderMaxt {
inOrderMaxt = s.T
}
} else {
histogramsTotal--
histoOOORejected++
}
if chunkCreated {
a.head.metrics.chunks.Inc()
a.head.metrics.chunksCreated.Inc()
}
}
for i, m := range a.metadata {
series = a.metadataSeries[i]
series.Lock()
@ -946,19 +931,15 @@ func (a *headAppender) Commit() (err error) {
series.Unlock()
}
<<<<<<< HEAD
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(total))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooRejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected))
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oobRejected))
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(tooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(samplesAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsTotal))
a.head.updateMinMaxTime(a.mint, a.maxt)
=======
a.head.metrics.outOfOrderSamples.Add(float64(oooRejected))
a.head.metrics.outOfBoundSamples.Add(float64(oobRejected))
a.head.metrics.tooOldSamples.Add(float64(tooOldRejected))
a.head.metrics.samplesAppended.Add(float64(samplesAppended))
a.head.metrics.outOfOrderSamplesAppended.Add(float64(oooAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt)
>>>>>>> main
collectOOORecords()
if a.head.wbl != nil {
@ -998,9 +979,8 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
<<<<<<< HEAD
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper)
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1009,10 +989,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
c.maxTime = t
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
s.lastValue = v
if appendID > 0 {
s.txs.add(appendID)
@ -1023,7 +1000,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable before
// appendPreprocessor because in case it ends up creating a new chunk,
@ -1034,7 +1011,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
positiveInterjections, negativeInterjections []chunkenc.Interjection
okToAppend, counterReset bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1050,7 +1027,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// recoding before we can append our histogram.
// - okToAppend and no interjections → Chunk is ready to support our histogram.
if !okToAppend || counterReset {
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper)
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
chunkCreated = true
} else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 {
// New buckets have appeared. We need to recode all
@ -1081,10 +1058,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
c.maxTime = t
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, h: h}
s.lastHistogramValue = h
if appendID > 0 {
s.txs.add(appendID)
@ -1097,11 +1071,8 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
=======
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
>>>>>>> main
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -1114,13 +1085,8 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return c, false, false
}
<<<<<<< HEAD
// There is no chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
=======
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange)
>>>>>>> main
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
chunkCreated = true
}
@ -1132,7 +1098,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
chunkCreated = true
}
@ -1157,26 +1123,11 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
<<<<<<< HEAD
c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
chunkCreated = true
}
return c, true, chunkCreated
=======
c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange)
chunkCreated = true
}
s.app.Append(t, v)
c.maxTime = t
s.lastValue = v
if appendID > 0 && s.txs != nil {
s.txs.add(appendID)
}
return true, chunkCreated
>>>>>>> main
return c, true, chunkCreated
}
// computeChunkEndTime estimates the end timestamp based the beginning of a
@ -1192,13 +1143,9 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/n
}
<<<<<<< HEAD
func (s *memSeries) cutNewHeadChunk(
mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper,
mint int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64,
) *memChunk {
=======
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) *memChunk {
>>>>>>> main
s.mmapCurrentHeadChunk(chunkDiskMapper)
s.headChunk = &memChunk{

115
tsdb/head_read.go

@ -23,7 +23,6 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -487,7 +486,7 @@ func (o mergedOOOChunks) Bytes() []byte {
panic(err)
}
it := o.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
app.Append(t, v)
}
@ -536,7 +535,7 @@ func (b boundedChunk) Bytes() []byte {
xor := chunkenc.NewXORChunk()
a, _ := xor.Appender()
it := b.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
a.Append(t, v)
}
@ -565,33 +564,35 @@ type boundedIterator struct {
// until its able to find a sample within the bounds minT and maxT.
// If there are samples within bounds it will advance one by one amongst them.
// If there are no samples within bounds it will return false.
func (b boundedIterator) Next() bool {
for b.Iterator.Next() {
func (b boundedIterator) Next() chunkenc.ValueType {
for b.Iterator.Next() == chunkenc.ValFloat {
t, _ := b.Iterator.At()
if t < b.minT {
continue
} else if t > b.maxT {
return false
return chunkenc.ValNone
}
return true
return chunkenc.ValFloat
}
return false
return chunkenc.ValNone
}
func (b boundedIterator) Seek(t int64) bool {
func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
if t < b.minT {
// We must seek at least up to b.minT if it is asked for something before that.
ok := b.Iterator.Seek(b.minT)
if !ok {
return false
val := b.Iterator.Seek(b.minT)
if !(val == chunkenc.ValFloat) {
return chunkenc.ValNone
}
t, _ := b.Iterator.At()
return t <= b.maxT
if t <= b.maxT {
return chunkenc.ValFloat
}
}
if t > b.maxT {
// We seek anyway so that the subsequent Next() calls will also return false.
b.Iterator.Seek(t)
return false
return chunkenc.ValNone
}
return b.Iterator.Seek(t)
}
@ -685,92 +686,6 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
return makeStopIterator(c.chunk, it, stopAfter)
}
// memSafeIterator returns values from the wrapped stopIterator
// except the last 4, which come from buf.
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
}
func (it *memSafeIterator) Seek(t int64) chunkenc.ValueType {
if it.Err() != nil {
return chunkenc.ValNone
}
var valueType chunkenc.ValueType
var ts int64 = math.MinInt64
if it.i > -1 {
ts = it.AtT()
}
if t <= ts {
// We are already at the right sample, but we have to find out
// its ValueType.
if it.total-it.i > 4 {
return it.Iterator.Seek(ts)
}
return it.buf[4-(it.total-it.i)].Type()
}
for t > ts || it.i == -1 {
if valueType = it.Next(); valueType == chunkenc.ValNone {
return chunkenc.ValNone
}
ts = it.AtT()
}
return valueType
}
func (it *memSafeIterator) Next() chunkenc.ValueType {
if it.i+1 >= it.stopAfter {
return chunkenc.ValNone
}
it.i++
if it.total-it.i > 4 {
return it.Iterator.Next()
}
return it.buf[4-(it.total-it.i)].Type()
}
func (it *memSafeIterator) At() (int64, float64) {
if it.total-it.i > 4 {
return it.Iterator.At()
}
s := it.buf[4-(it.total-it.i)]
return s.t, s.v
}
func (it *memSafeIterator) AtHistogram() (int64, *histogram.Histogram) {
if it.total-it.i > 4 {
return it.Iterator.AtHistogram()
}
s := it.buf[4-(it.total-it.i)]
return s.t, s.h
}
func (it *memSafeIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
if it.total-it.i > 4 {
return it.Iterator.AtFloatHistogram()
}
s := it.buf[4-(it.total-it.i)]
if s.fh != nil {
return s.t, s.fh
}
return s.t, s.h.ToFloat()
}
func (it *memSafeIterator) AtT() int64 {
if it.total-it.i > 4 {
return it.Iterator.AtT()
}
s := it.buf[4-(it.total-it.i)]
return s.t
}
// stopIterator wraps an Iterator, but only returns the first
// stopAfter values, if initialized with i=-1.
type stopIterator struct {

72
tsdb/head_read_test.go

@ -41,7 +41,7 @@ func TestBoundedChunk(t *testing.T) {
name: "bounds represent a single sample",
inputChunk: newTestChunk(10),
expSamples: []sample{
{0, 0},
{0, 0, nil, nil},
},
},
{
@ -50,14 +50,14 @@ func TestBoundedChunk(t *testing.T) {
inputMinT: 1,
inputMaxT: 8,
expSamples: []sample{
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{8, 8},
{1, 1, nil, nil},
{2, 2, nil, nil},
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
{8, 8, nil, nil},
},
},
{
@ -66,12 +66,12 @@ func TestBoundedChunk(t *testing.T) {
inputMinT: 0,
inputMaxT: 5,
expSamples: []sample{
{0, 0},
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 5},
{0, 0, nil, nil},
{1, 1, nil, nil},
{2, 2, nil, nil},
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
},
},
{
@ -80,11 +80,11 @@ func TestBoundedChunk(t *testing.T) {
inputMinT: 5,
inputMaxT: 9,
expSamples: []sample{
{5, 5},
{6, 6},
{7, 7},
{8, 8},
{9, 9},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
{8, 8, nil, nil},
{9, 9, nil, nil},
},
},
{
@ -95,11 +95,11 @@ func TestBoundedChunk(t *testing.T) {
initialSeek: 1,
seekIsASuccess: true,
expSamples: []sample{
{3, 3},
{4, 4},
{5, 5},
{6, 6},
{7, 7},
{3, 3, nil, nil},
{4, 4, nil, nil},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
},
},
{
@ -110,9 +110,9 @@ func TestBoundedChunk(t *testing.T) {
initialSeek: 5,
seekIsASuccess: true,
expSamples: []sample{
{5, 5},
{6, 6},
{7, 7},
{5, 5, nil, nil},
{6, 6, nil, nil},
{7, 7, nil, nil},
},
},
{
@ -144,23 +144,23 @@ func TestBoundedChunk(t *testing.T) {
if tc.initialSeek != 0 {
// Testing Seek()
ok := it.Seek(tc.initialSeek)
require.Equal(t, tc.seekIsASuccess, ok)
if ok {
val := it.Seek(tc.initialSeek)
require.Equal(t, tc.seekIsASuccess, val == chunkenc.ValFloat)
if val == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t, v})
samples = append(samples, sample{t, v, nil, nil})
}
}
// Testing Next()
for it.Next() {
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t, v})
samples = append(samples, sample{t, v, nil, nil})
}
// it.Next() should keep returning false.
// it.Next() should keep returning no value.
for i := 0; i < 10; i++ {
require.False(t, it.Next())
require.True(t, it.Next() == chunkenc.ValNone)
}
require.Equal(t, tc.expSamples, samples)

56
tsdb/head_test.go

@ -61,13 +61,10 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL, oooEnabled bool) (
opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
<<<<<<< HEAD
opts.EnableNativeHistograms.Store(true)
=======
if oooEnabled {
opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds())
}
>>>>>>> main
h, err := NewHead(nil, nil, wlog, nil, opts, nil)
require.NoError(t, err)
@ -526,19 +523,11 @@ func TestHead_ReadWAL(t *testing.T) {
require.NoError(t, c.Err())
return x
}
<<<<<<< HEAD
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil)))
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
=======
require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
>>>>>>> main
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
@ -1335,8 +1324,9 @@ func TestMemSeries_appendHistogram(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkRange := int64(1000)
s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
histograms := GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy()
@ -1348,19 +1338,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper)
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper)
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper)
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper)
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1370,7 +1360,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper)
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
@ -2564,12 +2554,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil)
// First point.
<<<<<<< HEAD
require.Equal(t, chunkenc.ValFloat, it.Seek(0))
=======
ok := it.Seek(0)
require.True(t, ok)
>>>>>>> main
ts, val := it.At()
require.Equal(t, int64(0), ts)
require.Equal(t, float64(0), val)
@ -2824,7 +2809,7 @@ func TestAppendHistogram(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -2869,7 +2854,7 @@ func TestAppendHistogram(t *testing.T) {
}
func TestHistogramInWALAndMmapChunk(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -2940,7 +2925,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
require.NoError(t, head.Close())
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
@ -3259,7 +3244,7 @@ func TestSnapshotError(t *testing.T) {
}
func TestHistogramMetrics(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3284,7 +3269,7 @@ func TestHistogramMetrics(t *testing.T) {
require.NoError(t, head.Close())
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
head, err = NewHead(nil, nil, w, nil, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(0))
@ -3294,7 +3279,7 @@ func TestHistogramMetrics(t *testing.T) {
func TestHistogramStaleSample(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
numHistograms := 20
head, _ := newTestHead(t, 100000, false)
head, _ := newTestHead(t, 100000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3388,7 +3373,7 @@ func TestHistogramStaleSample(t *testing.T) {
func TestHistogramCounterResetHeader(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
head, _ := newTestHead(t, 1000, false)
head, _ := newTestHead(t, 1000, false, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
@ -3799,7 +3784,7 @@ func TestOOOWalReplay(t *testing.T) {
it := xor.Iterator(nil)
actOOOSamples := make([]sample, 0, len(expOOOSamples))
for it.Next() {
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()
actOOOSamples = append(actOOOSamples, sample{t: ts, v: v})
}
@ -4108,7 +4093,6 @@ func TestReplayAfterMmapReplayError(t *testing.T) {
require.NoError(t, h.Close())
}
<<<<<<< HEAD
func TestHistogramValidation(t *testing.T) {
tests := map[string]struct {
h *histogram.Histogram
@ -4240,7 +4224,8 @@ func generateBigTestHistograms(n int) []*histogram.Histogram {
histograms = append(histograms, h)
}
return histograms
=======
}
func TestOOOAppendWithNoSeries(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
@ -4367,5 +4352,4 @@ func TestHeadMinOOOTimeUpdate(t *testing.T) {
// So the lowest among them, 295, is set as minOOOTime.
require.NoError(t, h.truncateOOO(0, 2))
require.Equal(t, 295*time.Minute.Milliseconds(), h.MinOOOTime())
>>>>>>> main
}

6
tsdb/head_wal.go

@ -314,6 +314,7 @@ Outer:
exemplarsPool.Put(v)
case []record.RefHistogramSample:
samples := v
minValidTime := h.minValidTime.Load()
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
@ -329,6 +330,9 @@ Outer:
}
}
for _, sam := range samples[:m] {
if sam.T < minValidTime {
continue // Before minValidTime: discard.
}
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
@ -336,7 +340,7 @@ Outer:
histogramShards[mod] = append(histogramShards[mod], sam)
}
for i := 0; i < n; i++ {
if len(shards[i]) > 0 {
if len(histogramShards[i]) > 0 {
processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
histogramShards[i] = nil
}

4
tsdb/ooo_head.go

@ -41,7 +41,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
if i >= len(o.samples) {
// none found. append it at the end
o.samples = append(o.samples, sample{t, v})
o.samples = append(o.samples, sample{t, v, nil, nil})
return true
}
@ -52,7 +52,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool {
// Expand length by 1 to make room. use a zero sample, we will overwrite it anyway.
o.samples = append(o.samples, sample{})
copy(o.samples[i+1:], o.samples[i:])
o.samples[i] = sample{t, v}
o.samples[i] = sample{t, v, nil, nil}
return true
}

4
tsdb/ooo_head_read_test.go

@ -860,7 +860,7 @@ func TestOOOHeadChunkReader_Chunk(t *testing.T) {
var resultSamples tsdbutil.SampleSlice
it := c.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
resultSamples = append(resultSamples, sample{t: t, v: v})
}
@ -1031,7 +1031,7 @@ func TestOOOHeadChunkReader_Chunk_ConsistentQueryResponseDespiteOfHeadExpanding(
var resultSamples tsdbutil.SampleSlice
it := c.Iterator(nil)
for it.Next() {
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()
resultSamples = append(resultSamples, sample{t: ts, v: v})
}

2
tsdb/ooo_head_test.go

@ -25,7 +25,7 @@ const testMaxSize int = 32
func valEven(pos int) int { return pos*2 + 2 } // s[0]=2, s[1]=4, s[2]=6, ..., s[31]=64 - Predictable pre-existing values
func valOdd(pos int) int { return pos*2 + 1 } // s[0]=1, s[1]=3, s[2]=5, ..., s[31]=63 - New values will interject at chosen position because they sort before the pre-existing vals.
func samplify(v int) sample { return sample{int64(v), float64(v)} }
func samplify(v int) sample { return sample{int64(v), float64(v), nil, nil} }
func makeEvenSampleSlice(n int) []sample {
s := make([]sample, n)

25
tsdb/tsdbutil/chunks.go

@ -84,8 +84,10 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
}
type sample struct {
t int64
v float64
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
func (s sample) T() int64 {
@ -96,6 +98,25 @@ func (s sample) V() float64 {
return s.v
}
func (s sample) H() *histogram.Histogram {
return s.h
}
func (s sample) FH() *histogram.FloatHistogram {
return s.fh
}
func (s sample) Type() chunkenc.ValueType {
switch {
case s.h != nil:
return chunkenc.ValHistogram
case s.fh != nil:
return chunkenc.ValFloatHistogram
default:
return chunkenc.ValFloat
}
}
// PopulatedChunk creates a chunk populated with samples every second starting at minTime
func PopulatedChunk(numSamples int, minTime int64) chunks.Meta {
samples := make([]Sample, numSamples)

Loading…
Cancel
Save