nginx-amplify-agent/amplify/agent/data/statsd.py

213 lines
8.2 KiB
Python

# -*- coding: utf-8 -*-
import copy
import time
from amplify.agent.common.util.math import median
from collections import defaultdict
__author__ = "Mike Belov"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
class StatsdClient(object):
def __init__(self, address=None, port=None, interval=None, object=None):
# Import context as a class object to avoid circular import on statsd. This could be refactored later.
from amplify.agent.common.context import context
self.context = context
self.address = address
self.port = port
self.object = object
self.interval = interval
self.current = defaultdict(dict)
self.delivery = defaultdict(dict)
def latest(self, metric_name, value, stamp=None):
"""
Stores the most recent value of a gauge
:param metric_name: metric name
:param value: metric value
:param stamp: timestamp (current timestamp will be used if this is not specified)
"""
timestamp = stamp or int(time.time())
gauges = self.current['gauge']
if metric_name not in gauges or timestamp > gauges[metric_name][0][0]:
gauges[metric_name] = [(timestamp, value)]
def average(self, metric_name, value):
"""
Same thing as histogram but without p95
:param metric_name: metric name
:param value: metric value
"""
if metric_name in self.current['average']:
self.current['average'][metric_name].append(value)
else:
self.current['average'][metric_name] = [value]
def timer(self, metric_name, value):
"""
Histogram with 95 percentile
The algorithm is as follows:
Collect all the data samples for a period of time (commonly a day, a week, or a month).
Sort the data set by value from highest to lowest and discard the highest 5% of the sorted samples.
The next highest sample is the 95th percentile value for the data set.
:param metric_name: metric name
:param value: metric value
"""
if metric_name in self.current['timer']:
self.current['timer'][metric_name].append(value)
else:
self.current['timer'][metric_name] = [value]
def incr(self, metric_name, value=None, rate=None, stamp=None):
"""
Simple counter with rate
:param metric_name: metric name
:param value: metric value
:param rate: rate
:param stamp: timestamp (current timestamp will be used if this is not specified)
"""
timestamp = stamp or int(time.time())
if value is None:
value = 1
elif value < 0:
self.context.default_log.debug(
'negative delta (%s) passed for metric %s, skipping' %
(value, metric_name)
)
return
# new metric
if metric_name not in self.current['counter']:
self.current['counter'][metric_name] = [[timestamp, value]]
return
# metric exists
slots = self.current['counter'][metric_name]
last_stamp, last_value = slots[-1]
# if rate is set then check it's time
if self.interval and rate:
sample_duration = self.interval * rate
# write to current slot
if timestamp < last_stamp + sample_duration:
self.current['counter'][metric_name][-1] = [last_stamp, last_value + value]
else:
self.current['counter'][metric_name].append([last_stamp, value])
else:
self.current['counter'][metric_name][-1] = [last_stamp, last_value + value]
def object_status(self, metric_name, value=1, stamp=None):
"""
Object status metrics
:param metric_name: metric
:param value: value
:param stamp: timestamp (current timestamp will be used if this is not specified)
"""
timestamp = stamp or int(time.time())
self.current['gauge'][metric_name] = [(timestamp, value)]
def gauge(self, metric_name, value, delta=False, prefix=False, stamp=None):
"""
Gauge
:param metric_name: metric name
:param value: metric value
:param delta: metric delta (applicable only if we have previous values)
:param stamp: timestamp (current timestamp will be used if this is not specified)
"""
timestamp = stamp or int(time.time())
if metric_name in self.current['gauge']:
if delta:
last_stamp, last_value = self.current['gauge'][metric_name][-1]
new_value = last_value + value
else:
new_value = value
self.current['gauge'][metric_name].append((timestamp, new_value))
else:
self.current['gauge'][metric_name] = [(timestamp, value)]
def flush(self):
if not self.current:
return {'object': self.object.definition}
results = {}
delivery = copy.deepcopy(self.current)
self.current = defaultdict(dict)
# histogram
if 'timer' in delivery:
timers = {}
timestamp = int(time.time())
for metric_name, metric_values in delivery['timer'].items():
if len(metric_values):
metric_values.sort()
length = len(metric_values)
timers['G|%s' % metric_name] = [[timestamp, sum(metric_values) / float(length)]]
filter_suffix = ""
filter_suffix_index = metric_name.find("||")
if filter_suffix_index > 0:
filter_suffix = metric_name[filter_suffix_index:]
metric_name = metric_name[:filter_suffix_index]
timers['C|%s.count%s' % (metric_name, filter_suffix)] = [[timestamp, length]]
timers['G|%s.max%s' % (metric_name, filter_suffix)] = [[timestamp, metric_values[-1]]]
timers['G|%s.median%s' % (metric_name, filter_suffix)] = [[timestamp, median(metric_values, presorted=True)]]
timers['G|%s.pctl95%s' % (metric_name, filter_suffix)] = [[timestamp, metric_values[-int(round(length * .05))]]]
results['timer'] = timers
# counters
if 'counter' in delivery:
counters = {}
for k, v in delivery['counter'].items():
# Aggregate all observed counters into a single record.
last_stamp = v[-1][0] # Use the oldest timestamp.
total_value = 0
for timestamp, value in v:
total_value += value
# Condense the list of lists 'v' into a list of a single element. Remember that we are using lists
# instead of tuples because we need mutability during self.incr().
counters['C|%s' % k] = [[last_stamp, total_value]]
results['counter'] = counters
# gauges
if 'gauge' in delivery:
gauges = {}
for k, v in delivery['gauge'].items():
# Aggregate all observed gauges into a single record.
last_stamp = v[-1][0] # Use the oldest timestamp.
total_value = 0
for timestamp, value in v:
total_value += value
# Condense list of tuples 'v' into a list of a single tuple using an average value.
gauges['G|%s' % k] = [(last_stamp, float(total_value)/len(v))]
results['gauge'] = gauges
# avg
if 'average' in delivery:
averages = {}
timestamp = int(time.time()) # Take a new timestamp here because it is not collected previously.
for metric_name, metric_values in delivery['average'].items():
if len(metric_values):
length = len(metric_values)
averages['G|%s' % metric_name] = [[timestamp, sum(metric_values) / float(length)]]
results['average'] = averages
return {
'metrics': copy.deepcopy(results),
'object': self.object.definition
}