Merge pull request #827 from prometheus/fabxc/rmt-cleanup

main: cleanup initialization of remote storage.
pull/837/head
Fabian Reinartz 2015-06-24 09:53:47 +02:00
commit 3e811ad7a4
3 changed files with 113 additions and 52 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/local/index"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web"
)
@ -42,13 +43,7 @@ var cfg = struct {
notification notification.NotificationHandlerOptions
queryEngine promql.EngineOptions
web web.Options
// Remote storage.
remoteStorageTimeout time.Duration
influxdbURL string
influxdbRetentionPolicy string
influxdbDatabase string
opentsdbURL string
remote remote.Options
prometheusURL string
}{}
@ -167,23 +162,23 @@ func init() {
// Remote storage.
cfg.fs.StringVar(
&cfg.opentsdbURL, "storage.remote.opentsdb-url", "",
&cfg.remote.OpentsdbURL, "storage.remote.opentsdb-url", "",
"The URL of the remote OpenTSDB server to send samples to. None, if empty.",
)
cfg.fs.StringVar(
&cfg.influxdbURL, "storage.remote.influxdb-url", "",
&cfg.remote.InfluxdbURL, "storage.remote.influxdb-url", "",
"The URL of the remote InfluxDB server to send samples to. None, if empty.",
)
cfg.fs.StringVar(
&cfg.influxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
&cfg.remote.InfluxdbRetentionPolicy, "storage.remote.influxdb.retention-policy", "default",
"The InfluxDB retention policy to use.",
)
cfg.fs.StringVar(
&cfg.influxdbDatabase, "storage.remote.influxdb.database", "prometheus",
&cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus",
"The name of the database to use for storing samples in InfluxDB.",
)
cfg.fs.DurationVar(
&cfg.remoteStorageTimeout, "storage.remote.timeout", 30*time.Second,
&cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second,
"The timeout to use when sending samples to the remote storage.",
)

View File

@ -28,7 +28,7 @@ import (
"github.com/prometheus/log"
registry "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/notification"
@ -38,8 +38,6 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/prometheus/version"
"github.com/prometheus/prometheus/web"
)
@ -58,39 +56,17 @@ func Main() int {
return 0
}
memStorage := local.NewMemorySeriesStorage(&cfg.storage)
var (
sampleAppender storage.SampleAppender
remoteStorageQueues []*remote.StorageQueueManager
)
if cfg.opentsdbURL == "" && cfg.influxdbURL == "" {
log.Warnf("No remote storage URLs provided; not sending any samples to long-term storage")
sampleAppender = memStorage
} else {
fanout := storage.Fanout{memStorage}
addRemoteStorage := func(c remote.StorageClient) {
qm := remote.NewStorageQueueManager(c, 100*1024)
fanout = append(fanout, qm)
remoteStorageQueues = append(remoteStorageQueues, qm)
}
if cfg.opentsdbURL != "" {
addRemoteStorage(opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteStorageTimeout))
}
if cfg.influxdbURL != "" {
addRemoteStorage(influxdb.NewClient(cfg.influxdbURL, cfg.remoteStorageTimeout, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy))
}
sampleAppender = fanout
}
var (
memStorage = local.NewMemorySeriesStorage(&cfg.storage)
remoteStorage = remote.New(&cfg.remote)
sampleAppender = storage.Fanout{memStorage}
notificationHandler = notification.NewNotificationHandler(&cfg.notification)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine)
)
if remoteStorage != nil {
sampleAppender = append(sampleAppender, remoteStorage)
}
ruleManager := rules.NewManager(&rules.ManagerOptions{
SampleAppender: sampleAppender,
@ -115,7 +91,7 @@ func Main() int {
webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web)
if !reloadConfig(cfg.configFile, status, targetManager, ruleManager) {
os.Exit(1)
return 1
}
// Wait for reload or termination signals. Start the handler for SIGHUP as
@ -142,16 +118,15 @@ func Main() int {
}
}()
// The storage has to be fully initialized before registering.
registry.MustRegister(memStorage)
registry.MustRegister(notificationHandler)
if remoteStorage != nil {
prometheus.MustRegister(remoteStorage)
for _, q := range remoteStorageQueues {
registry.MustRegister(q)
go q.Run()
defer q.Stop()
go remoteStorage.Run()
defer remoteStorage.Stop()
}
// The storage has to be fully initialized before registering.
prometheus.MustRegister(memStorage)
prometheus.MustRegister(notificationHandler)
go ruleManager.Run()
defer ruleManager.Stop()

91
storage/remote/remote.go Normal file
View File

@ -0,0 +1,91 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
"github.com/prometheus/client_golang/prometheus"
clientmodel "github.com/prometheus/client_golang/model"
)
// Storage collects multiple remote storage queues.
type Storage struct {
queues []*StorageQueueManager
}
// New returns a new remote Storage.
func New(o *Options) *Storage {
s := &Storage{}
if o.OpentsdbURL != "" {
c := opentsdb.NewClient(o.OpentsdbURL, o.StorageTimeout)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if o.InfluxdbURL != "" {
c := influxdb.NewClient(o.InfluxdbURL, o.StorageTimeout, o.InfluxdbDatabase, o.InfluxdbRetentionPolicy)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if len(s.queues) == 0 {
return nil
}
return s
}
// Options contains configuration parameters for a remote storage.
type Options struct {
StorageTimeout time.Duration
InfluxdbURL string
InfluxdbRetentionPolicy string
InfluxdbDatabase string
OpentsdbURL string
}
// Run starts the background processing of the storage queues.
func (s *Storage) Run() {
for _, q := range s.queues {
go q.Run()
}
}
// Stop the background processing of the storage queues.
func (s *Storage) Stop() {
for _, q := range s.queues {
q.Stop()
}
}
// Append implements storage.SampleAppender.
func (s *Storage) Append(smpl *clientmodel.Sample) {
for _, q := range s.queues {
q.Append(smpl)
}
}
// Describe implements prometheus.Collector.
func (s *Storage) Describe(ch chan<- *prometheus.Desc) {
for _, q := range s.queues {
q.Describe(ch)
}
}
// Collect implements prometheus.Collector.
func (s *Storage) Collect(ch chan<- prometheus.Metric) {
for _, q := range s.queues {
q.Collect(ch)
}
}