mirror of https://github.com/prometheus/prometheus
Merge pull request #2419 from prometheus/remove-legacy-remotes
Remove legacy remote storage implementationspull/2427/head
commit
980586d183
|
@ -34,7 +34,6 @@ import (
|
|||
"github.com/prometheus/prometheus/storage/local"
|
||||
"github.com/prometheus/prometheus/storage/local/chunk"
|
||||
"github.com/prometheus/prometheus/storage/local/index"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/web"
|
||||
)
|
||||
|
||||
|
@ -52,15 +51,37 @@ var cfg = struct {
|
|||
notifierTimeout time.Duration
|
||||
queryEngine promql.EngineOptions
|
||||
web web.Options
|
||||
remote remote.Options
|
||||
|
||||
alertmanagerURLs stringset
|
||||
prometheusURL string
|
||||
influxdbURL string
|
||||
}{
|
||||
alertmanagerURLs: stringset{},
|
||||
}
|
||||
|
||||
// Value type for flags that are now unused, but which are kept around to
|
||||
// fulfill 1.0 stability guarantees.
|
||||
type unusedFlag struct {
|
||||
name string
|
||||
value string
|
||||
help string
|
||||
}
|
||||
|
||||
func (f *unusedFlag) Set(v string) error {
|
||||
f.value = v
|
||||
log.Warnf("Flag %q is unused, but set to %q! See the flag's help message: %s", f.name, f.value, f.help)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f unusedFlag) String() string {
|
||||
return f.value
|
||||
}
|
||||
|
||||
func registerUnusedFlags(fs *flag.FlagSet, help string, flags []string) {
|
||||
for _, name := range flags {
|
||||
fs.Var(&unusedFlag{name: name, help: help}, name, help)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
cfg.fs = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
|
||||
cfg.fs.Usage = usage
|
||||
|
@ -190,44 +211,19 @@ func init() {
|
|||
"Local storage engine. Supported values are: 'persisted' (full local storage with on-disk persistence) and 'none' (no local storage).",
|
||||
)
|
||||
|
||||
// Remote storage.
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphiteAddress, "storage.remote.graphite-address", "",
|
||||
"The host:port of the remote Graphite server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphiteTransport, "storage.remote.graphite-transport", "tcp",
|
||||
"Transport protocol to use to communicate with Graphite. 'tcp', if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.GraphitePrefix, "storage.remote.graphite-prefix", "",
|
||||
"The prefix to prepend to all metrics exported to Graphite. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.OpentsdbURL, "storage.remote.opentsdb-url", "",
|
||||
"The URL of the remote OpenTSDB server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.influxdbURL, "storage.remote.influxdb-url", "",
|
||||
"The URL of the remote InfluxDB server to send samples to. None, if empty.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
|
||||
"The InfluxDB retention policy to use.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbUsername, "storage.remote.influxdb.username", "",
|
||||
"The username to use when sending samples to InfluxDB. The corresponding password must be provided via the INFLUXDB_PW environment variable.",
|
||||
)
|
||||
cfg.fs.StringVar(
|
||||
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
|
||||
"The name of the database to use for storing samples in InfluxDB.",
|
||||
)
|
||||
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
|
||||
"The timeout to use when sending samples to the remote storage.",
|
||||
)
|
||||
// Unused flags for removed remote storage code.
|
||||
const remoteStorageFlagsHelp = "WARNING: THIS FLAG IS UNUSED! Built-in support for InfluxDB, Graphite, and OpenTSDB has been removed. Use Prometheus's generic remote write feature for building remote storage integrations. See https://prometheus.io/docs/operating/configuration/#<remote_write>"
|
||||
registerUnusedFlags(cfg.fs, remoteStorageFlagsHelp, []string{
|
||||
"storage.remote.graphite-address",
|
||||
"storage.remote.graphite-transport",
|
||||
"storage.remote.graphite-prefix",
|
||||
"storage.remote.opentsdb-url",
|
||||
"storage.remote.influxdb-url",
|
||||
"storage.remote.influxdb.retention-policy",
|
||||
"storage.remote.influxdb.username",
|
||||
"storage.remote.influxdb.database",
|
||||
"storage.remote.timeout",
|
||||
})
|
||||
|
||||
// Alertmanager.
|
||||
cfg.fs.Var(
|
||||
|
@ -287,17 +283,12 @@ func parse(args []string) error {
|
|||
// RoutePrefix must always be at least '/'.
|
||||
cfg.web.RoutePrefix = "/" + strings.Trim(cfg.web.RoutePrefix, "/")
|
||||
|
||||
if err := parseInfluxdbURL(); err != nil {
|
||||
return err
|
||||
}
|
||||
for u := range cfg.alertmanagerURLs {
|
||||
if err := validateAlertmanagerURL(u); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cfg.remote.InfluxdbPassword = os.Getenv("INFLUXDB_PW")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -332,24 +323,6 @@ func parsePrometheusURL() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseInfluxdbURL() error {
|
||||
if cfg.influxdbURL == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ok := govalidator.IsURL(cfg.influxdbURL); !ok {
|
||||
return fmt.Errorf("invalid InfluxDB URL: %s", cfg.influxdbURL)
|
||||
}
|
||||
|
||||
url, err := url.Parse(cfg.influxdbURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg.remote.InfluxdbURL = url
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateAlertmanagerURL(u string) error {
|
||||
if u == "" {
|
||||
return nil
|
||||
|
|
|
@ -36,18 +36,6 @@ func TestParse(t *testing.T) {
|
|||
input: []string{"-web.external-url", "'https://url/prometheus'"},
|
||||
valid: false,
|
||||
},
|
||||
{
|
||||
input: []string{"-storage.remote.influxdb-url", ""},
|
||||
valid: true,
|
||||
},
|
||||
{
|
||||
input: []string{"-storage.remote.influxdb-url", "http://localhost:8086/"},
|
||||
valid: true,
|
||||
},
|
||||
{
|
||||
input: []string{"-storage.remote.influxdb-url", "'https://some-url/'"},
|
||||
valid: false,
|
||||
},
|
||||
{
|
||||
input: []string{"-alertmanager.url", ""},
|
||||
valid: true,
|
||||
|
@ -69,7 +57,6 @@ func TestParse(t *testing.T) {
|
|||
for i, test := range tests {
|
||||
// reset "immutable" config
|
||||
cfg.prometheusURL = ""
|
||||
cfg.influxdbURL = ""
|
||||
cfg.alertmanagerURLs = stringset{}
|
||||
|
||||
err := parse(test.input)
|
||||
|
|
|
@ -92,19 +92,9 @@ func Main() int {
|
|||
return 1
|
||||
}
|
||||
|
||||
remoteStorage, err := remote.New(&cfg.remote)
|
||||
if err != nil {
|
||||
log.Errorf("Error initializing remote storage: %s", err)
|
||||
return 1
|
||||
}
|
||||
if remoteStorage != nil {
|
||||
sampleAppender = append(sampleAppender, remoteStorage)
|
||||
reloadables = append(reloadables, remoteStorage)
|
||||
}
|
||||
|
||||
reloadableRemoteStorage := remote.NewConfigurable()
|
||||
sampleAppender = append(sampleAppender, reloadableRemoteStorage)
|
||||
reloadables = append(reloadables, reloadableRemoteStorage)
|
||||
remoteStorage := &remote.Storage{}
|
||||
sampleAppender = append(sampleAppender, remoteStorage)
|
||||
reloadables = append(reloadables, remoteStorage)
|
||||
|
||||
var (
|
||||
notifier = notifier.New(&cfg.notifier)
|
||||
|
@ -188,12 +178,7 @@ func Main() int {
|
|||
}
|
||||
}()
|
||||
|
||||
if remoteStorage != nil {
|
||||
remoteStorage.Start()
|
||||
defer remoteStorage.Stop()
|
||||
}
|
||||
|
||||
defer reloadableRemoteStorage.Stop()
|
||||
defer remoteStorage.Stop()
|
||||
|
||||
// The storage has to be fully initialized before registering.
|
||||
if instrumentedStorage, ok := localStorage.(prometheus.Collector); ok {
|
||||
|
|
|
@ -29,12 +29,13 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/storage/remote/graphite"
|
||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||
|
||||
influx "github.com/influxdb/influxdb/client"
|
||||
|
||||
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite"
|
||||
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb"
|
||||
"github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/opentsdb"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
@ -14,101 +14,67 @@
|
|||
package remote
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
influx "github.com/influxdb/influxdb/client"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
"github.com/prometheus/prometheus/storage/remote/graphite"
|
||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
// Storage allows queueing samples for remote writes.
|
||||
type Storage struct {
|
||||
queues []*StorageQueueManager
|
||||
externalLabels model.LabelSet
|
||||
relabelConfigs []*config.RelabelConfig
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf config.RemoteWriteConfig
|
||||
|
||||
queue *StorageQueueManager
|
||||
}
|
||||
|
||||
// ApplyConfig updates the status state as the new config requires.
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *Storage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs
|
||||
return nil
|
||||
}
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
var newQueue *StorageQueueManager
|
||||
|
||||
// New returns a new remote Storage.
|
||||
func New(o *Options) (*Storage, error) {
|
||||
s := &Storage{}
|
||||
if o.GraphiteAddress != "" {
|
||||
c := graphite.NewClient(
|
||||
o.GraphiteAddress, o.GraphiteTransport,
|
||||
o.StorageTimeout, o.GraphitePrefix)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.OpentsdbURL != "" {
|
||||
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
}
|
||||
if o.InfluxdbURL != nil {
|
||||
conf := influx.Config{
|
||||
URL: *o.InfluxdbURL,
|
||||
Username: o.InfluxdbUsername,
|
||||
Password: o.InfluxdbPassword,
|
||||
Timeout: o.StorageTimeout,
|
||||
if conf.RemoteWriteConfig.URL != nil {
|
||||
c, err := NewClient(conf.RemoteWriteConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c := influxdb.NewClient(conf, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
|
||||
prometheus.MustRegister(c)
|
||||
s.queues = append(s.queues, NewStorageQueueManager(c, nil))
|
||||
newQueue = NewStorageQueueManager(c, nil)
|
||||
}
|
||||
if len(s.queues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Options contains configuration parameters for a remote storage.
|
||||
type Options struct {
|
||||
StorageTimeout time.Duration
|
||||
InfluxdbURL *url.URL
|
||||
InfluxdbRetentionPolicy string
|
||||
InfluxdbUsername string
|
||||
InfluxdbPassword string
|
||||
InfluxdbDatabase string
|
||||
OpentsdbURL string
|
||||
GraphiteAddress string
|
||||
GraphiteTransport string
|
||||
GraphitePrefix string
|
||||
}
|
||||
|
||||
// Start starts the background processing of the storage queues.
|
||||
func (s *Storage) Start() {
|
||||
for _, q := range s.queues {
|
||||
q.Start()
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
s.queue = newQueue
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
if s.queue != nil {
|
||||
s.queue.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *Storage) Stop() {
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Append implements storage.SampleAppender. Always returns nil.
|
||||
func (s *Storage) Append(smpl *model.Sample) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if s.queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var snew model.Sample
|
||||
snew = *smpl
|
||||
|
@ -120,16 +86,12 @@ func (s *Storage) Append(smpl *model.Sample) error {
|
|||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...))
|
||||
s.mtx.RUnlock()
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Append(&snew)
|
||||
}
|
||||
s.queue.Append(&snew)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,108 +0,0 @@
|
|||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
type ReloadableStorage struct {
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf config.RemoteWriteConfig
|
||||
|
||||
queue *StorageQueueManager
|
||||
}
|
||||
|
||||
// New returns a new remote Storage.
|
||||
func NewConfigurable() *ReloadableStorage {
|
||||
return &ReloadableStorage{}
|
||||
}
|
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
var newQueue *StorageQueueManager
|
||||
|
||||
if conf.RemoteWriteConfig.URL != nil {
|
||||
c, err := NewClient(conf.RemoteWriteConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueue = NewStorageQueueManager(c, nil)
|
||||
}
|
||||
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
s.queue = newQueue
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
if s.queue != nil {
|
||||
s.queue.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *ReloadableStorage) Stop() {
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Append implements storage.SampleAppender. Always returns nil.
|
||||
func (s *ReloadableStorage) Append(smpl *model.Sample) error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
|
||||
if s.queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var snew model.Sample
|
||||
snew = *smpl
|
||||
snew.Metric = smpl.Metric.Clone()
|
||||
|
||||
for ln, lv := range s.externalLabels {
|
||||
if _, ok := smpl.Metric[ln]; !ok {
|
||||
snew.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
s.queue.Append(&snew)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NeedsThrottling implements storage.SampleAppender. It will always return
|
||||
// false as a remote storage drops samples on the floor if backlogging instead
|
||||
// of asking for throttling.
|
||||
func (s *ReloadableStorage) NeedsThrottling() bool {
|
||||
return false
|
||||
}
|
Loading…
Reference in New Issue