mirror of https://github.com/hashicorp/consul
150 lines
3.1 KiB
Go
150 lines
3.1 KiB
Go
package metrics
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// We force flush the statsite metrics after this period of
|
|
// inactivity. Prevents stats from getting stuck in a buffer
|
|
// forever.
|
|
flushInterval = 100 * time.Millisecond
|
|
)
|
|
|
|
// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
|
|
// (and tested) from NewMetricSinkFromURL.
|
|
func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
|
|
return NewStatsiteSink(u.Host)
|
|
}
|
|
|
|
// StatsiteSink provides a MetricSink that can be used with a
|
|
// statsite metrics server
|
|
type StatsiteSink struct {
|
|
addr string
|
|
metricQueue chan string
|
|
}
|
|
|
|
// NewStatsiteSink is used to create a new StatsiteSink
|
|
func NewStatsiteSink(addr string) (*StatsiteSink, error) {
|
|
s := &StatsiteSink{
|
|
addr: addr,
|
|
metricQueue: make(chan string, 4096),
|
|
}
|
|
go s.flushMetrics()
|
|
return s, nil
|
|
}
|
|
|
|
// Close is used to stop flushing to statsite
|
|
func (s *StatsiteSink) Shutdown() {
|
|
close(s.metricQueue)
|
|
}
|
|
|
|
func (s *StatsiteSink) SetGauge(key []string, val float32) {
|
|
flatKey := s.flattenKey(key)
|
|
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
|
|
}
|
|
|
|
func (s *StatsiteSink) EmitKey(key []string, val float32) {
|
|
flatKey := s.flattenKey(key)
|
|
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
|
|
}
|
|
|
|
func (s *StatsiteSink) IncrCounter(key []string, val float32) {
|
|
flatKey := s.flattenKey(key)
|
|
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
|
|
}
|
|
|
|
func (s *StatsiteSink) AddSample(key []string, val float32) {
|
|
flatKey := s.flattenKey(key)
|
|
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
|
|
}
|
|
|
|
// Flattens the key for formatting, removes spaces
|
|
func (s *StatsiteSink) flattenKey(parts []string) string {
|
|
joined := strings.Join(parts, ".")
|
|
return strings.Map(func(r rune) rune {
|
|
switch r {
|
|
case ':':
|
|
fallthrough
|
|
case ' ':
|
|
return '_'
|
|
default:
|
|
return r
|
|
}
|
|
}, joined)
|
|
}
|
|
|
|
// Does a non-blocking push to the metrics queue
|
|
func (s *StatsiteSink) pushMetric(m string) {
|
|
select {
|
|
case s.metricQueue <- m:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Flushes metrics
|
|
func (s *StatsiteSink) flushMetrics() {
|
|
var sock net.Conn
|
|
var err error
|
|
var wait <-chan time.Time
|
|
var buffered *bufio.Writer
|
|
ticker := time.NewTicker(flushInterval)
|
|
defer ticker.Stop()
|
|
|
|
CONNECT:
|
|
// Attempt to connect
|
|
sock, err = net.Dial("tcp", s.addr)
|
|
if err != nil {
|
|
log.Printf("[ERR] Error connecting to statsite! Err: %s", err)
|
|
goto WAIT
|
|
}
|
|
|
|
// Create a buffered writer
|
|
buffered = bufio.NewWriter(sock)
|
|
|
|
for {
|
|
select {
|
|
case metric, ok := <-s.metricQueue:
|
|
// Get a metric from the queue
|
|
if !ok {
|
|
goto QUIT
|
|
}
|
|
|
|
// Try to send to statsite
|
|
_, err := buffered.Write([]byte(metric))
|
|
if err != nil {
|
|
log.Printf("[ERR] Error writing to statsite! Err: %s", err)
|
|
goto WAIT
|
|
}
|
|
case <-ticker.C:
|
|
if err := buffered.Flush(); err != nil {
|
|
log.Printf("[ERR] Error flushing to statsite! Err: %s", err)
|
|
goto WAIT
|
|
}
|
|
}
|
|
}
|
|
|
|
WAIT:
|
|
// Wait for a while
|
|
wait = time.After(time.Duration(5) * time.Second)
|
|
for {
|
|
select {
|
|
// Dequeue the messages to avoid backlog
|
|
case _, ok := <-s.metricQueue:
|
|
if !ok {
|
|
goto QUIT
|
|
}
|
|
case <-wait:
|
|
goto CONNECT
|
|
}
|
|
}
|
|
QUIT:
|
|
s.metricQueue = nil
|
|
}
|