529 lines
20 KiB
Python
529 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
|
import re
|
|
import time
|
|
|
|
import psutil
|
|
|
|
from gevent import GreenletExit
|
|
|
|
from amplify.agent.common.util.plus import traverse_plus_api
|
|
from amplify.agent.collectors.abstract import AbstractMetricsCollector
|
|
from amplify.agent.collectors.plus.util.api import http_cache as api_http_cache
|
|
from amplify.agent.collectors.plus.util.api import http_server_zone as api_http_server_zone
|
|
from amplify.agent.collectors.plus.util.api import http_upstream as api_http_upstream
|
|
from amplify.agent.collectors.plus.util.api import slab as api_slab
|
|
from amplify.agent.collectors.plus.util.api import stream_server_zone as api_stream_server_zone
|
|
from amplify.agent.collectors.plus.util.api import stream_upstream as api_stream_upstream
|
|
from amplify.agent.collectors.plus.util.status import cache as status_cache
|
|
from amplify.agent.collectors.plus.util.status import status_zone as status_http_server_zone
|
|
from amplify.agent.collectors.plus.util.status import upstream as status_http_upstream
|
|
from amplify.agent.collectors.plus.util.status import slab as status_slab
|
|
from amplify.agent.collectors.plus.util.status import stream as status_stream_server_zone
|
|
from amplify.agent.collectors.plus.util.status import stream_upstream as status_stream_upstream
|
|
from amplify.agent.common.context import context
|
|
from amplify.agent.common.errors import AmplifyParseException
|
|
from amplify.agent.common.util.ps import Process
|
|
from amplify.agent.data.eventd import WARNING
|
|
|
|
__author__ = "Mike Belov"
|
|
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
|
|
__license__ = ""
|
|
__maintainer__ = "Mike Belov"
|
|
__email__ = "dedm@nginx.com"
|
|
|
|
STUB_RE = re.compile(r'^Active connections: (?P<connections>\d+)\s+[\w ]+\n'
|
|
r'\s+(?P<accepts>\d+)'
|
|
r'\s+(?P<handled>\d+)'
|
|
r'\s+(?P<requests>\d+)'
|
|
r'\s+Reading:\s+(?P<reading>\d+)'
|
|
r'\s+Writing:\s+(?P<writing>\d+)'
|
|
r'\s+Waiting:\s+(?P<waiting>\d+)')
|
|
|
|
|
|
class NginxMetricsCollector(AbstractMetricsCollector):
|
|
short_name = 'nginx_metrics'
|
|
status_metric_key = 'nginx.status'
|
|
|
|
def __init__(self, **kwargs):
|
|
super(NginxMetricsCollector, self).__init__(**kwargs)
|
|
self.processes = [Process(pid) for pid in self.object.workers]
|
|
self.zombies = set()
|
|
|
|
self.register(
|
|
self.workers_count,
|
|
self.memory_info,
|
|
self.workers_fds_count,
|
|
self.workers_cpu,
|
|
self.global_metrics,
|
|
self.reloads_and_restarts_count,
|
|
)
|
|
if not self.in_container:
|
|
self.register(
|
|
self.workers_rlimit_nofile,
|
|
self.workers_io
|
|
)
|
|
|
|
def handle_exception(self, method, exception):
|
|
if isinstance(exception, psutil.NoSuchProcess):
|
|
|
|
# Log exception
|
|
context.log.warning(
|
|
'failed to collect metrics %s due to %s, object restart needed (PID: %s)' %
|
|
(method.__name__, exception.__class__.__name__, exception.pid)
|
|
)
|
|
# since the PID no longer exists, mark the object as needing restart for safety
|
|
self.object.need_restart = True
|
|
else:
|
|
# Fire event warning.
|
|
self.object.eventd.event(
|
|
level=WARNING,
|
|
message="can't obtain worker process metrics (maybe permissions?)",
|
|
onetime=True
|
|
)
|
|
super(NginxMetricsCollector, self).handle_exception(method, exception)
|
|
|
|
def reloads_and_restarts_count(self):
|
|
self.object.statsd.incr('nginx.master.reloads', self.object.reloads)
|
|
self.object.reloads = 0
|
|
|
|
def workers_count(self):
|
|
"""nginx.workers.count"""
|
|
self.object.statsd.gauge('nginx.workers.count', len(self.object.workers))
|
|
|
|
def handle_zombie(self, pid):
|
|
"""
|
|
removes pid from workers list
|
|
:param pid: zombie pid
|
|
"""
|
|
context.log.warning('zombie process %s found' % pid)
|
|
self.zombies.add(pid)
|
|
|
|
def memory_info(self):
|
|
"""
|
|
memory info
|
|
|
|
nginx.workers.mem.rss
|
|
nginx.workers.mem.vms
|
|
nginx.workers.mem.rss_pct
|
|
"""
|
|
rss, vms, pct = 0, 0, 0.0
|
|
for p in self.processes:
|
|
if p.pid in self.zombies:
|
|
continue
|
|
try:
|
|
mem_info = p.memory_info()
|
|
rss += mem_info.rss
|
|
vms += mem_info.vms
|
|
pct += p.memory_percent()
|
|
except psutil.ZombieProcess:
|
|
self.handle_zombie(p.pid)
|
|
|
|
self.object.statsd.gauge('nginx.workers.mem.rss', rss)
|
|
self.object.statsd.gauge('nginx.workers.mem.vms', vms)
|
|
self.object.statsd.gauge('nginx.workers.mem.rss_pct', pct)
|
|
|
|
def workers_fds_count(self):
|
|
"""nginx.workers.fds_count"""
|
|
fds = 0
|
|
for p in self.processes:
|
|
if p.pid in self.zombies:
|
|
continue
|
|
try:
|
|
fds += p.num_fds()
|
|
except psutil.ZombieProcess:
|
|
self.handle_zombie(p.pid)
|
|
self.object.statsd.incr('nginx.workers.fds_count', fds)
|
|
|
|
def workers_cpu(self):
|
|
"""
|
|
cpu
|
|
|
|
nginx.workers.cpu.system
|
|
nginx.workers.cpu.user
|
|
"""
|
|
worker_user, worker_sys = 0.0, 0.0
|
|
for p in self.processes:
|
|
if p.pid in self.zombies:
|
|
continue
|
|
try:
|
|
u, s = p.cpu_percent()
|
|
worker_user += u
|
|
worker_sys += s
|
|
except psutil.ZombieProcess:
|
|
self.handle_zombie(p.pid)
|
|
self.object.statsd.gauge('nginx.workers.cpu.total', worker_user + worker_sys)
|
|
self.object.statsd.gauge('nginx.workers.cpu.user', worker_user)
|
|
self.object.statsd.gauge('nginx.workers.cpu.system', worker_sys)
|
|
|
|
def global_metrics(self):
|
|
"""
|
|
check if found api or extended status, collect "global" metrics from it
|
|
don't look for stub_status
|
|
if there's no extended status or N+ API easily accessible, proceed with stub_status
|
|
"""
|
|
if self.object.api_enabled and self.object.api_internal_url:
|
|
self.plus_api()
|
|
elif self.object.plus_status_enabled and self.object.plus_status_internal_url \
|
|
and self.object.status_directive_supported:
|
|
self.plus_status()
|
|
elif self.object.stub_status_enabled and self.object.stub_status_url:
|
|
self.stub_status()
|
|
else:
|
|
return
|
|
|
|
def stub_status(self):
|
|
"""
|
|
stub status metrics
|
|
|
|
nginx.http.conn.current = ss.active
|
|
nginx.http.conn.active = ss.active - ss.waiting
|
|
nginx.http.conn.idle = ss.waiting
|
|
nginx.http.request.count = ss.requests ## counter
|
|
nginx.http.request.reading = ss.reading
|
|
nginx.http.request.writing = ss.writing
|
|
nginx.http.conn.dropped = ss.accepts - ss.handled ## counter
|
|
nginx.http.conn.accepted = ss.accepts ## counter
|
|
"""
|
|
stub_body = ''
|
|
stub = {}
|
|
stub_time = int(time.time())
|
|
|
|
# get stub status body
|
|
try:
|
|
stub_body = context.http_client.get(self.object.stub_status_url, timeout=1, json=False, log=False)
|
|
except GreenletExit:
|
|
# we caught an exit signal in the middle of processing so raise it.
|
|
raise
|
|
except:
|
|
context.log.error('failed to check stub_status url %s' % self.object.stub_status_url)
|
|
context.log.debug('additional info', exc_info=True)
|
|
stub_body = None
|
|
|
|
if not stub_body:
|
|
return
|
|
|
|
# parse body
|
|
try:
|
|
gre = STUB_RE.match(stub_body)
|
|
if not gre:
|
|
raise AmplifyParseException(message='stub status %s' % stub_body)
|
|
for field in ('connections', 'accepts', 'handled', 'requests', 'reading', 'writing', 'waiting'):
|
|
stub[field] = int(gre.group(field))
|
|
except:
|
|
context.log.error('failed to parse stub_status body')
|
|
raise
|
|
|
|
# store some variables for further use
|
|
stub['dropped'] = stub['accepts'] - stub['handled']
|
|
|
|
# gauges
|
|
self.object.statsd.gauge('nginx.http.conn.current', stub['connections'])
|
|
self.object.statsd.gauge('nginx.http.conn.active', stub['connections'] - stub['waiting'])
|
|
self.object.statsd.gauge('nginx.http.conn.idle', stub['waiting'])
|
|
self.object.statsd.gauge('nginx.http.request.writing', stub['writing'])
|
|
self.object.statsd.gauge('nginx.http.request.reading', stub['reading'])
|
|
self.object.statsd.gauge('nginx.http.request.current', stub['reading'] + stub['writing'])
|
|
|
|
# counters
|
|
counted_vars = {
|
|
'nginx.http.request.count': 'requests',
|
|
'nginx.http.conn.accepted': 'accepts',
|
|
'nginx.http.conn.dropped': 'dropped'
|
|
}
|
|
for metric_name, stub_name in counted_vars.items():
|
|
stamp, value = stub_time, stub[stub_name]
|
|
prev_stamp, prev_value = self.previous_counters.get(metric_name, (None, None))
|
|
|
|
if isinstance(prev_value, (int, float, complex)) and prev_stamp and prev_stamp != stamp:
|
|
value_delta = value - prev_value
|
|
self.object.statsd.incr(metric_name, value_delta)
|
|
|
|
self.previous_counters[metric_name] = [stamp, value]
|
|
|
|
def plus_status(self):
|
|
"""
|
|
plus status metrics
|
|
|
|
nginx.http.conn.accepted = connections.accepted ## counter
|
|
nginx.http.conn.dropped = connections.dropped ## counter
|
|
nginx.http.conn.active = connections.active
|
|
nginx.http.conn.current = connections.active + connections.idle
|
|
nginx.http.conn.idle = connections.idle
|
|
nginx.http.request.count = requests.total ## counter
|
|
nginx.http.request.current = requests.current
|
|
|
|
plus.http.ssl.handshakes = ssl.handshakes
|
|
plus.http.ssl.failed = ssl.handshakes_failed
|
|
plus.http.ssl.reuses = ssl.session_reuses
|
|
|
|
also here we run plus metrics collection
|
|
"""
|
|
stamp = int(time.time())
|
|
|
|
# get plus status body
|
|
try:
|
|
status = context.http_client.get(self.object.plus_status_internal_url, timeout=1, log=False)
|
|
|
|
# modify status to move stream data up a level
|
|
if 'stream' in status:
|
|
status['streams'] = status['stream'].get('server_zones', {})
|
|
status['stream_upstreams'] = status['stream'].get('upstreams', {})
|
|
|
|
# Add the status payload to plus_cache so it can be parsed by other collectors (plus objects)
|
|
context.plus_cache.put(self.object.plus_status_internal_url, (status, stamp))
|
|
except GreenletExit:
|
|
raise
|
|
except:
|
|
context.log.error('failed to check plus_status url %s' % self.object.plus_status_internal_url)
|
|
context.log.debug('additional info', exc_info=True)
|
|
status = None
|
|
|
|
if not status:
|
|
return
|
|
|
|
connections = status.get('connections', {})
|
|
requests = status.get('requests', {})
|
|
ssl = status.get('ssl', {})
|
|
|
|
# gauges
|
|
self.object.statsd.gauge('nginx.http.conn.active', connections.get('active'))
|
|
self.object.statsd.gauge('nginx.http.conn.idle', connections.get('idle'))
|
|
self.object.statsd.gauge('nginx.http.conn.current', connections.get('active') + connections.get('idle'))
|
|
self.object.statsd.gauge('nginx.http.request.current', requests.get('current'))
|
|
|
|
# counters
|
|
counted_vars = {
|
|
'nginx.http.request.count': requests.get('total'),
|
|
'nginx.http.conn.accepted': connections.get('accepted'),
|
|
'nginx.http.conn.dropped': connections.get('dropped'),
|
|
'plus.http.ssl.handshakes': ssl.get('handshakes'),
|
|
'plus.http.ssl.failed': ssl.get('handshakes_failed'),
|
|
'plus.http.ssl.reuses': ssl.get('session_reuses')
|
|
}
|
|
self.aggregate_counters(counted_vars, stamp=stamp)
|
|
|
|
# aggregate plus metrics
|
|
# caches
|
|
caches = status.get('caches', {})
|
|
for cache in caches.values():
|
|
for method in status_cache.CACHE_COLLECT_INDEX:
|
|
method(self, cache, stamp)
|
|
|
|
# status zones
|
|
zones = status.get('server_zones', {})
|
|
for zone in zones.values():
|
|
for method in status_http_server_zone.STATUS_ZONE_COLLECT_INDEX:
|
|
method(self, zone, stamp)
|
|
|
|
# upstreams
|
|
upstreams = status.get('upstreams', {})
|
|
for upstream in upstreams.values():
|
|
# workaround for supporting old N+ format
|
|
# http://nginx.org/en/docs/http/ngx_http_status_module.html#compatibility
|
|
peers = upstream['peers'] if 'peers' in upstream else upstream
|
|
for peer in peers:
|
|
for method in status_http_upstream.UPSTREAM_PEER_COLLECT_INDEX:
|
|
method(self, peer, stamp)
|
|
for method in status_http_upstream.UPSTREAM_COLLECT_INDEX:
|
|
method(self, upstream, stamp)
|
|
|
|
# slabs
|
|
slabs = status.get('slabs', {})
|
|
for slab in slabs.values():
|
|
for method in status_slab.SLAB_COLLECT_INDEX:
|
|
method(self, slab, stamp)
|
|
|
|
# streams - server_zones of stream
|
|
streams = status.get('streams', {})
|
|
for stream in streams.values():
|
|
for method in status_stream_server_zone.STREAM_COLLECT_INDEX:
|
|
method(self, stream, stamp)
|
|
|
|
# stream upstreams - upstreams of stream
|
|
stream_upstreams = status.get('stream_upstreams', {})
|
|
for stream_upstream in stream_upstreams.values():
|
|
peers = stream_upstream['peers'] if 'peers' in stream_upstream else stream_upstream
|
|
for peer in peers:
|
|
for method in status_stream_upstream.STREAM_UPSTREAM_PEER_COLLECT_INDEX:
|
|
method(self, peer, stamp)
|
|
for method in status_stream_upstream.STREAM_UPSTREAM_COLLECT_INDEX:
|
|
method(self, stream_upstream, stamp)
|
|
|
|
self.increment_counters()
|
|
self.finalize_latest()
|
|
|
|
def plus_api(self):
|
|
"""
|
|
plus api top-level metrics
|
|
|
|
nginx.http.conn.accepted = connections.accepted ## counter
|
|
nginx.http.conn.dropped = connections.dropped ## counter
|
|
nginx.http.conn.active = connections.active
|
|
nginx.http.conn.current = connections.active + connections.idle
|
|
nginx.http.conn.idle = connections.idle
|
|
nginx.http.request.count = requests.total ## counter
|
|
nginx.http.request.current = requests.current
|
|
|
|
plus.http.ssl.handshakes = ssl.handshakes
|
|
plus.http.ssl.failed = ssl.handshakes_failed
|
|
plus.http.ssl.reuses = ssl.session_reuses
|
|
plus.proc.respawned = processes.respawned
|
|
|
|
also here we run plus metrics collection
|
|
"""
|
|
stamp = int(time.time())
|
|
|
|
try:
|
|
aggregated_api_payload = traverse_plus_api(
|
|
location_prefix=self.object.api_internal_url,
|
|
root_endpoints_to_skip=self.object.api_endpoints_to_skip
|
|
)
|
|
except GreenletExit:
|
|
raise
|
|
except:
|
|
context.log.error('failed to check plus_api url %s' % self.object.api_internal_url)
|
|
context.log.debug('additional info', exc_info=True)
|
|
aggregated_api_payload = None
|
|
|
|
if not aggregated_api_payload:
|
|
return
|
|
|
|
context.plus_cache.put(self.object.api_internal_url, (aggregated_api_payload, stamp))
|
|
|
|
connections = aggregated_api_payload.get('connections', {})
|
|
http = aggregated_api_payload.get('http', {})
|
|
requests = http.get('requests', {})
|
|
ssl = aggregated_api_payload.get('ssl', {})
|
|
processes = aggregated_api_payload.get('processes', {})
|
|
stream = aggregated_api_payload.get('stream', {})
|
|
|
|
# gauges
|
|
self.object.statsd.gauge('nginx.http.conn.active', connections.get('active'))
|
|
self.object.statsd.gauge('nginx.http.conn.idle', connections.get('idle'))
|
|
self.object.statsd.gauge('nginx.http.conn.current', connections.get('active') + connections.get('idle'))
|
|
self.object.statsd.gauge('nginx.http.request.current', requests.get('current'))
|
|
|
|
# counters
|
|
counted_vars = {
|
|
'nginx.http.request.count': requests.get('total'),
|
|
'nginx.http.conn.accepted': connections.get('accepted'),
|
|
'nginx.http.conn.dropped': connections.get('dropped'),
|
|
'plus.http.ssl.handshakes': ssl.get('handshakes'),
|
|
'plus.http.ssl.failed': ssl.get('handshakes_failed'),
|
|
'plus.http.ssl.reuses': ssl.get('session_reuses'),
|
|
'plus.proc.respawned' : processes.get('respawned')
|
|
}
|
|
self.aggregate_counters(counted_vars, stamp=stamp)
|
|
|
|
caches = http.get('caches', {})
|
|
for cache in caches.values():
|
|
for method in api_http_cache.CACHE_COLLECT_INDEX:
|
|
method(self, cache, stamp)
|
|
|
|
http_server_zones = http.get('server_zones', {})
|
|
for server_zone in http_server_zones.values():
|
|
for method in api_http_server_zone.STATUS_ZONE_COLLECT_INDEX:
|
|
method(self, server_zone, stamp)
|
|
|
|
http_upstreams = http.get('upstreams', {})
|
|
for upstream in http_upstreams.values():
|
|
for peer in upstream.get('peers', []):
|
|
for method in api_http_upstream.UPSTREAM_PEER_COLLECT_INDEX:
|
|
method(self, peer, stamp)
|
|
for method in api_http_upstream.UPSTREAM_COLLECT_INDEX:
|
|
method(self, upstream, stamp)
|
|
|
|
slabs = aggregated_api_payload.get('slabs', {})
|
|
for slab in slabs.values():
|
|
for method in api_slab.SLAB_COLLECT_INDEX:
|
|
method(self, slab, stamp)
|
|
|
|
stream_server_zones = stream.get('server_zones', {})
|
|
for server_zone in stream_server_zones.values():
|
|
for method in api_stream_server_zone.STREAM_COLLECT_INDEX:
|
|
method(self, server_zone, stamp)
|
|
|
|
stream_upstreams = stream.get('upstreams', {})
|
|
for upstream in stream_upstreams.values():
|
|
for peer in upstream.get('peers', []):
|
|
for method in api_stream_upstream.STREAM_UPSTREAM_PEER_COLLECT_INDEX:
|
|
method(self, peer, stamp)
|
|
for method in api_stream_upstream.STREAM_UPSTREAM_COLLECT_INDEX:
|
|
method(self, upstream, stamp)
|
|
|
|
self.increment_counters()
|
|
self.finalize_latest()
|
|
|
|
def workers_rlimit_nofile(self):
|
|
"""
|
|
nginx.workers.rlimit_nofile
|
|
|
|
sum for all hard limits (second value of rlimit)
|
|
"""
|
|
rlimit = 0
|
|
for p in self.processes:
|
|
if p.pid in self.zombies:
|
|
continue
|
|
try:
|
|
rlimit += p.rlimit_nofile()
|
|
except psutil.ZombieProcess:
|
|
self.handle_zombie(p.pid)
|
|
self.object.statsd.gauge('nginx.workers.rlimit_nofile', rlimit)
|
|
|
|
def workers_io(self):
|
|
"""
|
|
io
|
|
|
|
nginx.workers.io.kbs_r
|
|
nginx.workers.io.kbs_w
|
|
"""
|
|
# collect raw data
|
|
read, write = 0, 0
|
|
for p in self.processes:
|
|
if p.pid in self.zombies:
|
|
continue
|
|
try:
|
|
io = p.io_counters()
|
|
read += io.read_bytes
|
|
write += io.write_bytes
|
|
except psutil.ZombieProcess:
|
|
self.handle_zombie(p.pid)
|
|
current_stamp = int(time.time())
|
|
|
|
# kilobytes!
|
|
read /= 1024
|
|
write /= 1024
|
|
|
|
# get deltas and store metrics
|
|
for metric_name, value in {'nginx.workers.io.kbs_r': read, 'nginx.workers.io.kbs_w': write}.items():
|
|
prev_stamp, prev_value = self.previous_counters.get(metric_name, (None, None))
|
|
if isinstance(prev_value, (int, float, complex)) and prev_stamp and prev_stamp != current_stamp:
|
|
value_delta = value - prev_value
|
|
self.object.statsd.incr(metric_name, value_delta)
|
|
self.previous_counters[metric_name] = (current_stamp, value)
|
|
|
|
|
|
class GenericLinuxNginxMetricsCollector(NginxMetricsCollector):
|
|
pass
|
|
|
|
|
|
class DebianNginxMetricsCollector(NginxMetricsCollector):
|
|
pass
|
|
|
|
|
|
class CentosNginxMetricsCollector(NginxMetricsCollector):
|
|
pass
|
|
|
|
|
|
class GentooNginxMetricsCollector(NginxMetricsCollector):
|
|
pass
|
|
|
|
|
|
class FreebsdNginxMetricsCollector(NginxMetricsCollector):
|
|
|
|
def workers_fds_count(self):
|
|
"""
|
|
This doesn't work on FreeBSD
|
|
"""
|
|
pass
|