Instrument unwritten samples queue.

Change-Id: Id77387314d340a5118490cf08e7bbc37c7366b25
pull/413/head
Bjoern Rabenstein 2014-11-10 19:42:55 +01:00
parent c087ee35f7
commit a5f56639b8
1 changed files with 49 additions and 6 deletions

55
main.go
View File

@ -45,7 +45,7 @@ var (
configFile = flag.String("configFile", "prometheus.conf", "Prometheus configuration file name.")
metricsStoragePath = flag.String("metricsStoragePath", "/tmp/metrics", "Base path for metrics storage.")
alertmanagerUrl = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
@ -67,6 +67,20 @@ var (
printVersion = flag.Bool("version", false, "print version information")
)
// Instrumentation.
var (
samplesQueueCapDesc = registry.NewDesc(
"prometheus_samples_queue_capacity",
"Capacity of the queue for unwritten samples.",
nil, nil,
)
samplesQueueLenDesc = registry.NewDesc(
"prometheus_samples_queue_length",
"Current number of items in the queue for unwritten samples. Each item comprises all samples exposed by one target as one metric family (i.e. metrics of the same name).",
nil, nil,
)
)
type prometheus struct {
unwrittenSamples chan *extraction.Result
@ -99,8 +113,7 @@ func NewPrometheus() *prometheus {
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, *notificationQueueCapacity)
registry.MustRegister(notificationHandler)
notificationHandler := notification.NewNotificationHandler(*alertmanagerURL, *notificationQueueCapacity)
o := &local.MemorySeriesStorageOptions{
MemoryEvictionInterval: *memoryEvictionInterval,
@ -115,7 +128,6 @@ func NewPrometheus() *prometheus {
if err != nil {
glog.Fatal("Error opening memory series storage: ", err)
}
registry.MustRegister(memStorage)
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
Results: unwrittenSamples,
@ -134,7 +146,6 @@ func NewPrometheus() *prometheus {
} else {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
registry.MustRegister(remoteTSDBQueue)
}
flags := map[string]string{}
@ -259,6 +270,36 @@ func (p *prometheus) close() {
// remaining shut-downs happen in Serve().
}
// Describe implements registry.Collector.
func (p *prometheus) Describe(ch chan<- *registry.Desc) {
ch <- samplesQueueCapDesc
ch <- samplesQueueLenDesc
p.notificationHandler.Describe(ch)
p.storage.Describe(ch)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Describe(ch)
}
}
// Collect implements registry.Collector.
func (p *prometheus) Collect(ch chan<- registry.Metric) {
ch <- registry.MustNewConstMetric(
samplesQueueCapDesc,
registry.GaugeValue,
float64(cap(p.unwrittenSamples)),
)
ch <- registry.MustNewConstMetric(
samplesQueueLenDesc,
registry.GaugeValue,
float64(len(p.unwrittenSamples)),
)
p.notificationHandler.Collect(ch)
p.storage.Collect(ch)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Collect(ch)
}
}
func main() {
flag.Parse()
versionInfoTmpl.Execute(os.Stdout, BuildInfo)
@ -267,5 +308,7 @@ func main() {
os.Exit(0)
}
NewPrometheus().Serve()
p := NewPrometheus()
registry.MustRegister(p)
p.Serve()
}