Browse Source

Allow metric metadata to be propagated via Remote Write. (#6815)

* Introduce a metadata watcher

Similarly to the WAL watcher, its purpose is to observe the scrape manager and pull metadata. Then, send it to a remote storage.

Signed-off-by: gotjosh <josue@grafana.com>

* Additional fixes after rebasing.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Rework samples/metadata metrics.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use more descriptive variable names in MetadataWatcher collect.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix issues caused during rebasing.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix missing metric add and unneeded config code.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address some review comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix metrics and docs

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Replace assert with require

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Bring back max_samples_per_send metric

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix tests

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

Co-authored-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/8209/head
gotjosh 4 years ago committed by GitHub
parent
commit
4eca4dffb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      cmd/prometheus/main.go
  2. 21
      config/config.go
  3. 12
      config/config_test.go
  4. 10
      docs/configuration/configuration.md
  5. 133
      prompb/remote.pb.go
  6. 4
      prompb/remote.proto
  7. 467
      prompb/types.pb.go
  8. 20
      prompb/types.proto
  9. 1
      scrape/target.go
  10. 13
      storage/remote/codec.go
  11. 32
      storage/remote/codec_test.go
  12. 163
      storage/remote/metadata_watcher.go
  13. 155
      storage/remote/metadata_watcher_test.go
  14. 262
      storage/remote/queue_manager.go
  15. 98
      storage/remote/queue_manager_test.go
  16. 2
      storage/remote/read_test.go
  17. 9
      storage/remote/storage.go
  18. 4
      storage/remote/storage_test.go
  19. 6
      storage/remote/write.go
  20. 14
      storage/remote/write_test.go
  21. 4
      web/api/v1/api_test.go

34
cmd/prometheus/main.go

@ -369,7 +369,8 @@ func main() {
var (
localStorage = &readyStorage{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline))
scraper = &readyScrapeManager{}
remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)
@ -414,6 +415,8 @@ func main() {
})
)
scraper.Set(scrapeManager)
cfg.web.Context = ctxWeb
cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration
cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes
@ -1095,6 +1098,35 @@ func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) {
return nil, tsdb.ErrNotReady
}
// ErrNotReady is returned if the underlying scrape manager is not ready yet.
var ErrNotReady = errors.New("Scrape manager not ready")
// ReadyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time.
type readyScrapeManager struct {
mtx sync.RWMutex
m *scrape.Manager
}
// Set the scrape manager.
func (rm *readyScrapeManager) Set(m *scrape.Manager) {
rm.mtx.Lock()
defer rm.mtx.Unlock()
rm.m = m
}
// Get the scrape manager. If is not ready, return an error.
func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
rm.mtx.RLock()
defer rm.mtx.RUnlock()
if rm.m != nil {
return rm.m, nil
}
return nil, ErrNotReady
}
// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {

21
config/config.go

@ -98,8 +98,9 @@ var (
// DefaultRemoteWriteConfig is the default remote write configuration.
DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig,
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig,
}
// DefaultQueueConfig is the default remote queue configuration.
@ -121,6 +122,12 @@ var (
MaxBackoff: model.Duration(100 * time.Millisecond),
}
// DefaultMetadataConfig is the default metadata configuration for a remote write endpoint.
DefaultMetadataConfig = MetadataConfig{
Send: true,
SendInterval: model.Duration(1 * time.Minute),
}
// DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute),
@ -570,6 +577,7 @@ type RemoteWriteConfig struct {
// values arbitrarily into the overflow maps of further-down types.
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
QueueConfig QueueConfig `yaml:"queue_config,omitempty"`
MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"`
}
// SetDirectory joins any relative file paths with dir.
@ -623,6 +631,15 @@ type QueueConfig struct {
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
}
// MetadataConfig is the configuration for sending metadata to remote
// storage.
type MetadataConfig struct {
// Send controls whether we send metric metadata to remote storage.
Send bool `yaml:"send"`
// SendInterval controls how frequently we send metric metadata.
SendInterval model.Duration `yaml:"send_interval"`
}
// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *config.URL `yaml:"url"`

12
config/config_test.go

@ -87,13 +87,15 @@ var expectedConf = &Config{
Action: relabel.Drop,
},
},
QueueConfig: DefaultQueueConfig,
QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig,
},
{
URL: mustParseURL("http://remote2/push"),
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig,
Name: "rw_tls",
URL: mustParseURL("http://remote2/push"),
RemoteTimeout: model.Duration(30 * time.Second),
QueueConfig: DefaultQueueConfig,
MetadataConfig: DefaultMetadataConfig,
Name: "rw_tls",
HTTPClientConfig: config.HTTPClientConfig{
TLSConfig: config.TLSConfig{
CertFile: filepath.FromSlash("testdata/valid_cert_file"),

10
docs/configuration/configuration.md

@ -1769,7 +1769,15 @@ queue_config:
[ min_backoff: <duration> | default = 30ms ]
# Maximum retry delay.
[ max_backoff: <duration> | default = 100ms ]
# Configures the sending of series metadata to remote storage.
# Metadata configuration is subject to change at any point
# or be removed in future releases.
metadata_config:
# Whether metric metadata is sent to remote storage or not.
[ send: <boolean> | default = true ]
# How frequently metric metadata is sent to remote storage.
[ send_interval: <duration> | default = 1m ]
```
There is a list of

133
prompb/remote.pb.go

@ -62,10 +62,11 @@ func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) {
}
type WriteRequest struct {
Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
Metadata []MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
@ -108,6 +109,13 @@ func (m *WriteRequest) GetTimeseries() []TimeSeries {
return nil
}
func (m *WriteRequest) GetMetadata() []MetricMetadata {
if m != nil {
return m.Metadata
}
return nil
}
// ReadRequest represents a remote read request.
type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"`
@ -410,37 +418,38 @@ func init() {
func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) }
var fileDescriptor_eefc82927d57d89b = []byte{
// 466 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0xbb, 0x4d, 0xdb, 0xa0, 0x71, 0x88, 0xc2, 0xb6, 0x25, 0xa6, 0x87, 0x34, 0xb2, 0x38,
0x58, 0x2a, 0x0a, 0x22, 0x54, 0x9c, 0x38, 0x90, 0x96, 0x48, 0x45, 0x24, 0xfc, 0x59, 0x07, 0x81,
0x10, 0x92, 0xe5, 0xd8, 0xa3, 0xc6, 0xa2, 0xfe, 0xd3, 0xdd, 0xb5, 0xd4, 0xbc, 0x1e, 0xa7, 0x9e,
0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0xed, 0xda, 0x0e, 0x1b, 0xb8, 0x70, 0x5b, 0x7f, 0xdf, 0x37,
0x3f, 0xef, 0x8c, 0xc7, 0xd0, 0xe2, 0x98, 0x64, 0x12, 0x07, 0x39, 0xcf, 0x64, 0x46, 0x21, 0xe7,
0x59, 0x82, 0x72, 0x81, 0x85, 0x38, 0xb2, 0xe4, 0x32, 0x47, 0x51, 0x1a, 0x47, 0x07, 0x97, 0xd9,
0x65, 0xa6, 0x8f, 0x8f, 0xd5, 0xa9, 0x54, 0x9d, 0x09, 0xb4, 0x3e, 0xf2, 0x58, 0x22, 0xc3, 0xeb,
0x02, 0x85, 0xa4, 0xcf, 0x01, 0x64, 0x9c, 0xa0, 0x40, 0x1e, 0xa3, 0xb0, 0x49, 0xbf, 0xe1, 0x5a,
0xc3, 0xfb, 0x83, 0x3f, 0xcc, 0xc1, 0x2c, 0x4e, 0xd0, 0xd3, 0xee, 0xd9, 0xce, 0xed, 0xcf, 0xe3,
0x2d, 0x66, 0xe4, 0x9d, 0xef, 0x04, 0x2c, 0x86, 0x41, 0x54, 0xd3, 0x4e, 0xa0, 0x79, 0x5d, 0x98,
0xa8, 0x7b, 0x26, 0xea, 0x7d, 0x81, 0x7c, 0xc9, 0xea, 0x04, 0xfd, 0x02, 0xdd, 0x20, 0x0c, 0x31,
0x97, 0x18, 0xf9, 0x1c, 0x45, 0x9e, 0xa5, 0x02, 0x7d, 0xdd, 0x81, 0xbd, 0xdd, 0x6f, 0xb8, 0xed,
0xe1, 0x43, 0xb3, 0xd8, 0x78, 0xcd, 0x80, 0x55, 0xe9, 0xd9, 0x32, 0x47, 0x76, 0x58, 0x43, 0x4c,
0x55, 0x38, 0xa7, 0xd0, 0x32, 0x05, 0x6a, 0x41, 0xd3, 0x1b, 0x4d, 0xdf, 0x4d, 0xc6, 0x5e, 0x67,
0x8b, 0x76, 0x61, 0xdf, 0x9b, 0xb1, 0xf1, 0x68, 0x3a, 0x7e, 0xe9, 0x7f, 0x7a, 0xcb, 0xfc, 0xf3,
0x8b, 0x0f, 0x6f, 0x5e, 0x7b, 0x1d, 0xe2, 0x8c, 0x54, 0x55, 0xb0, 0x46, 0xd1, 0x27, 0xd0, 0xe4,
0x28, 0x8a, 0x2b, 0x59, 0x37, 0xd4, 0xfd, 0xb7, 0x21, 0xed, 0xb3, 0x3a, 0xe7, 0x7c, 0x23, 0xb0,
0xab, 0x0d, 0xfa, 0x08, 0xa8, 0x90, 0x01, 0x97, 0xbe, 0x9e, 0x98, 0x0c, 0x92, 0xdc, 0x4f, 0x14,
0x87, 0xb8, 0x0d, 0xd6, 0xd1, 0xce, 0xac, 0x36, 0xa6, 0x82, 0xba, 0xd0, 0xc1, 0x34, 0xda, 0xcc,
0x6e, 0xeb, 0x6c, 0x1b, 0xd3, 0xc8, 0x4c, 0x9e, 0xc2, 0x9d, 0x24, 0x90, 0xe1, 0x02, 0xb9, 0xb0,
0x1b, 0xfa, 0x56, 0xb6, 0x79, 0xab, 0x49, 0x30, 0xc7, 0xab, 0x69, 0x19, 0x60, 0xeb, 0x24, 0x3d,
0x81, 0xdd, 0x45, 0x9c, 0x4a, 0x61, 0xef, 0xf4, 0x89, 0x6b, 0x0d, 0x0f, 0xff, 0x1e, 0xee, 0x85,
0x32, 0x59, 0x99, 0x71, 0xc6, 0x60, 0x19, 0xcd, 0xd1, 0x67, 0xff, 0xbf, 0x25, 0x1b, 0xfb, 0x71,
0x03, 0xfb, 0xe7, 0x8b, 0x22, 0xfd, 0xaa, 0x3e, 0x8e, 0x31, 0xd5, 0x17, 0xd0, 0x0e, 0x4b, 0xd9,
0xdf, 0x40, 0x3e, 0x30, 0x91, 0x55, 0x61, 0x45, 0xbd, 0x1b, 0x9a, 0x8f, 0xf4, 0x18, 0x2c, 0xb5,
0x46, 0x4b, 0x3f, 0x4e, 0x23, 0xbc, 0xa9, 0xe6, 0x04, 0x5a, 0x7a, 0xa5, 0x94, 0xb3, 0x83, 0xdb,
0x55, 0x8f, 0xfc, 0x58, 0xf5, 0xc8, 0xaf, 0x55, 0x8f, 0x7c, 0xde, 0x53, 0xdc, 0x7c, 0x3e, 0xdf,
0xd3, 0x3f, 0xc1, 0xd3, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xb6, 0x6b, 0xcd, 0x43, 0x03,
0x00, 0x00,
// 496 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xee, 0x26, 0x69, 0x13, 0x8d, 0x43, 0x14, 0xb6, 0x2d, 0x09, 0x39, 0xa4, 0x91, 0xc5, 0x21,
0x52, 0x51, 0x10, 0xa1, 0xe2, 0xd4, 0x03, 0x69, 0x89, 0x54, 0xa0, 0xe6, 0x67, 0x13, 0x04, 0x42,
0x48, 0xd6, 0xc6, 0x1e, 0x35, 0x16, 0xf5, 0x4f, 0x77, 0xd7, 0x52, 0xf3, 0x16, 0x3c, 0x13, 0xa7,
0x9e, 0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0x79, 0x6d, 0x87, 0x2d, 0x5c, 0xb8, 0xad, 0xbf, 0x3f,
0xcf, 0xcc, 0xce, 0x42, 0x53, 0x60, 0x18, 0x2b, 0x1c, 0x25, 0x22, 0x56, 0x31, 0x85, 0x44, 0xc4,
0x21, 0xaa, 0x25, 0xa6, 0xb2, 0x67, 0xa9, 0x55, 0x82, 0x32, 0x27, 0x7a, 0x7b, 0x17, 0xf1, 0x45,
0xac, 0x8f, 0x8f, 0xb2, 0x53, 0x8e, 0xda, 0x5f, 0x09, 0x34, 0x3f, 0x88, 0x40, 0x21, 0xc3, 0xab,
0x14, 0xa5, 0xa2, 0xc7, 0x00, 0x2a, 0x08, 0x51, 0xa2, 0x08, 0x50, 0x76, 0xc9, 0xa0, 0x3a, 0xb4,
0xc6, 0xf7, 0x46, 0x7f, 0x42, 0x47, 0xf3, 0x20, 0xc4, 0x99, 0x66, 0x4f, 0x6a, 0x37, 0x3f, 0x0f,
0xb6, 0x98, 0xa1, 0xa7, 0xc7, 0xd0, 0x08, 0x51, 0x71, 0x9f, 0x2b, 0xde, 0xad, 0x6a, 0x6f, 0xcf,
0xf4, 0x3a, 0xa8, 0x44, 0xe0, 0x39, 0x85, 0xa2, 0xf0, 0x6f, 0x1c, 0x2f, 0x6b, 0x8d, 0x4a, 0xbb,
0x6a, 0x7f, 0x27, 0x60, 0x31, 0xe4, 0x7e, 0x59, 0xd1, 0x21, 0xd4, 0xaf, 0x52, 0xb3, 0x9c, 0xbb,
0x66, 0xe4, 0xbb, 0x14, 0xc5, 0x8a, 0x95, 0x0a, 0xfa, 0x19, 0x3a, 0xdc, 0xf3, 0x30, 0x51, 0xe8,
0xbb, 0x02, 0x65, 0x12, 0x47, 0x12, 0x5d, 0x3d, 0x86, 0x6e, 0x65, 0x50, 0x1d, 0xb6, 0xc6, 0x0f,
0x4c, 0xb3, 0xf1, 0x9b, 0x11, 0x2b, 0xd4, 0xf3, 0x55, 0x82, 0x6c, 0xbf, 0x0c, 0x31, 0x51, 0x69,
0x1f, 0x41, 0xd3, 0x04, 0xa8, 0x05, 0xf5, 0xd9, 0xc4, 0x79, 0x7b, 0x3e, 0x9d, 0xb5, 0xb7, 0x68,
0x07, 0x76, 0x67, 0x73, 0x36, 0x9d, 0x38, 0xd3, 0xe7, 0xee, 0xc7, 0x37, 0xcc, 0x3d, 0x3d, 0x7b,
0xff, 0xfa, 0xd5, 0xac, 0x4d, 0xec, 0x49, 0xe6, 0xe2, 0x9b, 0x28, 0xfa, 0x18, 0xea, 0x02, 0x65,
0x7a, 0xa9, 0xca, 0x86, 0x3a, 0xff, 0x36, 0xa4, 0x79, 0x56, 0xea, 0xec, 0x6f, 0x04, 0xb6, 0x35,
0x41, 0x1f, 0x02, 0x95, 0x8a, 0x0b, 0xe5, 0xea, 0xa9, 0x2b, 0x1e, 0x26, 0x6e, 0x98, 0xe5, 0x90,
0x61, 0x95, 0xb5, 0x35, 0x33, 0x2f, 0x09, 0x47, 0xd2, 0x21, 0xb4, 0x31, 0xf2, 0x6f, 0x6b, 0x2b,
0x5a, 0xdb, 0xc2, 0xc8, 0x37, 0x95, 0x47, 0xd0, 0x08, 0xb9, 0xf2, 0x96, 0x28, 0x64, 0x71, 0x73,
0x5d, 0xb3, 0xaa, 0x73, 0xbe, 0xc0, 0x4b, 0x27, 0x17, 0xb0, 0x8d, 0x92, 0x1e, 0xc2, 0xf6, 0x32,
0x88, 0x94, 0xec, 0xd6, 0x06, 0x64, 0x68, 0x8d, 0xf7, 0xff, 0x1e, 0xee, 0x59, 0x46, 0xb2, 0x5c,
0x63, 0x4f, 0xc1, 0x32, 0x9a, 0xa3, 0x4f, 0xff, 0x7f, 0xd3, 0xcc, 0x1d, 0xb3, 0xaf, 0x61, 0xf7,
0x74, 0x99, 0x46, 0x5f, 0xb2, 0xcb, 0x31, 0xa6, 0xfa, 0x0c, 0x5a, 0x5e, 0x0e, 0xbb, 0xb7, 0x22,
0xef, 0x9b, 0x91, 0x85, 0xb1, 0x48, 0xbd, 0xe3, 0x99, 0x9f, 0xf4, 0x00, 0xac, 0x6c, 0x8d, 0x56,
0x6e, 0x10, 0xf9, 0x78, 0x5d, 0xcc, 0x09, 0x34, 0xf4, 0x22, 0x43, 0x4e, 0xf6, 0x6e, 0xd6, 0x7d,
0xf2, 0x63, 0xdd, 0x27, 0xbf, 0xd6, 0x7d, 0xf2, 0x69, 0x27, 0xcb, 0x4d, 0x16, 0x8b, 0x1d, 0xfd,
0x92, 0x9e, 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x13, 0x18, 0x12, 0x0a, 0x88, 0x03, 0x00, 0x00,
}
func (m *WriteRequest) Marshal() (dAtA []byte, err error) {
@ -467,6 +476,20 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Metadata) > 0 {
for iNdEx := len(m.Metadata) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Metadata[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintRemote(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
}
if len(m.Timeseries) > 0 {
for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- {
{
@ -757,6 +780,12 @@ func (m *WriteRequest) Size() (n int) {
n += 1 + l + sovRemote(uint64(l))
}
}
if len(m.Metadata) > 0 {
for _, e := range m.Metadata {
l = e.Size()
n += 1 + l + sovRemote(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -942,6 +971,40 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRemote
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthRemote
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthRemote
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Metadata = append(m.Metadata, MetricMetadata{})
if err := m.Metadata[len(m.Metadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipRemote(dAtA[iNdEx:])

4
prompb/remote.proto

@ -21,6 +21,10 @@ import "gogoproto/gogo.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
}
// ReadRequest represents a remote read request.

467
prompb/types.pb.go

@ -25,6 +25,49 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type MetricMetadata_MetricType int32
const (
MetricMetadata_UNKNOWN MetricMetadata_MetricType = 0
MetricMetadata_COUNTER MetricMetadata_MetricType = 1
MetricMetadata_GAUGE MetricMetadata_MetricType = 2
MetricMetadata_HISTOGRAM MetricMetadata_MetricType = 3
MetricMetadata_GAUGEHISTOGRAM MetricMetadata_MetricType = 4
MetricMetadata_SUMMARY MetricMetadata_MetricType = 5
MetricMetadata_INFO MetricMetadata_MetricType = 6
MetricMetadata_STATESET MetricMetadata_MetricType = 7
)
var MetricMetadata_MetricType_name = map[int32]string{
0: "UNKNOWN",
1: "COUNTER",
2: "GAUGE",
3: "HISTOGRAM",
4: "GAUGEHISTOGRAM",
5: "SUMMARY",
6: "INFO",
7: "STATESET",
}
var MetricMetadata_MetricType_value = map[string]int32{
"UNKNOWN": 0,
"COUNTER": 1,
"GAUGE": 2,
"HISTOGRAM": 3,
"GAUGEHISTOGRAM": 4,
"SUMMARY": 5,
"INFO": 6,
"STATESET": 7,
}
func (x MetricMetadata_MetricType) String() string {
return proto.EnumName(MetricMetadata_MetricType_name, int32(x))
}
func (MetricMetadata_MetricType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{0, 0}
}
type LabelMatcher_Type int32
const (
@ -53,7 +96,7 @@ func (x LabelMatcher_Type) String() string {
}
func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{4, 0}
return fileDescriptor_d938547f84707355, []int{5, 0}
}
// We require this to match chunkenc.Encoding.
@ -79,7 +122,80 @@ func (x Chunk_Encoding) String() string {
}
func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{6, 0}
return fileDescriptor_d938547f84707355, []int{7, 0}
}
type MetricMetadata struct {
// Represents the metric type, these match the set from Prometheus.
// Refer to pkg/textparse/interface.go for details.
Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MetricMetadata_MetricType" json:"type,omitempty"`
MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"`
Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"`
Unit string `protobuf:"bytes,5,opt,name=unit,proto3" json:"unit,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MetricMetadata) Reset() { *m = MetricMetadata{} }
func (m *MetricMetadata) String() string { return proto.CompactTextString(m) }
func (*MetricMetadata) ProtoMessage() {}
func (*MetricMetadata) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{0}
}
func (m *MetricMetadata) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *MetricMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_MetricMetadata.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *MetricMetadata) XXX_Merge(src proto.Message) {
xxx_messageInfo_MetricMetadata.Merge(m, src)
}
func (m *MetricMetadata) XXX_Size() int {
return m.Size()
}
func (m *MetricMetadata) XXX_DiscardUnknown() {
xxx_messageInfo_MetricMetadata.DiscardUnknown(m)
}
var xxx_messageInfo_MetricMetadata proto.InternalMessageInfo
func (m *MetricMetadata) GetType() MetricMetadata_MetricType {
if m != nil {
return m.Type
}
return MetricMetadata_UNKNOWN
}
func (m *MetricMetadata) GetMetricFamilyName() string {
if m != nil {
return m.MetricFamilyName
}
return ""
}
func (m *MetricMetadata) GetHelp() string {
if m != nil {
return m.Help
}
return ""
}
func (m *MetricMetadata) GetUnit() string {
if m != nil {
return m.Unit
}
return ""
}
type Sample struct {
@ -94,7 +210,7 @@ func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{0}
return fileDescriptor_d938547f84707355, []int{1}
}
func (m *Sample) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -150,7 +266,7 @@ func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{1}
return fileDescriptor_d938547f84707355, []int{2}
}
func (m *TimeSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -205,7 +321,7 @@ func (m *Label) Reset() { *m = Label{} }
func (m *Label) String() string { return proto.CompactTextString(m) }
func (*Label) ProtoMessage() {}
func (*Label) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{2}
return fileDescriptor_d938547f84707355, []int{3}
}
func (m *Label) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -259,7 +375,7 @@ func (m *Labels) Reset() { *m = Labels{} }
func (m *Labels) String() string { return proto.CompactTextString(m) }
func (*Labels) ProtoMessage() {}
func (*Labels) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{3}
return fileDescriptor_d938547f84707355, []int{4}
}
func (m *Labels) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -309,7 +425,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} }
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) }
func (*LabelMatcher) ProtoMessage() {}
func (*LabelMatcher) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{4}
return fileDescriptor_d938547f84707355, []int{5}
}
func (m *LabelMatcher) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -376,7 +492,7 @@ func (m *ReadHints) Reset() { *m = ReadHints{} }
func (m *ReadHints) String() string { return proto.CompactTextString(m) }
func (*ReadHints) ProtoMessage() {}
func (*ReadHints) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{5}
return fileDescriptor_d938547f84707355, []int{6}
}
func (m *ReadHints) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -470,7 +586,7 @@ func (m *Chunk) Reset() { *m = Chunk{} }
func (m *Chunk) String() string { return proto.CompactTextString(m) }
func (*Chunk) ProtoMessage() {}
func (*Chunk) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{6}
return fileDescriptor_d938547f84707355, []int{7}
}
func (m *Chunk) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -542,7 +658,7 @@ func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} }
func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) }
func (*ChunkedSeries) ProtoMessage() {}
func (*ChunkedSeries) Descriptor() ([]byte, []int) {
return fileDescriptor_d938547f84707355, []int{7}
return fileDescriptor_d938547f84707355, []int{8}
}
func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -586,8 +702,10 @@ func (m *ChunkedSeries) GetChunks() []Chunk {
}
func init() {
proto.RegisterEnum("prometheus.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value)
proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value)
proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value)
proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata")
proto.RegisterType((*Sample)(nil), "prometheus.Sample")
proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries")
proto.RegisterType((*Label)(nil), "prometheus.Label")
@ -601,41 +719,104 @@ func init() {
func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) }
var fileDescriptor_d938547f84707355 = []byte{
// 539 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0xee, 0xda, 0x89, 0x9d, 0x4c, 0x4a, 0x95, 0xae, 0x8a, 0x30, 0x15, 0x04, 0xcb, 0x27, 0x9f,
0x5c, 0x35, 0x9c, 0x90, 0x38, 0x15, 0x45, 0x42, 0xa2, 0x4e, 0xd5, 0x6d, 0x11, 0x88, 0x4b, 0xb5,
0x89, 0x17, 0xc7, 0x22, 0x5e, 0xbb, 0xde, 0x0d, 0x6a, 0x1e, 0x84, 0xc7, 0xe0, 0xc0, 0x5b, 0xf4,
0xc8, 0x13, 0x20, 0x94, 0x27, 0x41, 0x3b, 0x76, 0x7e, 0xa4, 0x72, 0x81, 0xdb, 0xfc, 0x7c, 0xf3,
0x7d, 0x9f, 0x77, 0xc6, 0xd0, 0xd3, 0xcb, 0x52, 0xa8, 0xa8, 0xac, 0x0a, 0x5d, 0x50, 0x28, 0xab,
0x22, 0x17, 0x7a, 0x26, 0x16, 0xea, 0xf8, 0x28, 0x2d, 0xd2, 0x02, 0xcb, 0x27, 0x26, 0xaa, 0x11,
0xc1, 0x6b, 0x70, 0xae, 0x78, 0x5e, 0xce, 0x05, 0x3d, 0x82, 0xf6, 0x57, 0x3e, 0x5f, 0x08, 0x8f,
0xf8, 0x24, 0x24, 0xac, 0x4e, 0xe8, 0x33, 0xe8, 0xea, 0x2c, 0x17, 0x4a, 0xf3, 0xbc, 0xf4, 0x2c,
0x9f, 0x84, 0x36, 0xdb, 0x16, 0x82, 0x5b, 0x80, 0xeb, 0x2c, 0x17, 0x57, 0xa2, 0xca, 0x84, 0xa2,
0x27, 0xe0, 0xcc, 0xf9, 0x44, 0xcc, 0x95, 0x47, 0x7c, 0x3b, 0xec, 0x0d, 0x0f, 0xa3, 0xad, 0x7c,
0x74, 0x6e, 0x3a, 0x67, 0xad, 0xfb, 0x5f, 0x2f, 0xf6, 0x58, 0x03, 0xa3, 0x43, 0x70, 0x15, 0x8a,
0x2b, 0xcf, 0xc2, 0x09, 0xba, 0x3b, 0x51, 0xfb, 0x6a, 0x46, 0xd6, 0xc0, 0xe0, 0x14, 0xda, 0x48,
0x45, 0x29, 0xb4, 0x24, 0xcf, 0x6b, 0xbb, 0x5d, 0x86, 0xf1, 0xf6, 0x1b, 0x2c, 0x2c, 0xd6, 0x49,
0xf0, 0x0a, 0x9c, 0xf3, 0x5a, 0xf0, 0x5f, 0x1d, 0x06, 0xdf, 0x08, 0xec, 0x63, 0x3d, 0xe6, 0x7a,
0x3a, 0x13, 0x15, 0x3d, 0x85, 0x96, 0x79, 0x60, 0x54, 0x3d, 0x18, 0x3e, 0x7f, 0x30, 0xdf, 0xe0,
0xa2, 0xeb, 0x65, 0x29, 0x18, 0x42, 0x37, 0x46, 0xad, 0xbf, 0x19, 0xb5, 0x77, 0x8d, 0x86, 0xd0,
0x32, 0x73, 0xd4, 0x01, 0x6b, 0x74, 0xd9, 0xdf, 0xa3, 0x2e, 0xd8, 0xe3, 0xd1, 0x65, 0x9f, 0x98,
0x02, 0x1b, 0xf5, 0x2d, 0x2c, 0xb0, 0x51, 0xdf, 0x0e, 0x7e, 0x10, 0xe8, 0x32, 0xc1, 0x93, 0xb7,
0x99, 0xd4, 0x8a, 0x3e, 0x01, 0x57, 0x69, 0x51, 0xde, 0xe4, 0x0a, 0x7d, 0xd9, 0xcc, 0x31, 0x69,
0xac, 0x8c, 0xf4, 0xe7, 0x85, 0x9c, 0xae, 0xa5, 0x4d, 0x4c, 0x9f, 0x42, 0x47, 0x69, 0x5e, 0x69,
0x83, 0xb6, 0x11, 0xed, 0x62, 0x1e, 0x2b, 0xfa, 0x18, 0x1c, 0x21, 0x13, 0xd3, 0x68, 0x61, 0xa3,
0x2d, 0x64, 0x12, 0x2b, 0x7a, 0x0c, 0x9d, 0xb4, 0x2a, 0x16, 0x65, 0x26, 0x53, 0xaf, 0xed, 0xdb,
0x61, 0x97, 0x6d, 0x72, 0x7a, 0x00, 0xd6, 0x64, 0xe9, 0x39, 0x3e, 0x09, 0x3b, 0xcc, 0x9a, 0x2c,
0x0d, 0x7b, 0xc5, 0x65, 0x2a, 0x0c, 0x89, 0x5b, 0xb3, 0x63, 0x1e, 0xab, 0xe0, 0x3b, 0x81, 0xf6,
0x9b, 0xd9, 0x42, 0x7e, 0xa1, 0x03, 0xe8, 0xe5, 0x99, 0xbc, 0x31, 0x77, 0xb4, 0xf5, 0xdc, 0xcd,
0x33, 0x69, 0x8e, 0x29, 0x56, 0xd8, 0xe7, 0x77, 0x9b, 0x7e, 0x73, 0x76, 0x39, 0xbf, 0x6b, 0xfa,
0x51, 0xb3, 0x04, 0x1b, 0x97, 0x70, 0xbc, 0xbb, 0x04, 0x14, 0x88, 0x46, 0x72, 0x5a, 0x24, 0x99,
0x4c, 0xb7, 0x1b, 0x48, 0xb8, 0xe6, 0xf8, 0x55, 0xfb, 0x0c, 0xe3, 0xc0, 0x87, 0xce, 0x1a, 0x45,
0x7b, 0xe0, 0xbe, 0x1f, 0xbf, 0x1b, 0x5f, 0x7c, 0x18, 0xd7, 0x8f, 0xfe, 0xf1, 0x82, 0xf5, 0x49,
0x70, 0x0b, 0x8f, 0x90, 0x4d, 0x24, 0xff, 0x7b, 0xdf, 0x27, 0xe0, 0x4c, 0x0d, 0xc3, 0xfa, 0xbc,
0x0f, 0x1f, 0x38, 0x5d, 0x0f, 0xd4, 0xb0, 0xb3, 0xa3, 0xfb, 0xd5, 0x80, 0xfc, 0x5c, 0x0d, 0xc8,
0xef, 0xd5, 0x80, 0x7c, 0x72, 0x0c, 0xba, 0x9c, 0x4c, 0x1c, 0xfc, 0x55, 0x5f, 0xfe, 0x09, 0x00,
0x00, 0xff, 0xff, 0xed, 0x99, 0x84, 0x88, 0xdb, 0x03, 0x00, 0x00,
// 690 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xda, 0x40,
0x10, 0xce, 0xfa, 0x17, 0x86, 0x04, 0x39, 0xab, 0x54, 0x75, 0xa3, 0x96, 0x22, 0x4b, 0x95, 0x38,
0x54, 0x44, 0x49, 0x4f, 0x91, 0x7a, 0x21, 0x91, 0xf3, 0xa3, 0xc6, 0xa0, 0x2c, 0xa0, 0xfe, 0x5c,
0xd0, 0x02, 0x1b, 0xb0, 0x8a, 0x8d, 0xe3, 0x5d, 0xaa, 0xf0, 0x20, 0xbd, 0xf5, 0x15, 0x7a, 0xe8,
0x5b, 0xe4, 0xd8, 0x27, 0xa8, 0xaa, 0x3c, 0x49, 0xb5, 0x6b, 0x13, 0x13, 0xa5, 0x97, 0xf6, 0x36,
0xf3, 0x7d, 0xdf, 0xfc, 0xec, 0xcc, 0xd8, 0x50, 0x11, 0xcb, 0x84, 0xf1, 0x66, 0x92, 0xce, 0xc5,
0x1c, 0x43, 0x92, 0xce, 0x23, 0x26, 0xa6, 0x6c, 0xc1, 0x77, 0x77, 0x26, 0xf3, 0xc9, 0x5c, 0xc1,
0x7b, 0xd2, 0xca, 0x14, 0xde, 0x37, 0x0d, 0xaa, 0x01, 0x13, 0x69, 0x38, 0x0a, 0x98, 0xa0, 0x63,
0x2a, 0x28, 0x3e, 0x04, 0x43, 0xe6, 0x70, 0x51, 0x1d, 0x35, 0xaa, 0x07, 0xaf, 0x9a, 0x45, 0x8e,
0xe6, 0x43, 0x65, 0xee, 0xf6, 0x96, 0x09, 0x23, 0x2a, 0x04, 0xbf, 0x06, 0x1c, 0x29, 0x6c, 0x70,
0x45, 0xa3, 0x70, 0xb6, 0x1c, 0xc4, 0x34, 0x62, 0xae, 0x56, 0x47, 0x8d, 0x32, 0x71, 0x32, 0xe6,
0x44, 0x11, 0x6d, 0x1a, 0x31, 0x8c, 0xc1, 0x98, 0xb2, 0x59, 0xe2, 0x1a, 0x8a, 0x57, 0xb6, 0xc4,
0x16, 0x71, 0x28, 0x5c, 0x33, 0xc3, 0xa4, 0xed, 0x2d, 0x01, 0x8a, 0x4a, 0xb8, 0x02, 0x76, 0xbf,
0xfd, 0xae, 0xdd, 0x79, 0xdf, 0x76, 0x36, 0xa4, 0x73, 0xdc, 0xe9, 0xb7, 0x7b, 0x3e, 0x71, 0x10,
0x2e, 0x83, 0x79, 0xda, 0xea, 0x9f, 0xfa, 0x8e, 0x86, 0xb7, 0xa0, 0x7c, 0x76, 0xde, 0xed, 0x75,
0x4e, 0x49, 0x2b, 0x70, 0x74, 0x8c, 0xa1, 0xaa, 0x98, 0x02, 0x33, 0x64, 0x68, 0xb7, 0x1f, 0x04,
0x2d, 0xf2, 0xd1, 0x31, 0x71, 0x09, 0x8c, 0xf3, 0xf6, 0x49, 0xc7, 0xb1, 0xf0, 0x26, 0x94, 0xba,
0xbd, 0x56, 0xcf, 0xef, 0xfa, 0x3d, 0xc7, 0xf6, 0xde, 0x82, 0xd5, 0xa5, 0x51, 0x32, 0x63, 0x78,
0x07, 0xcc, 0x2f, 0x74, 0xb6, 0xc8, 0xc6, 0x82, 0x48, 0xe6, 0xe0, 0xe7, 0x50, 0x16, 0x61, 0xc4,
0xb8, 0xa0, 0x51, 0xa2, 0xde, 0xa9, 0x93, 0x02, 0xf0, 0xae, 0x01, 0x7a, 0x61, 0xc4, 0xba, 0x2c,
0x0d, 0x19, 0xc7, 0x7b, 0x60, 0xcd, 0xe8, 0x90, 0xcd, 0xb8, 0x8b, 0xea, 0x7a, 0xa3, 0x72, 0xb0,
0xbd, 0x3e, 0xd9, 0x0b, 0xc9, 0x1c, 0x19, 0xb7, 0xbf, 0x5e, 0x6e, 0x90, 0x5c, 0x86, 0x0f, 0xc0,
0xe6, 0xaa, 0x38, 0x77, 0x35, 0x15, 0x81, 0xd7, 0x23, 0xb2, 0xbe, 0xf2, 0x90, 0x95, 0xd0, 0xdb,
0x07, 0x53, 0xa5, 0x92, 0x83, 0x54, 0xc3, 0x47, 0xd9, 0x20, 0xa5, 0x5d, 0xbc, 0x21, 0xdb, 0x48,
0xe6, 0x78, 0x87, 0x60, 0x5d, 0x64, 0x05, 0xff, 0xb5, 0x43, 0xef, 0x2b, 0x82, 0x4d, 0x85, 0x07,
0x54, 0x8c, 0xa6, 0x2c, 0xc5, 0xfb, 0x0f, 0x6e, 0xe7, 0xc5, 0xa3, 0xf8, 0x5c, 0xd7, 0x5c, 0xbb,
0x99, 0x55, 0xa3, 0xda, 0xdf, 0x1a, 0xd5, 0xd7, 0x1b, 0x6d, 0x80, 0xa1, 0x2e, 0xc0, 0x02, 0xcd,
0xbf, 0x74, 0x36, 0xb0, 0x0d, 0x7a, 0xdb, 0xbf, 0x74, 0x90, 0x04, 0x88, 0xdc, 0xba, 0x04, 0x88,
0xef, 0xe8, 0xde, 0x0f, 0x04, 0x65, 0xc2, 0xe8, 0xf8, 0x2c, 0x8c, 0x05, 0xc7, 0x4f, 0xc1, 0xe6,
0x82, 0x25, 0x83, 0x88, 0xab, 0xbe, 0x74, 0x62, 0x49, 0x37, 0xe0, 0xb2, 0xf4, 0xd5, 0x22, 0x1e,
0xad, 0x4a, 0x4b, 0x1b, 0x3f, 0x83, 0x12, 0x17, 0x34, 0x15, 0x52, 0xad, 0x2b, 0xb5, 0xad, 0xfc,
0x80, 0xe3, 0x27, 0x60, 0xb1, 0x78, 0x2c, 0x09, 0x43, 0x11, 0x26, 0x8b, 0xc7, 0x01, 0xc7, 0xbb,
0x50, 0x9a, 0xa4, 0xf3, 0x45, 0x12, 0xc6, 0x13, 0xd7, 0xac, 0xeb, 0x8d, 0x32, 0xb9, 0xf7, 0x71,
0x15, 0xb4, 0xe1, 0xd2, 0xb5, 0xea, 0xa8, 0x51, 0x22, 0xda, 0x70, 0x29, 0xb3, 0xa7, 0x34, 0x9e,
0x30, 0x99, 0xc4, 0xce, 0xb2, 0x2b, 0x3f, 0xe0, 0xde, 0x77, 0x04, 0xe6, 0xf1, 0x74, 0x11, 0x7f,
0xc6, 0x35, 0xa8, 0x44, 0x61, 0x3c, 0x90, 0x77, 0x54, 0xf4, 0x5c, 0x8e, 0xc2, 0x58, 0x1e, 0x53,
0xc0, 0x15, 0x4f, 0x6f, 0xee, 0xf9, 0xfc, 0xec, 0x22, 0x7a, 0x93, 0xf3, 0xcd, 0x7c, 0x09, 0xba,
0x5a, 0xc2, 0xee, 0xfa, 0x12, 0x54, 0x81, 0xa6, 0x1f, 0x8f, 0xe6, 0xe3, 0x30, 0x9e, 0x14, 0x1b,
0x90, 0x9f, 0xb3, 0x7a, 0xd5, 0x26, 0x51, 0xb6, 0x57, 0x87, 0xd2, 0x4a, 0xf5, 0xf0, 0x8b, 0xb3,
0x41, 0xff, 0xd0, 0x21, 0x0e, 0xf2, 0xae, 0x61, 0x4b, 0x65, 0x63, 0xe3, 0xff, 0xbd, 0xef, 0x3d,
0xb0, 0x46, 0x32, 0xc3, 0xea, 0xbc, 0xb7, 0x1f, 0x75, 0xba, 0x0a, 0xc8, 0x64, 0x47, 0x3b, 0xb7,
0x77, 0x35, 0xf4, 0xf3, 0xae, 0x86, 0x7e, 0xdf, 0xd5, 0xd0, 0x27, 0x4b, 0xaa, 0x93, 0xe1, 0xd0,
0x52, 0x7f, 0xb2, 0x37, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf3, 0xb7, 0x12, 0x44, 0xfa, 0x04,
0x00, 0x00,
}
func (m *MetricMetadata) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *MetricMetadata) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *MetricMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Unit) > 0 {
i -= len(m.Unit)
copy(dAtA[i:], m.Unit)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Unit)))
i--
dAtA[i] = 0x2a
}
if len(m.Help) > 0 {
i -= len(m.Help)
copy(dAtA[i:], m.Help)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Help)))
i--
dAtA[i] = 0x22
}
if len(m.MetricFamilyName) > 0 {
i -= len(m.MetricFamilyName)
copy(dAtA[i:], m.MetricFamilyName)
i = encodeVarintTypes(dAtA, i, uint64(len(m.MetricFamilyName)))
i--
dAtA[i] = 0x12
}
if m.Type != 0 {
i = encodeVarintTypes(dAtA, i, uint64(m.Type))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *Sample) Marshal() (dAtA []byte, err error) {
@ -1047,6 +1228,33 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
func (m *MetricMetadata) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Type != 0 {
n += 1 + sovTypes(uint64(m.Type))
}
l = len(m.MetricFamilyName)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
l = len(m.Help)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
l = len(m.Unit)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Sample) Size() (n int) {
if m == nil {
return 0
@ -1242,6 +1450,175 @@ func sovTypes(x uint64) (n int) {
func sozTypes(x uint64) (n int) {
return sovTypes(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *MetricMetadata) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: MetricMetadata: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: MetricMetadata: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
m.Type = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Type |= MetricMetadata_MetricType(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field MetricFamilyName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.MetricFamilyName = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Help = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Unit = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Sample) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

20
prompb/types.proto

@ -18,6 +18,26 @@ option go_package = "prompb";
import "gogoproto/gogo.proto";
message MetricMetadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
// Represents the metric type, these match the set from Prometheus.
// Refer to pkg/textparse/interface.go for details.
MetricType type = 1;
string metric_family_name = 2;
string help = 4;
string unit = 5;
}
message Sample {
double value = 1;
int64 timestamp = 2;

1
scrape/target.go

@ -75,6 +75,7 @@ func (t *Target) String() string {
return t.URL().String()
}
// MetricMetadataStore represents a storage for metadata.
type MetricMetadataStore interface {
ListMetadata() []MetricMetadata
GetMetadata(metric string) (MetricMetadata, bool)

13
storage/remote/codec.go

@ -19,6 +19,7 @@ import (
"io/ioutil"
"net/http"
"sort"
"strings"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@ -26,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -484,3 +486,14 @@ func labelsToLabelsProto(labels labels.Labels, buf []prompb.Label) []prompb.Labe
}
return result
}
// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum.
func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType {
mt := strings.ToUpper(string(t))
v, ok := prompb.MetricMetadata_MetricType_value[mt]
if !ok {
return prompb.MetricMetadata_UNKNOWN
}
return prompb.MetricMetadata_MetricType(v)
}

32
storage/remote/codec_test.go

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
)
@ -230,3 +231,34 @@ func TestMergeLabels(t *testing.T) {
require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary))
}
}
func TestMetricTypeToMetricTypeProto(t *testing.T) {
tc := []struct {
desc string
input textparse.MetricType
expected prompb.MetricMetadata_MetricType
}{
{
desc: "with a single-word metric",
input: textparse.MetricTypeCounter,
expected: prompb.MetricMetadata_COUNTER,
},
{
desc: "with a two-word metric",
input: textparse.MetricTypeStateset,
expected: prompb.MetricMetadata_STATESET,
},
{
desc: "with an unknown metric",
input: "not-known",
expected: prompb.MetricMetadata_UNKNOWN,
},
}
for _, tt := range tc {
t.Run(tt.desc, func(t *testing.T) {
m := metricTypeToMetricTypeProto(tt.input)
require.Equal(t, tt.expected, m)
})
}
}

163
storage/remote/metadata_watcher.go

@ -0,0 +1,163 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/scrape"
)
// MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else.
type MetadataAppender interface {
AppendMetadata(context.Context, []scrape.MetricMetadata)
}
// Watchable represents from where we fetch active targets for metadata.
type Watchable interface {
TargetsActive() map[string][]*scrape.Target
}
type noopScrapeManager struct{}
func (noop *noopScrapeManager) Get() (*scrape.Manager, error) {
return nil, errors.New("Scrape manager not ready")
}
// MetadataWatcher watches the Scrape Manager for a given WriteMetadataTo.
type MetadataWatcher struct {
name string
logger log.Logger
managerGetter ReadyScrapeManager
manager Watchable
writer MetadataAppender
interval model.Duration
deadline time.Duration
done chan struct{}
softShutdownCtx context.Context
softShutdownCancel context.CancelFunc
hardShutdownCancel context.CancelFunc
hardShutdownCtx context.Context
}
// NewMetadataWatcher builds a new MetadataWatcher.
func NewMetadataWatcher(l log.Logger, mg ReadyScrapeManager, name string, w MetadataAppender, interval model.Duration, deadline time.Duration) *MetadataWatcher {
if l == nil {
l = log.NewNopLogger()
}
if mg == nil {
mg = &noopScrapeManager{}
}
return &MetadataWatcher{
name: name,
logger: l,
managerGetter: mg,
writer: w,
interval: interval,
deadline: deadline,
done: make(chan struct{}),
}
}
// Start the MetadataWatcher.
func (mw *MetadataWatcher) Start() {
level.Info(mw.logger).Log("msg", "Starting scraped metadata watcher")
mw.hardShutdownCtx, mw.hardShutdownCancel = context.WithCancel(context.Background())
mw.softShutdownCtx, mw.softShutdownCancel = context.WithCancel(mw.hardShutdownCtx)
go mw.loop()
}
// Stop the MetadataWatcher.
func (mw *MetadataWatcher) Stop() {
level.Info(mw.logger).Log("msg", "Stopping metadata watcher...")
defer level.Info(mw.logger).Log("msg", "Scraped metadata watcher stopped")
mw.softShutdownCancel()
select {
case <-mw.done:
return
case <-time.After(mw.deadline):
level.Error(mw.logger).Log("msg", "Failed to flush metadata")
}
mw.hardShutdownCancel()
<-mw.done
}
func (mw *MetadataWatcher) loop() {
ticker := time.NewTicker(time.Duration(mw.interval))
defer ticker.Stop()
defer close(mw.done)
for {
select {
case <-mw.softShutdownCtx.Done():
return
case <-ticker.C:
mw.collect()
}
}
}
func (mw *MetadataWatcher) collect() {
if !mw.ready() {
return
}
// We create a set of the metadata to help deduplicating based on the attributes of a
// scrape.MetricMetadata. In this case, a combination of metric name, help, type, and unit.
metadataSet := map[scrape.MetricMetadata]struct{}{}
metadata := []scrape.MetricMetadata{}
for _, tset := range mw.manager.TargetsActive() {
for _, target := range tset {
for _, entry := range target.MetadataList() {
if _, ok := metadataSet[entry]; !ok {
metadata = append(metadata, entry)
metadataSet[entry] = struct{}{}
}
}
}
}
// Blocks until the metadata is sent to the remote write endpoint or hardShutdownContext is expired.
mw.writer.AppendMetadata(mw.hardShutdownCtx, metadata)
}
func (mw *MetadataWatcher) ready() bool {
if mw.manager != nil {
return true
}
m, err := mw.managerGetter.Get()
if err != nil {
return false
}
mw.manager = m
return true
}

155
storage/remote/metadata_watcher_test.go

@ -0,0 +1,155 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/require"
)
var (
interval = model.Duration(1 * time.Millisecond)
deadline = 1 * time.Millisecond
)
// TestMetaStore satisfies the MetricMetadataStore interface.
// It is used to inject specific metadata as part of a test case.
type TestMetaStore struct {
Metadata []scrape.MetricMetadata
}
func (s *TestMetaStore) ListMetadata() []scrape.MetricMetadata {
return s.Metadata
}
func (s *TestMetaStore) GetMetadata(metric string) (scrape.MetricMetadata, bool) {
for _, m := range s.Metadata {
if metric == m.Metric {
return m, true
}
}
return scrape.MetricMetadata{}, false
}
func (s *TestMetaStore) SizeMetadata() int { return 0 }
func (s *TestMetaStore) LengthMetadata() int { return 0 }
type writeMetadataToMock struct {
metadataAppended int
}
func (mwtm *writeMetadataToMock) AppendMetadata(_ context.Context, m []scrape.MetricMetadata) {
mwtm.metadataAppended += len(m)
}
func newMetadataWriteToMock() *writeMetadataToMock {
return &writeMetadataToMock{}
}
type scrapeManagerMock struct {
manager *scrape.Manager
ready bool
}
func (smm *scrapeManagerMock) Get() (*scrape.Manager, error) {
if smm.ready {
return smm.manager, nil
}
return nil, errors.New("not ready")
}
type fakeManager struct {
activeTargets map[string][]*scrape.Target
}
func (fm *fakeManager) TargetsActive() map[string][]*scrape.Target {
return fm.activeTargets
}
func TestWatchScrapeManager_NotReady(t *testing.T) {
wt := newMetadataWriteToMock()
smm := &scrapeManagerMock{
ready: false,
}
mw := NewMetadataWatcher(nil, smm, "", wt, interval, deadline)
require.Equal(t, false, mw.ready())
mw.collect()
require.Equal(t, 0, wt.metadataAppended)
}
func TestWatchScrapeManager_ReadyForCollection(t *testing.T) {
wt := newMetadataWriteToMock()
metadata := &TestMetaStore{
Metadata: []scrape.MetricMetadata{
{
Metric: "prometheus_tsdb_head_chunks_created_total",
Type: textparse.MetricTypeCounter,
Help: "Total number",
Unit: "",
},
{
Metric: "prometheus_remote_storage_retried_samples_total",
Type: textparse.MetricTypeCounter,
Help: "Total number",
Unit: "",
},
},
}
metadataDup := &TestMetaStore{
Metadata: []scrape.MetricMetadata{
{
Metric: "prometheus_tsdb_head_chunks_created_total",
Type: textparse.MetricTypeCounter,
Help: "Total number",
Unit: "",
},
},
}
target := &scrape.Target{}
target.SetMetadataStore(metadata)
targetWithDup := &scrape.Target{}
targetWithDup.SetMetadataStore(metadataDup)
manager := &fakeManager{
activeTargets: map[string][]*scrape.Target{
"job": []*scrape.Target{target},
"dup": []*scrape.Target{targetWithDup},
},
}
smm := &scrapeManagerMock{
ready: true,
}
mw := NewMetadataWatcher(nil, smm, "", wt, interval, deadline)
mw.manager = manager
mw.collect()
require.Equal(t, 2, wt.metadataAppended)
}

262
storage/remote/queue_manager.go

@ -26,13 +26,14 @@ import (
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
)
@ -50,21 +51,25 @@ const (
type queueManagerMetrics struct {
reg prometheus.Registerer
succeededSamplesTotal prometheus.Counter
failedSamplesTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
shardCapacity prometheus.Gauge
numShards prometheus.Gauge
maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
bytesSent prometheus.Counter
maxSamplesPerSend prometheus.Gauge
samplesTotal prometheus.Counter
metadataTotal prometheus.Counter
failedSamplesTotal prometheus.Counter
failedMetadataTotal prometheus.Counter
retriedSamplesTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge
shardCapacity prometheus.Gauge
numShards prometheus.Gauge
maxNumShards prometheus.Gauge
minNumShards prometheus.Gauge
desiredNumShards prometheus.Gauge
samplesBytesTotal prometheus.Counter
metadataBytesTotal prometheus.Counter
maxSamplesPerSend prometheus.Gauge
}
func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics {
@ -76,31 +81,52 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
endpoint: e,
}
m.succeededSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
m.samplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_total",
Help: "Total number of samples sent to remote storage.",
ConstLabels: constLabels,
})
m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "succeeded_samples_total",
Help: "Total number of samples successfully sent to remote storage.",
Name: "metadata_total",
Help: "Total number of metadata entries sent to remote storage.",
ConstLabels: constLabels,
})
m.failedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_samples_total",
Name: "samples_failed_total",
Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels,
})
m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "metadata_failed_total",
Help: "Total number of metadata entries which failed on send to remote storage, non-recoverable errors.",
ConstLabels: constLabels,
})
m.retriedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "retried_samples_total",
Name: "samples_retried_total",
Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "metadata_retried_total",
Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_samples_total",
Name: "samples_dropped_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.",
ConstLabels: constLabels,
})
@ -115,7 +141,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_batch_duration_seconds",
Help: "Duration of sample batch send calls to the remote storage.",
Help: "Duration of send calls to the remote storage.",
Buckets: append(prometheus.DefBuckets, 25, 60, 120, 300),
ConstLabels: constLabels,
})
@ -131,7 +157,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
m.pendingSamples = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pending_samples",
Name: "samples_pending",
Help: "The number of samples pending in the queues shards to be sent to the remote storage.",
ConstLabels: constLabels,
})
@ -170,11 +196,18 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.",
ConstLabels: constLabels,
})
m.bytesSent = prometheus.NewCounter(prometheus.CounterOpts{
m.samplesBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_bytes_total",
Help: "The total number of bytes of samples sent by the queue after compression.",
ConstLabels: constLabels,
})
m.metadataBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_bytes_total",
Help: "The total number of bytes sent by the queue.",
Name: "metadata_bytes_total",
Help: "The total number of bytes of metadata sent by the queue after compression.",
ConstLabels: constLabels,
})
m.maxSamplesPerSend = prometheus.NewGauge(prometheus.GaugeOpts{
@ -191,9 +224,12 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
func (m *queueManagerMetrics) register() {
if m.reg != nil {
m.reg.MustRegister(
m.succeededSamplesTotal,
m.samplesTotal,
m.metadataTotal,
m.failedSamplesTotal,
m.failedMetadataTotal,
m.retriedSamplesTotal,
m.retriedMetadataTotal,
m.droppedSamplesTotal,
m.enqueueRetriesTotal,
m.sentBatchDuration,
@ -204,7 +240,8 @@ func (m *queueManagerMetrics) register() {
m.maxNumShards,
m.minNumShards,
m.desiredNumShards,
m.bytesSent,
m.samplesBytesTotal,
m.metadataBytesTotal,
m.maxSamplesPerSend,
)
}
@ -212,9 +249,12 @@ func (m *queueManagerMetrics) register() {
func (m *queueManagerMetrics) unregister() {
if m.reg != nil {
m.reg.Unregister(m.succeededSamplesTotal)
m.reg.Unregister(m.samplesTotal)
m.reg.Unregister(m.metadataTotal)
m.reg.Unregister(m.failedSamplesTotal)
m.reg.Unregister(m.failedMetadataTotal)
m.reg.Unregister(m.retriedSamplesTotal)
m.reg.Unregister(m.retriedMetadataTotal)
m.reg.Unregister(m.droppedSamplesTotal)
m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration)
@ -225,7 +265,8 @@ func (m *queueManagerMetrics) unregister() {
m.reg.Unregister(m.maxNumShards)
m.reg.Unregister(m.minNumShards)
m.reg.Unregister(m.desiredNumShards)
m.reg.Unregister(m.bytesSent)
m.reg.Unregister(m.samplesBytesTotal)
m.reg.Unregister(m.metadataBytesTotal)
m.reg.Unregister(m.maxSamplesPerSend)
}
}
@ -247,12 +288,14 @@ type WriteClient interface {
type QueueManager struct {
lastSendTimestamp atomic.Int64
logger log.Logger
flushDeadline time.Duration
cfg config.QueueConfig
externalLabels labels.Labels
relabelConfigs []*relabel.Config
watcher *wal.Watcher
logger log.Logger
flushDeadline time.Duration
cfg config.QueueConfig
mcfg config.MetadataConfig
externalLabels labels.Labels
relabelConfigs []*relabel.Config
watcher *wal.Watcher
metadataWatcher *MetadataWatcher
clientMtx sync.RWMutex
storeClient WriteClient
@ -284,12 +327,14 @@ func NewQueueManager(
walDir string,
samplesIn *ewmaRate,
cfg config.QueueConfig,
mCfg config.MetadataConfig,
externalLabels labels.Labels,
relabelConfigs []*relabel.Config,
client WriteClient,
flushDeadline time.Duration,
interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager,
) *QueueManager {
if logger == nil {
logger = log.NewNopLogger()
@ -300,6 +345,7 @@ func NewQueueManager(
logger: logger,
flushDeadline: flushDeadline,
cfg: cfg,
mcfg: mCfg,
externalLabels: externalLabels,
relabelConfigs: relabelConfigs,
storeClient: client,
@ -323,11 +369,77 @@ func NewQueueManager(
}
t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir)
if t.mcfg.Send {
t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline)
}
t.shards = t.newShards()
return t
}
// AppendMetadata sends metadata the remote storage. Metadata is sent all at once and is not parallelized.
func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) {
mm := make([]prompb.MetricMetadata, 0, len(metadata))
for _, entry := range metadata {
mm = append(mm, prompb.MetricMetadata{
MetricFamilyName: entry.Metric,
Help: entry.Help,
Type: metricTypeToMetricTypeProto(entry.Type),
Unit: entry.Unit,
})
}
err := t.sendMetadataWithBackoff(ctx, mm)
if err != nil {
t.metrics.failedMetadataTotal.Add(float64(len(metadata)))
level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", len(metadata), "err", err)
}
}
func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata) error {
// Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, nil)
if err != nil {
return err
}
metadataCount := len(metadata)
attemptStore := func(try int) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Metadata Send Batch")
defer span.Finish()
span.SetTag("metadata", metadataCount)
span.SetTag("try", try)
span.SetTag("remote_name", t.storeClient.Name())
span.SetTag("remote_url", t.storeClient.Endpoint())
begin := time.Now()
err := t.storeClient.Store(ctx, req)
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
if err != nil {
span.LogKV("error", err)
ext.Error.Set(span, true)
return err
}
return nil
}
retry := func() {
t.metrics.retriedMetadataTotal.Add(float64(len(metadata)))
}
err = sendWriteRequestWithBackoff(ctx, t.cfg, t.client(), t.logger, req, attemptStore, retry)
if err != nil {
return err
}
t.metrics.metadataTotal.Add(float64(len(metadata)))
t.metrics.metadataBytesTotal.Add(float64(len(req)))
return nil
}
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
@ -386,6 +498,9 @@ func (t *QueueManager) Start() {
t.shards.start(t.numShards)
t.watcher.Start()
if t.mcfg.Send {
t.metadataWatcher.Start()
}
t.wg.Add(2)
go t.updateShardsLoop()
@ -400,11 +515,14 @@ func (t *QueueManager) Stop() {
close(t.quit)
t.wg.Wait()
// Wait for all QueueManager routines to end before stopping shards and WAL watcher. This
// Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
t.shards.stop()
t.watcher.Stop()
if t.mcfg.Send {
t.metadataWatcher.Stop()
}
// On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock()
@ -868,23 +986,22 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error {
req, highest, err := buildWriteRequest(samples, *buf)
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, *buf)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
backoff := s.qm.cfg.MinBackoff
reqSize := len(*buf)
sampleCount := len(samples)
*buf = req
try := 0
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func() error {
attemptStore := func(try int) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch")
defer span.Finish()
@ -895,6 +1012,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
span.SetTag("remote_url", s.qm.storeClient.Endpoint())
begin := time.Now()
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
err := s.qm.client().Store(ctx, *buf)
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
@ -907,6 +1025,23 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
return nil
}
onRetry := func() {
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
}
err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.client(), s.qm.logger, req, attemptStore, onRetry)
if err != nil {
return err
}
s.qm.metrics.samplesBytesTotal.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return nil
}
func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, s WriteClient, l log.Logger, req []byte, attempt func(int) error, onRetry func()) error {
backoff := cfg.MinBackoff
try := 0
for {
select {
case <-ctx.Done():
@ -914,37 +1049,34 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
default:
}
err = attemptStore()
err := attempt(try)
if err != nil {
// If the error is unrecoverable, we should not retry.
if _, ok := err.(RecoverableError); !ok {
return err
}
if err == nil {
return nil
}
// If we make it this far, we've encountered a recoverable error and will retry.
s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount))
level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err)
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
// If the error is unrecoverable, we should not retry.
if _, ok := err.(RecoverableError); !ok {
return err
}
if backoff > s.qm.cfg.MaxBackoff {
backoff = s.qm.cfg.MaxBackoff
}
// If we make it this far, we've encountered a recoverable error and will retry.
onRetry()
level.Debug(l).Log("msg", "failed to send batch, retrying", "err", err)
try++
continue
time.Sleep(time.Duration(backoff))
backoff = backoff * 2
if backoff > cfg.MaxBackoff {
backoff = cfg.MaxBackoff
}
// Since we retry forever on recoverable errors, this needs to stay inside the loop.
s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount))
s.qm.metrics.bytesSent.Add(float64(reqSize))
s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000))
return nil
try++
continue
}
}
func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) {
func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) {
var highest int64
for _, ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
@ -952,8 +1084,10 @@ func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64,
highest = ts.Samples[0].Timestamp
}
}
req := &prompb.WriteRequest{
Timeseries: samples,
Metadata: metadata,
}
data, err := proto.Marshal(req)

98
storage/remote/queue_manager_test.go

@ -39,8 +39,10 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/record"
)
@ -78,7 +80,7 @@ func TestSampleDelivery(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
defer s.Close()
writeConfig := config.DefaultRemoteWriteConfig
@ -110,6 +112,33 @@ func TestSampleDelivery(t *testing.T) {
c.waitForExpectedSamples(t)
}
func TestMetadataDelivery(t *testing.T) {
c := NewTestWriteClient()
dir, err := ioutil.TempDir("", "TestMetadataDelivery")
require.NoError(t, err)
defer os.RemoveAll(dir)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.Start()
defer m.Stop()
m.AppendMetadata(context.Background(), []scrape.MetricMetadata{
scrape.MetricMetadata{
Metric: "prometheus_remote_storage_sent_metadata_bytes_total",
Type: textparse.MetricTypeCounter,
Help: "a nice help text",
Unit: "",
},
})
require.Equal(t, len(c.receivedMetadata), 1)
}
func TestSampleDeliveryTimeout(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
@ -117,6 +146,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
@ -127,7 +157,7 @@ func TestSampleDeliveryTimeout(t *testing.T) {
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.StoreSeries(series, 0)
m.Start()
defer m.Stop()
@ -169,8 +199,11 @@ func TestSampleDeliveryOrder(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), nil)
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.StoreSeries(series, 0)
m.Start()
@ -190,9 +223,11 @@ func TestShutdown(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil)
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
@ -231,8 +266,10 @@ func TestSeriesReset(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil)
for i := 0; i < numSegments; i++ {
series := []record.RefSeries{}
for j := 0; j < numSeries; j++ {
@ -255,6 +292,7 @@ func TestReshard(t *testing.T) {
c.expectSamples(samples, series)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
dir, err := ioutil.TempDir("", "TestReshard")
@ -264,7 +302,7 @@ func TestReshard(t *testing.T) {
}()
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.StoreSeries(series, 0)
m.Start()
@ -294,10 +332,12 @@ func TestReshardRaceWithStop(t *testing.T) {
h.Lock()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
go func() {
for {
metrics := newQueueManagerMetrics(nil, "", "")
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.Start()
h.Unlock()
h.Lock()
@ -313,9 +353,11 @@ func TestReshardRaceWithStop(t *testing.T) {
}
func TestReleaseNoninternedString(t *testing.T) {
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.Start()
for i := 1; i < 1000; i++ {
@ -360,10 +402,13 @@ func TestShouldReshard(t *testing.T) {
expectedToReshard: true,
},
}
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
for _, c := range cases {
metrics := newQueueManagerMetrics(nil, "", "")
client := NewTestWriteClient()
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.numShards = c.startingShards
m.samplesIn.incr(c.samplesIn)
m.samplesOut.incr(c.samplesOut)
@ -411,19 +456,21 @@ func getSeriesNameFromRef(r record.RefSeries) string {
}
type TestWriteClient struct {
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
receivedSamples map[string][]prompb.Sample
expectedSamples map[string][]prompb.Sample
receivedMetadata map[string][]prompb.MetricMetadata
withWaitGroup bool
wg sync.WaitGroup
mtx sync.Mutex
buf []byte
}
func NewTestWriteClient() *TestWriteClient {
return &TestWriteClient{
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
withWaitGroup: true,
receivedSamples: map[string][]prompb.Sample{},
expectedSamples: map[string][]prompb.Sample{},
receivedMetadata: map[string][]prompb.MetricMetadata{},
}
}
@ -510,6 +557,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
if c.withWaitGroup {
c.wg.Add(-count)
}
for _, m := range reqProto.Metadata {
c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m)
}
return nil
}
@ -560,6 +612,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond)
cfg.MaxShards = 1
@ -568,7 +621,7 @@ func BenchmarkSampleDelivery(b *testing.B) {
defer os.RemoveAll(dir)
metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
m.StoreSeries(series, 0)
// These should be received by the client.
@ -607,12 +660,14 @@ func BenchmarkStartup(b *testing.B) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
logger = log.With(logger, "caller", log.DefaultCaller)
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
for n := 0; n < b.N; n++ {
metrics := newQueueManagerMetrics(nil, "", "")
c := NewTestBlockedWriteClient()
m := NewQueueManager(metrics, nil, nil, logger, dir,
newEWMARate(ewmaWeight, shardUpdateDuration),
config.DefaultQueueConfig, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric())
cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil)
m.watcher.SetStartTime(timestamp.Time(math.MaxInt64))
m.watcher.MaxSegment = segments[len(segments)-2]
err := m.watcher.Run()
@ -654,6 +709,7 @@ func TestProcessExternalLabels(t *testing.T) {
func TestCalculateDesiredShards(t *testing.T) {
c := NewTestWriteClient()
cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig
dir, err := ioutil.TempDir("", "TestCalculateDesiredShards")
require.NoError(t, err)
@ -663,7 +719,7 @@ func TestCalculateDesiredShards(t *testing.T) {
metrics := newQueueManagerMetrics(nil, "", "")
samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration)
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric())
m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil)
// Need to start the queue manager so the proper metrics are initialized.
// However we can stop it right away since we don't need to do any actual

2
storage/remote/read_test.go

@ -95,7 +95,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) {
for _, tc := range cases {
t.Run("", func(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteReadConfigs: tc.cfgs,

9
storage/remote/storage.go

@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/logging"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
@ -40,6 +41,10 @@ const (
endpoint = "url"
)
type ReadyScrapeManager interface {
Get() (*scrape.Manager, error)
}
// startTimeCallback is a callback func that return the oldest timestamp stored in a storage.
type startTimeCallback func() (int64, error)
@ -57,7 +62,7 @@ type Storage struct {
}
// NewStorage returns a remote.Storage.
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration) *Storage {
func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage {
if l == nil {
l = log.NewNopLogger()
}
@ -66,7 +71,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal
logger: logging.Dedupe(l, 1*time.Minute),
localStartTimeCallback: stCallback,
}
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline)
s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm)
return s
}

4
storage/remote/storage_test.go

@ -30,7 +30,7 @@ func TestStorageLifecycle(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -69,7 +69,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline)
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},

6
storage/remote/write.go

@ -52,13 +52,14 @@ type WriteStorage struct {
samplesIn *ewmaRate
flushDeadline time.Duration
interner *pool
scraper ReadyScrapeManager
// For timestampTracker.
highestTimestamp *maxTimestamp
}
// NewWriteStorage creates and runs a WriteStorage.
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration) *WriteStorage {
func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage {
if logger == nil {
logger = log.NewNopLogger()
}
@ -72,6 +73,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
walDir: walDir,
interner: newPool(),
scraper: sm,
highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@ -155,12 +157,14 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.walDir,
rws.samplesIn,
rwConf.QueueConfig,
rwConf.MetadataConfig,
conf.GlobalConfig.ExternalLabels,
rwConf.WriteRelabelConfigs,
c,
rws.flushDeadline,
rws.interner,
rws.highestTimestamp,
rws.scraper,
)
// Keep track of which queues are new so we know which to start.
newHashes = append(newHashes, hash)

14
storage/remote/write_test.go

@ -115,7 +115,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) {
}
for _, tc := range cases {
s := NewWriteStorage(nil, nil, dir, time.Millisecond)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: tc.cfgs,
@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) {
hash, err := toHash(cfg)
require.NoError(t, err)
s := NewWriteStorage(nil, nil, dir, time.Millisecond)
s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil)
c1 := &config.RemoteWriteConfig{
Name: "named",
URL: &common_config.URL{
@ -211,7 +211,7 @@ func TestWriteStorageLifecycle(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.DefaultGlobalConfig,
RemoteWriteConfigs: []*config.RemoteWriteConfig{
@ -232,7 +232,7 @@ func TestUpdateExternalLabels(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second)
s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil)
externalLabels := labels.FromStrings("external", "true")
conf := &config.Config{
@ -265,7 +265,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
conf := &config.Config{
GlobalConfig: config.GlobalConfig{},
@ -301,7 +301,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) {
require.NoError(t, os.RemoveAll(dir))
}()
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline)
s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil)
c0 := &config.RemoteWriteConfig{
RemoteTimeout: model.Duration(10 * time.Second),

4
web/api/v1/api_test.go

@ -367,7 +367,9 @@ func TestEndpoints(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dbDir)
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, nil, dbDir, 1*time.Second)
remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
}, dbDir, 1*time.Second, nil)
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{

Loading…
Cancel
Save